Compare commits

...
Sign in to create a new pull request.

15 commits

Author SHA1 Message Date
vasilije
51fd2a51a8 added formatting 2025-08-02 21:04:09 +02:00
vasilije
bf15790be8 added suggestions 2025-08-02 20:37:13 +02:00
Vasilije
d0f2a377ef
Merge branch 'dev' into loader_separation 2025-08-02 20:28:44 +02:00
vasilije
dc03a52541 fixes to delete 2025-07-19 19:50:36 +02:00
vasilije
f0ba618f0c fixes to delete 2025-07-19 19:34:30 +02:00
Vasilije
725061fbef
Merge branch 'dev' into loader_separation 2025-07-19 19:32:08 +02:00
vasilije
bf191ae6d0 fixes to delete 2025-07-19 19:06:10 +02:00
vasilije
9d423f5e16 fixes to delete 2025-07-19 18:50:34 +02:00
vasilije
411e9a6205 added fixes for nltk 2025-07-19 15:31:12 +02:00
vasilije
3429af32c2 added fixes for nltk 2025-07-19 15:17:27 +02:00
vasilije
9110a2b59b added fixes 2025-07-19 14:52:16 +02:00
vasilije
1cd0fe0dcf check if base new workflow works 2025-07-19 14:24:55 +02:00
Vasilije
f2e96d5c62
Merge branch 'dev' into loader_separation 2025-07-19 14:20:09 +02:00
vasilije
1c378dabdb linter fixes 2025-07-13 22:34:27 +02:00
vasilije
98882ba1d1 added loader separation 2025-07-13 20:24:52 +02:00
49 changed files with 3379 additions and 29 deletions

View file

@ -1,4 +1,5 @@
from uuid import UUID
from pathlib import Path
from typing import Union, BinaryIO, List, Optional
from cognee.modules.pipelines import Task
@ -8,22 +9,27 @@ from cognee.tasks.ingestion import ingest_data, resolve_data_directories
async def add(
data: Union[BinaryIO, list[BinaryIO], str, list[str]],
data: Union[BinaryIO, list[BinaryIO], str, list[str], Path, list[Path]],
dataset_name: str = "main_dataset",
user: User = None,
node_set: Optional[List[str]] = None,
vector_db_config: dict = None,
graph_db_config: dict = None,
dataset_id: Optional[UUID] = None,
preferred_loaders: Optional[List[str]] = None,
loader_config: Optional[dict] = None,
incremental_loading: bool = True,
):
"""
Add data to Cognee for knowledge graph processing.
Add data to Cognee for knowledge graph processing using a plugin-based loader system.
This is the first step in the Cognee workflow - it ingests raw data and prepares it
for processing. The function accepts various data formats including text, files, and
binary streams, then stores them in a specified dataset for further processing.
This version supports both the original ingestion system (for backward compatibility)
and the new plugin-based loader system (when loader parameters are provided).
Prerequisites:
- **LLM_API_KEY**: Must be set in environment variables for content processing
- **Database Setup**: Relational and vector databases must be configured
@ -31,24 +37,47 @@ async def add(
Supported Input Types:
- **Text strings**: Direct text content (str) - any string not starting with "/" or "file://"
- **File paths**: Local file paths as strings in these formats:
* Absolute paths: "/path/to/document.pdf"
- **File paths**: Local file paths in these formats:
* Path objects: pathlib.Path("/path/to/document.pdf") - **Recommended for explicit file path handling**
* Absolute paths as strings: "/path/to/document.pdf"
* File URLs: "file:///path/to/document.pdf" or "file://relative/path.txt"
* S3 paths: "s3://bucket-name/path/to/file.pdf"
- **Binary file objects**: File handles/streams (BinaryIO)
- **Lists**: Multiple files or text strings in a single call
Supported File Formats:
- Text files (.txt, .md, .csv)
- PDFs (.pdf)
- Text files (.txt, .md, .csv) - processed by text_loader
- PDFs (.pdf) - processed by pypdf_loader (if available)
- Images (.png, .jpg, .jpeg) - extracted via OCR/vision models
- Audio files (.mp3, .wav) - transcribed to text
- Code files (.py, .js, .ts, etc.) - parsed for structure and content
- Office documents (.docx, .pptx)
- Office documents (.docx, .pptx) - processed by unstructured_loader (if available)
- Data files (.json, .jsonl, .parquet) - processed by dlt_loader (if available)
Workflow:
Plugin System:
The function automatically uses the best available loader for each file type.
You can customize this behavior using the loader parameters:
```python
# Use specific loaders in priority order
await cognee.add(
"/path/to/document.pdf",
preferred_loaders=["pypdf_loader", "text_loader"]
)
# Configure loader-specific options
await cognee.add(
"/path/to/document.pdf",
loader_config={
"pypdf_loader": {"strict": False},
"unstructured_loader": {"strategy": "hi_res"}
}
)
```
Workflow:
1. **Data Resolution**: Resolves file paths and validates accessibility
2. **Content Extraction**: Extracts text content from various file formats
2. **Content Extraction**: Uses plugin system or falls back to existing classification
3. **Dataset Storage**: Stores processed content in the specified dataset
4. **Metadata Tracking**: Records file metadata, timestamps, and user permissions
5. **Permission Assignment**: Grants user read/write/delete/share permissions on dataset
@ -71,6 +100,12 @@ async def add(
vector_db_config: Optional configuration for vector database (for custom setups).
graph_db_config: Optional configuration for graph database (for custom setups).
dataset_id: Optional specific dataset UUID to use instead of dataset_name.
preferred_loaders: Optional list of loader names to try first (e.g., ["pypdf_loader", "text_loader"]).
If not provided, uses default loader priority.
loader_config: Optional configuration for specific loaders. Dictionary mapping loader names
to their configuration options (e.g., {"pypdf_loader": {"strict": False}}).
incremental_loading: Whether to skip processing of documents already processed by the pipeline.
Defaults to True for efficiency. Set to False to reprocess all data.
Returns:
PipelineRunInfo: Information about the ingestion pipeline execution including:
@ -139,10 +174,32 @@ async def add(
UnsupportedFileTypeError: If file format cannot be processed
InvalidValueError: If LLM_API_KEY is not set or invalid
"""
# Determine which ingestion system to use
# use_plugin_system = preferred_loaders is not None or loader_config is not None
# if use_plugin_system:
# # Use new plugin-based ingestion system
from cognee.tasks.ingestion.plugin_ingest_data import plugin_ingest_data
tasks = [
Task(resolve_data_directories, include_subdirectories=True),
Task(ingest_data, dataset_name, user, node_set, dataset_id),
Task(
plugin_ingest_data,
dataset_name,
user,
node_set,
dataset_id,
preferred_loaders,
loader_config,
),
]
# else:
# # Use existing ingestion system for backward compatibility
# tasks = [
# Task(resolve_data_directories, include_subdirectories=True),
# Task(ingest_data, dataset_name, user, node_set, dataset_id),
# ]
pipeline_run_info = None

View file

@ -84,6 +84,12 @@ async def delete(
# Get the content hash for deletion
content_hash = data_point.content_hash
# Debug logging
logger.info(
f"🔍 Retrieved from database - data_id: {data_id}, content_hash: {content_hash}"
)
logger.info(f"🔍 Document name in database: {data_point.name}")
# Use the existing comprehensive deletion logic
return await delete_single_document(content_hash, dataset.id, mode)

View file

@ -0,0 +1,237 @@
import os
import importlib.util
from typing import Dict, List, Optional
from .LoaderInterface import LoaderInterface
from .models.LoaderResult import LoaderResult
from cognee.shared.logging_utils import get_logger
class LoaderEngine:
"""
Main loader engine for managing file loaders.
Follows cognee's adapter pattern similar to database engines,
providing a centralized system for file loading operations.
"""
def __init__(
self,
loader_directories: List[str],
default_loader_priority: List[str],
fallback_loader: str = "text_loader",
enable_dependency_validation: bool = True,
):
"""
Initialize the loader engine.
Args:
loader_directories: Directories to search for loader implementations
default_loader_priority: Priority order for loader selection
fallback_loader: Default loader to use when no other matches
enable_dependency_validation: Whether to validate loader dependencies
"""
self._loaders: Dict[str, LoaderInterface] = {}
self._extension_map: Dict[str, List[LoaderInterface]] = {}
self._mime_type_map: Dict[str, List[LoaderInterface]] = {}
self.loader_directories = loader_directories
self.default_loader_priority = default_loader_priority
self.fallback_loader = fallback_loader
self.enable_dependency_validation = enable_dependency_validation
self.logger = get_logger(__name__)
def register_loader(self, loader: LoaderInterface) -> bool:
"""
Register a loader with the engine.
Args:
loader: LoaderInterface implementation to register
Returns:
True if loader was registered successfully, False otherwise
"""
# Validate dependencies if enabled
if self.enable_dependency_validation and not loader.validate_dependencies():
self.logger.warning(
f"Skipping loader '{loader.loader_name}' - missing dependencies: "
f"{loader.get_dependencies()}"
)
return False
self._loaders[loader.loader_name] = loader
# Map extensions to loaders
for ext in loader.supported_extensions:
ext_lower = ext.lower()
if ext_lower not in self._extension_map:
self._extension_map[ext_lower] = []
self._extension_map[ext_lower].append(loader)
# Map mime types to loaders
for mime_type in loader.supported_mime_types:
if mime_type not in self._mime_type_map:
self._mime_type_map[mime_type] = []
self._mime_type_map[mime_type].append(loader)
self.logger.info(f"Registered loader: {loader.loader_name}")
return True
def get_loader(
self, file_path: str, mime_type: str = None, preferred_loaders: List[str] = None
) -> Optional[LoaderInterface]:
"""
Get appropriate loader for a file.
Args:
file_path: Path to the file to be processed
mime_type: Optional MIME type of the file
preferred_loaders: List of preferred loader names to try first
Returns:
LoaderInterface that can handle the file, or None if not found
"""
ext = os.path.splitext(file_path)[1].lower()
# Try preferred loaders first
if preferred_loaders:
for loader_name in preferred_loaders:
if loader_name in self._loaders:
loader = self._loaders[loader_name]
if loader.can_handle(file_path, mime_type):
return loader
# Try priority order
for loader_name in self.default_loader_priority:
if loader_name in self._loaders:
loader = self._loaders[loader_name]
if loader.can_handle(file_path, mime_type):
return loader
# Try mime type mapping
if mime_type and mime_type in self._mime_type_map:
for loader in self._mime_type_map[mime_type]:
if loader.can_handle(file_path, mime_type):
return loader
# Try extension mapping
if ext in self._extension_map:
for loader in self._extension_map[ext]:
if loader.can_handle(file_path, mime_type):
return loader
# Fallback loader
if self.fallback_loader in self._loaders:
fallback = self._loaders[self.fallback_loader]
if fallback.can_handle(file_path, mime_type):
return fallback
return None
async def load_file(
self, file_path: str, mime_type: str = None, preferred_loaders: List[str] = None, **kwargs
) -> LoaderResult:
"""
Load file using appropriate loader.
Args:
file_path: Path to the file to be processed
mime_type: Optional MIME type of the file
preferred_loaders: List of preferred loader names to try first
**kwargs: Additional loader-specific configuration
Returns:
LoaderResult containing processed content and metadata
Raises:
ValueError: If no suitable loader is found
Exception: If file processing fails
"""
loader = self.get_loader(file_path, mime_type, preferred_loaders)
if not loader:
raise ValueError(f"No loader found for file: {file_path}")
self.logger.debug(f"Loading {file_path} with {loader.loader_name}")
return await loader.load(file_path, **kwargs)
def discover_loaders(self):
"""
Auto-discover loaders from configured directories.
Scans loader directories for Python modules containing
LoaderInterface implementations and registers them.
"""
for directory in self.loader_directories:
if os.path.exists(directory):
self._discover_in_directory(directory)
def _discover_in_directory(self, directory: str):
"""
Discover loaders in a specific directory.
Args:
directory: Directory path to scan for loader implementations
"""
try:
for file_name in os.listdir(directory):
if file_name.endswith(".py") and not file_name.startswith("_"):
module_name = file_name[:-3]
file_path = os.path.join(directory, file_name)
try:
spec = importlib.util.spec_from_file_location(module_name, file_path)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Look for loader classes
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, LoaderInterface)
and attr != LoaderInterface
):
# Instantiate and register the loader
try:
loader_instance = attr()
self.register_loader(loader_instance)
except Exception as e:
self.logger.warning(
f"Failed to instantiate loader {attr_name}: {e}"
)
except Exception as e:
self.logger.warning(f"Failed to load module {module_name}: {e}")
except OSError as e:
self.logger.warning(f"Failed to scan directory {directory}: {e}")
def get_available_loaders(self) -> List[str]:
"""
Get list of available loader names.
Returns:
List of registered loader names
"""
return list(self._loaders.keys())
def get_loader_info(self, loader_name: str) -> Dict[str, any]:
"""
Get information about a specific loader.
Args:
loader_name: Name of the loader to inspect
Returns:
Dictionary containing loader information
"""
if loader_name not in self._loaders:
return {}
loader = self._loaders[loader_name]
return {
"name": loader.loader_name,
"extensions": loader.supported_extensions,
"mime_types": loader.supported_mime_types,
"dependencies": loader.get_dependencies(),
"available": loader.validate_dependencies(),
}

