added loader separation

This commit is contained in:
vasilije 2025-07-13 20:24:52 +02:00
parent bd892652ad
commit 98882ba1d1
40 changed files with 3065 additions and 89 deletions

View file

@ -15,14 +15,19 @@ async def add(
vector_db_config: dict = None,
graph_db_config: dict = None,
dataset_id: UUID = None,
preferred_loaders: Optional[List[str]] = None,
loader_config: Optional[dict] = None,
):
"""
Add data to Cognee for knowledge graph processing.
Add data to Cognee for knowledge graph processing using a plugin-based loader system.
This is the first step in the Cognee workflow - it ingests raw data and prepares it
for processing. The function accepts various data formats including text, files, and
binary streams, then stores them in a specified dataset for further processing.
This version supports both the original ingestion system (for backward compatibility)
and the new plugin-based loader system (when loader parameters are provided).
Prerequisites:
- **LLM_API_KEY**: Must be set in environment variables for content processing
- **Database Setup**: Relational and vector databases must be configured
@ -38,16 +43,38 @@ async def add(
- **Lists**: Multiple files or text strings in a single call
Supported File Formats:
- Text files (.txt, .md, .csv)
- PDFs (.pdf)
- Text files (.txt, .md, .csv) - processed by text_loader
- PDFs (.pdf) - processed by pypdf_loader (if available)
- Images (.png, .jpg, .jpeg) - extracted via OCR/vision models
- Audio files (.mp3, .wav) - transcribed to text
- Code files (.py, .js, .ts, etc.) - parsed for structure and content
- Office documents (.docx, .pptx)
- Office documents (.docx, .pptx) - processed by unstructured_loader (if available)
- Data files (.json, .jsonl, .parquet) - processed by dlt_loader (if available)
Workflow:
Plugin System:
The function automatically uses the best available loader for each file type.
You can customize this behavior using the loader parameters:
```python
# Use specific loaders in priority order
await cognee.add(
"/path/to/document.pdf",
preferred_loaders=["pypdf_loader", "text_loader"]
)
# Configure loader-specific options
await cognee.add(
"/path/to/document.pdf",
loader_config={
"pypdf_loader": {"strict": False},
"unstructured_loader": {"strategy": "hi_res"}
}
)
```
Workflow:
1. **Data Resolution**: Resolves file paths and validates accessibility
2. **Content Extraction**: Extracts text content from various file formats
2. **Content Extraction**: Uses plugin system or falls back to existing classification
3. **Dataset Storage**: Stores processed content in the specified dataset
4. **Metadata Tracking**: Records file metadata, timestamps, and user permissions
5. **Permission Assignment**: Grants user read/write/delete/share permissions on dataset
@ -70,6 +97,10 @@ async def add(
vector_db_config: Optional configuration for vector database (for custom setups).
graph_db_config: Optional configuration for graph database (for custom setups).
dataset_id: Optional specific dataset UUID to use instead of dataset_name.
preferred_loaders: Optional list of loader names to try first (e.g., ["pypdf_loader", "text_loader"]).
If not provided, uses default loader priority.
loader_config: Optional configuration for specific loaders. Dictionary mapping loader names
to their configuration options (e.g., {"pypdf_loader": {"strict": False}}).
Returns:
PipelineRunInfo: Information about the ingestion pipeline execution including:
@ -138,10 +169,32 @@ async def add(
UnsupportedFileTypeError: If file format cannot be processed
InvalidValueError: If LLM_API_KEY is not set or invalid
"""
tasks = [
Task(resolve_data_directories, include_subdirectories=True),
Task(ingest_data, dataset_name, user, node_set, dataset_id),
]
# Determine which ingestion system to use
use_plugin_system = preferred_loaders is not None or loader_config is not None
if use_plugin_system:
# Use new plugin-based ingestion system
from cognee.tasks.ingestion.plugin_ingest_data import plugin_ingest_data
tasks = [
Task(resolve_data_directories, include_subdirectories=True),
Task(
plugin_ingest_data,
dataset_name,
user,
node_set,
dataset_id,
preferred_loaders,
loader_config,
),
]
else:
# Use existing ingestion system for backward compatibility
tasks = [
Task(resolve_data_directories, include_subdirectories=True),
Task(ingest_data, dataset_name, user, node_set, dataset_id),
]
pipeline_run_info = None

View file

@ -0,0 +1,237 @@
import os
import importlib.util
from typing import Dict, List, Optional
from .LoaderInterface import LoaderInterface
from .models.LoaderResult import LoaderResult
from cognee.shared.logging_utils import get_logger
class LoaderEngine:
"""
Main loader engine for managing file loaders.
Follows cognee's adapter pattern similar to database engines,
providing a centralized system for file loading operations.
"""
def __init__(
self,
loader_directories: List[str],
default_loader_priority: List[str],
fallback_loader: str = "text_loader",
enable_dependency_validation: bool = True,
):
"""
Initialize the loader engine.
Args:
loader_directories: Directories to search for loader implementations
default_loader_priority: Priority order for loader selection
fallback_loader: Default loader to use when no other matches
enable_dependency_validation: Whether to validate loader dependencies
"""
self._loaders: Dict[str, LoaderInterface] = {}
self._extension_map: Dict[str, List[LoaderInterface]] = {}
self._mime_type_map: Dict[str, List[LoaderInterface]] = {}
self.loader_directories = loader_directories
self.default_loader_priority = default_loader_priority
self.fallback_loader = fallback_loader
self.enable_dependency_validation = enable_dependency_validation
self.logger = get_logger(__name__)
def register_loader(self, loader: LoaderInterface) -> bool:
"""
Register a loader with the engine.
Args:
loader: LoaderInterface implementation to register
Returns:
True if loader was registered successfully, False otherwise
"""
# Validate dependencies if enabled
if self.enable_dependency_validation and not loader.validate_dependencies():
self.logger.warning(
f"Skipping loader '{loader.loader_name}' - missing dependencies: "
f"{loader.get_dependencies()}"
)
return False
self._loaders[loader.loader_name] = loader
# Map extensions to loaders
for ext in loader.supported_extensions:
ext_lower = ext.lower()
if ext_lower not in self._extension_map:
self._extension_map[ext_lower] = []
self._extension_map[ext_lower].append(loader)
# Map mime types to loaders
for mime_type in loader.supported_mime_types:
if mime_type not in self._mime_type_map:
self._mime_type_map[mime_type] = []
self._mime_type_map[mime_type].append(loader)
self.logger.info(f"Registered loader: {loader.loader_name}")
return True
def get_loader(
self, file_path: str, mime_type: str = None, preferred_loaders: List[str] = None
) -> Optional[LoaderInterface]:
"""
Get appropriate loader for a file.
Args:
file_path: Path to the file to be processed
mime_type: Optional MIME type of the file
preferred_loaders: List of preferred loader names to try first
Returns:
LoaderInterface that can handle the file, or None if not found
"""
ext = os.path.splitext(file_path)[1].lower()
# Try preferred loaders first
if preferred_loaders:
for loader_name in preferred_loaders:
if loader_name in self._loaders:
loader = self._loaders[loader_name]
if loader.can_handle(file_path, mime_type):
return loader
# Try priority order
for loader_name in self.default_loader_priority:
if loader_name in self._loaders:
loader = self._loaders[loader_name]
if loader.can_handle(file_path, mime_type):
return loader
# Try mime type mapping
if mime_type and mime_type in self._mime_type_map:
for loader in self._mime_type_map[mime_type]:
if loader.can_handle(file_path, mime_type):
return loader
# Try extension mapping
if ext in self._extension_map:
for loader in self._extension_map[ext]:
if loader.can_handle(file_path, mime_type):
return loader
# Fallback loader
if self.fallback_loader in self._loaders:
fallback = self._loaders[self.fallback_loader]
if fallback.can_handle(file_path, mime_type):
return fallback
return None
async def load_file(
self, file_path: str, mime_type: str = None, preferred_loaders: List[str] = None, **kwargs
) -> LoaderResult:
"""
Load file using appropriate loader.
Args:
file_path: Path to the file to be processed
mime_type: Optional MIME type of the file
preferred_loaders: List of preferred loader names to try first
**kwargs: Additional loader-specific configuration
Returns:
LoaderResult containing processed content and metadata
Raises:
ValueError: If no suitable loader is found
Exception: If file processing fails
"""
loader = self.get_loader(file_path, mime_type, preferred_loaders)
if not loader:
raise ValueError(f"No loader found for file: {file_path}")
self.logger.debug(f"Loading {file_path} with {loader.loader_name}")
return await loader.load(file_path, **kwargs)
def discover_loaders(self):
"""
Auto-discover loaders from configured directories.
Scans loader directories for Python modules containing
LoaderInterface implementations and registers them.
"""
for directory in self.loader_directories:
if os.path.exists(directory):
self._discover_in_directory(directory)
def _discover_in_directory(self, directory: str):
"""
Discover loaders in a specific directory.
Args:
directory: Directory path to scan for loader implementations
"""
try:
for file_name in os.listdir(directory):
if file_name.endswith(".py") and not file_name.startswith("_"):
module_name = file_name[:-3]
file_path = os.path.join(directory, file_name)
try:
spec = importlib.util.spec_from_file_location(module_name, file_path)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Look for loader classes
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, LoaderInterface)
and attr != LoaderInterface
):
# Instantiate and register the loader
try:
loader_instance = attr()
self.register_loader(loader_instance)
except Exception as e:
self.logger.warning(
f"Failed to instantiate loader {attr_name}: {e}"
)
except Exception as e:
self.logger.warning(f"Failed to load module {module_name}: {e}")
except OSError as e:
self.logger.warning(f"Failed to scan directory {directory}: {e}")
def get_available_loaders(self) -> List[str]:
"""
Get list of available loader names.
Returns:
List of registered loader names
"""
return list(self._loaders.keys())
def get_loader_info(self, loader_name: str) -> Dict[str, any]:
"""
Get information about a specific loader.
Args:
loader_name: Name of the loader to inspect
Returns:
Dictionary containing loader information
"""
if loader_name not in self._loaders:
return {}
loader = self._loaders[loader_name]
return {
"name": loader.loader_name,
"extensions": loader.supported_extensions,
"mime_types": loader.supported_mime_types,
"dependencies": loader.get_dependencies(),
"available": loader.validate_dependencies(),
}

View file

