added suggestions

This commit is contained in:
vasilije 2025-08-02 20:37:13 +02:00
parent d0f2a377ef
commit bf15790be8
7 changed files with 244 additions and 27 deletions

View file

@ -1,4 +1,5 @@
from uuid import UUID from uuid import UUID
from pathlib import Path
from typing import Union, BinaryIO, List, Optional from typing import Union, BinaryIO, List, Optional
from cognee.modules.pipelines import Task from cognee.modules.pipelines import Task
@ -8,7 +9,7 @@ from cognee.tasks.ingestion import ingest_data, resolve_data_directories
async def add( async def add(
data: Union[BinaryIO, list[BinaryIO], str, list[str]], data: Union[BinaryIO, list[BinaryIO], str, list[str], Path, list[Path]],
dataset_name: str = "main_dataset", dataset_name: str = "main_dataset",
user: User = None, user: User = None,
node_set: Optional[List[str]] = None, node_set: Optional[List[str]] = None,
@ -17,7 +18,7 @@ async def add(
dataset_id: Optional[UUID] = None, dataset_id: Optional[UUID] = None,
preferred_loaders: Optional[List[str]] = None, preferred_loaders: Optional[List[str]] = None,
loader_config: Optional[dict] = None, loader_config: Optional[dict] = None,
incremental_loading: bool = True,
): ):
""" """
Add data to Cognee for knowledge graph processing using a plugin-based loader system. Add data to Cognee for knowledge graph processing using a plugin-based loader system.
@ -36,8 +37,9 @@ async def add(
Supported Input Types: Supported Input Types:
- **Text strings**: Direct text content (str) - any string not starting with "/" or "file://" - **Text strings**: Direct text content (str) - any string not starting with "/" or "file://"
- **File paths**: Local file paths as strings in these formats: - **File paths**: Local file paths in these formats:
* Absolute paths: "/path/to/document.pdf" * Path objects: pathlib.Path("/path/to/document.pdf") - **Recommended for explicit file path handling**
* Absolute paths as strings: "/path/to/document.pdf"
* File URLs: "file:///path/to/document.pdf" or "file://relative/path.txt" * File URLs: "file:///path/to/document.pdf" or "file://relative/path.txt"
* S3 paths: "s3://bucket-name/path/to/file.pdf" * S3 paths: "s3://bucket-name/path/to/file.pdf"
- **Binary file objects**: File handles/streams (BinaryIO) - **Binary file objects**: File handles/streams (BinaryIO)
@ -102,6 +104,8 @@ async def add(
If not provided, uses default loader priority. If not provided, uses default loader priority.
loader_config: Optional configuration for specific loaders. Dictionary mapping loader names loader_config: Optional configuration for specific loaders. Dictionary mapping loader names
to their configuration options (e.g., {"pypdf_loader": {"strict": False}}). to their configuration options (e.g., {"pypdf_loader": {"strict": False}}).
incremental_loading: Whether to skip processing of documents already processed by the pipeline.
Defaults to True for efficiency. Set to False to reprocess all data.
Returns: Returns:
PipelineRunInfo: Information about the ingestion pipeline execution including: PipelineRunInfo: Information about the ingestion pipeline execution including:

View file

@ -1,11 +1,13 @@
import os import os
from urllib.parse import urlparse from urllib.parse import urlparse
from pathlib import Path
from typing import List, Union, BinaryIO from typing import List, Union, BinaryIO
from cognee.infrastructure.files.storage.s3_config import get_s3_config from cognee.infrastructure.files.storage.s3_config import get_s3_config
async def resolve_data_directories( async def resolve_data_directories(
data: Union[BinaryIO, List[BinaryIO], str, List[str]], include_subdirectories: bool = True data: Union[BinaryIO, List[BinaryIO], str, List[str], Path, List[Path]],
include_subdirectories: bool = True,
): ):
""" """
Resolves directories by replacing them with their contained files. Resolves directories by replacing them with their contained files.
@ -33,7 +35,26 @@ async def resolve_data_directories(
) )
for item in data: for item in data:
if isinstance(item, str): # Check if the item is a path if isinstance(item, Path): # Path objects explicitly indicate file paths
# Convert Path to string for processing
item_str = str(item)
if item.is_dir(): # If it's a directory
if include_subdirectories:
# Recursively add all files in the directory and subdirectories
for root, _, files in os.walk(item_str):
resolved_data.extend([Path(os.path.join(root, f)) for f in files])
else:
# Add all files (not subdirectories) in the directory
resolved_data.extend(
[
Path(os.path.join(item_str, f))
for f in os.listdir(item_str)
if os.path.isfile(os.path.join(item_str, f))
]
)
else: # If it's a file, add it directly
resolved_data.append(item)
elif isinstance(item, str): # Check if the item is a path or text content
# S3 # S3
if urlparse(item).scheme == "s3": if urlparse(item).scheme == "s3":
if fs is not None: if fs is not None:

View file

@ -1,5 +1,6 @@
import os import os
from urllib.parse import urlparse from urllib.parse import urlparse
from pathlib import Path
from typing import Union, BinaryIO, Any from typing import Union, BinaryIO, Any
from cognee.modules.ingestion.exceptions import IngestionError from cognee.modules.ingestion.exceptions import IngestionError
@ -16,7 +17,7 @@ class SaveDataSettings(BaseSettings):
settings = SaveDataSettings() settings = SaveDataSettings()
async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str: async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Path, Any]) -> str:
if "llama_index" in str(type(data_item)): if "llama_index" in str(type(data_item)):
# Dynamic import is used because the llama_index module is optional. # Dynamic import is used because the llama_index module is optional.
from .transform_data import get_data_from_llama_index from .transform_data import get_data_from_llama_index
@ -27,6 +28,18 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str
if hasattr(data_item, "file"): if hasattr(data_item, "file"):
return await save_data_to_file(data_item.file, filename=data_item.filename) return await save_data_to_file(data_item.file, filename=data_item.filename)
# data is a Path object - explicitly indicates a file path
if isinstance(data_item, Path):
if settings.accept_local_file_path:
# Convert Path to file URL for consistency
normalized_path = str(data_item.resolve())
# Use forward slashes in file URLs for consistency
url_path = normalized_path.replace(os.sep, "/")
file_path = "file://" + url_path
return file_path
else:
raise IngestionError(message="Local files are not accepted.")
if isinstance(data_item, str): if isinstance(data_item, str):
parsed_url = urlparse(data_item) parsed_url = urlparse(data_item)