View file

@ -0,0 +1,101 @@
from abc import ABC, abstractmethod
from typing import List
from .models.LoaderResult import LoaderResult
class LoaderInterface(ABC):
"""
Base interface for all file loaders in cognee.
This interface follows cognee's established pattern for database adapters,
ensuring consistent behavior across all loader implementations.
"""
@property
@abstractmethod
def supported_extensions(self) -> List[str]:
"""
List of file extensions this loader supports.
Returns:
List of extensions including the dot (e.g., ['.txt', '.md'])
"""
pass
@property
@abstractmethod
def supported_mime_types(self) -> List[str]:
"""
List of MIME types this loader supports.
Returns:
List of MIME type strings (e.g., ['text/plain', 'application/pdf'])
"""
pass
@property
@abstractmethod
def loader_name(self) -> str:
"""
Unique name identifier for this loader.
Returns:
String identifier used for registration and configuration
"""
pass
@abstractmethod
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
"""
Check if this loader can handle the given file.
Args:
file_path: Path to the file to be processed
mime_type: Optional MIME type of the file
Returns:
True if this loader can process the file, False otherwise
"""
pass
@abstractmethod
async def load(self, file_path: str, **kwargs) -> LoaderResult:
"""
Load and process the file, returning standardized result.
Args:
file_path: Path to the file to be processed
**kwargs: Additional loader-specific configuration
Returns:
LoaderResult containing processed content and metadata
Raises:
Exception: If file cannot be processed
"""
pass
def get_dependencies(self) -> List[str]:
"""
Optional: Return list of required dependencies for this loader.
Returns:
List of package names with optional version specifications
"""
return []
def validate_dependencies(self) -> bool:
"""
Check if all required dependencies are available.
Returns:
True if all dependencies are installed, False otherwise
"""
for dep in self.get_dependencies():
# Extract package name from version specification
package_name = dep.split(">=")[0].split("==")[0].split("<")[0]
try:
__import__(package_name)
except ImportError:
return False
return True

View file

@ -0,0 +1,19 @@
"""
File loader infrastructure for cognee.
This package provides a plugin-based system for loading different file formats
into cognee, following the same patterns as database adapters.
Main exports:
- get_loader_engine(): Factory function to get configured loader engine
- use_loader(): Register custom loaders at runtime
- LoaderInterface: Base interface for implementing loaders
- LoaderResult, ContentType: Data models for loader results
"""
from .get_loader_engine import get_loader_engine
from .use_loader import use_loader
from .LoaderInterface import LoaderInterface
from .models.LoaderResult import LoaderResult, ContentType
__all__ = ["get_loader_engine", "use_loader", "LoaderInterface", "LoaderResult", "ContentType"]

View file

@ -0,0 +1,57 @@
from functools import lru_cache
from typing import List, Optional, Dict, Any
from pydantic_settings import BaseSettings, SettingsConfigDict
from cognee.root_dir import get_absolute_path
class LoaderConfig(BaseSettings):
"""
Configuration for file loader system.
Follows cognee's pattern using pydantic_settings.BaseSettings for
environment variable support and validation.
"""
loader_directories: List[str] = [
get_absolute_path("infrastructure/loaders/core"),
get_absolute_path("infrastructure/loaders/external"),
]
default_loader_priority: List[str] = [
"text_loader",
"pypdf_loader",
"unstructured_loader",
"dlt_loader",
]
auto_discover: bool = True
fallback_loader: str = "text_loader"
enable_dependency_validation: bool = True
model_config = SettingsConfigDict(env_file=".env", extra="allow", env_prefix="LOADER_")
def to_dict(self) -> Dict[str, Any]:
"""
Convert configuration to dictionary format.
Returns:
Dict containing all loader configuration settings
"""
return {
"loader_directories": self.loader_directories,
"default_loader_priority": self.default_loader_priority,
"auto_discover": self.auto_discover,
"fallback_loader": self.fallback_loader,
"enable_dependency_validation": self.enable_dependency_validation,
}
@lru_cache
def get_loader_config() -> LoaderConfig:
"""
Get cached loader configuration.
Uses LRU cache following cognee's pattern for configuration objects.
Returns:
LoaderConfig instance with current settings
"""
return LoaderConfig()

View file

@ -0,0 +1,5 @@
"""Core loader implementations that are always available."""
from .text_loader import TextLoader
__all__ = ["TextLoader"]

View file