@ -0,0 +1,101 @@
from abc import ABC, abstractmethod
from typing import List
from .models.LoaderResult import LoaderResult
class LoaderInterface(ABC):
"""
Base interface for all file loaders in cognee.
This interface follows cognee's established pattern for database adapters,
ensuring consistent behavior across all loader implementations.
"""
@property
@abstractmethod
def supported_extensions(self) -> List[str]:
"""
List of file extensions this loader supports.
Returns:
List of extensions including the dot (e.g., ['.txt', '.md'])
"""
pass
@property
@abstractmethod
def supported_mime_types(self) -> List[str]:
"""
List of MIME types this loader supports.
Returns:
List of MIME type strings (e.g., ['text/plain', 'application/pdf'])
"""
pass
@property
@abstractmethod
def loader_name(self) -> str:
"""
Unique name identifier for this loader.
Returns:
String identifier used for registration and configuration
"""
pass
@abstractmethod
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
"""
Check if this loader can handle the given file.
Args:
file_path: Path to the file to be processed
mime_type: Optional MIME type of the file
Returns:
True if this loader can process the file, False otherwise
"""
pass
@abstractmethod
async def load(self, file_path: str, **kwargs) -> LoaderResult:
"""
Load and process the file, returning standardized result.
Args:
file_path: Path to the file to be processed
**kwargs: Additional loader-specific configuration
Returns:
LoaderResult containing processed content and metadata
Raises:
Exception: If file cannot be processed
"""
pass
def get_dependencies(self) -> List[str]:
"""
Optional: Return list of required dependencies for this loader.
Returns:
List of package names with optional version specifications
"""
return []
def validate_dependencies(self) -> bool:
"""
Check if all required dependencies are available.
Returns:
True if all dependencies are installed, False otherwise
"""
for dep in self.get_dependencies():
# Extract package name from version specification
package_name = dep.split(">=")[0].split("==")[0].split("<")[0]
try:
__import__(package_name)
except ImportError:
return False
return True

View file

@ -0,0 +1,19 @@
"""
File loader infrastructure for cognee.
This package provides a plugin-based system for loading different file formats
into cognee, following the same patterns as database adapters.
Main exports:
- get_loader_engine(): Factory function to get configured loader engine
- use_loader(): Register custom loaders at runtime
- LoaderInterface: Base interface for implementing loaders
- LoaderResult, ContentType: Data models for loader results
"""
from .get_loader_engine import get_loader_engine
from .use_loader import use_loader
from .LoaderInterface import LoaderInterface
from .models.LoaderResult import LoaderResult, ContentType
__all__ = ["get_loader_engine", "use_loader", "LoaderInterface", "LoaderResult", "ContentType"]

View file

@ -0,0 +1,57 @@
from functools import lru_cache
from typing import List, Optional, Dict, Any
from pydantic_settings import BaseSettings, SettingsConfigDict
from cognee.root_dir import get_absolute_path
class LoaderConfig(BaseSettings):
"""
Configuration for file loader system.
Follows cognee's pattern using pydantic_settings.BaseSettings for
environment variable support and validation.
"""
loader_directories: List[str] = [
get_absolute_path("infrastructure/loaders/core"),
get_absolute_path("infrastructure/loaders/external"),
]
default_loader_priority: List[str] = [
"text_loader",
"pypdf_loader",
"unstructured_loader",
"dlt_loader",
]
auto_discover: bool = True
fallback_loader: str = "text_loader"
enable_dependency_validation: bool = True
model_config = SettingsConfigDict(env_file=".env", extra="allow", env_prefix="LOADER_")
def to_dict(self) -> Dict[str, Any]:
"""
Convert configuration to dictionary format.
Returns:
Dict containing all loader configuration settings
"""
return {
"loader_directories": self.loader_directories,
"default_loader_priority": self.default_loader_priority,
"auto_discover": self.auto_discover,
"fallback_loader": self.fallback_loader,
"enable_dependency_validation": self.enable_dependency_validation,
}
@lru_cache
def get_loader_config() -> LoaderConfig:
"""
Get cached loader configuration.
Uses LRU cache following cognee's pattern for configuration objects.
Returns:
LoaderConfig instance with current settings
"""
return LoaderConfig()

View file

@ -0,0 +1,5 @@
"""Core loader implementations that are always available."""
from .text_loader import TextLoader
__all__ = ["TextLoader"]

View file