View file

@ -1,5 +1,6 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List from typing import List, Union
from pathlib import Path
from .models.LoaderResult import LoaderResult from .models.LoaderResult import LoaderResult
@ -45,12 +46,12 @@ class LoaderInterface(ABC):
pass pass
@abstractmethod @abstractmethod
def can_handle(self, file_path: str, mime_type: str = None) -> bool: def can_handle(self, file_path: Union[str, Path], mime_type: str = None) -> bool:
""" """
Check if this loader can handle the given file. Check if this loader can handle the given file.
Args: Args:
file_path: Path to the file to be processed file_path: Path to the file to be processed (Path type recommended for explicit file path handling)
mime_type: Optional MIME type of the file mime_type: Optional MIME type of the file
Returns: Returns:
@ -59,12 +60,12 @@ class LoaderInterface(ABC):
pass pass
@abstractmethod @abstractmethod
async def load(self, file_path: str, **kwargs) -> LoaderResult: async def load(self, file_path: Union[str, Path], **kwargs) -> LoaderResult:
""" """
Load and process the file, returning standardized result. Load and process the file, returning standardized result.
Args: Args:
file_path: Path to the file to be processed file_path: Path to the file to be processed (Path type recommended for explicit file path handling)
**kwargs: Additional loader-specific configuration **kwargs: Additional loader-specific configuration
Returns: Returns:

View file