@ -0,0 +1,128 @@
import os
from typing import List
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
class TextLoader(LoaderInterface):
"""
Core text file loader that handles basic text file formats.
This loader is always available and serves as the fallback for
text-based files when no specialized loader is available.
"""
@property
def supported_extensions(self) -> List[str]:
"""Supported text file extensions."""
return [".txt", ".md", ".csv", ".json", ".xml", ".yaml", ".yml", ".log"]
@property
def supported_mime_types(self) -> List[str]:
"""Supported MIME types for text content."""
return [
"text/plain",
"text/markdown",
"text/csv",
"application/json",
"text/xml",
"application/xml",
"text/yaml",
"application/yaml",
]
@property
def loader_name(self) -> str:
"""Unique identifier for this loader."""
return "text_loader"
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
"""
Check if this loader can handle the given file.
Args:
file_path: Path to the file
mime_type: Optional MIME type
Returns:
True if file can be handled, False otherwise
"""
# Check by extension
ext = os.path.splitext(file_path)[1].lower()
if ext in self.supported_extensions:
return True
# Check by MIME type
if mime_type and mime_type in self.supported_mime_types:
return True
# As fallback loader, can attempt to handle any text-like file
# This is useful when other loaders fail
try:
# Quick check if file appears to be text
with open(file_path, "rb") as f:
sample = f.read(512)
# Simple heuristic: if most bytes are printable, consider it text
if sample:
try:
sample.decode("utf-8")
return True
except UnicodeDecodeError:
try:
sample.decode("latin-1")
return True
except UnicodeDecodeError:
pass
except (OSError, IOError):
pass
return False
async def load(self, file_path: str, encoding: str = "utf-8", **kwargs) -> LoaderResult:
"""
Load and process the text file.
Args:
file_path: Path to the file to load
encoding: Text encoding to use (default: utf-8)
**kwargs: Additional configuration (unused)
Returns:
LoaderResult containing the file content and metadata
Raises:
FileNotFoundError: If file doesn't exist
UnicodeDecodeError: If file cannot be decoded with specified encoding
OSError: If file cannot be read
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
try:
with open(file_path, "r", encoding=encoding) as f:
content = f.read()
except UnicodeDecodeError:
# Try with fallback encoding
if encoding == "utf-8":
return await self.load(file_path, encoding="latin-1", **kwargs)
else:
raise
# Extract basic metadata
file_stat = os.stat(file_path)
metadata = {
"name": os.path.basename(file_path),
"size": file_stat.st_size,
"extension": os.path.splitext(file_path)[1],
"encoding": encoding,
"loader": self.loader_name,
"lines": len(content.splitlines()) if content else 0,
"characters": len(content),
}
return LoaderResult(
content=content,
metadata=metadata,
content_type=ContentType.TEXT,
source_info={"file_path": file_path, "encoding": encoding},
)

View file

@ -0,0 +1,49 @@
from typing import List
from .LoaderEngine import LoaderEngine
from .supported_loaders import supported_loaders
def create_loader_engine(
loader_directories: List[str],
default_loader_priority: List[str],
auto_discover: bool = True,
fallback_loader: str = "text_loader",
enable_dependency_validation: bool = True,
) -> LoaderEngine:
"""
Create loader engine with given configuration.
Follows cognee's pattern for engine creation functions used
in database adapters.
Args:
loader_directories: Directories to search for loader implementations
default_loader_priority: Priority order for loader selection
auto_discover: Whether to auto-discover loaders from directories
fallback_loader: Default loader to use when no other matches
enable_dependency_validation: Whether to validate loader dependencies
Returns:
Configured LoaderEngine instance
"""
engine = LoaderEngine(
loader_directories=loader_directories,
default_loader_priority=default_loader_priority,
fallback_loader=fallback_loader,
enable_dependency_validation=enable_dependency_validation,
)
# Register supported loaders from registry
for loader_name, loader_class in supported_loaders.items():
try:
loader_instance = loader_class()
engine.register_loader(loader_instance)
except Exception as e:
# Log but don't fail - allow engine to continue with other loaders
engine.logger.warning(f"Failed to register loader {loader_name}: {e}")
# Auto-discover loaders if enabled
if auto_discover:
engine.discover_loaders()
return engine

View file

@ -0,0 +1,34 @@
"""
External loader implementations for cognee.
This module contains loaders that depend on external libraries:
- pypdf_loader: PDF processing using pypdf
- unstructured_loader: Document processing using unstructured
- dlt_loader: Data lake/warehouse integration using DLT
These loaders are optional and only available if their dependencies are installed.
"""
__all__ = []
# Conditional imports based on dependency availability
try:
from .pypdf_loader import PyPdfLoader
__all__.append("PyPdfLoader")
except ImportError:
pass
try:
from .unstructured_loader import UnstructuredLoader
__all__.append("UnstructuredLoader")
except ImportError:
pass
try:
from .dlt_loader import DltLoader
__all__.append("DltLoader")
except ImportError:
pass

View file

@ -0,0 +1,203 @@
import os
from typing import List, Dict, Any, Optional
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
from cognee.shared.logging_utils import get_logger
class DltLoader(LoaderInterface):
"""
Data loader using DLT (Data Load Tool) for various data sources.
Supports loading data from REST APIs, databases, cloud storage,
and other data sources through DLT pipelines.
"""
def __init__(self):
self.logger = get_logger(__name__)
@property
def supported_extensions(self) -> List[str]:
return [
".dlt", # DLT pipeline configuration
".json", # JSON data
".jsonl", # JSON Lines
".csv", # CSV data
".parquet", # Parquet files
".yaml", # YAML configuration
".yml", # YAML configuration
]
@property
def supported_mime_types(self) -> List[str]:
return [
"application/json",
"application/x-ndjson", # JSON Lines
"text/csv",
"application/x-parquet",
"application/yaml",
"text/yaml",
]
@property
def loader_name(self) -> str:
return "dlt_loader"
def get_dependencies(self) -> List[str]:
return ["dlt>=0.4.0"]
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
"""Check if file can be handled by this loader."""
# Check file extension
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext not in self.supported_extensions:
return False
# Check MIME type if provided
if mime_type and mime_type not in self.supported_mime_types:
return False
# Validate dependencies
return self.validate_dependencies()
async def load(self, file_path: str, source_type: str = "auto", **kwargs) -> LoaderResult:
"""
Load data using DLT pipeline.
Args:
file_path: Path to the data file or DLT configuration
source_type: Type of data source ("auto", "json", "csv", "parquet", "api")
**kwargs: Additional DLT pipeline configuration
Returns:
LoaderResult with loaded data and metadata
Raises:
ImportError: If DLT is not installed
Exception: If data loading fails
"""
try:
import dlt
except ImportError as e:
raise ImportError(
"dlt is required for data loading. Install with: pip install dlt"
) from e
try:
self.logger.info(f"Loading data with DLT: {file_path}")
file_ext = os.path.splitext(file_path)[1].lower()
file_name = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
# Determine source type if auto
if source_type == "auto":
if file_ext == ".json":
source_type = "json"
elif file_ext == ".jsonl":
source_type = "jsonl"
elif file_ext == ".csv":
source_type = "csv"
elif file_ext == ".parquet":
source_type = "parquet"
elif file_ext in [".yaml", ".yml"]:
source_type = "yaml"
else:
source_type = "file"
# Load data based on source type
if source_type == "json":
content = self._load_json(file_path)
elif source_type == "jsonl":
content = self._load_jsonl(file_path)
elif source_type == "csv":
content = self._load_csv(file_path)
elif source_type == "parquet":
content = self._load_parquet(file_path)
elif source_type == "yaml":
content = self._load_yaml(file_path)
else:
# Default: read as text
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Determine content type
if isinstance(content, (dict, list)):
content_type = ContentType.STRUCTURED
text_content = str(content)
else:
content_type = ContentType.TEXT
text_content = content
# Gather metadata
metadata = {
"name": file_name,
"size": file_size,
"extension": file_ext,
"loader": self.loader_name,
"source_type": source_type,
"dlt_version": dlt.__version__,
}
# Add data-specific metadata
if isinstance(content, list):
metadata["records_count"] = len(content)
elif isinstance(content, dict):
metadata["keys_count"] = len(content)
return LoaderResult(
content=text_content,
metadata=metadata,
content_type=content_type,
chunks=[text_content], # Single chunk for now
source_info={
"file_path": file_path,
"source_type": source_type,
"raw_data": content if isinstance(content, (dict, list)) else None,
},
)
except Exception as e:
self.logger.error(f"Failed to load data with DLT from {file_path}: {e}")
raise Exception(f"DLT data loading failed: {e}") from e
def _load_json(self, file_path: str) -> Dict[str, Any]:
"""Load JSON file."""
import json
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
def _load_jsonl(self, file_path: str) -> List[Dict[str, Any]]:
"""Load JSON Lines file."""
import json
data = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
data.append(json.loads(line))
return data
def _load_csv(self, file_path: str) -> str:
"""Load CSV file as text."""
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
def _load_parquet(self, file_path: str) -> str:
"""Load Parquet file (requires pandas)."""
try:
import pandas as pd
df = pd.read_parquet(file_path)
return df.to_string()
except ImportError:
# Fallback: read as binary and convert to string representation
with open(file_path, "rb") as f:
return f"<Parquet file: {os.path.basename(file_path)}, size: {len(f.read())} bytes>"
def _load_yaml(self, file_path: str) -> str:
"""Load YAML file as text."""
with open(file_path, "r", encoding="utf-8") as f:
return f.read()

View file

@ -0,0 +1,127 @@
import os
from typing import List
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
from cognee.shared.logging_utils import get_logger
class PyPdfLoader(LoaderInterface):
"""
PDF loader using pypdf library.
Extracts text content from PDF files page by page, providing
structured page information and handling PDF-specific errors.
"""
def __init__(self):
self.logger = get_logger(__name__)
@property
def supported_extensions(self) -> List[str]:
return [".pdf"]
@property
def supported_mime_types(self) -> List[str]:
return ["application/pdf"]
@property
def loader_name(self) -> str:
return "pypdf_loader"
def get_dependencies(self) -> List[str]:
return ["pypdf>=4.0.0"]
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
"""Check if file can be handled by this loader."""
# Check file extension
if not file_path.lower().endswith(".pdf"):
return False
# Check MIME type if provided
if mime_type and mime_type != "application/pdf":
return False
# Validate dependencies
return self.validate_dependencies()
async def load(self, file_path: str, strict: bool = False, **kwargs) -> LoaderResult:
"""
Load PDF file and extract text content.
Args:
file_path: Path to the PDF file
strict: Whether to use strict mode for PDF reading
**kwargs: Additional arguments
Returns:
LoaderResult with extracted text content and metadata
Raises:
ImportError: If pypdf is not installed
Exception: If PDF processing fails
"""
try:
from pypdf import PdfReader
except ImportError as e:
raise ImportError(
"pypdf is required for PDF processing. Install with: pip install pypdf"
) from e
try:
with open(file_path, "rb") as file:
self.logger.info(f"Reading PDF: {file_path}")
reader = PdfReader(file, strict=strict)
content_parts = []
page_texts = []
for page_num, page in enumerate(reader.pages, 1):
try:
page_text = page.extract_text()
if page_text.strip(): # Only add non-empty pages
page_texts.append(page_text)
content_parts.append(f"Page {page_num}:\n{page_text}\n")
except Exception as e:
self.logger.warning(f"Failed to extract text from page {page_num}: {e}")
continue
# Combine all content
full_content = "\n".join(content_parts)
# Gather metadata
metadata = {
"name": os.path.basename(file_path),
"size": os.path.getsize(file_path),
"extension": ".pdf",
"pages": len(reader.pages),
"pages_with_text": len(page_texts),
"loader": self.loader_name,
}
# Add PDF metadata if available
if reader.metadata:
metadata["pdf_metadata"] = {
"title": reader.metadata.get("/Title", ""),
"author": reader.metadata.get("/Author", ""),
"subject": reader.metadata.get("/Subject", ""),
"creator": reader.metadata.get("/Creator", ""),
"producer": reader.metadata.get("/Producer", ""),
"creation_date": str(reader.metadata.get("/CreationDate", "")),
"modification_date": str(reader.metadata.get("/ModDate", "")),
}
return LoaderResult(
content=full_content,
metadata=metadata,
content_type=ContentType.TEXT,
chunks=page_texts, # Pre-chunked by page
source_info={
"file_path": file_path,
"pages": len(reader.pages),
"strict_mode": strict,
},
)
except Exception as e:
self.logger.error(f"Failed to process PDF {file_path}: {e}")
raise Exception(f"PDF processing failed: {e}") from e

View file

@ -0,0 +1,168 @@
import os
from typing import List
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
from cognee.shared.logging_utils import get_logger
class UnstructuredLoader(LoaderInterface):
"""
Document loader using the unstructured library.
Handles various document formats including docx, pptx, xlsx, odt, etc.
Uses the unstructured library's auto-partition functionality.
"""
def __init__(self):
self.logger = get_logger(__name__)
@property
def supported_extensions(self) -> List[str]:
return [
".docx",
".doc",
".odt", # Word documents
".xlsx",
".xls",
".ods", # Spreadsheets
".pptx",
".ppt",
".odp", # Presentations
".rtf",
".html",
".htm", # Rich text and HTML
".eml",
".msg", # Email formats
".epub", # eBooks
]
@property
def supported_mime_types(self) -> List[str]:
return [
"application/vnd.openxmlformats-officedocument.wordprocessingml.document", # docx
"application/msword", # doc
"application/vnd.oasis.opendocument.text", # odt
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # xlsx
"application/vnd.ms-excel", # xls
"application/vnd.oasis.opendocument.spreadsheet", # ods
"application/vnd.openxmlformats-officedocument.presentationml.presentation", # pptx
"application/vnd.ms-powerpoint", # ppt
"application/vnd.oasis.opendocument.presentation", # odp
"application/rtf", # rtf
"text/html", # html
"message/rfc822", # eml
"application/epub+zip", # epub
]
@property
def loader_name(self) -> str:
return "unstructured_loader"
def get_dependencies(self) -> List[str]:
return ["unstructured>=0.10.0"]
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
"""Check if file can be handled by this loader."""
# Check file extension
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext not in self.supported_extensions:
return False
# Check MIME type if provided
if mime_type and mime_type not in self.supported_mime_types:
return False
# Validate dependencies
return self.validate_dependencies()
async def load(self, file_path: str, strategy: str = "auto", **kwargs) -> LoaderResult:
"""
Load document using unstructured library.
Args:
file_path: Path to the document file
strategy: Partitioning strategy ("auto", "fast", "hi_res", "ocr_only")
**kwargs: Additional arguments passed to unstructured partition
Returns:
LoaderResult with extracted text content and metadata
Raises:
ImportError: If unstructured is not installed
Exception: If document processing fails
"""
try:
from unstructured.partition.auto import partition
except ImportError as e:
raise ImportError(
"unstructured is required for document processing. "
"Install with: pip install unstructured"
) from e
try:
self.logger.info(f"Processing document: {file_path}")
# Determine content type from file extension
file_ext = os.path.splitext(file_path)[1].lower()
# Get file size and basic info
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
# Set partitioning parameters
partition_kwargs = {"filename": file_path, "strategy": strategy, **kwargs}
# Use partition to extract elements
elements = partition(**partition_kwargs)
# Process elements into text content
text_parts = []
element_info = []
for element in elements:
element_text = str(element).strip()
if element_text:
text_parts.append(element_text)
element_info.append(
{
"type": type(element).__name__,
"text": element_text[:100] + "..."
if len(element_text) > 100
else element_text,
}
)
# Combine all text content
full_content = "\n\n".join(text_parts)
# Determine content type based on structure
content_type = ContentType.STRUCTURED if len(element_info) > 1 else ContentType.TEXT
# Gather metadata
metadata = {
"name": file_name,
"size": file_size,
"extension": file_ext,
"loader": self.loader_name,
"elements_count": len(elements),
"text_elements_count": len(text_parts),
"strategy": strategy,
"element_types": list(set(info["type"] for info in element_info)),
}
return LoaderResult(
content=full_content,
metadata=metadata,
content_type=content_type,
chunks=text_parts, # Pre-chunked by elements
source_info={
"file_path": file_path,
"strategy": strategy,
"elements": element_info[:10], # First 10 elements for debugging
"total_elements": len(elements),
},
)
except Exception as e:
self.logger.error(f"Failed to process document {file_path}: {e}")
raise Exception(f"Document processing failed: {e}") from e

View file

@ -0,0 +1,20 @@
from functools import lru_cache
from .config import get_loader_config
from .LoaderEngine import LoaderEngine
from .create_loader_engine import create_loader_engine
@lru_cache
def get_loader_engine() -> LoaderEngine:
"""
Factory function to get loader engine.
Follows cognee's pattern with @lru_cache for efficient reuse
of engine instances. Configuration is loaded from environment
variables and settings.
Returns:
Cached LoaderEngine instance configured with current settings
"""
config = get_loader_config()
return create_loader_engine(**config.to_dict())

View file

@ -0,0 +1,47 @@
from pydantic import BaseModel
from typing import Optional, Dict, Any, List
from enum import Enum
class ContentType(Enum):
"""Content type classification for loaded files"""
TEXT = "text"
STRUCTURED = "structured"
BINARY = "binary"
class LoaderResult(BaseModel):
"""
Standardized output format for all file loaders.
This model ensures consistent data structure across all loader implementations,
following cognee's pattern of using Pydantic models for data validation.
"""
content: str # Primary text content extracted from file
metadata: Dict[str, Any] # File metadata (name, size, type, loader info, etc.)
content_type: ContentType # Content classification
chunks: Optional[List[str]] = None # Pre-chunked content if available
source_info: Optional[Dict[str, Any]] = None # Source-specific information
def to_dict(self) -> Dict[str, Any]:
"""
Convert the loader result to a dictionary format.
Returns:
Dict containing all loader result data with string-serialized content_type
"""
return {
"content": self.content,
"metadata": self.metadata,
"content_type": self.content_type.value,
"source_info": self.source_info or {},
"chunks": self.chunks,
}
class Config:
"""Pydantic configuration following cognee patterns"""
use_enum_values = True
validate_assignment = True

View file

@ -0,0 +1,3 @@
from .LoaderResult import LoaderResult, ContentType
__all__ = ["LoaderResult", "ContentType"]

View file

@ -0,0 +1,3 @@
# Registry for loader implementations
# Follows cognee's pattern used in databases/vector/supported_databases.py
supported_loaders = {}

View file

@ -0,0 +1,22 @@
from .supported_loaders import supported_loaders
def use_loader(loader_name: str, loader_class):
"""
Register a loader at runtime.
Follows cognee's pattern used in databases for adapter registration.
This allows external packages and custom loaders to be registered
into the loader system.
Args:
loader_name: Unique name for the loader
loader_class: Loader class implementing LoaderInterface
Example:
from cognee.infrastructure.loaders import use_loader
from my_package import MyCustomLoader
use_loader("my_custom_loader", MyCustomLoader)
"""
supported_loaders[loader_name] = loader_class

View file

@ -30,9 +30,38 @@ class BinaryData(IngestionData):
async def ensure_metadata(self):
if self.metadata is None:
self.metadata = await get_file_metadata(self.data)
# Handle case where file might be closed
if hasattr(self.data, "closed") and self.data.closed:
# Try to reopen the file if we have a file path
if hasattr(self.data, "name") and self.data.name:
try:
with open(self.data.name, "rb") as reopened_file:
self.metadata = await get_file_metadata(reopened_file)
except (OSError, FileNotFoundError):
# If we can't reopen, create minimal metadata
self.metadata = {
"name": self.name or "unknown",
"file_path": getattr(self.data, "name", "unknown"),
"extension": "txt",
"mime_type": "text/plain",
"content_hash": f"closed_file_{id(self.data)}",
"file_size": 0,
}
else:
# Create minimal metadata when file is closed and no path available
self.metadata = {
"name": self.name or "unknown",
"file_path": "unknown",
"extension": "txt",
"mime_type": "text/plain",
"content_hash": f"closed_file_{id(self.data)}",
"file_size": 0,
}
else:
# File is still open, proceed normally
self.metadata = await get_file_metadata(self.data)
if self.metadata["name"] is None:
if self.metadata.get("name") is None:
self.metadata["name"] = self.name
@asynccontextmanager

View file

@ -2,7 +2,7 @@ import os
from typing import Optional
from contextlib import asynccontextmanager
from cognee.infrastructure.files import get_file_metadata, FileMetadata
from cognee.infrastructure.utils import run_sync
from cognee.infrastructure.utils.run_sync import run_sync
from .IngestionData import IngestionData

View file

@ -16,9 +16,12 @@ class TextData(IngestionData):
self.data = data
def get_identifier(self):
keywords = extract_keywords(self.data)
import hashlib
return "text/plain" + "_" + "|".join(keywords)
content_bytes = self.data.encode("utf-8")
content_hash = hashlib.md5(content_bytes).hexdigest()
return "text/plain" + "_" + content_hash
def get_metadata(self):
self.ensure_metadata()
@ -27,7 +30,20 @@ class TextData(IngestionData):
def ensure_metadata(self):
if self.metadata is None:
self.metadata = {}
import hashlib
keywords = extract_keywords(self.data)
content_bytes = self.data.encode("utf-8")
content_hash = hashlib.md5(content_bytes).hexdigest()
self.metadata = {
"keywords": keywords,
"content_hash": content_hash,
"content_type": "text/plain",
"mime_type": "text/plain",
"extension": "txt",
"file_size": len(content_bytes),
}
@asynccontextmanager
async def get_data(self):

View file

@ -72,6 +72,25 @@ async def cognee_pipeline(
if cognee_pipeline.first_run:
from cognee.infrastructure.llm.utils import test_llm_connection, test_embedding_connection
# Ensure NLTK data is downloaded on first run
def ensure_nltk_data():
"""Download required NLTK data if not already present."""
try:
import nltk
# Download essential NLTK data used by the system
nltk.download("punkt_tab", quiet=True)
nltk.download("punkt", quiet=True)
nltk.download("averaged_perceptron_tagger", quiet=True)
nltk.download("averaged_perceptron_tagger_eng", quiet=True)
nltk.download("maxent_ne_chunker", quiet=True)
nltk.download("words", quiet=True)
logger.info("NLTK data initialized successfully")
except Exception as e:
logger.warning(f"Failed to initialize NLTK data: {e}")
ensure_nltk_data()
# Test LLM and Embedding configuration once before running Cognee
await test_llm_connection()
await test_embedding_connection()

View file

@ -98,10 +98,9 @@ async def run_tasks(
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
if data_point:
if (
data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id))
== DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
):
if (data_point.pipeline_status or {}).get(pipeline_name, {}).get(
str(dataset.id)
) == DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED:
yield {
"run_info": PipelineRunAlreadyCompleted(
pipeline_run_id=pipeline_run_id,
@ -133,11 +132,20 @@ async def run_tasks(
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
data_point.pipeline_status[pipeline_name] = {
str(dataset.id): DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
}
await session.merge(data_point)
await session.commit()
if data_point is not None:
if data_point.pipeline_status is None:
data_point.pipeline_status = {}
data_point.pipeline_status[pipeline_name] = {
str(dataset.id): DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
}
await session.merge(data_point)
await session.commit()
else:
# Log warning if data point not found but don't fail the pipeline
logger = get_logger(__name__)
logger.warning(
f"Data point with ID {data_id} not found in database, skipping pipeline status update"
)
yield {
"run_info": PipelineRunCompleted(

View file

@ -0,0 +1,11 @@
"""
Adapters for bridging the new loader system with existing ingestion pipeline.
This module provides compatibility layers to integrate the plugin-based loader
system with cognee's existing data processing pipeline while maintaining
backward compatibility and preserving permission logic.
"""
from .loader_to_ingestion_adapter import LoaderToIngestionAdapter
__all__ = ["LoaderToIngestionAdapter"]

View file

@ -0,0 +1,241 @@
import os
import tempfile
from typing import BinaryIO, Union, Optional, Any
from io import StringIO, BytesIO
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
from cognee.modules.ingestion.data_types import IngestionData, TextData, BinaryData
from cognee.infrastructure.files import get_file_metadata
from cognee.shared.logging_utils import get_logger
class LoaderResultToIngestionData(IngestionData):
"""
Adapter class that wraps LoaderResult to be compatible with IngestionData interface.
This maintains backward compatibility with existing cognee ingestion pipeline
while enabling the new loader system.
"""
def __init__(self, loader_result: LoaderResult, original_file_path: str = None):
self.loader_result = loader_result
self.original_file_path = original_file_path
self._cached_metadata = None
self.logger = get_logger(__name__)
def get_identifier(self) -> str:
"""
Get content identifier for deduplication.
Always generates hash from content to ensure consistency with existing system.
"""
# Always generate hash from content for consistency
import hashlib
content_bytes = self.loader_result.content.encode("utf-8")
content_hash = hashlib.md5(content_bytes).hexdigest()
# Add content type prefix for better identification
content_type = self.loader_result.content_type.value
return f"{content_type}_{content_hash}"
def get_metadata(self) -> dict:
"""
Get file metadata in the format expected by existing pipeline.
Converts LoaderResult metadata to the format used by IngestionData.
"""
if self._cached_metadata is not None:
return self._cached_metadata
# Start with loader result metadata
metadata = self.loader_result.metadata.copy()
# Ensure required fields are present
if "name" not in metadata:
if self.original_file_path:
metadata["name"] = os.path.basename(self.original_file_path)
else:
# Generate name from content hash
content_hash = self.get_identifier().split("_")[-1][:8]
ext = metadata.get("extension", ".txt")
metadata["name"] = f"content_{content_hash}{ext}"
if "content_hash" not in metadata:
# Store content hash without prefix for compatibility with deletion system
identifier = self.get_identifier()
if "_" in identifier:
# Remove content type prefix (e.g., "text_abc123" -> "abc123")
metadata["content_hash"] = identifier.split("_", 1)[-1]
else:
metadata["content_hash"] = identifier
if "file_path" not in metadata and self.original_file_path:
metadata["file_path"] = self.original_file_path
# Add mime type if not present
if "mime_type" not in metadata:
ext = metadata.get("extension", "").lower()
mime_type_map = {
".txt": "text/plain",
".md": "text/markdown",
".csv": "text/csv",
".json": "application/json",
".pdf": "application/pdf",
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
}
metadata["mime_type"] = mime_type_map.get(ext, "application/octet-stream")
self._cached_metadata = metadata
return metadata
def get_data(self) -> Union[str, BinaryIO]:
"""
Get data content in format expected by existing pipeline.
Returns content as string for text data or creates a file-like object
for binary data to maintain compatibility.
"""
if self.loader_result.content_type == ContentType.TEXT:
return self.loader_result.content
# For structured or binary content, return as string for now
# The existing pipeline expects text content for processing
return self.loader_result.content
class LoaderToIngestionAdapter:
"""
Adapter that bridges the new loader system with existing ingestion pipeline.
This class provides methods to process files using the loader system
while maintaining compatibility with the existing IngestionData interface.
"""
def __init__(self):
self.logger = get_logger(__name__)
async def process_file_with_loaders(
self,
file_path: str,
s3fs: Optional[Any] = None,
preferred_loaders: Optional[list] = None,
loader_config: Optional[dict] = None,
) -> IngestionData:
"""
Process a file using the loader system and return IngestionData.
Args:
file_path: Path to the file to process
s3fs: S3 filesystem (for compatibility with existing code)
preferred_loaders: List of preferred loader names
loader_config: Configuration for specific loaders
Returns:
IngestionData compatible object
Raises:
Exception: If no loader can handle the file
"""
from cognee.infrastructure.loaders import get_loader_engine
try:
# Get the loader engine
engine = get_loader_engine()
# Determine MIME type if possible
mime_type = None
try:
import mimetypes
mime_type, _ = mimetypes.guess_type(file_path)
except Exception:
pass
# Load file using loader system
self.logger.info(f"Processing file with loaders: {file_path}")
# Extract loader-specific config if provided
kwargs = {}
if loader_config:
# Find the first available loader that matches our preferred loaders
loader = engine.get_loader(file_path, mime_type, preferred_loaders)
if loader and loader.loader_name in loader_config:
kwargs = loader_config[loader.loader_name]
loader_result = await engine.load_file(
file_path, mime_type=mime_type, preferred_loaders=preferred_loaders, **kwargs
)
# Convert to IngestionData compatible format
return LoaderResultToIngestionData(loader_result, file_path)
except Exception as e:
self.logger.warning(f"Loader system failed for {file_path}: {e}")
# Fallback to existing classification system
return await self._fallback_to_existing_system(file_path, s3fs)
async def _fallback_to_existing_system(
self, file_path: str, s3fs: Optional[Any] = None
) -> IngestionData:
"""
Fallback to existing ingestion.classify() system for backward compatibility.
This ensures that even if the loader system fails, we can still process
files using the original classification method.
"""
from cognee.modules.ingestion import classify
self.logger.info(f"Falling back to existing classification system for: {file_path}")
# Open file and classify using existing system
if file_path.startswith("s3://"):
if s3fs:
with s3fs.open(file_path, "rb") as file:
return classify(file)
else:
raise ValueError("S3 file path provided but no s3fs available")
else:
# Handle local files and file:// URLs
local_path = file_path.replace("file://", "")
with open(local_path, "rb") as file:
return classify(file)
def is_text_content(self, data: Union[str, Any]) -> bool:
"""
Check if the provided data is text content (not a file path).
Args:
data: The data to check
Returns:
True if data is text content, False if it's a file path
"""
if not isinstance(data, str):
return False
# Check if it's a file path
if (
data.startswith("/")
or data.startswith("file://")
or data.startswith("s3://")
or (len(data) > 1 and data[1] == ":")
): # Windows drive paths
return False
return True
def create_text_ingestion_data(self, content: str) -> IngestionData:
"""
Create IngestionData for text content.
Args:
content: Text content to wrap
Returns:
IngestionData compatible object
"""
return TextData(content)

View file

@ -60,7 +60,7 @@ async def ingest_data(
else:
# Find existing dataset or create a new one
existing_datasets = await get_authorized_existing_datasets(
user=user, permission_type="write", datasets=[dataset_name]
datasets=[dataset_name], permission_type="write", user=user
)
dataset = await load_or_create_datasets(
dataset_names=[dataset_name],

View file

@ -0,0 +1,324 @@
import json
import inspect
from uuid import UUID
from typing import Union, BinaryIO, Any, List, Optional
import cognee.modules.ingestion as ingestion
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
from cognee.modules.data.methods import (
get_authorized_existing_datasets,
get_dataset_data,
load_or_create_datasets,
)
from cognee.shared.logging_utils import get_logger
from .save_data_item_to_storage import save_data_item_to_storage
from .adapters import LoaderToIngestionAdapter
from cognee.infrastructure.files.storage.s3_config import get_s3_config
logger = get_logger(__name__)
async def plugin_ingest_data(
data: Any,
dataset_name: str,
user: User,
node_set: Optional[List[str]] = None,
dataset_id: UUID = None,
preferred_loaders: Optional[List[str]] = None,
loader_config: Optional[dict] = None,
):
"""
Plugin-based data ingestion using the loader system.
This function maintains full backward compatibility with the existing
ingest_data function while adding support for the new loader system.
Args:
data: The data to ingest
dataset_name: Name of the dataset
user: User object for permissions
node_set: Optional node set for organization
dataset_id: Optional specific dataset ID
preferred_loaders: List of preferred loader names to try first
loader_config: Configuration for specific loaders
Returns:
List of Data objects that were ingested
"""
if not user:
user = await get_default_user()
# Ensure NLTK data is downloaded (preserves automatic download behavior)
def ensure_nltk_data():
"""Download required NLTK data if not already present."""
try:
import nltk
# Download essential NLTK data used by the system
nltk.download("punkt_tab", quiet=True)
nltk.download("punkt", quiet=True)
nltk.download("averaged_perceptron_tagger", quiet=True)
nltk.download("averaged_perceptron_tagger_eng", quiet=True)
nltk.download("maxent_ne_chunker", quiet=True)
nltk.download("words", quiet=True)
logger.info("NLTK data verified/downloaded successfully")
except Exception as e:
logger.warning(f"Failed to download NLTK data: {e}")
# Download NLTK data once per session
if not hasattr(plugin_ingest_data, "_nltk_initialized"):
ensure_nltk_data()
plugin_ingest_data._nltk_initialized = True
# Initialize S3 support (maintain existing behavior)
s3_config = get_s3_config()
fs = None
if s3_config.aws_access_key_id is not None and s3_config.aws_secret_access_key is not None:
import s3fs
fs = s3fs.S3FileSystem(
key=s3_config.aws_access_key_id, secret=s3_config.aws_secret_access_key, anon=False
)
# Initialize the loader adapter
loader_adapter = LoaderToIngestionAdapter()
def open_data_file(file_path: str):
"""Open file with S3 support (preserves existing behavior)."""
if file_path.startswith("s3://"):
return fs.open(file_path, mode="rb")
else:
local_path = file_path.replace("file://", "")
return open(local_path, mode="rb")
def get_external_metadata_dict(data_item: Union[BinaryIO, str, Any]) -> dict[str, Any]:
"""Get external metadata (preserves existing behavior)."""
if hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")):
return {"metadata": data_item.dict(), "origin": str(type(data_item))}
else:
return {}
async def store_data_to_dataset(
data: Any,
dataset_name: str,
user: User,
node_set: Optional[List[str]] = None,
dataset_id: UUID = None,
):
"""
Core data storage logic with plugin-based file processing.
This function preserves all existing permission and database logic
while using the new loader system for file processing.
"""
logger.info(f"Plugin-based ingestion starting for dataset: {dataset_name}")
# Preserve existing dataset creation and permission logic
if dataset_id:
# Retrieve existing dataset by ID
dataset = await get_specific_user_permission_datasets(user.id, "write", [dataset_id])
# Convert from list to Dataset element
if isinstance(dataset, list):
dataset = dataset[0]
else:
# Find existing dataset or create a new one by name
existing_datasets = await get_authorized_existing_datasets(
datasets=[dataset_name], permission_type="write", user=user
)
datasets = await load_or_create_datasets(
dataset_names=[dataset_name], existing_datasets=existing_datasets, user=user
)
if isinstance(datasets, list):
dataset = datasets[0]
new_datapoints = []
existing_data_points = []
dataset_new_data_points = []
# Get existing dataset data for deduplication (preserve existing logic)
dataset_data: list[Data] = await get_dataset_data(dataset.id)
dataset_data_map = {str(data.id): True for data in dataset_data}
for data_item in data:
file_path = await save_data_item_to_storage(data_item)
# NEW: Use loader system or existing classification based on data type
try:
if loader_adapter.is_text_content(data_item):
# Handle text content (preserve existing behavior)
logger.info("Processing text content with existing system")
classified_data = ingestion.classify(data_item)
else:
# Use loader system for file paths
logger.info(f"Processing file with loader system: {file_path}")
classified_data = await loader_adapter.process_file_with_loaders(
file_path,
s3fs=fs,
preferred_loaders=preferred_loaders,
loader_config=loader_config,
)
except Exception as e:
logger.warning(f"Plugin system failed for {file_path}, falling back: {e}")
# Fallback to existing system for full backward compatibility
with open_data_file(file_path) as file:
classified_data = ingestion.classify(file)
# Preserve all existing data processing logic
data_id = ingestion.identify(classified_data, user)
file_metadata = classified_data.get_metadata()
# Ensure metadata has all required fields with fallbacks
def get_metadata_field(metadata, field_name, default_value=""):
"""Get metadata field with fallback handling."""
if field_name in metadata and metadata[field_name] is not None:
return metadata[field_name]
logger.warning(f"Missing metadata field '{field_name}', using fallback")
# Provide fallbacks based on available information
if field_name == "name":
if "file_path" in metadata and metadata["file_path"]:
import os
return os.path.basename(str(metadata["file_path"])).split(".")[0]
elif file_path:
import os
return os.path.basename(str(file_path)).split(".")[0]
else:
content_hash = metadata.get("content_hash", str(data_id))[:8]
return f"content_{content_hash}"
elif field_name == "file_path":
# Use the actual file path returned by save_data_item_to_storage
return file_path
elif field_name == "extension":
if "file_path" in metadata and metadata["file_path"]:
import os
_, ext = os.path.splitext(str(metadata["file_path"]))
return ext.lstrip(".") if ext else "txt"
elif file_path:
import os
_, ext = os.path.splitext(str(file_path))
return ext.lstrip(".") if ext else "txt"
return "txt"
elif field_name == "mime_type":
ext = get_metadata_field(metadata, "extension", "txt")
mime_map = {
"txt": "text/plain",
"md": "text/markdown",
"pdf": "application/pdf",
"json": "application/json",
"csv": "text/csv",
}
return mime_map.get(ext.lower(), "text/plain")
elif field_name == "content_hash":
# Extract the raw content hash for compatibility with deletion system
content_identifier = classified_data.get_identifier()
# Remove content type prefix if present (e.g., "text_abc123" -> "abc123")
if "_" in content_identifier:
return content_identifier.split("_", 1)[-1]
return content_identifier
elif field_name == "file_size":
# Get file size from metadata or filesystem
if "file_size" in metadata:
return metadata["file_size"]
elif file_path:
import os
try:
return os.path.getsize(file_path)
except (OSError, TypeError):
return None
return None
return default_value
from sqlalchemy import select
db_engine = get_relational_engine()
# Check if data should be updated (preserve existing logic)
async with db_engine.get_async_session() as session:
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
ext_metadata = get_external_metadata_dict(data_item)
if node_set:
ext_metadata["node_set"] = node_set
# Preserve existing data point creation/update logic
if data_point is not None:
data_point.name = get_metadata_field(file_metadata, "name")
data_point.raw_data_location = get_metadata_field(file_metadata, "file_path")
data_point.extension = get_metadata_field(file_metadata, "extension")
data_point.mime_type = get_metadata_field(file_metadata, "mime_type")
data_point.owner_id = user.id
data_point.content_hash = get_metadata_field(file_metadata, "content_hash")
data_point.external_metadata = ext_metadata
data_point.node_set = json.dumps(node_set) if node_set else None
if str(data_point.id) in dataset_data_map:
existing_data_points.append(data_point)
else:
dataset_new_data_points.append(data_point)
dataset_data_map[str(data_point.id)] = True
else:
if str(data_id) in dataset_data_map:
continue
data_point = Data(
id=data_id,
name=get_metadata_field(file_metadata, "name"),
raw_data_location=get_metadata_field(file_metadata, "file_path"),
extension=get_metadata_field(file_metadata, "extension"),
mime_type=get_metadata_field(file_metadata, "mime_type"),
owner_id=user.id,
content_hash=get_metadata_field(file_metadata, "content_hash"),
external_metadata=ext_metadata,
node_set=json.dumps(node_set) if node_set else None,
data_size=get_metadata_field(file_metadata, "file_size"),
tenant_id=user.tenant_id if user.tenant_id else None,
pipeline_status={},
token_count=-1,
)
new_datapoints.append(data_point)
dataset_data_map[str(data_point.id)] = True
# Preserve existing database operations
async with db_engine.get_async_session() as session:
if dataset not in session:
session.add(dataset)
if len(new_datapoints) > 0:
dataset.data.extend(new_datapoints)
if len(existing_data_points) > 0:
for data_point in existing_data_points:
await session.merge(data_point)
if len(dataset_new_data_points) > 0:
dataset.data.extend(dataset_new_data_points)
await session.merge(dataset)
await session.commit()
logger.info(
f"Plugin-based ingestion completed. New: {len(new_datapoints)}, "
+ f"Updated: {len(existing_data_points)}, Dataset new: {len(dataset_new_data_points)}"
)
return existing_data_points + dataset_new_data_points + new_datapoints
return await store_data_to_dataset(data, dataset_name, user, node_set, dataset_id)

View file

@ -1,11 +1,13 @@
import os
from urllib.parse import urlparse
from pathlib import Path
from typing import List, Union, BinaryIO
from cognee.infrastructure.files.storage.s3_config import get_s3_config
async def resolve_data_directories(
data: Union[BinaryIO, List[BinaryIO], str, List[str]], include_subdirectories: bool = True
data: Union[BinaryIO, List[BinaryIO], str, List[str], Path, List[Path]],
include_subdirectories: bool = True,
):
"""
Resolves directories by replacing them with their contained files.
@ -33,7 +35,26 @@ async def resolve_data_directories(
)
for item in data:
if isinstance(item, str): # Check if the item is a path
if isinstance(item, Path): # Path objects explicitly indicate file paths
# Convert Path to string for processing
item_str = str(item)
if item.is_dir(): # If it's a directory
if include_subdirectories:
# Recursively add all files in the directory and subdirectories
for root, _, files in os.walk(item_str):
resolved_data.extend([Path(os.path.join(root, f)) for f in files])
else:
# Add all files (not subdirectories) in the directory
resolved_data.extend(
[
Path(os.path.join(item_str, f))
for f in os.listdir(item_str)
if os.path.isfile(os.path.join(item_str, f))
]
)
else: # If it's a file, add it directly
resolved_data.append(item)
elif isinstance(item, str): # Check if the item is a path or text content
# S3
if urlparse(item).scheme == "s3":
if fs is not None:

View file

@ -1,5 +1,6 @@
import os
from urllib.parse import urlparse
from pathlib import Path
from typing import Union, BinaryIO, Any
from cognee.modules.ingestion.exceptions import IngestionError
@ -16,7 +17,7 @@ class SaveDataSettings(BaseSettings):
settings = SaveDataSettings()
async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str:
async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Path, Any]) -> str:
if "llama_index" in str(type(data_item)):
# Dynamic import is used because the llama_index module is optional.
from .transform_data import get_data_from_llama_index
@ -27,6 +28,18 @@ async def save_data_item_to_storage(data_item: Union[BinaryIO, str, Any]) -> str
if hasattr(data_item, "file"):
return await save_data_to_file(data_item.file, filename=data_item.filename)
# data is a Path object - explicitly indicates a file path
if isinstance(data_item, Path):
if settings.accept_local_file_path:
# Convert Path to file URL for consistency
normalized_path = str(data_item.resolve())
# Use forward slashes in file URLs for consistency
url_path = normalized_path.replace(os.sep, "/")
file_path = "file://" + url_path
return file_path
else:
raise IngestionError(message="Local files are not accepted.")
if isinstance(data_item, str):
parsed_url = urlparse(data_item)

View file

@ -0,0 +1,237 @@
import os
import importlib.util
from typing import Dict, List, Optional
from .LoaderInterface import LoaderInterface
from .models.LoaderResult import LoaderResult
from cognee.shared.logging_utils import get_logger
class LoaderEngine:
"""
Main loader engine for managing file loaders.
Follows cognee's adapter pattern similar to database engines,
providing a centralized system for file loading operations.
"""
def __init__(
self,
loader_directories: List[str],
default_loader_priority: List[str],
fallback_loader: str = "text_loader",
enable_dependency_validation: bool = True,
):
"""
Initialize the loader engine.
Args:
loader_directories: Directories to search for loader implementations
default_loader_priority: Priority order for loader selection
fallback_loader: Default loader to use when no other matches
enable_dependency_validation: Whether to validate loader dependencies
"""
self._loaders: Dict[str, LoaderInterface] = {}
self._extension_map: Dict[str, List[LoaderInterface]] = {}
self._mime_type_map: Dict[str, List[LoaderInterface]] = {}
self.loader_directories = loader_directories
self.default_loader_priority = default_loader_priority
self.fallback_loader = fallback_loader
self.enable_dependency_validation = enable_dependency_validation
self.logger = get_logger(__name__)
def register_loader(self, loader: LoaderInterface) -> bool:
"""
Register a loader with the engine.
Args:
loader: LoaderInterface implementation to register
Returns:
True if loader was registered successfully, False otherwise
"""
# Validate dependencies if enabled
if self.enable_dependency_validation and not loader.validate_dependencies():
self.logger.warning(
f"Skipping loader '{loader.loader_name}' - missing dependencies: "
f"{loader.get_dependencies()}"
)
return False
self._loaders[loader.loader_name] = loader
# Map extensions to loaders
for ext in loader.supported_extensions:
ext_lower = ext.lower()
if ext_lower not in self._extension_map:
self._extension_map[ext_lower] = []
self._extension_map[ext_lower].append(loader)
# Map mime types to loaders
for mime_type in loader.supported_mime_types:
if mime_type not in self._mime_type_map:
self._mime_type_map[mime_type] = []
self._mime_type_map[mime_type].append(loader)
self.logger.info(f"Registered loader: {loader.loader_name}")
return True
def get_loader(
self, file_path: str, mime_type: str = None, preferred_loaders: List[str] = None
) -> Optional[LoaderInterface]:
"""
Get appropriate loader for a file.
Args:
file_path: Path to the file to be processed
mime_type: Optional MIME type of the file
preferred_loaders: List of preferred loader names to try first
Returns:
LoaderInterface that can handle the file, or None if not found
"""
ext = os.path.splitext(file_path)[1].lower()
# Try preferred loaders first
if preferred_loaders:
for loader_name in preferred_loaders:
if loader_name in self._loaders:
loader = self._loaders[loader_name]
if loader.can_handle(file_path, mime_type):
return loader
# Try priority order
for loader_name in self.default_loader_priority:
if loader_name in self._loaders:
loader = self._loaders[loader_name]
if loader.can_handle(file_path, mime_type):
return loader
# Try mime type mapping
if mime_type and mime_type in self._mime_type_map:
for loader in self._mime_type_map[mime_type]:
if loader.can_handle(file_path, mime_type):
return loader
# Try extension mapping
if ext in self._extension_map:
for loader in self._extension_map[ext]:
if loader.can_handle(file_path, mime_type):
return loader
# Fallback loader
if self.fallback_loader in self._loaders:
fallback = self._loaders[self.fallback_loader]
if fallback.can_handle(file_path, mime_type):
return fallback
return None
async def load_file(
self, file_path: str, mime_type: str = None, preferred_loaders: List[str] = None, **kwargs
) -> LoaderResult:
"""
Load file using appropriate loader.
Args:
file_path: Path to the file to be processed
mime_type: Optional MIME type of the file
preferred_loaders: List of preferred loader names to try first
**kwargs: Additional loader-specific configuration
Returns:
LoaderResult containing processed content and metadata
Raises:
ValueError: If no suitable loader is found
Exception: If file processing fails
"""
loader = self.get_loader(file_path, mime_type, preferred_loaders)
if not loader:
raise ValueError(f"No loader found for file: {file_path}")
self.logger.debug(f"Loading {file_path} with {loader.loader_name}")
return await loader.load(file_path, **kwargs)
def discover_loaders(self):
"""
Auto-discover loaders from configured directories.
Scans loader directories for Python modules containing
LoaderInterface implementations and registers them.
"""
for directory in self.loader_directories:
if os.path.exists(directory):
self._discover_in_directory(directory)
def _discover_in_directory(self, directory: str):
"""
Discover loaders in a specific directory.
Args:
directory: Directory path to scan for loader implementations
"""
try:
for file_name in os.listdir(directory):
if file_name.endswith(".py") and not file_name.startswith("_"):
module_name = file_name[:-3]
file_path = os.path.join(directory, file_name)
try:
spec = importlib.util.spec_from_file_location(module_name, file_path)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Look for loader classes
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, LoaderInterface)
and attr != LoaderInterface
):
# Instantiate and register the loader
try:
loader_instance = attr()
self.register_loader(loader_instance)
except Exception as e:
self.logger.warning(
f"Failed to instantiate loader {attr_name}: {e}"
)
except Exception as e:
self.logger.warning(f"Failed to load module {module_name}: {e}")
except OSError as e:
self.logger.warning(f"Failed to scan directory {directory}: {e}")
def get_available_loaders(self) -> List[str]:
"""
Get list of available loader names.
Returns:
List of registered loader names
"""
return list(self._loaders.keys())
def get_loader_info(self, loader_name: str) -> Dict[str, any]:
"""
Get information about a specific loader.
Args:
loader_name: Name of the loader to inspect
Returns:
Dictionary containing loader information
"""
if loader_name not in self._loaders:
return {}
loader = self._loaders[loader_name]
return {
"name": loader.loader_name,
"extensions": loader.supported_extensions,
"mime_types": loader.supported_mime_types,
"dependencies": loader.get_dependencies(),
"available": loader.validate_dependencies(),
}