@ -0,0 +1,128 @@
import os
from typing import List
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
class TextLoader(LoaderInterface):
"""
Core text file loader that handles basic text file formats.
This loader is always available and serves as the fallback for
text-based files when no specialized loader is available.
"""
@property
def supported_extensions(self) -> List[str]:
"""Supported text file extensions."""
return [".txt", ".md", ".csv", ".json", ".xml", ".yaml", ".yml", ".log"]
@property
def supported_mime_types(self) -> List[str]:
"""Supported MIME types for text content."""
return [
"text/plain",
"text/markdown",
"text/csv",
"application/json",
"text/xml",
"application/xml",
"text/yaml",
"application/yaml",
]
@property
def loader_name(self) -> str:
"""Unique identifier for this loader."""
return "text_loader"
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
"""
Check if this loader can handle the given file.
Args:
file_path: Path to the file
mime_type: Optional MIME type
Returns:
True if file can be handled, False otherwise
"""
# Check by extension
ext = os.path.splitext(file_path)[1].lower()
if ext in self.supported_extensions:
return True
# Check by MIME type
if mime_type and mime_type in self.supported_mime_types:
return True
# As fallback loader, can attempt to handle any text-like file
# This is useful when other loaders fail
try:
# Quick check if file appears to be text
with open(file_path, "rb") as f:
sample = f.read(512)
# Simple heuristic: if most bytes are printable, consider it text
if sample:
try:
sample.decode("utf-8")
return True
except UnicodeDecodeError:
try:
sample.decode("latin-1")
return True
except UnicodeDecodeError:
pass
except (OSError, IOError):
pass
return False
async def load(self, file_path: str, encoding: str = "utf-8", **kwargs) -> LoaderResult:
"""
Load and process the text file.
Args:
file_path: Path to the file to load
encoding: Text encoding to use (default: utf-8)
**kwargs: Additional configuration (unused)
Returns:
LoaderResult containing the file content and metadata
Raises:
FileNotFoundError: If file doesn't exist
UnicodeDecodeError: If file cannot be decoded with specified encoding
OSError: If file cannot be read
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
try:
with open(file_path, "r", encoding=encoding) as f:
content = f.read()
except UnicodeDecodeError:
# Try with fallback encoding
if encoding == "utf-8":
return await self.load(file_path, encoding="latin-1", **kwargs)
else:
raise
# Extract basic metadata
file_stat = os.stat(file_path)
metadata = {
"name": os.path.basename(file_path),
"size": file_stat.st_size,
"extension": os.path.splitext(file_path)[1],
"encoding": encoding,
"loader": self.loader_name,
"lines": len(content.splitlines()) if content else 0,
"characters": len(content),
}
return LoaderResult(
content=content,
metadata=metadata,
content_type=ContentType.TEXT,
source_info={"file_path": file_path, "encoding": encoding},
)

View file

@ -0,0 +1,49 @@
from typing import List
from .LoaderEngine import LoaderEngine
from .supported_loaders import supported_loaders
def create_loader_engine(
loader_directories: List[str],
default_loader_priority: List[str],
auto_discover: bool = True,
fallback_loader: str = "text_loader",
enable_dependency_validation: bool = True,
) -> LoaderEngine:
"""
Create loader engine with given configuration.
Follows cognee's pattern for engine creation functions used
in database adapters.
Args:
loader_directories: Directories to search for loader implementations
default_loader_priority: Priority order for loader selection
auto_discover: Whether to auto-discover loaders from directories
fallback_loader: Default loader to use when no other matches
enable_dependency_validation: Whether to validate loader dependencies
Returns:
Configured LoaderEngine instance
"""
engine = LoaderEngine(
loader_directories=loader_directories,
default_loader_priority=default_loader_priority,
fallback_loader=fallback_loader,
enable_dependency_validation=enable_dependency_validation,
)
# Register supported loaders from registry
for loader_name, loader_class in supported_loaders.items():
try:
loader_instance = loader_class()
engine.register_loader(loader_instance)
except Exception as e:
# Log but don't fail - allow engine to continue with other loaders
engine.logger.warning(f"Failed to register loader {loader_name}: {e}")
# Auto-discover loaders if enabled
if auto_discover:
engine.discover_loaders()
return engine

View file

@ -0,0 +1,34 @@
"""
External loader implementations for cognee.
This module contains loaders that depend on external libraries:
- pypdf_loader: PDF processing using pypdf
- unstructured_loader: Document processing using unstructured
- dlt_loader: Data lake/warehouse integration using DLT
These loaders are optional and only available if their dependencies are installed.
"""
__all__ = []
# Conditional imports based on dependency availability
try:
from .pypdf_loader import PyPdfLoader
__all__.append("PyPdfLoader")
except ImportError:
pass
try:
from .unstructured_loader import UnstructuredLoader
__all__.append("UnstructuredLoader")
except ImportError:
pass
try:
from .dlt_loader import DltLoader
__all__.append("DltLoader")
except ImportError:
pass

View file

@ -0,0 +1,203 @@
import os
from typing import List, Dict, Any, Optional
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
from cognee.shared.logging_utils import get_logger
class DltLoader(LoaderInterface):
"""
Data loader using DLT (Data Load Tool) for various data sources.
Supports loading data from REST APIs, databases, cloud storage,
and other data sources through DLT pipelines.
"""
def __init__(self):
self.logger = get_logger(__name__)
@property
def supported_extensions(self) -> List[str]:
return [
".dlt", # DLT pipeline configuration
".json", # JSON data
".jsonl", # JSON Lines
".csv", # CSV data
".parquet", # Parquet files
".yaml", # YAML configuration
".yml", # YAML configuration
]
@property
def supported_mime_types(self) -> List[str]:
return [
"application/json",
"application/x-ndjson", # JSON Lines
"text/csv",
"application/x-parquet",
"application/yaml",
"text/yaml",
]
@property
def loader_name(self) -> str:
return "dlt_loader"
def get_dependencies(self) -> List[str]:
return ["dlt>=0.4.0"]
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
"""Check if file can be handled by this loader."""
# Check file extension
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext not in self.supported_extensions:
return False
# Check MIME type if provided
if mime_type and mime_type not in self.supported_mime_types:
return False
# Validate dependencies
return self.validate_dependencies()
async def load(self, file_path: str, source_type: str = "auto", **kwargs) -> LoaderResult:
"""
Load data using DLT pipeline.
Args:
file_path: Path to the data file or DLT configuration
source_type: Type of data source ("auto", "json", "csv", "parquet", "api")
**kwargs: Additional DLT pipeline configuration
Returns:
LoaderResult with loaded data and metadata
Raises:
ImportError: If DLT is not installed
Exception: If data loading fails
"""
try:
import dlt
except ImportError as e:
raise ImportError(
"dlt is required for data loading. Install with: pip install dlt"
) from e
try:
self.logger.info(f"Loading data with DLT: {file_path}")
file_ext = os.path.splitext(file_path)[1].lower()
file_name = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
# Determine source type if auto
if source_type == "auto":
if file_ext == ".json":
source_type = "json"
elif file_ext == ".jsonl":
source_type = "jsonl"
elif file_ext == ".csv":
source_type = "csv"
elif file_ext == ".parquet":
source_type = "parquet"
elif file_ext in [".yaml", ".yml"]:
source_type = "yaml"
else:
source_type = "file"
# Load data based on source type
if source_type == "json":
content = self._load_json(file_path)
elif source_type == "jsonl":
content = self._load_jsonl(file_path)
elif source_type == "csv":
content = self._load_csv(file_path)
elif source_type == "parquet":
content = self._load_parquet(file_path)
elif source_type == "yaml":
content = self._load_yaml(file_path)
else:
# Default: read as text
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Determine content type
if isinstance(content, (dict, list)):
content_type = ContentType.STRUCTURED
text_content = str(content)
else:
content_type = ContentType.TEXT
text_content = content
# Gather metadata
metadata = {
"name": file_name,
"size": file_size,
"extension": file_ext,
"loader": self.loader_name,
"source_type": source_type,
"dlt_version": dlt.__version__,
}
# Add data-specific metadata
if isinstance(content, list):
metadata["records_count"] = len(content)
elif isinstance(content, dict):
metadata["keys_count"] = len(content)
return LoaderResult(
content=text_content,
metadata=metadata,
content_type=content_type,
chunks=[text_content], # Single chunk for now
source_info={
"file_path": file_path,
"source_type": source_type,
"raw_data": content if isinstance(content, (dict, list)) else None,
},
)
except Exception as e:
self.logger.error(f"Failed to load data with DLT from {file_path}: {e}")
raise Exception(f"DLT data loading failed: {e}") from e
def _load_json(self, file_path: str) -> Dict[str, Any]:
"""Load JSON file."""
import json
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
def _load_jsonl(self, file_path: str) -> List[Dict[str, Any]]:
"""Load JSON Lines file."""
import json
data = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
data.append(json.loads(line))
return data
def _load_csv(self, file_path: str) -> str:
"""Load CSV file as text."""
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
def _load_parquet(self, file_path: str) -> str:
"""Load Parquet file (requires pandas)."""
try:
import pandas as pd
df = pd.read_parquet(file_path)
return df.to_string()
except ImportError:
# Fallback: read as binary and convert to string representation
with open(file_path, "rb") as f:
return f"<Parquet file: {os.path.basename(file_path)}, size: {len(f.read())} bytes>"
def _load_yaml(self, file_path: str) -> str:
"""Load YAML file as text."""
with open(file_path, "r", encoding="utf-8") as f:
return f.read()

View file

@ -0,0 +1,127 @@
import os
from typing import List
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
from cognee.shared.logging_utils import get_logger
class PyPdfLoader(LoaderInterface):
"""
PDF loader using pypdf library.
Extracts text content from PDF files page by page, providing
structured page information and handling PDF-specific errors.
"""
def __init__(self):
self.logger = get_logger(__name__)
@property
def supported_extensions(self) -> List[str]:
return [".pdf"]
@property
def supported_mime_types(self) -> List[str]:
return ["application/pdf"]
@property
def loader_name(self) -> str:
return "pypdf_loader"
def get_dependencies(self) -> List[str]:
return ["pypdf>=4.0.0"]
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
"""Check if file can be handled by this loader."""
# Check file extension
if not file_path.lower().endswith(".pdf"):
return False
# Check MIME type if provided
if mime_type and mime_type != "application/pdf":
return False
# Validate dependencies
return self.validate_dependencies()
async def load(self, file_path: str, strict: bool = False, **kwargs) -> LoaderResult:
"""
Load PDF file and extract text content.
Args:
file_path: Path to the PDF file
strict: Whether to use strict mode for PDF reading
**kwargs: Additional arguments
Returns:
LoaderResult with extracted text content and metadata
Raises:
ImportError: If pypdf is not installed
Exception: If PDF processing fails
"""
try:
from pypdf import PdfReader
except ImportError as e:
raise ImportError(
"pypdf is required for PDF processing. Install with: pip install pypdf"
) from e
try:
with open(file_path, "rb") as file:
self.logger.info(f"Reading PDF: {file_path}")
reader = PdfReader(file, strict=strict)
content_parts = []
page_texts = []
for page_num, page in enumerate(reader.pages, 1):
try:
page_text = page.extract_text()
if page_text.strip(): # Only add non-empty pages
page_texts.append(page_text)
content_parts.append(f"Page {page_num}:\n{page_text}\n")
except Exception as e:
self.logger.warning(f"Failed to extract text from page {page_num}: {e}")
continue
# Combine all content
full_content = "\n".join(content_parts)
# Gather metadata
metadata = {
"name": os.path.basename(file_path),
"size": os.path.getsize(file_path),
"extension": ".pdf",
"pages": len(reader.pages),
"pages_with_text": len(page_texts),
"loader": self.loader_name,
}
# Add PDF metadata if available
if reader.metadata:
metadata["pdf_metadata"] = {
"title": reader.metadata.get("/Title", ""),
"author": reader.metadata.get("/Author", ""),
"subject": reader.metadata.get("/Subject", ""),
"creator": reader.metadata.get("/Creator", ""),
"producer": reader.metadata.get("/Producer", ""),
"creation_date": str(reader.metadata.get("/CreationDate", "")),
"modification_date": str(reader.metadata.get("/ModDate", "")),
}
return LoaderResult(
content=full_content,
metadata=metadata,
content_type=ContentType.TEXT,
chunks=page_texts, # Pre-chunked by page
source_info={
"file_path": file_path,
"pages": len(reader.pages),
"strict_mode": strict,
},
)
except Exception as e:
self.logger.error(f"Failed to process PDF {file_path}: {e}")
raise Exception(f"PDF processing failed: {e}") from e

View file

@ -0,0 +1,169 @@
import os
from typing import List
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
from cognee.shared.logging_utils import get_logger
class UnstructuredLoader(LoaderInterface):
"""
Document loader using the unstructured library.
Handles various document formats including docx, pptx, xlsx, odt, etc.
Uses the unstructured library's auto-partition functionality.
"""
def __init__(self):
self.logger = get_logger(__name__)
@property
def supported_extensions(self) -> List[str]:
return [
".docx",
".doc",
".odt", # Word documents
".xlsx",
".xls",
".ods", # Spreadsheets
".pptx",
".ppt",
".odp", # Presentations
".rtf",
".html",
".htm", # Rich text and HTML
".eml",
".msg", # Email formats
".epub", # eBooks
]
@property
def supported_mime_types(self) -> List[str]:
return [
"application/vnd.openxmlformats-officedocument.wordprocessingml.document", # docx
"application/msword", # doc
"application/vnd.oasis.opendocument.text", # odt
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # xlsx
"application/vnd.ms-excel", # xls
"application/vnd.oasis.opendocument.spreadsheet", # ods
"application/vnd.openxmlformats-officedocument.presentationml.presentation", # pptx
"application/vnd.ms-powerpoint", # ppt
"application/vnd.oasis.opendocument.presentation", # odp
"application/rtf", # rtf
"text/html", # html
"message/rfc822", # eml
"application/epub+zip", # epub
]
@property
def loader_name(self) -> str:
return "unstructured_loader"
def get_dependencies(self) -> List[str]:
return ["unstructured>=0.10.0"]
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
"""Check if file can be handled by this loader."""
# Check file extension
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext not in self.supported_extensions:
return False
# Check MIME type if provided
if mime_type and mime_type not in self.supported_mime_types:
return False
# Validate dependencies
return self.validate_dependencies()
async def load(self, file_path: str, strategy: str = "auto", **kwargs) -> LoaderResult:
"""
Load document using unstructured library.
Args:
file_path: Path to the document file
strategy: Partitioning strategy ("auto", "fast", "hi_res", "ocr_only")
**kwargs: Additional arguments passed to unstructured partition
Returns:
LoaderResult with extracted text content and metadata
Raises:
ImportError: If unstructured is not installed
Exception: If document processing fails
"""
try:
from unstructured.partition.auto import partition
except ImportError as e:
raise ImportError(
"unstructured is required for document processing. "
"Install with: pip install unstructured"
) from e
try:
self.logger.info(f"Processing document: {file_path}")
# Determine content type from file extension
file_ext = os.path.splitext(file_path)[1].lower()
content_type_hint = None
# Get file size and basic info
file_size = os.path.getsize(file_path)
file_name = os.path.basename(file_path)
# Set partitioning parameters
partition_kwargs = {"filename": file_path, "strategy": strategy, **kwargs}
# Use partition to extract elements
elements = partition(**partition_kwargs)
# Process elements into text content
text_parts = []
element_info = []
for element in elements:
element_text = str(element).strip()
if element_text:
text_parts.append(element_text)
element_info.append(
{
"type": type(element).__name__,
"text": element_text[:100] + "..."
if len(element_text) > 100
else element_text,
}
)
# Combine all text content
full_content = "\n\n".join(text_parts)
# Determine content type based on structure
content_type = ContentType.STRUCTURED if len(element_info) > 1 else ContentType.TEXT
# Gather metadata
metadata = {
"name": file_name,
"size": file_size,
"extension": file_ext,
"loader": self.loader_name,
"elements_count": len(elements),
"text_elements_count": len(text_parts),
"strategy": strategy,
"element_types": list(set(info["type"] for info in element_info)),
}
return LoaderResult(
content=full_content,
metadata=metadata,
content_type=content_type,
chunks=text_parts, # Pre-chunked by elements
source_info={
"file_path": file_path,
"strategy": strategy,
"elements": element_info[:10], # First 10 elements for debugging
"total_elements": len(elements),
},
)
except Exception as e:
self.logger.error(f"Failed to process document {file_path}: {e}")
raise Exception(f"Document processing failed: {e}") from e

View file

@ -0,0 +1,20 @@
from functools import lru_cache
from .config import get_loader_config
from .LoaderEngine import LoaderEngine
from .create_loader_engine import create_loader_engine
@lru_cache
def get_loader_engine() -> LoaderEngine:
"""
Factory function to get loader engine.
Follows cognee's pattern with @lru_cache for efficient reuse
of engine instances. Configuration is loaded from environment
variables and settings.
Returns:
Cached LoaderEngine instance configured with current settings
"""
config = get_loader_config()
return create_loader_engine(**config.to_dict())

View file

@ -0,0 +1,47 @@
from pydantic import BaseModel
from typing import Optional, Dict, Any, List
from enum import Enum
class ContentType(Enum):
"""Content type classification for loaded files"""
TEXT = "text"
STRUCTURED = "structured"
BINARY = "binary"
class LoaderResult(BaseModel):
"""
Standardized output format for all file loaders.
This model ensures consistent data structure across all loader implementations,
following cognee's pattern of using Pydantic models for data validation.
"""
content: str # Primary text content extracted from file
metadata: Dict[str, Any] # File metadata (name, size, type, loader info, etc.)
content_type: ContentType # Content classification
chunks: Optional[List[str]] = None # Pre-chunked content if available
source_info: Optional[Dict[str, Any]] = None # Source-specific information
def to_dict(self) -> Dict[str, Any]:
"""
Convert the loader result to a dictionary format.
Returns:
Dict containing all loader result data with string-serialized content_type
"""
return {
"content": self.content,
"metadata": self.metadata,
"content_type": self.content_type.value,
"source_info": self.source_info or {},
"chunks": self.chunks,
}
class Config:
"""Pydantic configuration following cognee patterns"""
use_enum_values = True
validate_assignment = True

View file

@ -0,0 +1,3 @@
from .LoaderResult import LoaderResult, ContentType
__all__ = ["LoaderResult", "ContentType"]

View file

@ -0,0 +1,3 @@
# Registry for loader implementations
# Follows cognee's pattern used in databases/vector/supported_databases.py
supported_loaders = {}

View file

@ -0,0 +1,22 @@
from .supported_loaders import supported_loaders
def use_loader(loader_name: str, loader_class):
"""
Register a loader at runtime.
Follows cognee's pattern used in databases for adapter registration.
This allows external packages and custom loaders to be registered
into the loader system.
Args:
loader_name: Unique name for the loader
loader_class: Loader class implementing LoaderInterface
Example:
from cognee.infrastructure.loaders import use_loader
from my_package import MyCustomLoader
use_loader("my_custom_loader", MyCustomLoader)
"""
supported_loaders[loader_name] = loader_class

View file

@ -22,16 +22,16 @@ def _create_edge_key(source_id: str, target_id: str, relationship_name: str) ->
def _process_ontology_nodes(
ontology_nodes: list,
data_chunk: DocumentChunk,
added_nodes_map: dict,
added_ontology_nodes_map: dict
ontology_nodes: list,
data_chunk: DocumentChunk,
added_nodes_map: dict,
added_ontology_nodes_map: dict,
) -> None:
"""Process and store ontology nodes"""
for ontology_node in ontology_nodes:
ont_node_id = generate_node_id(ontology_node.name)
ont_node_name = generate_node_name(ontology_node.name)
if ontology_node.category == "classes":
ont_node_key = _create_node_key(ont_node_id, "type")
if ont_node_key not in added_nodes_map and ont_node_key not in added_ontology_nodes_map:
@ -41,7 +41,7 @@ def _process_ontology_nodes(
description=ont_node_name,
ontology_valid=True,
)
elif ontology_node.category == "individuals":
ont_node_key = _create_node_key(ont_node_id, "entity")
if ont_node_key not in added_nodes_map and ont_node_key not in added_ontology_nodes_map:
@ -55,9 +55,7 @@ def _process_ontology_nodes(
def _process_ontology_edges(
ontology_edges: list,
existing_edges_map: dict,
ontology_relationships: list
ontology_edges: list, existing_edges_map: dict, ontology_relationships: list
) -> None:
"""Process ontology edges and add them if new"""
for source, relation, target in ontology_edges:
@ -65,7 +63,7 @@ def _process_ontology_edges(
target_node_id = generate_node_id(target)
relationship_name = generate_edge_name(relation)
edge_key = _create_edge_key(source_node_id, target_node_id, relationship_name)
if edge_key not in existing_edges_map:
ontology_relationships.append(
(
@ -84,41 +82,43 @@ def _process_ontology_edges(
def _create_type_node(
node_type: str,
ontology_resolver: OntologyResolver,
added_nodes_map: dict,
node_type: str,
ontology_resolver: OntologyResolver,
added_nodes_map: dict,
added_ontology_nodes_map: dict,
name_mapping: dict,
key_mapping: dict,
data_chunk: DocumentChunk,
existing_edges_map: dict,
ontology_relationships: list
name_mapping: dict,
key_mapping: dict,
data_chunk: DocumentChunk,
existing_edges_map: dict,
ontology_relationships: list,
) -> EntityType:
"""Create or retrieve a type node with ontology validation"""
node_id = generate_node_id(node_type)
node_name = generate_node_name(node_type)
type_node_key = _create_node_key(node_id, "type")
if type_node_key in added_nodes_map or type_node_key in key_mapping:
return added_nodes_map.get(type_node_key) or added_nodes_map.get(key_mapping.get(type_node_key))
return added_nodes_map.get(type_node_key) or added_nodes_map.get(
key_mapping.get(type_node_key)
)
# Get ontology validation
ontology_nodes, ontology_edges, closest_class = ontology_resolver.get_subgraph(
node_name=node_name, node_type="classes"
)
ontology_validated = bool(closest_class)
if ontology_validated:
old_key = type_node_key
node_id = generate_node_id(closest_class.name)
type_node_key = _create_node_key(node_id, "type")
new_node_name = generate_node_name(closest_class.name)
name_mapping[node_name] = closest_class.name
key_mapping[old_key] = type_node_key
node_name = new_node_name
type_node = EntityType(
id=node_id,
name=node_name,
@ -126,55 +126,57 @@ def _create_type_node(
description=node_name,
ontology_valid=ontology_validated,
)
added_nodes_map[type_node_key] = type_node
# Process ontology nodes and edges
_process_ontology_nodes(ontology_nodes, data_chunk, added_nodes_map, added_ontology_nodes_map)
_process_ontology_edges(ontology_edges, existing_edges_map, ontology_relationships)
return type_node
def _create_entity_node(
node_id: str,
node_name: str,
node_description: str,
type_node: EntityType,
ontology_resolver: OntologyResolver,
added_nodes_map: dict,
node_id: str,
node_name: str,
node_description: str,
type_node: EntityType,
ontology_resolver: OntologyResolver,
added_nodes_map: dict,
added_ontology_nodes_map: dict,
name_mapping: dict,
key_mapping: dict,
data_chunk: DocumentChunk,
existing_edges_map: dict,
ontology_relationships: list
name_mapping: dict,
key_mapping: dict,
data_chunk: DocumentChunk,
existing_edges_map: dict,
ontology_relationships: list,
) -> Entity:
"""Create or retrieve an entity node with ontology validation"""
generated_node_id = generate_node_id(node_id)
generated_node_name = generate_node_name(node_name)
entity_node_key = _create_node_key(generated_node_id, "entity")
if entity_node_key in added_nodes_map or entity_node_key in key_mapping:
return added_nodes_map.get(entity_node_key) or added_nodes_map.get(key_mapping.get(entity_node_key))
return added_nodes_map.get(entity_node_key) or added_nodes_map.get(
key_mapping.get(entity_node_key)
)
# Get ontology validation
ontology_nodes, ontology_edges, start_ent_ont = ontology_resolver.get_subgraph(
node_name=generated_node_name, node_type="individuals"
)
ontology_validated = bool(start_ent_ont)
if ontology_validated:
old_key = entity_node_key
generated_node_id = generate_node_id(start_ent_ont.name)
entity_node_key = _create_node_key(generated_node_id, "entity")
new_node_name = generate_node_name(start_ent_ont.name)
name_mapping[generated_node_name] = start_ent_ont.name
key_mapping[old_key] = entity_node_key
generated_node_name = new_node_name
entity_node = Entity(
id=generated_node_id,
name=generated_node_name,
@ -183,42 +185,58 @@ def _create_entity_node(
ontology_valid=ontology_validated,
belongs_to_set=data_chunk.belongs_to_set,
)
added_nodes_map[entity_node_key] = entity_node
# Process ontology nodes and edges
_process_ontology_nodes(ontology_nodes, data_chunk, added_nodes_map, added_ontology_nodes_map)
_process_ontology_edges(ontology_edges, existing_edges_map, ontology_relationships)
return entity_node
def _process_graph_nodes(
data_chunk: DocumentChunk,
graph: KnowledgeGraph,
ontology_resolver: OntologyResolver,
added_nodes_map: dict,
data_chunk: DocumentChunk,
graph: KnowledgeGraph,
ontology_resolver: OntologyResolver,
added_nodes_map: dict,
added_ontology_nodes_map: dict,
name_mapping: dict,
key_mapping: dict,
existing_edges_map: dict,
ontology_relationships: list
name_mapping: dict,
key_mapping: dict,
existing_edges_map: dict,
ontology_relationships: list,
) -> None:
"""Process nodes in a knowledge graph"""
for node in graph.nodes:
# Create type node
type_node = _create_type_node(
node.type, ontology_resolver, added_nodes_map, added_ontology_nodes_map,
name_mapping, key_mapping, data_chunk, existing_edges_map, ontology_relationships
node.type,
ontology_resolver,
added_nodes_map,
added_ontology_nodes_map,
name_mapping,
key_mapping,
data_chunk,
existing_edges_map,
ontology_relationships,
)
# Create entity node
entity_node = _create_entity_node(
node.id, node.name, node.description, type_node, ontology_resolver,
added_nodes_map, added_ontology_nodes_map, name_mapping, key_mapping,
data_chunk, existing_edges_map, ontology_relationships
node.id,
node.name,
node.description,
type_node,
ontology_resolver,
added_nodes_map,
added_ontology_nodes_map,
name_mapping,
key_mapping,
data_chunk,
existing_edges_map,
ontology_relationships,
)
# Add entity to data chunk
if data_chunk.contains is None:
data_chunk.contains = []
@ -226,22 +244,19 @@ def _process_graph_nodes(
def _process_graph_edges(
graph: KnowledgeGraph,
name_mapping: dict,
existing_edges_map: dict,
relationships: list
graph: KnowledgeGraph, name_mapping: dict, existing_edges_map: dict, relationships: list
) -> None:
"""Process edges in a knowledge graph"""
for edge in graph.edges:
# Apply name mapping if exists
source_id = name_mapping.get(edge.source_node_id, edge.source_node_id)
target_id = name_mapping.get(edge.target_node_id, edge.target_node_id)
source_node_id = generate_node_id(source_id)
target_node_id = generate_node_id(target_id)
relationship_name = generate_edge_name(edge.relationship_name)
edge_key = _create_edge_key(source_node_id, target_node_id, relationship_name)
if edge_key not in existing_edges_map:
relationships.append(
(
@ -270,33 +285,40 @@ def expand_with_nodes_and_edges(
"""
if existing_edges_map is None:
existing_edges_map = {}
if ontology_resolver is None:
ontology_resolver = OntologyResolver()
added_nodes_map = {}
added_ontology_nodes_map = {}
relationships = []
ontology_relationships = []
name_mapping = {}
key_mapping = {}
# Process each chunk and its corresponding graph
for data_chunk, graph in zip(data_chunks, chunk_graphs):
if not graph:
continue
# Process nodes first
_process_graph_nodes(
data_chunk, graph, ontology_resolver, added_nodes_map, added_ontology_nodes_map,
name_mapping, key_mapping, existing_edges_map, ontology_relationships
data_chunk,
graph,
ontology_resolver,
added_nodes_map,
added_ontology_nodes_map,
name_mapping,
key_mapping,
existing_edges_map,
ontology_relationships,
)
# Then process edges
_process_graph_edges(graph, name_mapping, existing_edges_map, relationships)
# Return combined results
graph_nodes = list(added_ontology_nodes_map.values())
graph_edges = relationships + ontology_relationships
return graph_nodes, graph_edges