@ -1,5 +1,6 @@
import os import os
from typing import List from typing import List, Union
from pathlib import Path
from ..LoaderInterface import LoaderInterface from ..LoaderInterface import LoaderInterface
from ..models.LoaderResult import LoaderResult, ContentType from ..models.LoaderResult import LoaderResult, ContentType
@ -36,19 +37,22 @@ class TextLoader(LoaderInterface):
"""Unique identifier for this loader.""" """Unique identifier for this loader."""
return "text_loader" return "text_loader"
def can_handle(self, file_path: str, mime_type: str = None) -> bool: def can_handle(self, file_path: Union[str, Path], mime_type: str = None) -> bool:
""" """
Check if this loader can handle the given file. Check if this loader can handle the given file.
Args: Args:
file_path: Path to the file file_path: Path to the file (Path type recommended for explicit file path handling)
mime_type: Optional MIME type mime_type: Optional MIME type
Returns: Returns:
True if file can be handled, False otherwise True if file can be handled, False otherwise
""" """
# Convert to Path for consistent handling
path_obj = Path(file_path) if isinstance(file_path, str) else file_path
# Check by extension # Check by extension
ext = os.path.splitext(file_path)[1].lower() ext = path_obj.suffix.lower()
if ext in self.supported_extensions: if ext in self.supported_extensions:
return True return True
@ -60,7 +64,7 @@ class TextLoader(LoaderInterface):
# This is useful when other loaders fail # This is useful when other loaders fail
try: try:
# Quick check if file appears to be text # Quick check if file appears to be text
with open(file_path, "rb") as f: with open(path_obj, "rb") as f:
sample = f.read(512) sample = f.read(512)
# Simple heuristic: if most bytes are printable, consider it text # Simple heuristic: if most bytes are printable, consider it text
if sample: if sample:
@ -78,12 +82,14 @@ class TextLoader(LoaderInterface):
return False return False
async def load(self, file_path: str, encoding: str = "utf-8", **kwargs) -> LoaderResult: async def load(
self, file_path: Union[str, Path], encoding: str = "utf-8", **kwargs
) -> LoaderResult:
""" """
Load and process the text file. Load and process the text file.
Args: Args:
file_path: Path to the file to load file_path: Path to the file to load (Path type recommended for explicit file path handling)
encoding: Text encoding to use (default: utf-8) encoding: Text encoding to use (default: utf-8)
**kwargs: Additional configuration (unused) **kwargs: Additional configuration (unused)
@ -95,25 +101,28 @@ class TextLoader(LoaderInterface):
UnicodeDecodeError: If file cannot be decoded with specified encoding UnicodeDecodeError: If file cannot be decoded with specified encoding
OSError: If file cannot be read OSError: If file cannot be read
""" """
if not os.path.exists(file_path): # Convert to Path for consistent handling
raise FileNotFoundError(f"File not found: {file_path}") path_obj = Path(file_path) if isinstance(file_path, str) else file_path
if not path_obj.exists():
raise FileNotFoundError(f"File not found: {path_obj}")
try: try:
with open(file_path, "r", encoding=encoding) as f: with open(path_obj, "r", encoding=encoding) as f:
content = f.read() content = f.read()
except UnicodeDecodeError: except UnicodeDecodeError:
# Try with fallback encoding # Try with fallback encoding
if encoding == "utf-8": if encoding == "utf-8":
return await self.load(file_path, encoding="latin-1", **kwargs) return await self.load(path_obj, encoding="latin-1", **kwargs)
else: else:
raise raise
# Extract basic metadata # Extract basic metadata
file_stat = os.stat(file_path) file_stat = path_obj.stat()
metadata = { metadata = {
"name": os.path.basename(file_path), "name": path_obj.name,
"size": file_stat.st_size, "size": file_stat.st_size,
"extension": os.path.splitext(file_path)[1], "extension": path_obj.suffix,
"encoding": encoding, "encoding": encoding,
"loader": self.loader_name, "loader": self.loader_name,
"lines": len(content.splitlines()) if content else 0, "lines": len(content.splitlines()) if content else 0,
@ -124,5 +133,5 @@ class TextLoader(LoaderInterface):
content=content, content=content,
metadata=metadata, metadata=metadata,
content_type=ContentType.TEXT, content_type=ContentType.TEXT,
source_info={"file_path": file_path, "encoding": encoding}, source_info={"file_path": str(path_obj), "encoding": encoding},
) )

View file

@ -155,3 +155,43 @@ class TestTextLoader:
"""Test that TextLoader has no external dependencies.""" """Test that TextLoader has no external dependencies."""
assert text_loader.get_dependencies() == [] assert text_loader.get_dependencies() == []
assert text_loader.validate_dependencies() is True assert text_loader.validate_dependencies() is True
def test_can_handle_path_object(self, text_loader):
"""Test that can_handle works with Path objects."""
path_obj = Path("test.txt")
assert text_loader.can_handle(path_obj)
path_obj = Path("test.pdf")
assert not text_loader.can_handle(path_obj)
# Test case insensitive
path_obj = Path("test.TXT")
assert text_loader.can_handle(path_obj)
def test_can_handle_path_object_with_mime_type(self, text_loader):
"""Test that can_handle works with Path objects and MIME type."""
path_obj = Path("test.unknown")
assert text_loader.can_handle(path_obj, mime_type="text/plain")
assert not text_loader.can_handle(path_obj, mime_type="application/pdf")
@pytest.mark.asyncio
async def test_load_path_object(self, text_loader, temp_text_file):
"""Test loading a file using a Path object."""
path_obj = Path(temp_text_file)
result = await text_loader.load(path_obj)
assert isinstance(result.content, str)
assert "This is a test file." in result.content
assert result.content_type == ContentType.TEXT
assert result.metadata["loader"] == "text_loader"
assert result.metadata["name"] == path_obj.name
assert result.metadata["lines"] == 2
assert result.metadata["encoding"] == "utf-8"
assert result.source_info["file_path"] == str(path_obj)
@pytest.mark.asyncio
async def test_load_path_object_nonexistent(self, text_loader):
"""Test loading a nonexistent file using a Path object."""
path_obj = Path("/nonexistent/file.txt")
with pytest.raises(FileNotFoundError):
await text_loader.load(path_obj)