View file

@ -0,0 +1,102 @@
from abc import ABC, abstractmethod
from typing import List, Union
from pathlib import Path
from .models.LoaderResult import LoaderResult
class LoaderInterface(ABC):
"""
Base interface for all file loaders in cognee.
This interface follows cognee's established pattern for database adapters,
ensuring consistent behavior across all loader implementations.
"""
@property
@abstractmethod
def supported_extensions(self) -> List[str]:
"""
List of file extensions this loader supports.
Returns:
List of extensions including the dot (e.g., ['.txt', '.md'])
"""
pass
@property
@abstractmethod
def supported_mime_types(self) -> List[str]:
"""
List of MIME types this loader supports.
Returns:
List of MIME type strings (e.g., ['text/plain', 'application/pdf'])
"""
pass
@property
@abstractmethod
def loader_name(self) -> str:
"""
Unique name identifier for this loader.
Returns:
String identifier used for registration and configuration
"""
pass
@abstractmethod
def can_handle(self, file_path: Union[str, Path], mime_type: str = None) -> bool:
"""
Check if this loader can handle the given file.
Args:
file_path: Path to the file to be processed (Path type recommended for explicit file path handling)
mime_type: Optional MIME type of the file
Returns:
True if this loader can process the file, False otherwise
"""
pass
@abstractmethod
async def load(self, file_path: Union[str, Path], **kwargs) -> LoaderResult:
"""
Load and process the file, returning standardized result.
Args:
file_path: Path to the file to be processed (Path type recommended for explicit file path handling)
**kwargs: Additional loader-specific configuration
Returns:
LoaderResult containing processed content and metadata
Raises:
Exception: If file cannot be processed
"""
pass
def get_dependencies(self) -> List[str]:
"""
Optional: Return list of required dependencies for this loader.
Returns:
List of package names with optional version specifications
"""
return []
def validate_dependencies(self) -> bool:
"""
Check if all required dependencies are available.
Returns:
True if all dependencies are installed, False otherwise
"""
for dep in self.get_dependencies():
# Extract package name from version specification
package_name = dep.split(">=")[0].split("==")[0].split("<")[0]
try:
__import__(package_name)
except ImportError:
return False
return True