View file

@ -0,0 +1,11 @@
"""
Adapters for bridging the new loader system with existing ingestion pipeline.
This module provides compatibility layers to integrate the plugin-based loader
system with cognee's existing data processing pipeline while maintaining
backward compatibility and preserving permission logic.
"""
from .loader_to_ingestion_adapter import LoaderToIngestionAdapter
__all__ = ["LoaderToIngestionAdapter"]

View file

@ -0,0 +1,240 @@
import os
import tempfile
from typing import BinaryIO, Union, Optional, Any
from io import StringIO, BytesIO
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
from cognee.modules.ingestion.data_types import IngestionData, TextData, BinaryData
from cognee.infrastructure.files import get_file_metadata
from cognee.shared.logging_utils import get_logger
class LoaderResultToIngestionData(IngestionData):
"""
Adapter class that wraps LoaderResult to be compatible with IngestionData interface.
This maintains backward compatibility with existing cognee ingestion pipeline
while enabling the new loader system.
"""
def __init__(self, loader_result: LoaderResult, original_file_path: str = None):
self.loader_result = loader_result
self.original_file_path = original_file_path
self._cached_metadata = None
self.logger = get_logger(__name__)
def get_identifier(self) -> str:
"""
Get content identifier for deduplication.
Uses the loader result's source info or generates hash from content.
"""
# Try to get file hash from metadata first
if "content_hash" in self.loader_result.metadata:
return self.loader_result.metadata["content_hash"]
# Fallback: generate hash from content
import hashlib
content_bytes = self.loader_result.content.encode("utf-8")
content_hash = hashlib.md5(content_bytes).hexdigest()
# Add content type prefix for better identification
content_type = self.loader_result.content_type.value
return f"{content_type}_{content_hash}"
def get_metadata(self) -> dict:
"""
Get file metadata in the format expected by existing pipeline.
Converts LoaderResult metadata to the format used by IngestionData.
"""
if self._cached_metadata is not None:
return self._cached_metadata
# Start with loader result metadata
metadata = self.loader_result.metadata.copy()
# Ensure required fields are present
if "name" not in metadata:
if self.original_file_path:
metadata["name"] = os.path.basename(self.original_file_path)
else:
# Generate name from content hash
content_hash = self.get_identifier().split("_")[-1][:8]
ext = metadata.get("extension", ".txt")
metadata["name"] = f"content_{content_hash}{ext}"
if "content_hash" not in metadata:
metadata["content_hash"] = self.get_identifier()
if "file_path" not in metadata and self.original_file_path:
metadata["file_path"] = self.original_file_path
# Add mime type if not present
if "mime_type" not in metadata:
ext = metadata.get("extension", "").lower()
mime_type_map = {
".txt": "text/plain",
".md": "text/markdown",
".csv": "text/csv",
".json": "application/json",
".pdf": "application/pdf",
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
}
metadata["mime_type"] = mime_type_map.get(ext, "application/octet-stream")
self._cached_metadata = metadata
return metadata
def get_data(self) -> Union[str, BinaryIO]:
"""
Get data content in format expected by existing pipeline.
Returns content as string for text data or creates a file-like object
for binary data to maintain compatibility.
"""
if self.loader_result.content_type == ContentType.TEXT:
return self.loader_result.content
# For structured or binary content, return as string for now
# The existing pipeline expects text content for processing
return self.loader_result.content
class LoaderToIngestionAdapter:
"""
Adapter that bridges the new loader system with existing ingestion pipeline.
This class provides methods to process files using the loader system
while maintaining compatibility with the existing IngestionData interface.
"""
def __init__(self):
self.logger = get_logger(__name__)
async def process_file_with_loaders(
self,
file_path: str,
s3fs: Optional[Any] = None,
preferred_loaders: Optional[list] = None,
loader_config: Optional[dict] = None,
) -> IngestionData:
"""
Process a file using the loader system and return IngestionData.
Args:
file_path: Path to the file to process
s3fs: S3 filesystem (for compatibility with existing code)
preferred_loaders: List of preferred loader names
loader_config: Configuration for specific loaders
Returns:
IngestionData compatible object
Raises:
Exception: If no loader can handle the file
"""
from cognee.infrastructure.loaders import get_loader_engine
try:
# Get the loader engine
engine = get_loader_engine()
# Determine MIME type if possible
mime_type = None
try:
import mimetypes
mime_type, _ = mimetypes.guess_type(file_path)
except Exception:
pass
# Load file using loader system
self.logger.info(f"Processing file with loaders: {file_path}")
# Extract loader-specific config if provided
kwargs = {}
if loader_config:
# Find the first available loader that matches our preferred loaders
loader = engine.get_loader(file_path, mime_type, preferred_loaders)
if loader and loader.loader_name in loader_config:
kwargs = loader_config[loader.loader_name]
loader_result = await engine.load_file(
file_path, mime_type=mime_type, preferred_loaders=preferred_loaders, **kwargs
)
# Convert to IngestionData compatible format
return LoaderResultToIngestionData(loader_result, file_path)
except Exception as e:
self.logger.warning(f"Loader system failed for {file_path}: {e}")
# Fallback to existing classification system
return await self._fallback_to_existing_system(file_path, s3fs)
async def _fallback_to_existing_system(
self, file_path: str, s3fs: Optional[Any] = None
) -> IngestionData:
"""
Fallback to existing ingestion.classify() system for backward compatibility.
This ensures that even if the loader system fails, we can still process
files using the original classification method.
"""
from cognee.modules.ingestion import classify
self.logger.info(f"Falling back to existing classification system for: {file_path}")
# Open file and classify using existing system
if file_path.startswith("s3://"):
if s3fs:
with s3fs.open(file_path, "rb") as file:
return classify(file, s3fs=s3fs)
else:
raise ValueError("S3 file path provided but no s3fs available")
else:
# Handle local files and file:// URLs
local_path = file_path.replace("file://", "")
with open(local_path, "rb") as file:
return classify(file)
def is_text_content(self, data: Union[str, Any]) -> bool:
"""
Check if the provided data is text content (not a file path).
Args:
data: The data to check
Returns:
True if data is text content, False if it's a file path
"""
if not isinstance(data, str):
return False
# Check if it's a file path
if (
data.startswith("/")
or data.startswith("file://")
or data.startswith("s3://")
or (len(data) > 1 and data[1] == ":")
): # Windows drive paths
return False
return True
def create_text_ingestion_data(self, content: str) -> IngestionData:
"""
Create IngestionData for text content.
Args:
content: Text content to wrap
Returns:
IngestionData compatible object
"""
from cognee.modules.ingestion.data_types import TextData
return TextData(content)

