diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 23d9c8afa..c325023fa 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -1,4 +1,5 @@ from uuid import UUID +from pathlib import Path from typing import Union, BinaryIO, List, Optional from cognee.modules.pipelines import Task @@ -8,7 +9,7 @@ from cognee.tasks.ingestion import ingest_data, resolve_data_directories async def add( - data: Union[BinaryIO, list[BinaryIO], str, list[str]], + data: Union[BinaryIO, list[BinaryIO], str, list[str], Path, list[Path]], dataset_name: str = "main_dataset", user: User = None, node_set: Optional[List[str]] = None, @@ -17,7 +18,7 @@ async def add( dataset_id: Optional[UUID] = None, preferred_loaders: Optional[List[str]] = None, loader_config: Optional[dict] = None, - + incremental_loading: bool = True, ): """ Add data to Cognee for knowledge graph processing using a plugin-based loader system. @@ -36,8 +37,9 @@ async def add( Supported Input Types: - **Text strings**: Direct text content (str) - any string not starting with "/" or "file://" - - **File paths**: Local file paths as strings in these formats: - * Absolute paths: "/path/to/document.pdf" + - **File paths**: Local file paths in these formats: + * Path objects: pathlib.Path("/path/to/document.pdf") - **Recommended for explicit file path handling** + * Absolute paths as strings: "/path/to/document.pdf" * File URLs: "file:///path/to/document.pdf" or "file://relative/path.txt" * S3 paths: "s3://bucket-name/path/to/file.pdf" - **Binary file objects**: File handles/streams (BinaryIO) @@ -102,6 +104,8 @@ async def add( If not provided, uses default loader priority. loader_config: Optional configuration for specific loaders. Dictionary mapping loader names to their configuration options (e.g., {"pypdf_loader": {"strict": False}}). + incremental_loading: Whether to skip processing of documents already processed by the pipeline. + Defaults to True for efficiency. Set to False to reprocess all data. Returns: PipelineRunInfo: Information about the ingestion pipeline execution including: diff --git a/cognee/tasks/ingestion/resolve_data_directories.py b/cognee/tasks/ingestion/resolve_data_directories.py index 0f2f2a85f..cd326b6c0 100644 --- a/cognee/tasks/ingestion/resolve_data_directories.py +++ b/cognee/tasks/ingestion/resolve_data_directories.py @@ -1,11 +1,13 @@ import os from urllib.parse import urlparse +from pathlib import Path from typing import List, Union, BinaryIO from cognee.infrastructure.files.storage.s3_config import get_s3_config async def resolve_data_directories( - data: Union[BinaryIO, List[BinaryIO], str, List[str]], include_subdirectories: bool = True + data: Union[BinaryIO, List[BinaryIO], str, List[str], Path, List[Path]], + include_subdirectories: bool = True, ): """ Resolves directories by replacing them with their contained files. @@ -33,7 +35,26 @@ async def resolve_data_directories( ) for item in data: - if isinstance(item, str): # Check if the item is a path + if isinstance(item, Path): # Path objects explicitly indicate file paths + # Convert Path to string for processing + item_str = str(item) + if item.is_dir(): # If it's a directory + if include_subdirectories: + # Recursively add all files in the directory and subdirectories + for root, _, files in os.walk(item_str): + resolved_data.extend([Path(os.path.join(root, f)) for f in files]) + else: + # Add all files (not subdirectories) in the directory + resolved_data.extend( + [ + Path(os.path.join(item_str, f)) + for f in os.listdir(item_str) + if os.path.isfile(os.path.join(item_str, f)) + ] + ) + else: # If it's a file, add it directly + resolved_data.append(item) + elif isinstance(item, str): # Check if the item is a path or text content # S3 if urlparse(item).scheme == "s3": if fs is not None: diff --git a/cognee/tasks/ingestion/save_data_item_to_storage.py b/cognee/tasks/ingestion/save_data_item_to_storage.py index 814e908b1..50ed022a6 100644 --- a/cognee/tasks/ingestion/save_data_item_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_to_storage.py @@ -1,5 +1,6 @@ import os from urllib.parse import urlparse +from pathlib import Path from typing import Union, BinaryIO, Any from cognee.modules.ingestion.exceptions import IngestionError @@ -16,7 +17,7 @@ class SaveDataSettings(BaseSettings): settings = SaveDataSettings() -async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str: +async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Path, Any]) -> str: if "llama_index" in str(type(data_item)): # Dynamic import is used because the llama_index module is optional. from .transform_data import get_data_from_llama_index @@ -27,6 +28,18 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str if hasattr(data_item, "file"): return await save_data_to_file(data_item.file, filename=data_item.filename) + # data is a Path object - explicitly indicates a file path + if isinstance(data_item, Path): + if settings.accept_local_file_path: + # Convert Path to file URL for consistency + normalized_path = str(data_item.resolve()) + # Use forward slashes in file URLs for consistency + url_path = normalized_path.replace(os.sep, "/") + file_path = "file://" + url_path + return file_path + else: + raise IngestionError(message="Local files are not accepted.") + if isinstance(data_item, str): parsed_url = urlparse(data_item) diff --git a/infrastructure/loaders/LoaderInterface.py b/infrastructure/loaders/LoaderInterface.py index 66d8ede40..7a6075268 100644 --- a/infrastructure/loaders/LoaderInterface.py +++ b/infrastructure/loaders/LoaderInterface.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod -from typing import List +from typing import List, Union +from pathlib import Path from .models.LoaderResult import LoaderResult @@ -45,12 +46,12 @@ class LoaderInterface(ABC): pass @abstractmethod - def can_handle(self, file_path: str, mime_type: str = None) -> bool: + def can_handle(self, file_path: Union[str, Path], mime_type: str = None) -> bool: """ Check if this loader can handle the given file. Args: - file_path: Path to the file to be processed + file_path: Path to the file to be processed (Path type recommended for explicit file path handling) mime_type: Optional MIME type of the file Returns: @@ -59,12 +60,12 @@ class LoaderInterface(ABC): pass @abstractmethod - async def load(self, file_path: str, **kwargs) -> LoaderResult: + async def load(self, file_path: Union[str, Path], **kwargs) -> LoaderResult: """ Load and process the file, returning standardized result. Args: - file_path: Path to the file to be processed + file_path: Path to the file to be processed (Path type recommended for explicit file path handling) **kwargs: Additional loader-specific configuration Returns: diff --git a/infrastructure/loaders/core/text_loader.py b/infrastructure/loaders/core/text_loader.py index c85ea9781..63e65e720 100644 --- a/infrastructure/loaders/core/text_loader.py +++ b/infrastructure/loaders/core/text_loader.py @@ -1,5 +1,6 @@ import os -from typing import List +from typing import List, Union +from pathlib import Path from ..LoaderInterface import LoaderInterface from ..models.LoaderResult import LoaderResult, ContentType @@ -36,19 +37,22 @@ class TextLoader(LoaderInterface): """Unique identifier for this loader.""" return "text_loader" - def can_handle(self, file_path: str, mime_type: str = None) -> bool: + def can_handle(self, file_path: Union[str, Path], mime_type: str = None) -> bool: """ Check if this loader can handle the given file. Args: - file_path: Path to the file + file_path: Path to the file (Path type recommended for explicit file path handling) mime_type: Optional MIME type Returns: True if file can be handled, False otherwise """ + # Convert to Path for consistent handling + path_obj = Path(file_path) if isinstance(file_path, str) else file_path + # Check by extension - ext = os.path.splitext(file_path)[1].lower() + ext = path_obj.suffix.lower() if ext in self.supported_extensions: return True @@ -60,7 +64,7 @@ class TextLoader(LoaderInterface): # This is useful when other loaders fail try: # Quick check if file appears to be text - with open(file_path, "rb") as f: + with open(path_obj, "rb") as f: sample = f.read(512) # Simple heuristic: if most bytes are printable, consider it text if sample: @@ -78,12 +82,14 @@ class TextLoader(LoaderInterface): return False - async def load(self, file_path: str, encoding: str = "utf-8", **kwargs) -> LoaderResult: + async def load( + self, file_path: Union[str, Path], encoding: str = "utf-8", **kwargs + ) -> LoaderResult: """ Load and process the text file. Args: - file_path: Path to the file to load + file_path: Path to the file to load (Path type recommended for explicit file path handling) encoding: Text encoding to use (default: utf-8) **kwargs: Additional configuration (unused) @@ -95,25 +101,28 @@ class TextLoader(LoaderInterface): UnicodeDecodeError: If file cannot be decoded with specified encoding OSError: If file cannot be read """ - if not os.path.exists(file_path): - raise FileNotFoundError(f"File not found: {file_path}") + # Convert to Path for consistent handling + path_obj = Path(file_path) if isinstance(file_path, str) else file_path + + if not path_obj.exists(): + raise FileNotFoundError(f"File not found: {path_obj}") try: - with open(file_path, "r", encoding=encoding) as f: + with open(path_obj, "r", encoding=encoding) as f: content = f.read() except UnicodeDecodeError: # Try with fallback encoding if encoding == "utf-8": - return await self.load(file_path, encoding="latin-1", **kwargs) + return await self.load(path_obj, encoding="latin-1", **kwargs) else: raise # Extract basic metadata - file_stat = os.stat(file_path) + file_stat = path_obj.stat() metadata = { - "name": os.path.basename(file_path), + "name": path_obj.name, "size": file_stat.st_size, - "extension": os.path.splitext(file_path)[1], + "extension": path_obj.suffix, "encoding": encoding, "loader": self.loader_name, "lines": len(content.splitlines()) if content else 0, @@ -124,5 +133,5 @@ class TextLoader(LoaderInterface): content=content, metadata=metadata, content_type=ContentType.TEXT, - source_info={"file_path": file_path, "encoding": encoding}, + source_info={"file_path": str(path_obj), "encoding": encoding}, ) diff --git a/tests/unit/infrastructure/loaders/test_text_loader.py b/tests/unit/infrastructure/loaders/test_text_loader.py index c0b6e8ccb..9267ef808 100644 --- a/tests/unit/infrastructure/loaders/test_text_loader.py +++ b/tests/unit/infrastructure/loaders/test_text_loader.py @@ -155,3 +155,43 @@ class TestTextLoader: """Test that TextLoader has no external dependencies.""" assert text_loader.get_dependencies() == [] assert text_loader.validate_dependencies() is True + + def test_can_handle_path_object(self, text_loader): + """Test that can_handle works with Path objects.""" + path_obj = Path("test.txt") + assert text_loader.can_handle(path_obj) + + path_obj = Path("test.pdf") + assert not text_loader.can_handle(path_obj) + + # Test case insensitive + path_obj = Path("test.TXT") + assert text_loader.can_handle(path_obj) + + def test_can_handle_path_object_with_mime_type(self, text_loader): + """Test that can_handle works with Path objects and MIME type.""" + path_obj = Path("test.unknown") + assert text_loader.can_handle(path_obj, mime_type="text/plain") + assert not text_loader.can_handle(path_obj, mime_type="application/pdf") + + @pytest.mark.asyncio + async def test_load_path_object(self, text_loader, temp_text_file): + """Test loading a file using a Path object.""" + path_obj = Path(temp_text_file) + result = await text_loader.load(path_obj) + + assert isinstance(result.content, str) + assert "This is a test file." in result.content + assert result.content_type == ContentType.TEXT + assert result.metadata["loader"] == "text_loader" + assert result.metadata["name"] == path_obj.name + assert result.metadata["lines"] == 2 + assert result.metadata["encoding"] == "utf-8" + assert result.source_info["file_path"] == str(path_obj) + + @pytest.mark.asyncio + async def test_load_path_object_nonexistent(self, text_loader): + """Test loading a nonexistent file using a Path object.""" + path_obj = Path("/nonexistent/file.txt") + with pytest.raises(FileNotFoundError): + await text_loader.load(path_obj) diff --git a/tests/unit/tasks/ingestion/test_path_support.py b/tests/unit/tasks/ingestion/test_path_support.py new file mode 100644 index 000000000..03c86dc8f --- /dev/null +++ b/tests/unit/tasks/ingestion/test_path_support.py @@ -0,0 +1,129 @@ +import pytest +import tempfile +import os +from pathlib import Path + +from cognee.tasks.ingestion.save_data_item_to_storage import save_data_item_to_storage +from cognee.tasks.ingestion.resolve_data_directories import resolve_data_directories + + +class TestPathSupport: + """Test Path type support in ingestion functions.""" + + @pytest.fixture + def temp_text_file(self): + """Create a temporary text file for testing.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: + f.write("This is a test file for Path support.\n") + temp_path = f.name + + yield temp_path + + # Cleanup + if os.path.exists(temp_path): + os.unlink(temp_path) + + @pytest.fixture + def temp_directory(self): + """Create a temporary directory with test files.""" + import tempfile + + temp_dir = tempfile.mkdtemp() + + # Create some test files + for i in range(3): + with open(os.path.join(temp_dir, f"test_{i}.txt"), "w") as f: + f.write(f"Test file {i} content.\n") + + yield temp_dir + + # Cleanup + import shutil + + shutil.rmtree(temp_dir, ignore_errors=True) + + @pytest.mark.asyncio + async def test_save_data_item_path_object(self, temp_text_file): + """Test save_data_item_to_storage with Path object.""" + path_obj = Path(temp_text_file) + result = await save_data_item_to_storage(path_obj) + + # Should return a file:// URL + assert result.startswith("file://") + assert str(path_obj.resolve()) in result + + @pytest.mark.asyncio + async def test_save_data_item_string_vs_path(self, temp_text_file): + """Test that Path object vs string path are handled consistently.""" + path_obj = Path(temp_text_file) + string_path = str(path_obj.resolve()) + + # Both should work and produce similar results + result_path = await save_data_item_to_storage(path_obj) + result_string = await save_data_item_to_storage(string_path) + + # Both should be file:// URLs pointing to the same file + assert result_path.startswith("file://") + assert result_string.startswith("file://") + + # Extract the actual file paths from the URLs + path_from_path_obj = result_path.replace("file://", "") + path_from_string = result_string.replace("file://", "") + + # They should resolve to the same absolute path + assert os.path.normpath(path_from_path_obj) == os.path.normpath(path_from_string) + + @pytest.mark.asyncio + async def test_save_data_item_text_content(self): + """Test that plain text strings are handled as content, not paths.""" + text_content = "This is plain text content, not a file path." + result = await save_data_item_to_storage(text_content) + + # Should create a file and return file:// URL since this is text content + assert result.startswith("file://") + + @pytest.mark.asyncio + async def test_resolve_data_directories_path_object(self, temp_directory): + """Test resolve_data_directories with Path object.""" + path_obj = Path(temp_directory) + result = await resolve_data_directories([path_obj]) + + # Should return a list of Path objects for the files in the directory + assert len(result) == 3 # We created 3 test files + assert all(isinstance(item, Path) for item in result) + assert all(item.suffix == ".txt" for item in result) + + @pytest.mark.asyncio + async def test_resolve_data_directories_mixed_types(self, temp_directory, temp_text_file): + """Test resolve_data_directories with mixed Path and string types.""" + path_obj = Path(temp_text_file) + string_path = str(temp_text_file) + directory_path = Path(temp_directory) + + # Mix of types + mixed_data = [path_obj, string_path, directory_path] + result = await resolve_data_directories(mixed_data) + + # Should have: + # - 1 Path object (original file as Path) + # - 1 string (original file as string) + # - 3 Path objects (from directory expansion) + assert len(result) == 5 + + # Count types + path_objects = [item for item in result if isinstance(item, Path)] + string_objects = [item for item in result if isinstance(item, str)] + + assert len(path_objects) == 4 # 1 original + 3 from directory + assert len(string_objects) == 1 # 1 original string + + @pytest.mark.asyncio + async def test_resolve_data_directories_path_single_file(self, temp_text_file): + """Test resolve_data_directories with a single Path file.""" + path_obj = Path(temp_text_file) + result = await resolve_data_directories([path_obj]) + + # Should return the same Path object + assert len(result) == 1 + assert isinstance(result[0], Path) + assert str(result[0]) == str(path_obj)