View file

@ -0,0 +1,19 @@
"""
File loader infrastructure for cognee.
This package provides a plugin-based system for loading different file formats
into cognee, following the same patterns as database adapters.
Main exports:
- get_loader_engine(): Factory function to get configured loader engine
- use_loader(): Register custom loaders at runtime
- LoaderInterface: Base interface for implementing loaders
- LoaderResult, ContentType: Data models for loader results
"""
from .get_loader_engine import get_loader_engine
from .use_loader import use_loader
from .LoaderInterface import LoaderInterface
from .models.LoaderResult import LoaderResult, ContentType
__all__ = ["get_loader_engine", "use_loader", "LoaderInterface", "LoaderResult", "ContentType"]

View file

@ -0,0 +1,57 @@
from functools import lru_cache
from typing import List, Optional, Dict, Any
from pydantic_settings import BaseSettings, SettingsConfigDict
from cognee.root_dir import get_absolute_path
class LoaderConfig(BaseSettings):
"""
Configuration for file loader system.
Follows cognee's pattern using pydantic_settings.BaseSettings for
environment variable support and validation.
"""
loader_directories: List[str] = [
get_absolute_path("cognee/infrastructure/loaders/core"),
get_absolute_path("cognee/infrastructure/loaders/external"),
]
default_loader_priority: List[str] = [
"text_loader",
"pypdf_loader",
"unstructured_loader",
"dlt_loader",
]
auto_discover: bool = True
fallback_loader: str = "text_loader"
enable_dependency_validation: bool = True
model_config = SettingsConfigDict(env_file=".env", extra="allow", env_prefix="LOADER_")
def to_dict(self) -> Dict[str, Any]:
"""
Convert configuration to dictionary format.
Returns:
Dict containing all loader configuration settings
"""
return {
"loader_directories": self.loader_directories,
"default_loader_priority": self.default_loader_priority,
"auto_discover": self.auto_discover,
"fallback_loader": self.fallback_loader,
"enable_dependency_validation": self.enable_dependency_validation,
}
@lru_cache
def get_loader_config() -> LoaderConfig:
"""
Get cached loader configuration.
Uses LRU cache following cognee's pattern for configuration objects.
Returns:
LoaderConfig instance with current settings
"""
return LoaderConfig()

View file

@ -0,0 +1,5 @@
"""Core loader implementations that are always available."""
from .text_loader import TextLoader
__all__ = ["TextLoader"]

View file

@ -0,0 +1,137 @@
import os
from typing import List, Union
from pathlib import Path
from ..LoaderInterface import LoaderInterface
from ..models.LoaderResult import LoaderResult, ContentType
class TextLoader(LoaderInterface):
"""
Core text file loader that handles basic text file formats.
This loader is always available and serves as the fallback for
text-based files when no specialized loader is available.
"""
@property
def supported_extensions(self) -> List[str]:
"""Supported text file extensions."""
return [".txt", ".md", ".csv", ".json", ".xml", ".yaml", ".yml", ".log"]
@property
def supported_mime_types(self) -> List[str]:
"""Supported MIME types for text content."""
return [
"text/plain",
"text/markdown",
"text/csv",
"application/json",
"text/xml",
"application/xml",
"text/yaml",
"application/yaml",
]
@property
def loader_name(self) -> str:
"""Unique identifier for this loader."""
return "text_loader"
def can_handle(self, file_path: Union[str, Path], mime_type: str = None) -> bool:
"""
Check if this loader can handle the given file.
Args:
file_path: Path to the file (Path type recommended for explicit file path handling)
mime_type: Optional MIME type
Returns:
True if file can be handled, False otherwise
"""
# Convert to Path for consistent handling
path_obj = Path(file_path) if isinstance(file_path, str) else file_path
# Check by extension
ext = path_obj.suffix.lower()
if ext in self.supported_extensions:
return True
# Check by MIME type
if mime_type and mime_type in self.supported_mime_types:
return True
# As fallback loader, can attempt to handle any text-like file
# This is useful when other loaders fail
try:
# Quick check if file appears to be text
with open(path_obj, "rb") as f:
sample = f.read(512)
# Simple heuristic: if most bytes are printable, consider it text
if sample:
try:
sample.decode("utf-8")
return True
except UnicodeDecodeError:
try:
sample.decode("latin-1")
return True
except UnicodeDecodeError:
pass
except (OSError, IOError):
pass
return False
async def load(
self, file_path: Union[str, Path], encoding: str = "utf-8", **kwargs
) -> LoaderResult:
"""
Load and process the text file.
Args:
file_path: Path to the file to load (Path type recommended for explicit file path handling)
encoding: Text encoding to use (default: utf-8)
**kwargs: Additional configuration (unused)
Returns:
LoaderResult containing the file content and metadata
Raises:
FileNotFoundError: If file doesn't exist
UnicodeDecodeError: If file cannot be decoded with specified encoding
OSError: If file cannot be read
"""
# Convert to Path for consistent handling
path_obj = Path(file_path) if isinstance(file_path, str) else file_path
if not path_obj.exists():
raise FileNotFoundError(f"File not found: {path_obj}")
try:
with open(path_obj, "r", encoding=encoding) as f:
content = f.read()
except UnicodeDecodeError:
# Try with fallback encoding
if encoding == "utf-8":
return await self.load(path_obj, encoding="latin-1", **kwargs)
else:
raise
# Extract basic metadata
file_stat = path_obj.stat()
metadata = {
"name": path_obj.name,
"size": file_stat.st_size,
"extension": path_obj.suffix,
"encoding": encoding,
"loader": self.loader_name,
"lines": len(content.splitlines()) if content else 0,
"characters": len(content),
}
return LoaderResult(
content=content,
metadata=metadata,
content_type=ContentType.TEXT,
source_info={"file_path": str(path_obj), "encoding": encoding},
)