View file

@ -0,0 +1,129 @@
import pytest
import tempfile
import os
from pathlib import Path
from cognee.tasks.ingestion.save_data_item_to_storage import save_data_item_to_storage
from cognee.tasks.ingestion.resolve_data_directories import resolve_data_directories
class TestPathSupport:
"""Test Path type support in ingestion functions."""
@pytest.fixture
def temp_text_file(self):
"""Create a temporary text file for testing."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
f.write("This is a test file for Path support.\n")
temp_path = f.name
yield temp_path
# Cleanup
if os.path.exists(temp_path):
os.unlink(temp_path)
@pytest.fixture
def temp_directory(self):
"""Create a temporary directory with test files."""
import tempfile
temp_dir = tempfile.mkdtemp()
# Create some test files
for i in range(3):
with open(os.path.join(temp_dir, f"test_{i}.txt"), "w") as f:
f.write(f"Test file {i} content.\n")
yield temp_dir
# Cleanup
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.mark.asyncio
async def test_save_data_item_path_object(self, temp_text_file):
"""Test save_data_item_to_storage with Path object."""
path_obj = Path(temp_text_file)
result = await save_data_item_to_storage(path_obj)
# Should return a file:// URL
assert result.startswith("file://")
assert str(path_obj.resolve()) in result
@pytest.mark.asyncio
async def test_save_data_item_string_vs_path(self, temp_text_file):
"""Test that Path object vs string path are handled consistently."""
path_obj = Path(temp_text_file)
string_path = str(path_obj.resolve())
# Both should work and produce similar results
result_path = await save_data_item_to_storage(path_obj)
result_string = await save_data_item_to_storage(string_path)
# Both should be file:// URLs pointing to the same file
assert result_path.startswith("file://")
assert result_string.startswith("file://")
# Extract the actual file paths from the URLs
path_from_path_obj = result_path.replace("file://", "")
path_from_string = result_string.replace("file://", "")
# They should resolve to the same absolute path
assert os.path.normpath(path_from_path_obj) == os.path.normpath(path_from_string)
@pytest.mark.asyncio
async def test_save_data_item_text_content(self):
"""Test that plain text strings are handled as content, not paths."""
text_content = "This is plain text content, not a file path."
result = await save_data_item_to_storage(text_content)
# Should create a file and return file:// URL since this is text content
assert result.startswith("file://")
@pytest.mark.asyncio
async def test_resolve_data_directories_path_object(self, temp_directory):
"""Test resolve_data_directories with Path object."""
path_obj = Path(temp_directory)
result = await resolve_data_directories([path_obj])
# Should return a list of Path objects for the files in the directory
assert len(result) == 3 # We created 3 test files
assert all(isinstance(item, Path) for item in result)
assert all(item.suffix == ".txt" for item in result)
@pytest.mark.asyncio
async def test_resolve_data_directories_mixed_types(self, temp_directory, temp_text_file):
"""Test resolve_data_directories with mixed Path and string types."""
path_obj = Path(temp_text_file)
string_path = str(temp_text_file)
directory_path = Path(temp_directory)
# Mix of types
mixed_data = [path_obj, string_path, directory_path]
result = await resolve_data_directories(mixed_data)
# Should have:
# - 1 Path object (original file as Path)
# - 1 string (original file as string)
# - 3 Path objects (from directory expansion)
assert len(result) == 5
# Count types
path_objects = [item for item in result if isinstance(item, Path)]
string_objects = [item for item in result if isinstance(item, str)]
assert len(path_objects) == 4 # 1 original + 3 from directory
assert len(string_objects) == 1 # 1 original string
@pytest.mark.asyncio
async def test_resolve_data_directories_path_single_file(self, temp_text_file):
"""Test resolve_data_directories with a single Path file."""
path_obj = Path(temp_text_file)
result = await resolve_data_directories([path_obj])
# Should return the same Path object
assert len(result) == 1
assert isinstance(result[0], Path)
assert str(result[0]) == str(path_obj)