View file

@ -0,0 +1,223 @@
import json
import inspect
from uuid import UUID
from typing import Union, BinaryIO, Any, List, Optional
import cognee.modules.ingestion as ingestion
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
from cognee.modules.data.methods import (
get_authorized_existing_datasets,
get_dataset_data,
load_or_create_datasets,
)
from cognee.shared.logging_utils import get_logger
from .save_data_item_to_storage import save_data_item_to_storage
from .adapters import LoaderToIngestionAdapter
from cognee.api.v1.add.config import get_s3_config
logger = get_logger(__name__)
async def plugin_ingest_data(
data: Any,
dataset_name: str,
user: User,
node_set: Optional[List[str]] = None,
dataset_id: UUID = None,
preferred_loaders: Optional[List[str]] = None,
loader_config: Optional[dict] = None,
):
"""
Plugin-based data ingestion using the loader system.
This function maintains full backward compatibility with the existing
ingest_data function while adding support for the new loader system.
Args:
data: The data to ingest
dataset_name: Name of the dataset
user: User object for permissions
node_set: Optional node set for organization
dataset_id: Optional specific dataset ID
preferred_loaders: List of preferred loader names to try first
loader_config: Configuration for specific loaders
Returns:
List of Data objects that were ingested
"""
if not user:
user = await get_default_user()
# Initialize S3 support (maintain existing behavior)
s3_config = get_s3_config()
fs = None
if s3_config.aws_access_key_id is not None and s3_config.aws_secret_access_key is not None:
import s3fs
fs = s3fs.S3FileSystem(
key=s3_config.aws_access_key_id, secret=s3_config.aws_secret_access_key, anon=False
)
# Initialize the loader adapter
loader_adapter = LoaderToIngestionAdapter()
def open_data_file(file_path: str):
"""Open file with S3 support (preserves existing behavior)."""
if file_path.startswith("s3://"):
return fs.open(file_path, mode="rb")
else:
local_path = file_path.replace("file://", "")
return open(local_path, mode="rb")
def get_external_metadata_dict(data_item: Union[BinaryIO, str, Any]) -> dict[str, Any]:
"""Get external metadata (preserves existing behavior)."""
if hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")):
return {"metadata": data_item.dict(), "origin": str(type(data_item))}
else:
return {}
async def store_data_to_dataset(
data: Any,
dataset_name: str,
user: User,
node_set: Optional[List[str]] = None,
dataset_id: UUID = None,
):
"""
Core data storage logic with plugin-based file processing.
This function preserves all existing permission and database logic
while using the new loader system for file processing.
"""
logger.info(f"Plugin-based ingestion starting for dataset: {dataset_name}")
# Preserve existing dataset creation and permission logic
user_datasets = await get_specific_user_permission_datasets(user.id, ["write"])
existing_datasets = await get_authorized_existing_datasets(user.id, dataset_name, ["write"])
datasets = await load_or_create_datasets(
user_datasets, existing_datasets, dataset_name, user, dataset_id
)
dataset = datasets[0]
new_datapoints = []
existing_data_points = []
dataset_new_data_points = []
# Get existing dataset data for deduplication (preserve existing logic)
dataset_data: list[Data] = await get_dataset_data(dataset.id)
dataset_data_map = {str(data.id): True for data in dataset_data}
for data_item in data:
file_path = await save_data_item_to_storage(data_item, dataset_name)
# NEW: Use loader system or existing classification based on data type
try:
if loader_adapter.is_text_content(data_item):
# Handle text content (preserve existing behavior)
logger.info("Processing text content with existing system")
classified_data = ingestion.classify(data_item)
else:
# Use loader system for file paths
logger.info(f"Processing file with loader system: {file_path}")
classified_data = await loader_adapter.process_file_with_loaders(
file_path,
s3fs=fs,
preferred_loaders=preferred_loaders,
loader_config=loader_config,
)
except Exception as e:
logger.warning(f"Plugin system failed for {file_path}, falling back: {e}")
# Fallback to existing system for full backward compatibility
with open_data_file(file_path) as file:
classified_data = ingestion.classify(file, s3fs=fs)
# Preserve all existing data processing logic
data_id = ingestion.identify(classified_data, user)
file_metadata = classified_data.get_metadata()
from sqlalchemy import select
db_engine = get_relational_engine()
# Check if data should be updated (preserve existing logic)
async with db_engine.get_async_session() as session:
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
ext_metadata = get_external_metadata_dict(data_item)
if node_set:
ext_metadata["node_set"] = node_set
# Preserve existing data point creation/update logic
if data_point is not None:
data_point.name = file_metadata["name"]
data_point.raw_data_location = file_metadata["file_path"]
data_point.extension = file_metadata["extension"]
data_point.mime_type = file_metadata["mime_type"]
data_point.owner_id = user.id
data_point.content_hash = file_metadata["content_hash"]
data_point.external_metadata = ext_metadata
data_point.node_set = json.dumps(node_set) if node_set else None
if str(data_point.id) in dataset_data_map:
existing_data_points.append(data_point)
else:
dataset_new_data_points.append(data_point)
dataset_data_map[str(data_point.id)] = True
else:
if str(data_id) in dataset_data_map:
continue
data_point = Data(
id=data_id,
name=file_metadata["name"],
raw_data_location=file_metadata["file_path"],
extension=file_metadata["extension"],
mime_type=file_metadata["mime_type"],
owner_id=user.id,
content_hash=file_metadata["content_hash"],
external_metadata=ext_metadata,
node_set=json.dumps(node_set) if node_set else None,
token_count=-1,
)
new_datapoints.append(data_point)
dataset_data_map[str(data_point.id)] = True
# Preserve existing database operations
async with db_engine.get_async_session() as session:
if dataset not in session:
session.add(dataset)
if len(new_datapoints) > 0:
dataset.data.extend(new_datapoints)
if len(existing_data_points) > 0:
for data_point in existing_data_points:
await session.merge(data_point)
if len(dataset_new_data_points) > 0:
dataset.data.extend(dataset_new_data_points)
await session.merge(dataset)
await session.commit()
logger.info(
f"Plugin-based ingestion completed. New: {len(new_datapoints)}, "
+ f"Updated: {len(existing_data_points)}, Dataset new: {len(dataset_new_data_points)}"
)
return existing_data_points + dataset_new_data_points + new_datapoints
return await store_data_to_dataset(data, dataset_name, user, node_set, dataset_id)