View file

@ -0,0 +1,49 @@
from typing import List
from .LoaderEngine import LoaderEngine
from .supported_loaders import supported_loaders
def create_loader_engine(
loader_directories: List[str],
default_loader_priority: List[str],
auto_discover: bool = True,
fallback_loader: str = "text_loader",
enable_dependency_validation: bool = True,
) -> LoaderEngine:
"""
Create loader engine with given configuration.
Follows cognee's pattern for engine creation functions used
in database adapters.
Args:
loader_directories: Directories to search for loader implementations
default_loader_priority: Priority order for loader selection
auto_discover: Whether to auto-discover loaders from directories
fallback_loader: Default loader to use when no other matches
enable_dependency_validation: Whether to validate loader dependencies
Returns:
Configured LoaderEngine instance
"""
engine = LoaderEngine(
loader_directories=loader_directories,
default_loader_priority=default_loader_priority,
fallback_loader=fallback_loader,
enable_dependency_validation=enable_dependency_validation,
)
# Register supported loaders from registry
for loader_name, loader_class in supported_loaders.items():
try:
loader_instance = loader_class()
engine.register_loader(loader_instance)
except Exception as e:
# Log but don't fail - allow engine to continue with other loaders
engine.logger.warning(f"Failed to register loader {loader_name}: {e}")
# Auto-discover loaders if enabled
if auto_discover:
engine.discover_loaders()
return engine

View file

@ -0,0 +1,20 @@
from functools import lru_cache
from .config import get_loader_config
from .LoaderEngine import LoaderEngine
from .create_loader_engine import create_loader_engine
@lru_cache
def get_loader_engine() -> LoaderEngine:
"""
Factory function to get loader engine.
Follows cognee's pattern with @lru_cache for efficient reuse
of engine instances. Configuration is loaded from environment
variables and settings.
Returns:
Cached LoaderEngine instance configured with current settings
"""
config = get_loader_config()
return create_loader_engine(**config.to_dict())

View file

@ -0,0 +1,47 @@
from pydantic import BaseModel
from typing import Optional, Dict, Any, List
from enum import Enum
class ContentType(Enum):
"""Content type classification for loaded files"""
TEXT = "text"
STRUCTURED = "structured"
BINARY = "binary"
class LoaderResult(BaseModel):
"""
Standardized output format for all file loaders.
This model ensures consistent data structure across all loader implementations,
following cognee's pattern of using Pydantic models for data validation.
"""
content: str # Primary text content extracted from file
metadata: Dict[str, Any] # File metadata (name, size, type, loader info, etc.)
content_type: ContentType # Content classification
chunks: Optional[List[str]] = None # Pre-chunked content if available
source_info: Optional[Dict[str, Any]] = None # Source-specific information
def to_dict(self) -> Dict[str, Any]:
"""
Convert the loader result to a dictionary format.
Returns:
Dict containing all loader result data with string-serialized content_type
"""
return {
"content": self.content,
"metadata": self.metadata,
"content_type": self.content_type.value,
"source_info": self.source_info or {},
"chunks": self.chunks,
}
class Config:
"""Pydantic configuration following cognee patterns"""
use_enum_values = True
validate_assignment = True

View file

@ -0,0 +1,3 @@
from .LoaderResult import LoaderResult, ContentType
__all__ = ["LoaderResult", "ContentType"]

View file

@ -0,0 +1,3 @@
# Registry for loader implementations
# Follows cognee's pattern used in databases/vector/supported_databases.py
supported_loaders = {}

View file

@ -0,0 +1,22 @@
from .supported_loaders import supported_loaders
def use_loader(loader_name: str, loader_class):
"""
Register a loader at runtime.
Follows cognee's pattern used in databases for adapter registration.
This allows external packages and custom loaders to be registered
into the loader system.
Args:
loader_name: Unique name for the loader
loader_class: Loader class implementing LoaderInterface
Example:
from cognee.infrastructure.loaders import use_loader
from my_package import MyCustomLoader
use_loader("my_custom_loader", MyCustomLoader)
"""
supported_loaders[loader_name] = loader_class

1
tests/__init__.py Normal file
View file

@ -0,0 +1 @@
# Tests package

1
tests/unit/__init__.py Normal file
View file

@ -0,0 +1 @@
# Unit tests package

View file

@ -0,0 +1 @@
# Infrastructure tests package

View file

@ -0,0 +1 @@
# Loaders tests package

View file

@ -0,0 +1,252 @@
import pytest
import tempfile
import os
from unittest.mock import Mock, AsyncMock
from cognee.infrastructure.loaders.LoaderEngine import LoaderEngine
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
class MockLoader(LoaderInterface):
"""Mock loader for testing."""
def __init__(self, name="mock_loader", extensions=None, mime_types=None, fail_deps=False):
self._name = name
self._extensions = extensions or [".mock"]
self._mime_types = mime_types or ["application/mock"]
self._fail_deps = fail_deps
@property
def supported_extensions(self):
return self._extensions
@property
def supported_mime_types(self):
return self._mime_types
@property
def loader_name(self):
return self._name
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
ext = os.path.splitext(file_path)[1].lower()
return ext in self._extensions or mime_type in self._mime_types
async def load(self, file_path: str, **kwargs) -> LoaderResult:
return LoaderResult(
content=f"Mock content from {self._name}",
metadata={"loader": self._name, "name": os.path.basename(file_path)},
content_type=ContentType.TEXT,
)
def validate_dependencies(self) -> bool:
return not self._fail_deps
class TestLoaderEngine:
"""Test the LoaderEngine class."""
@pytest.fixture
def engine(self):
"""Create a LoaderEngine instance for testing."""
return LoaderEngine(
loader_directories=[],
default_loader_priority=["loader1", "loader2"],
fallback_loader="fallback",
enable_dependency_validation=True,
)
def test_engine_initialization(self, engine):
"""Test LoaderEngine initialization."""
assert engine.loader_directories == []
assert engine.default_loader_priority == ["loader1", "loader2"]
assert engine.fallback_loader == "fallback"
assert engine.enable_dependency_validation is True
assert len(engine.get_available_loaders()) == 0
def test_register_loader_success(self, engine):
"""Test successful loader registration."""
loader = MockLoader("test_loader", [".test"])
success = engine.register_loader(loader)
assert success is True
assert "test_loader" in engine.get_available_loaders()
assert engine._loaders["test_loader"] == loader
assert ".test" in engine._extension_map
assert "application/mock" in engine._mime_type_map
def test_register_loader_with_failed_dependencies(self, engine):
"""Test loader registration with failed dependency validation."""
loader = MockLoader("test_loader", [".test"], fail_deps=True)
success = engine.register_loader(loader)
assert success is False
assert "test_loader" not in engine.get_available_loaders()
def test_register_loader_without_dependency_validation(self):
"""Test loader registration without dependency validation."""
engine = LoaderEngine(
loader_directories=[], default_loader_priority=[], enable_dependency_validation=False
)
loader = MockLoader("test_loader", [".test"], fail_deps=True)
success = engine.register_loader(loader)
assert success is True
assert "test_loader" in engine.get_available_loaders()
def test_get_loader_by_extension(self, engine):
"""Test getting loader by file extension."""
loader1 = MockLoader("loader1", [".txt"])
loader2 = MockLoader("loader2", [".pdf"])
engine.register_loader(loader1)
engine.register_loader(loader2)
result = engine.get_loader("test.txt")
assert result == loader1
result = engine.get_loader("test.pdf")
assert result == loader2
result = engine.get_loader("test.unknown")
assert result is None
def test_get_loader_by_mime_type(self, engine):
"""Test getting loader by MIME type."""
loader = MockLoader("loader", [".txt"], ["text/plain"])
engine.register_loader(loader)
result = engine.get_loader("test.unknown", mime_type="text/plain")
assert result == loader
result = engine.get_loader("test.unknown", mime_type="application/pdf")
assert result is None
def test_get_loader_with_preferences(self, engine):
"""Test getting loader with preferred loaders."""
loader1 = MockLoader("loader1", [".txt"])
loader2 = MockLoader("loader2", [".txt"])
engine.register_loader(loader1)
engine.register_loader(loader2)
# Should get preferred loader
result = engine.get_loader("test.txt", preferred_loaders=["loader2"])
assert result == loader2
# Should fallback to first available if preferred not found
result = engine.get_loader("test.txt", preferred_loaders=["nonexistent"])
assert result in [loader1, loader2] # One of them should be returned
def test_get_loader_with_priority(self, engine):
"""Test loader selection with priority order."""
engine.default_loader_priority = ["priority_loader", "other_loader"]
priority_loader = MockLoader("priority_loader", [".txt"])
other_loader = MockLoader("other_loader", [".txt"])
# Register in reverse order
engine.register_loader(other_loader)
engine.register_loader(priority_loader)
# Should get priority loader even though other was registered first
result = engine.get_loader("test.txt")
assert result == priority_loader
def test_get_loader_fallback(self, engine):
"""Test fallback loader selection."""
fallback_loader = MockLoader("fallback", [".txt"])
other_loader = MockLoader("other", [".pdf"])
engine.register_loader(fallback_loader)
engine.register_loader(other_loader)
engine.fallback_loader = "fallback"
# For .txt file, fallback should be considered
result = engine.get_loader("test.txt")
assert result == fallback_loader
# For unknown extension, should still get fallback if it can handle
result = engine.get_loader("test.unknown")
assert result == fallback_loader
@pytest.mark.asyncio
async def test_load_file_success(self, engine):
"""Test successful file loading."""
loader = MockLoader("test_loader", [".txt"])
engine.register_loader(loader)
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
f.write(b"test content")
temp_path = f.name
try:
result = await engine.load_file(temp_path)
assert result.content == "Mock content from test_loader"
assert result.metadata["loader"] == "test_loader"
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
@pytest.mark.asyncio
async def test_load_file_no_loader(self, engine):
"""Test file loading when no suitable loader is found."""
with pytest.raises(ValueError, match="No loader found for file"):
await engine.load_file("test.unknown")
@pytest.mark.asyncio
async def test_load_file_with_preferences(self, engine):
"""Test file loading with preferred loaders."""
loader1 = MockLoader("loader1", [".txt"])
loader2 = MockLoader("loader2", [".txt"])
engine.register_loader(loader1)
engine.register_loader(loader2)
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
f.write(b"test content")
temp_path = f.name
try:
result = await engine.load_file(temp_path, preferred_loaders=["loader2"])
assert result.metadata["loader"] == "loader2"
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
def test_get_loader_info(self, engine):
"""Test getting loader information."""
loader = MockLoader("test_loader", [".txt"], ["text/plain"])
engine.register_loader(loader)
info = engine.get_loader_info("test_loader")
assert info["name"] == "test_loader"
assert info["extensions"] == [".txt"]
assert info["mime_types"] == ["text/plain"]
assert info["available"] is True
# Test non-existent loader
info = engine.get_loader_info("nonexistent")
assert info == {}
def test_discover_loaders_empty_directory(self, engine):
"""Test loader discovery with empty directory."""
with tempfile.TemporaryDirectory() as temp_dir:
engine.loader_directories = [temp_dir]
engine.discover_loaders()
# Should not find any loaders in empty directory
assert len(engine.get_available_loaders()) == 0
def test_discover_loaders_nonexistent_directory(self, engine):
"""Test loader discovery with non-existent directory."""
engine.loader_directories = ["/nonexistent/directory"]
# Should not raise exception, just log warning
engine.discover_loaders()
assert len(engine.get_available_loaders()) == 0

View file

@ -0,0 +1,99 @@
import pytest
import tempfile
import os
from unittest.mock import AsyncMock
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
class TestLoaderInterface:
"""Test the LoaderInterface abstract base class."""
def test_loader_interface_is_abstract(self):
"""Test that LoaderInterface cannot be instantiated directly."""
with pytest.raises(TypeError):
LoaderInterface()
def test_dependency_validation_with_no_dependencies(self):
"""Test dependency validation when no dependencies are required."""
class MockLoader(LoaderInterface):
@property
def supported_extensions(self):
return [".txt"]
@property
def supported_mime_types(self):
return ["text/plain"]
@property
def loader_name(self):
return "mock_loader"
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
return True
async def load(self, file_path: str, **kwargs) -> LoaderResult:
return LoaderResult(content="test", metadata={}, content_type=ContentType.TEXT)
loader = MockLoader()
assert loader.validate_dependencies() is True
assert loader.get_dependencies() == []
def test_dependency_validation_with_missing_dependencies(self):
"""Test dependency validation with missing dependencies."""
class MockLoaderWithDeps(LoaderInterface):
@property
def supported_extensions(self):
return [".txt"]
@property
def supported_mime_types(self):
return ["text/plain"]
@property
def loader_name(self):
return "mock_loader_deps"
def get_dependencies(self):
return ["non_existent_package>=1.0.0"]
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
return True
async def load(self, file_path: str, **kwargs) -> LoaderResult:
return LoaderResult(content="test", metadata={}, content_type=ContentType.TEXT)
loader = MockLoaderWithDeps()
assert loader.validate_dependencies() is False
assert "non_existent_package>=1.0.0" in loader.get_dependencies()
def test_dependency_validation_with_existing_dependencies(self):
"""Test dependency validation with existing dependencies."""
class MockLoaderWithExistingDeps(LoaderInterface):
@property
def supported_extensions(self):
return [".txt"]
@property
def supported_mime_types(self):
return ["text/plain"]
@property
def loader_name(self):
return "mock_loader_existing"
def get_dependencies(self):
return ["os"] # Built-in module that always exists
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
return True
async def load(self, file_path: str, **kwargs) -> LoaderResult:
return LoaderResult(content="test", metadata={}, content_type=ContentType.TEXT)
loader = MockLoaderWithExistingDeps()
assert loader.validate_dependencies() is True

View file

@ -0,0 +1,197 @@
import pytest
import tempfile
import os
from pathlib import Path
from cognee.infrastructure.loaders.core.text_loader import TextLoader
from cognee.infrastructure.loaders.models.LoaderResult import ContentType
class TestTextLoader:
"""Test the TextLoader implementation."""
@pytest.fixture
def text_loader(self):
"""Create a TextLoader instance for testing."""
return TextLoader()
@pytest.fixture
def temp_text_file(self):
"""Create a temporary text file for testing."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
f.write("This is a test file.\nIt has multiple lines.\n")
temp_path = f.name
yield temp_path
# Cleanup
if os.path.exists(temp_path):
os.unlink(temp_path)
@pytest.fixture
def temp_binary_file(self):
"""Create a temporary binary file for testing."""
with tempfile.NamedTemporaryFile(mode="wb", suffix=".bin", delete=False) as f:
f.write(b"\x00\x01\x02\x03\x04\x05")
temp_path = f.name
yield temp_path
# Cleanup
if os.path.exists(temp_path):
os.unlink(temp_path)
def test_loader_properties(self, text_loader):
"""Test basic loader properties."""
assert text_loader.loader_name == "text_loader"
assert ".txt" in text_loader.supported_extensions
assert ".md" in text_loader.supported_extensions
assert "text/plain" in text_loader.supported_mime_types
assert "application/json" in text_loader.supported_mime_types
def test_can_handle_by_extension(self, text_loader):
"""Test file handling by extension."""
assert text_loader.can_handle("test.txt")
assert text_loader.can_handle("test.md")
assert text_loader.can_handle("test.json")
assert text_loader.can_handle("test.TXT") # Case insensitive
assert not text_loader.can_handle("test.pdf")
def test_can_handle_by_mime_type(self, text_loader):
"""Test file handling by MIME type."""
assert text_loader.can_handle("test.unknown", mime_type="text/plain")
assert text_loader.can_handle("test.unknown", mime_type="application/json")
assert not text_loader.can_handle("test.unknown", mime_type="application/pdf")
def test_can_handle_text_file_heuristic(self, text_loader, temp_text_file):
"""Test handling of text files by content heuristic."""
# Remove extension to force heuristic check
no_ext_path = temp_text_file.replace(".txt", "")
os.rename(temp_text_file, no_ext_path)
try:
assert text_loader.can_handle(no_ext_path)
finally:
if os.path.exists(no_ext_path):
os.unlink(no_ext_path)
def test_cannot_handle_binary_file(self, text_loader, temp_binary_file):
"""Test that binary files are not handled."""
assert not text_loader.can_handle(temp_binary_file)
@pytest.mark.asyncio
async def test_load_text_file(self, text_loader, temp_text_file):
"""Test loading a text file."""
result = await text_loader.load(temp_text_file)
assert isinstance(result.content, str)
assert "This is a test file." in result.content
assert result.content_type == ContentType.TEXT
assert result.metadata["loader"] == "text_loader"
assert result.metadata["name"] == os.path.basename(temp_text_file)
assert result.metadata["lines"] == 2
assert result.metadata["encoding"] == "utf-8"
assert result.source_info["file_path"] == temp_text_file
@pytest.mark.asyncio
async def test_load_with_custom_encoding(self, text_loader):
"""Test loading with custom encoding."""
# Create a file with latin-1 encoding
with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt", delete=False, encoding="latin-1"
) as f:
f.write("Test with åéîøü characters")
temp_path = f.name
try:
result = await text_loader.load(temp_path, encoding="latin-1")
assert "åéîøü" in result.content
assert result.metadata["encoding"] == "latin-1"
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
@pytest.mark.asyncio
async def test_load_with_fallback_encoding(self, text_loader):
"""Test automatic fallback to latin-1 encoding."""
# Create a file with latin-1 content but try to read as utf-8
with tempfile.NamedTemporaryFile(mode="wb", suffix=".txt", delete=False) as f:
# Write latin-1 encoded bytes that are invalid in utf-8
f.write(b"Test with \xe5\xe9\xee\xf8\xfc characters")
temp_path = f.name
try:
# Should automatically fallback to latin-1
result = await text_loader.load(temp_path)
assert result.metadata["encoding"] == "latin-1"
assert len(result.content) > 0
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
@pytest.mark.asyncio
async def test_load_nonexistent_file(self, text_loader):
"""Test loading a file that doesn't exist."""
with pytest.raises(FileNotFoundError):
await text_loader.load("/nonexistent/file.txt")
@pytest.mark.asyncio
async def test_load_empty_file(self, text_loader):
"""Test loading an empty file."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
# Create empty file
temp_path = f.name
try:
result = await text_loader.load(temp_path)
assert result.content == ""
assert result.metadata["lines"] == 0
assert result.metadata["characters"] == 0
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
def test_no_dependencies(self, text_loader):
"""Test that TextLoader has no external dependencies."""
assert text_loader.get_dependencies() == []
assert text_loader.validate_dependencies() is True
def test_can_handle_path_object(self, text_loader):
"""Test that can_handle works with Path objects."""
path_obj = Path("test.txt")
assert text_loader.can_handle(path_obj)
path_obj = Path("test.pdf")
assert not text_loader.can_handle(path_obj)
# Test case insensitive
path_obj = Path("test.TXT")
assert text_loader.can_handle(path_obj)
def test_can_handle_path_object_with_mime_type(self, text_loader):
"""Test that can_handle works with Path objects and MIME type."""
path_obj = Path("test.unknown")
assert text_loader.can_handle(path_obj, mime_type="text/plain")
assert not text_loader.can_handle(path_obj, mime_type="application/pdf")
@pytest.mark.asyncio
async def test_load_path_object(self, text_loader, temp_text_file):
"""Test loading a file using a Path object."""
path_obj = Path(temp_text_file)
result = await text_loader.load(path_obj)
assert isinstance(result.content, str)
assert "This is a test file." in result.content
assert result.content_type == ContentType.TEXT
assert result.metadata["loader"] == "text_loader"
assert result.metadata["name"] == path_obj.name
assert result.metadata["lines"] == 2
assert result.metadata["encoding"] == "utf-8"
assert result.source_info["file_path"] == str(path_obj)
@pytest.mark.asyncio
async def test_load_path_object_nonexistent(self, text_loader):
"""Test loading a nonexistent file using a Path object."""
path_obj = Path("/nonexistent/file.txt")
with pytest.raises(FileNotFoundError):
await text_loader.load(path_obj)

View file

@ -0,0 +1,129 @@
import pytest
import tempfile
import os
from pathlib import Path
from cognee.tasks.ingestion.save_data_item_to_storage import save_data_item_to_storage
from cognee.tasks.ingestion.resolve_data_directories import resolve_data_directories
class TestPathSupport:
"""Test Path type support in ingestion functions."""
@pytest.fixture
def temp_text_file(self):
"""Create a temporary text file for testing."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
f.write("This is a test file for Path support.\n")
temp_path = f.name
yield temp_path
# Cleanup
if os.path.exists(temp_path):
os.unlink(temp_path)
@pytest.fixture
def temp_directory(self):
"""Create a temporary directory with test files."""
import tempfile
temp_dir = tempfile.mkdtemp()
# Create some test files
for i in range(3):
with open(os.path.join(temp_dir, f"test_{i}.txt"), "w") as f:
f.write(f"Test file {i} content.\n")
yield temp_dir
# Cleanup
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.mark.asyncio
async def test_save_data_item_path_object(self, temp_text_file):
"""Test save_data_item_to_storage with Path object."""
path_obj = Path(temp_text_file)
result = await save_data_item_to_storage(path_obj)
# Should return a file:// URL
assert result.startswith("file://")
assert str(path_obj.resolve()) in result
@pytest.mark.asyncio
async def test_save_data_item_string_vs_path(self, temp_text_file):
"""Test that Path object vs string path are handled consistently."""
path_obj = Path(temp_text_file)
string_path = str(path_obj.resolve())
# Both should work and produce similar results
result_path = await save_data_item_to_storage(path_obj)
result_string = await save_data_item_to_storage(string_path)
# Both should be file:// URLs pointing to the same file
assert result_path.startswith("file://")
assert result_string.startswith("file://")
# Extract the actual file paths from the URLs
path_from_path_obj = result_path.replace("file://", "")
path_from_string = result_string.replace("file://", "")
# They should resolve to the same absolute path
assert os.path.normpath(path_from_path_obj) == os.path.normpath(path_from_string)
@pytest.mark.asyncio
async def test_save_data_item_text_content(self):
"""Test that plain text strings are handled as content, not paths."""
text_content = "This is plain text content, not a file path."
result = await save_data_item_to_storage(text_content)
# Should create a file and return file:// URL since this is text content
assert result.startswith("file://")
@pytest.mark.asyncio
async def test_resolve_data_directories_path_object(self, temp_directory):
"""Test resolve_data_directories with Path object."""
path_obj = Path(temp_directory)
result = await resolve_data_directories([path_obj])
# Should return a list of Path objects for the files in the directory
assert len(result) == 3 # We created 3 test files
assert all(isinstance(item, Path) for item in result)
assert all(item.suffix == ".txt" for item in result)
@pytest.mark.asyncio
async def test_resolve_data_directories_mixed_types(self, temp_directory, temp_text_file):
"""Test resolve_data_directories with mixed Path and string types."""
path_obj = Path(temp_text_file)
string_path = str(temp_text_file)
directory_path = Path(temp_directory)
# Mix of types
mixed_data = [path_obj, string_path, directory_path]
result = await resolve_data_directories(mixed_data)
# Should have:
# - 1 Path object (original file as Path)
# - 1 string (original file as string)
# - 3 Path objects (from directory expansion)
assert len(result) == 5
# Count types
path_objects = [item for item in result if isinstance(item, Path)]
string_objects = [item for item in result if isinstance(item, str)]
assert len(path_objects) == 4 # 1 original + 3 from directory
assert len(string_objects) == 1 # 1 original string
@pytest.mark.asyncio
async def test_resolve_data_directories_path_single_file(self, temp_text_file):
"""Test resolve_data_directories with a single Path file."""
path_obj = Path(temp_text_file)
result = await resolve_data_directories([path_obj])
# Should return the same Path object
assert len(result) == 1
assert isinstance(result[0], Path)
assert str(result[0]) == str(path_obj)