View file

@ -0,0 +1,237 @@
import os
import importlib.util
from typing import Dict, List, Optional
from .LoaderInterface import LoaderInterface
from .models.LoaderResult import LoaderResult
from cognee.shared.logging_utils import get_logger
class LoaderEngine:
"""
Main loader engine for managing file loaders.
Follows cognee's adapter pattern similar to database engines,
providing a centralized system for file loading operations.
"""
def __init__(
self,
loader_directories: List[str],
default_loader_priority: List[str],
fallback_loader: str = "text_loader",
enable_dependency_validation: bool = True,
):
"""
Initialize the loader engine.
Args:
loader_directories: Directories to search for loader implementations
default_loader_priority: Priority order for loader selection
fallback_loader: Default loader to use when no other matches
enable_dependency_validation: Whether to validate loader dependencies
"""
self._loaders: Dict[str, LoaderInterface] = {}
self._extension_map: Dict[str, List[LoaderInterface]] = {}
self._mime_type_map: Dict[str, List[LoaderInterface]] = {}
self.loader_directories = loader_directories
self.default_loader_priority = default_loader_priority
self.fallback_loader = fallback_loader
self.enable_dependency_validation = enable_dependency_validation
self.logger = get_logger(__name__)
def register_loader(self, loader: LoaderInterface) -> bool:
"""
Register a loader with the engine.
Args:
loader: LoaderInterface implementation to register
Returns:
True if loader was registered successfully, False otherwise
"""
# Validate dependencies if enabled
if self.enable_dependency_validation and not loader.validate_dependencies():
self.logger.warning(
f"Skipping loader '{loader.loader_name}' - missing dependencies: "
f"{loader.get_dependencies()}"
)
return False
self._loaders[loader.loader_name] = loader
# Map extensions to loaders
for ext in loader.supported_extensions:
ext_lower = ext.lower()
if ext_lower not in self._extension_map:
self._extension_map[ext_lower] = []
self._extension_map[ext_lower].append(loader)
# Map mime types to loaders
for mime_type in loader.supported_mime_types:
if mime_type not in self._mime_type_map:
self._mime_type_map[mime_type] = []
self._mime_type_map[mime_type].append(loader)
self.logger.info(f"Registered loader: {loader.loader_name}")
return True
def get_loader(
self, file_path: str, mime_type: str = None, preferred_loaders: List[str] = None
) -> Optional[LoaderInterface]:
"""
Get appropriate loader for a file.
Args:
file_path: Path to the file to be processed
mime_type: Optional MIME type of the file
preferred_loaders: List of preferred loader names to try first
Returns:
LoaderInterface that can handle the file, or None if not found
"""
ext = os.path.splitext(file_path)[1].lower()
# Try preferred loaders first
if preferred_loaders:
for loader_name in preferred_loaders:
if loader_name in self._loaders:
loader = self._loaders[loader_name]
if loader.can_handle(file_path, mime_type):
return loader
# Try priority order
for loader_name in self.default_loader_priority:
if loader_name in self._loaders:
loader = self._loaders[loader_name]
if loader.can_handle(file_path, mime_type):
return loader
# Try mime type mapping
if mime_type and mime_type in self._mime_type_map:
for loader in self._mime_type_map[mime_type]:
if loader.can_handle(file_path, mime_type):
return loader
# Try extension mapping
if ext in self._extension_map:
for loader in self._extension_map[ext]:
if loader.can_handle(file_path, mime_type):
return loader
# Fallback loader
if self.fallback_loader in self._loaders:
fallback = self._loaders[self.fallback_loader]
if fallback.can_handle(file_path, mime_type):
return fallback
return None
async def load_file(
self, file_path: str, mime_type: str = None, preferred_loaders: List[str] = None, **kwargs
) -> LoaderResult:
"""
Load file using appropriate loader.
Args:
file_path: Path to the file to be processed
mime_type: Optional MIME type of the file
preferred_loaders: List of preferred loader names to try first
**kwargs: Additional loader-specific configuration
Returns:
LoaderResult containing processed content and metadata
Raises:
ValueError: If no suitable loader is found
Exception: If file processing fails
"""
loader = self.get_loader(file_path, mime_type, preferred_loaders)
if not loader:
raise ValueError(f"No loader found for file: {file_path}")
self.logger.debug(f"Loading {file_path} with {loader.loader_name}")
return await loader.load(file_path, **kwargs)
def discover_loaders(self):
"""
Auto-discover loaders from configured directories.
Scans loader directories for Python modules containing
LoaderInterface implementations and registers them.
"""
for directory in self.loader_directories:
if os.path.exists(directory):
self._discover_in_directory(directory)
def _discover_in_directory(self, directory: str):
"""
Discover loaders in a specific directory.
Args:
directory: Directory path to scan for loader implementations
"""
try:
for file_name in os.listdir(directory):
if file_name.endswith(".py") and not file_name.startswith("_"):
module_name = file_name[:-3]
file_path = os.path.join(directory, file_name)
try:
spec = importlib.util.spec_from_file_location(module_name, file_path)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Look for loader classes
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, LoaderInterface)
and attr != LoaderInterface
):
# Instantiate and register the loader
try:
loader_instance = attr()
self.register_loader(loader_instance)
except Exception as e:
self.logger.warning(
f"Failed to instantiate loader {attr_name}: {e}"
)
except Exception as e:
self.logger.warning(f"Failed to load module {module_name}: {e}")
except OSError as e:
self.logger.warning(f"Failed to scan directory {directory}: {e}")
def get_available_loaders(self) -> List[str]:
"""
Get list of available loader names.
Returns:
List of registered loader names
"""
return list(self._loaders.keys())
def get_loader_info(self, loader_name: str) -> Dict[str, any]:
"""
Get information about a specific loader.
Args:
loader_name: Name of the loader to inspect
Returns:
Dictionary containing loader information
"""
if loader_name not in self._loaders:
return {}
loader = self._loaders[loader_name]
return {
"name": loader.loader_name,
"extensions": loader.supported_extensions,
"mime_types": loader.supported_mime_types,
"dependencies": loader.get_dependencies(),
"available": loader.validate_dependencies(),
}

View file

@ -0,0 +1,101 @@
from abc import ABC, abstractmethod
from typing import List
from .models.LoaderResult import LoaderResult
class LoaderInterface(ABC):
"""
Base interface for all file loaders in cognee.
This interface follows cognee's established pattern for database adapters,
ensuring consistent behavior across all loader implementations.
"""
@property
@abstractmethod
def supported_extensions(self) -> List[str]:
"""
List of file extensions this loader supports.
Returns:
List of extensions including the dot (e.g., ['.txt', '.md'])
"""
pass
@property
@abstractmethod
def supported_mime_types(self) -> List[str]:
"""
List of MIME types this loader supports.
Returns:
List of MIME type strings (e.g., ['text/plain', 'application/pdf'])
"""
pass
@property
@abstractmethod
def loader_name(self) -> str:
"""
Unique name identifier for this loader.
Returns:
String identifier used for registration and configuration
"""
pass
@abstractmethod
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
"""
Check if this loader can handle the given file.
Args:
file_path: Path to the file to be processed
mime_type: Optional MIME type of the file
Returns:
True if this loader can process the file, False otherwise
"""
pass
@abstractmethod
async def load(self, file_path: str, **kwargs) -> LoaderResult:
"""
Load and process the file, returning standardized result.
Args:
file_path: Path to the file to be processed
**kwargs: Additional loader-specific configuration
Returns:
LoaderResult containing processed content and metadata
Raises:
Exception: If file cannot be processed
"""
pass
def get_dependencies(self) -> List[str]:
"""
Optional: Return list of required dependencies for this loader.
Returns:
List of package names with optional version specifications
"""
return []
def validate_dependencies(self) -> bool:
"""
Check if all required dependencies are available.
Returns:
True if all dependencies are installed, False otherwise
"""
for dep in self.get_dependencies():
# Extract package name from version specification
package_name = dep.split(">=")[0].split("==")[0].split("<")[0]
try:
__import__(package_name)
except ImportError:
return False
return True

View file

@ -0,0 +1,19 @@
"""
File loader infrastructure for cognee.
This package provides a plugin-based system for loading different file formats
into cognee, following the same patterns as database adapters.
Main exports:
- get_loader_engine(): Factory function to get configured loader engine
- use_loader(): Register custom loaders at runtime
- LoaderInterface: Base interface for implementing loaders
- LoaderResult, ContentType: Data models for loader results
"""
from .get_loader_engine import get_loader_engine
from .use_loader import use_loader
from .LoaderInterface import LoaderInterface
from .models.LoaderResult import LoaderResult, ContentType
__all__ = ["get_loader_engine", "use_loader", "LoaderInterface", "LoaderResult", "ContentType"]

View file

@ -0,0 +1,57 @@
from functools import lru_cache
from typing import List, Optional, Dict, Any
from pydantic_settings import BaseSettings, SettingsConfigDict
from cognee.root_dir import get_absolute_path
class LoaderConfig(BaseSettings):
"""
Configuration for file loader system.
Follows cognee's pattern using pydantic_settings.BaseSettings for
environment variable support and validation.
"""
loader_directories: List[str] = [
get_absolute_path("cognee/infrastructure/loaders/core"),
get_absolute_path("cognee/infrastructure/loaders/external"),
]
default_loader_priority: List[str] = [
"text_loader",
"pypdf_loader",
"unstructured_loader",
"dlt_loader",
]
auto_discover: bool = True
fallback_loader: str = "text_loader"
enable_dependency_validation: bool = True
model_config = SettingsConfigDict(env_file=".env", extra="allow", env_prefix="LOADER_")
def to_dict(self) -> Dict[str, Any]:
"""
Convert configuration to dictionary format.
Returns:
Dict containing all loader configuration settings
"""
return {
"loader_directories": self.loader_directories,
"default_loader_priority": self.default_loader_priority,
"auto_discover": self.auto_discover,
"fallback_loader": self.fallback_loader,
"enable_dependency_validation": self.enable_dependency_validation,
}
@lru_cache
def get_loader_config() -> LoaderConfig:
"""
Get cached loader configuration.
Uses LRU cache following cognee's pattern for configuration objects.
Returns:
LoaderConfig instance with current settings
"""
return LoaderConfig()

View file

@ -0,0 +1,5 @@
"""Core loader implementations that are always available."""
from .text_loader import TextLoader
__all__ = ["TextLoader"]

View file

@ -0,0 +1,128 @@
import os
from typing import List
from ..LoaderInterface import LoaderInterface
from ..models.LoaderResult import LoaderResult, ContentType
class TextLoader(LoaderInterface):
"""
Core text file loader that handles basic text file formats.
This loader is always available and serves as the fallback for
text-based files when no specialized loader is available.
"""
@property
def supported_extensions(self) -> List[str]:
"""Supported text file extensions."""
return [".txt", ".md", ".csv", ".json", ".xml", ".yaml", ".yml", ".log"]
@property
def supported_mime_types(self) -> List[str]:
"""Supported MIME types for text content."""
return [
"text/plain",
"text/markdown",
"text/csv",
"application/json",
"text/xml",
"application/xml",
"text/yaml",
"application/yaml",
]
@property
def loader_name(self) -> str:
"""Unique identifier for this loader."""
return "text_loader"
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
"""
Check if this loader can handle the given file.
Args:
file_path: Path to the file
mime_type: Optional MIME type
Returns:
True if file can be handled, False otherwise
"""
# Check by extension
ext = os.path.splitext(file_path)[1].lower()
if ext in self.supported_extensions:
return True
# Check by MIME type
if mime_type and mime_type in self.supported_mime_types:
return True
# As fallback loader, can attempt to handle any text-like file
# This is useful when other loaders fail
try:
# Quick check if file appears to be text
with open(file_path, "rb") as f:
sample = f.read(512)
# Simple heuristic: if most bytes are printable, consider it text
if sample:
try:
sample.decode("utf-8")
return True
except UnicodeDecodeError:
try:
sample.decode("latin-1")
return True
except UnicodeDecodeError:
pass
except (OSError, IOError):
pass
return False
async def load(self, file_path: str, encoding: str = "utf-8", **kwargs) -> LoaderResult:
"""
Load and process the text file.
Args:
file_path: Path to the file to load
encoding: Text encoding to use (default: utf-8)
**kwargs: Additional configuration (unused)
Returns:
LoaderResult containing the file content and metadata
Raises:
FileNotFoundError: If file doesn't exist
UnicodeDecodeError: If file cannot be decoded with specified encoding
OSError: If file cannot be read
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
try:
with open(file_path, "r", encoding=encoding) as f:
content = f.read()
except UnicodeDecodeError:
# Try with fallback encoding
if encoding == "utf-8":
return await self.load(file_path, encoding="latin-1", **kwargs)
else:
raise
# Extract basic metadata
file_stat = os.stat(file_path)
metadata = {
"name": os.path.basename(file_path),
"size": file_stat.st_size,
"extension": os.path.splitext(file_path)[1],
"encoding": encoding,
"loader": self.loader_name,
"lines": len(content.splitlines()) if content else 0,
"characters": len(content),
}
return LoaderResult(
content=content,
metadata=metadata,
content_type=ContentType.TEXT,
source_info={"file_path": file_path, "encoding": encoding},
)

View file

@ -0,0 +1,49 @@
from typing import List
from .LoaderEngine import LoaderEngine
from .supported_loaders import supported_loaders
def create_loader_engine(
loader_directories: List[str],
default_loader_priority: List[str],
auto_discover: bool = True,
fallback_loader: str = "text_loader",
enable_dependency_validation: bool = True,
) -> LoaderEngine:
"""
Create loader engine with given configuration.
Follows cognee's pattern for engine creation functions used
in database adapters.
Args:
loader_directories: Directories to search for loader implementations
default_loader_priority: Priority order for loader selection
auto_discover: Whether to auto-discover loaders from directories
fallback_loader: Default loader to use when no other matches
enable_dependency_validation: Whether to validate loader dependencies
Returns:
Configured LoaderEngine instance
"""
engine = LoaderEngine(
loader_directories=loader_directories,
default_loader_priority=default_loader_priority,
fallback_loader=fallback_loader,
enable_dependency_validation=enable_dependency_validation,
)
# Register supported loaders from registry
for loader_name, loader_class in supported_loaders.items():
try:
loader_instance = loader_class()
engine.register_loader(loader_instance)
except Exception as e:
# Log but don't fail - allow engine to continue with other loaders
engine.logger.warning(f"Failed to register loader {loader_name}: {e}")
# Auto-discover loaders if enabled
if auto_discover:
engine.discover_loaders()
return engine

View file

@ -0,0 +1,20 @@
from functools import lru_cache
from .config import get_loader_config
from .LoaderEngine import LoaderEngine
from .create_loader_engine import create_loader_engine
@lru_cache
def get_loader_engine() -> LoaderEngine:
"""
Factory function to get loader engine.
Follows cognee's pattern with @lru_cache for efficient reuse
of engine instances. Configuration is loaded from environment
variables and settings.
Returns:
Cached LoaderEngine instance configured with current settings
"""
config = get_loader_config()
return create_loader_engine(**config.to_dict())

View file

@ -0,0 +1,47 @@
from pydantic import BaseModel
from typing import Optional, Dict, Any, List
from enum import Enum
class ContentType(Enum):
"""Content type classification for loaded files"""
TEXT = "text"
STRUCTURED = "structured"
BINARY = "binary"
class LoaderResult(BaseModel):
"""
Standardized output format for all file loaders.
This model ensures consistent data structure across all loader implementations,
following cognee's pattern of using Pydantic models for data validation.
"""
content: str # Primary text content extracted from file
metadata: Dict[str, Any] # File metadata (name, size, type, loader info, etc.)
content_type: ContentType # Content classification
chunks: Optional[List[str]] = None # Pre-chunked content if available
source_info: Optional[Dict[str, Any]] = None # Source-specific information
def to_dict(self) -> Dict[str, Any]:
"""
Convert the loader result to a dictionary format.
Returns:
Dict containing all loader result data with string-serialized content_type
"""
return {
"content": self.content,
"metadata": self.metadata,
"content_type": self.content_type.value,
"source_info": self.source_info or {},
"chunks": self.chunks,
}
class Config:
"""Pydantic configuration following cognee patterns"""
use_enum_values = True
validate_assignment = True

View file

@ -0,0 +1,3 @@
from .LoaderResult import LoaderResult, ContentType
__all__ = ["LoaderResult", "ContentType"]

View file

@ -0,0 +1,3 @@
# Registry for loader implementations
# Follows cognee's pattern used in databases/vector/supported_databases.py
supported_loaders = {}

View file

@ -0,0 +1,22 @@
from .supported_loaders import supported_loaders
def use_loader(loader_name: str, loader_class):
"""
Register a loader at runtime.
Follows cognee's pattern used in databases for adapter registration.
This allows external packages and custom loaders to be registered
into the loader system.
Args:
loader_name: Unique name for the loader
loader_class: Loader class implementing LoaderInterface
Example:
from cognee.infrastructure.loaders import use_loader
from my_package import MyCustomLoader
use_loader("my_custom_loader", MyCustomLoader)
"""
supported_loaders[loader_name] = loader_class

1
tests/__init__.py Normal file
View file

@ -0,0 +1 @@
# Tests package

1
tests/unit/__init__.py Normal file
View file

@ -0,0 +1 @@
# Unit tests package

View file

@ -0,0 +1 @@
# Infrastructure tests package

View file

@ -0,0 +1 @@
# Loaders tests package

View file

@ -0,0 +1,252 @@
import pytest
import tempfile
import os
from unittest.mock import Mock, AsyncMock
from cognee.infrastructure.loaders.LoaderEngine import LoaderEngine
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
class MockLoader(LoaderInterface):
"""Mock loader for testing."""
def __init__(self, name="mock_loader", extensions=None, mime_types=None, fail_deps=False):
self._name = name
self._extensions = extensions or [".mock"]
self._mime_types = mime_types or ["application/mock"]
self._fail_deps = fail_deps
@property
def supported_extensions(self):
return self._extensions
@property
def supported_mime_types(self):
return self._mime_types
@property
def loader_name(self):
return self._name
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
ext = os.path.splitext(file_path)[1].lower()
return ext in self._extensions or mime_type in self._mime_types
async def load(self, file_path: str, **kwargs) -> LoaderResult:
return LoaderResult(
content=f"Mock content from {self._name}",
metadata={"loader": self._name, "name": os.path.basename(file_path)},
content_type=ContentType.TEXT,
)
def validate_dependencies(self) -> bool:
return not self._fail_deps
class TestLoaderEngine:
"""Test the LoaderEngine class."""
@pytest.fixture
def engine(self):
"""Create a LoaderEngine instance for testing."""
return LoaderEngine(
loader_directories=[],
default_loader_priority=["loader1", "loader2"],
fallback_loader="fallback",
enable_dependency_validation=True,
)
def test_engine_initialization(self, engine):
"""Test LoaderEngine initialization."""
assert engine.loader_directories == []
assert engine.default_loader_priority == ["loader1", "loader2"]
assert engine.fallback_loader == "fallback"
assert engine.enable_dependency_validation is True
assert len(engine.get_available_loaders()) == 0
def test_register_loader_success(self, engine):
"""Test successful loader registration."""
loader = MockLoader("test_loader", [".test"])
success = engine.register_loader(loader)
assert success is True
assert "test_loader" in engine.get_available_loaders()
assert engine._loaders["test_loader"] == loader
assert ".test" in engine._extension_map
assert "application/mock" in engine._mime_type_map
def test_register_loader_with_failed_dependencies(self, engine):
"""Test loader registration with failed dependency validation."""
loader = MockLoader("test_loader", [".test"], fail_deps=True)
success = engine.register_loader(loader)
assert success is False
assert "test_loader" not in engine.get_available_loaders()
def test_register_loader_without_dependency_validation(self):
"""Test loader registration without dependency validation."""
engine = LoaderEngine(
loader_directories=[], default_loader_priority=[], enable_dependency_validation=False
)
loader = MockLoader("test_loader", [".test"], fail_deps=True)
success = engine.register_loader(loader)
assert success is True
assert "test_loader" in engine.get_available_loaders()
def test_get_loader_by_extension(self, engine):
"""Test getting loader by file extension."""
loader1 = MockLoader("loader1", [".txt"])
loader2 = MockLoader("loader2", [".pdf"])
engine.register_loader(loader1)
engine.register_loader(loader2)
result = engine.get_loader("test.txt")
assert result == loader1
result = engine.get_loader("test.pdf")
assert result == loader2
result = engine.get_loader("test.unknown")
assert result is None
def test_get_loader_by_mime_type(self, engine):
"""Test getting loader by MIME type."""
loader = MockLoader("loader", [".txt"], ["text/plain"])
engine.register_loader(loader)
result = engine.get_loader("test.unknown", mime_type="text/plain")
assert result == loader
result = engine.get_loader("test.unknown", mime_type="application/pdf")
assert result is None
def test_get_loader_with_preferences(self, engine):
"""Test getting loader with preferred loaders."""
loader1 = MockLoader("loader1", [".txt"])
loader2 = MockLoader("loader2", [".txt"])
engine.register_loader(loader1)
engine.register_loader(loader2)
# Should get preferred loader
result = engine.get_loader("test.txt", preferred_loaders=["loader2"])
assert result == loader2
# Should fallback to first available if preferred not found
result = engine.get_loader("test.txt", preferred_loaders=["nonexistent"])
assert result in [loader1, loader2] # One of them should be returned
def test_get_loader_with_priority(self, engine):
"""Test loader selection with priority order."""
engine.default_loader_priority = ["priority_loader", "other_loader"]
priority_loader = MockLoader("priority_loader", [".txt"])
other_loader = MockLoader("other_loader", [".txt"])
# Register in reverse order
engine.register_loader(other_loader)
engine.register_loader(priority_loader)
# Should get priority loader even though other was registered first
result = engine.get_loader("test.txt")
assert result == priority_loader
def test_get_loader_fallback(self, engine):
"""Test fallback loader selection."""
fallback_loader = MockLoader("fallback", [".txt"])
other_loader = MockLoader("other", [".pdf"])
engine.register_loader(fallback_loader)
engine.register_loader(other_loader)
engine.fallback_loader = "fallback"
# For .txt file, fallback should be considered
result = engine.get_loader("test.txt")
assert result == fallback_loader
# For unknown extension, should still get fallback if it can handle
result = engine.get_loader("test.unknown")
assert result == fallback_loader
@pytest.mark.asyncio
async def test_load_file_success(self, engine):
"""Test successful file loading."""
loader = MockLoader("test_loader", [".txt"])
engine.register_loader(loader)
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
f.write(b"test content")
temp_path = f.name
try:
result = await engine.load_file(temp_path)
assert result.content == "Mock content from test_loader"
assert result.metadata["loader"] == "test_loader"
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
@pytest.mark.asyncio
async def test_load_file_no_loader(self, engine):
"""Test file loading when no suitable loader is found."""
with pytest.raises(ValueError, match="No loader found for file"):
await engine.load_file("test.unknown")
@pytest.mark.asyncio
async def test_load_file_with_preferences(self, engine):
"""Test file loading with preferred loaders."""
loader1 = MockLoader("loader1", [".txt"])
loader2 = MockLoader("loader2", [".txt"])
engine.register_loader(loader1)
engine.register_loader(loader2)
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
f.write(b"test content")
temp_path = f.name
try:
result = await engine.load_file(temp_path, preferred_loaders=["loader2"])
assert result.metadata["loader"] == "loader2"
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
def test_get_loader_info(self, engine):
"""Test getting loader information."""
loader = MockLoader("test_loader", [".txt"], ["text/plain"])
engine.register_loader(loader)
info = engine.get_loader_info("test_loader")
assert info["name"] == "test_loader"
assert info["extensions"] == [".txt"]
assert info["mime_types"] == ["text/plain"]
assert info["available"] is True
# Test non-existent loader
info = engine.get_loader_info("nonexistent")
assert info == {}
def test_discover_loaders_empty_directory(self, engine):
"""Test loader discovery with empty directory."""
with tempfile.TemporaryDirectory() as temp_dir:
engine.loader_directories = [temp_dir]
engine.discover_loaders()
# Should not find any loaders in empty directory
assert len(engine.get_available_loaders()) == 0
def test_discover_loaders_nonexistent_directory(self, engine):
"""Test loader discovery with non-existent directory."""
engine.loader_directories = ["/nonexistent/directory"]
# Should not raise exception, just log warning
engine.discover_loaders()
assert len(engine.get_available_loaders()) == 0

View file

@ -0,0 +1,99 @@
import pytest
import tempfile
import os
from unittest.mock import AsyncMock
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType
class TestLoaderInterface:
"""Test the LoaderInterface abstract base class."""
def test_loader_interface_is_abstract(self):
"""Test that LoaderInterface cannot be instantiated directly."""
with pytest.raises(TypeError):
LoaderInterface()
def test_dependency_validation_with_no_dependencies(self):
"""Test dependency validation when no dependencies are required."""
class MockLoader(LoaderInterface):
@property
def supported_extensions(self):
return [".txt"]
@property
def supported_mime_types(self):
return ["text/plain"]
@property
def loader_name(self):
return "mock_loader"
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
return True
async def load(self, file_path: str, **kwargs) -> LoaderResult:
return LoaderResult(content="test", metadata={}, content_type=ContentType.TEXT)
loader = MockLoader()
assert loader.validate_dependencies() is True
assert loader.get_dependencies() == []
def test_dependency_validation_with_missing_dependencies(self):
"""Test dependency validation with missing dependencies."""
class MockLoaderWithDeps(LoaderInterface):
@property
def supported_extensions(self):
return [".txt"]
@property
def supported_mime_types(self):
return ["text/plain"]
@property
def loader_name(self):
return "mock_loader_deps"
def get_dependencies(self):
return ["non_existent_package>=1.0.0"]
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
return True
async def load(self, file_path: str, **kwargs) -> LoaderResult:
return LoaderResult(content="test", metadata={}, content_type=ContentType.TEXT)
loader = MockLoaderWithDeps()
assert loader.validate_dependencies() is False
assert "non_existent_package>=1.0.0" in loader.get_dependencies()
def test_dependency_validation_with_existing_dependencies(self):
"""Test dependency validation with existing dependencies."""
class MockLoaderWithExistingDeps(LoaderInterface):
@property
def supported_extensions(self):
return [".txt"]
@property
def supported_mime_types(self):
return ["text/plain"]
@property
def loader_name(self):
return "mock_loader_existing"
def get_dependencies(self):
return ["os"] # Built-in module that always exists
def can_handle(self, file_path: str, mime_type: str = None) -> bool:
return True
async def load(self, file_path: str, **kwargs) -> LoaderResult:
return LoaderResult(content="test", metadata={}, content_type=ContentType.TEXT)
loader = MockLoaderWithExistingDeps()
assert loader.validate_dependencies() is True

View file

@ -0,0 +1,157 @@
import pytest
import tempfile
import os
from pathlib import Path
from cognee.infrastructure.loaders.core.text_loader import TextLoader
from cognee.infrastructure.loaders.models.LoaderResult import ContentType
class TestTextLoader:
"""Test the TextLoader implementation."""
@pytest.fixture
def text_loader(self):
"""Create a TextLoader instance for testing."""
return TextLoader()
@pytest.fixture
def temp_text_file(self):
"""Create a temporary text file for testing."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
f.write("This is a test file.\nIt has multiple lines.\n")
temp_path = f.name
yield temp_path
# Cleanup
if os.path.exists(temp_path):
os.unlink(temp_path)
@pytest.fixture
def temp_binary_file(self):
"""Create a temporary binary file for testing."""
with tempfile.NamedTemporaryFile(mode="wb", suffix=".bin", delete=False) as f:
f.write(b"\x00\x01\x02\x03\x04\x05")
temp_path = f.name
yield temp_path
# Cleanup
if os.path.exists(temp_path):
os.unlink(temp_path)
def test_loader_properties(self, text_loader):
"""Test basic loader properties."""
assert text_loader.loader_name == "text_loader"
assert ".txt" in text_loader.supported_extensions
assert ".md" in text_loader.supported_extensions
assert "text/plain" in text_loader.supported_mime_types
assert "application/json" in text_loader.supported_mime_types
def test_can_handle_by_extension(self, text_loader):
"""Test file handling by extension."""
assert text_loader.can_handle("test.txt")
assert text_loader.can_handle("test.md")
assert text_loader.can_handle("test.json")
assert text_loader.can_handle("test.TXT") # Case insensitive
assert not text_loader.can_handle("test.pdf")
def test_can_handle_by_mime_type(self, text_loader):
"""Test file handling by MIME type."""
assert text_loader.can_handle("test.unknown", mime_type="text/plain")
assert text_loader.can_handle("test.unknown", mime_type="application/json")
assert not text_loader.can_handle("test.unknown", mime_type="application/pdf")
def test_can_handle_text_file_heuristic(self, text_loader, temp_text_file):
"""Test handling of text files by content heuristic."""
# Remove extension to force heuristic check
no_ext_path = temp_text_file.replace(".txt", "")
os.rename(temp_text_file, no_ext_path)
try:
assert text_loader.can_handle(no_ext_path)
finally:
if os.path.exists(no_ext_path):
os.unlink(no_ext_path)
def test_cannot_handle_binary_file(self, text_loader, temp_binary_file):
"""Test that binary files are not handled."""
assert not text_loader.can_handle(temp_binary_file)
@pytest.mark.asyncio
async def test_load_text_file(self, text_loader, temp_text_file):
"""Test loading a text file."""
result = await text_loader.load(temp_text_file)
assert isinstance(result.content, str)
assert "This is a test file." in result.content
assert result.content_type == ContentType.TEXT
assert result.metadata["loader"] == "text_loader"
assert result.metadata["name"] == os.path.basename(temp_text_file)
assert result.metadata["lines"] == 2
assert result.metadata["encoding"] == "utf-8"
assert result.source_info["file_path"] == temp_text_file
@pytest.mark.asyncio
async def test_load_with_custom_encoding(self, text_loader):
"""Test loading with custom encoding."""
# Create a file with latin-1 encoding
with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt", delete=False, encoding="latin-1"
) as f:
f.write("Test with åéîøü characters")
temp_path = f.name
try:
result = await text_loader.load(temp_path, encoding="latin-1")
assert "åéîøü" in result.content
assert result.metadata["encoding"] == "latin-1"
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
@pytest.mark.asyncio
async def test_load_with_fallback_encoding(self, text_loader):
"""Test automatic fallback to latin-1 encoding."""
# Create a file with latin-1 content but try to read as utf-8
with tempfile.NamedTemporaryFile(mode="wb", suffix=".txt", delete=False) as f:
# Write latin-1 encoded bytes that are invalid in utf-8
f.write(b"Test with \xe5\xe9\xee\xf8\xfc characters")
temp_path = f.name
try:
# Should automatically fallback to latin-1
result = await text_loader.load(temp_path)
assert result.metadata["encoding"] == "latin-1"
assert len(result.content) > 0
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
@pytest.mark.asyncio
async def test_load_nonexistent_file(self, text_loader):
"""Test loading a file that doesn't exist."""
with pytest.raises(FileNotFoundError):
await text_loader.load("/nonexistent/file.txt")
@pytest.mark.asyncio
async def test_load_empty_file(self, text_loader):
"""Test loading an empty file."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
# Create empty file
temp_path = f.name
try:
result = await text_loader.load(temp_path)
assert result.content == ""
assert result.metadata["lines"] == 0
assert result.metadata["characters"] == 0
finally:
if os.path.exists(temp_path):
os.unlink(temp_path)
def test_no_dependencies(self, text_loader):
"""Test that TextLoader has no external dependencies."""
assert text_loader.get_dependencies() == []
assert text_loader.validate_dependencies() is True