diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index f14861d6d..70b0d27a8 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -15,14 +15,19 @@ async def add( vector_db_config: dict = None, graph_db_config: dict = None, dataset_id: UUID = None, + preferred_loaders: Optional[List[str]] = None, + loader_config: Optional[dict] = None, ): """ - Add data to Cognee for knowledge graph processing. + Add data to Cognee for knowledge graph processing using a plugin-based loader system. This is the first step in the Cognee workflow - it ingests raw data and prepares it for processing. The function accepts various data formats including text, files, and binary streams, then stores them in a specified dataset for further processing. + This version supports both the original ingestion system (for backward compatibility) + and the new plugin-based loader system (when loader parameters are provided). + Prerequisites: - **LLM_API_KEY**: Must be set in environment variables for content processing - **Database Setup**: Relational and vector databases must be configured @@ -38,16 +43,38 @@ async def add( - **Lists**: Multiple files or text strings in a single call Supported File Formats: - - Text files (.txt, .md, .csv) - - PDFs (.pdf) + - Text files (.txt, .md, .csv) - processed by text_loader + - PDFs (.pdf) - processed by pypdf_loader (if available) - Images (.png, .jpg, .jpeg) - extracted via OCR/vision models - Audio files (.mp3, .wav) - transcribed to text - Code files (.py, .js, .ts, etc.) - parsed for structure and content - - Office documents (.docx, .pptx) + - Office documents (.docx, .pptx) - processed by unstructured_loader (if available) + - Data files (.json, .jsonl, .parquet) - processed by dlt_loader (if available) - Workflow: + Plugin System: + The function automatically uses the best available loader for each file type. + You can customize this behavior using the loader parameters: + + ```python + # Use specific loaders in priority order + await cognee.add( + "/path/to/document.pdf", + preferred_loaders=["pypdf_loader", "text_loader"] + ) + + # Configure loader-specific options + await cognee.add( + "/path/to/document.pdf", + loader_config={ + "pypdf_loader": {"strict": False}, + "unstructured_loader": {"strategy": "hi_res"} + } + ) + ``` + + Workflow: 1. **Data Resolution**: Resolves file paths and validates accessibility - 2. **Content Extraction**: Extracts text content from various file formats + 2. **Content Extraction**: Uses plugin system or falls back to existing classification 3. **Dataset Storage**: Stores processed content in the specified dataset 4. **Metadata Tracking**: Records file metadata, timestamps, and user permissions 5. **Permission Assignment**: Grants user read/write/delete/share permissions on dataset @@ -70,6 +97,10 @@ async def add( vector_db_config: Optional configuration for vector database (for custom setups). graph_db_config: Optional configuration for graph database (for custom setups). dataset_id: Optional specific dataset UUID to use instead of dataset_name. + preferred_loaders: Optional list of loader names to try first (e.g., ["pypdf_loader", "text_loader"]). + If not provided, uses default loader priority. + loader_config: Optional configuration for specific loaders. Dictionary mapping loader names + to their configuration options (e.g., {"pypdf_loader": {"strict": False}}). Returns: PipelineRunInfo: Information about the ingestion pipeline execution including: @@ -138,10 +169,32 @@ async def add( UnsupportedFileTypeError: If file format cannot be processed InvalidValueError: If LLM_API_KEY is not set or invalid """ - tasks = [ - Task(resolve_data_directories, include_subdirectories=True), - Task(ingest_data, dataset_name, user, node_set, dataset_id), - ] + + # Determine which ingestion system to use + use_plugin_system = preferred_loaders is not None or loader_config is not None + + if use_plugin_system: + # Use new plugin-based ingestion system + from cognee.tasks.ingestion.plugin_ingest_data import plugin_ingest_data + + tasks = [ + Task(resolve_data_directories, include_subdirectories=True), + Task( + plugin_ingest_data, + dataset_name, + user, + node_set, + dataset_id, + preferred_loaders, + loader_config, + ), + ] + else: + # Use existing ingestion system for backward compatibility + tasks = [ + Task(resolve_data_directories, include_subdirectories=True), + Task(ingest_data, dataset_name, user, node_set, dataset_id), + ] pipeline_run_info = None diff --git a/cognee/infrastructure/loaders/LoaderEngine.py b/cognee/infrastructure/loaders/LoaderEngine.py new file mode 100644 index 000000000..cd75da6cb --- /dev/null +++ b/cognee/infrastructure/loaders/LoaderEngine.py @@ -0,0 +1,237 @@ +import os +import importlib.util +from typing import Dict, List, Optional +from .LoaderInterface import LoaderInterface +from .models.LoaderResult import LoaderResult +from cognee.shared.logging_utils import get_logger + + +class LoaderEngine: + """ + Main loader engine for managing file loaders. + + Follows cognee's adapter pattern similar to database engines, + providing a centralized system for file loading operations. + """ + + def __init__( + self, + loader_directories: List[str], + default_loader_priority: List[str], + fallback_loader: str = "text_loader", + enable_dependency_validation: bool = True, + ): + """ + Initialize the loader engine. + + Args: + loader_directories: Directories to search for loader implementations + default_loader_priority: Priority order for loader selection + fallback_loader: Default loader to use when no other matches + enable_dependency_validation: Whether to validate loader dependencies + """ + self._loaders: Dict[str, LoaderInterface] = {} + self._extension_map: Dict[str, List[LoaderInterface]] = {} + self._mime_type_map: Dict[str, List[LoaderInterface]] = {} + self.loader_directories = loader_directories + self.default_loader_priority = default_loader_priority + self.fallback_loader = fallback_loader + self.enable_dependency_validation = enable_dependency_validation + self.logger = get_logger(__name__) + + def register_loader(self, loader: LoaderInterface) -> bool: + """ + Register a loader with the engine. + + Args: + loader: LoaderInterface implementation to register + + Returns: + True if loader was registered successfully, False otherwise + """ + # Validate dependencies if enabled + if self.enable_dependency_validation and not loader.validate_dependencies(): + self.logger.warning( + f"Skipping loader '{loader.loader_name}' - missing dependencies: " + f"{loader.get_dependencies()}" + ) + return False + + self._loaders[loader.loader_name] = loader + + # Map extensions to loaders + for ext in loader.supported_extensions: + ext_lower = ext.lower() + if ext_lower not in self._extension_map: + self._extension_map[ext_lower] = [] + self._extension_map[ext_lower].append(loader) + + # Map mime types to loaders + for mime_type in loader.supported_mime_types: + if mime_type not in self._mime_type_map: + self._mime_type_map[mime_type] = [] + self._mime_type_map[mime_type].append(loader) + + self.logger.info(f"Registered loader: {loader.loader_name}") + return True + + def get_loader( + self, file_path: str, mime_type: str = None, preferred_loaders: List[str] = None + ) -> Optional[LoaderInterface]: + """ + Get appropriate loader for a file. + + Args: + file_path: Path to the file to be processed + mime_type: Optional MIME type of the file + preferred_loaders: List of preferred loader names to try first + + Returns: + LoaderInterface that can handle the file, or None if not found + """ + ext = os.path.splitext(file_path)[1].lower() + + # Try preferred loaders first + if preferred_loaders: + for loader_name in preferred_loaders: + if loader_name in self._loaders: + loader = self._loaders[loader_name] + if loader.can_handle(file_path, mime_type): + return loader + + # Try priority order + for loader_name in self.default_loader_priority: + if loader_name in self._loaders: + loader = self._loaders[loader_name] + if loader.can_handle(file_path, mime_type): + return loader + + # Try mime type mapping + if mime_type and mime_type in self._mime_type_map: + for loader in self._mime_type_map[mime_type]: + if loader.can_handle(file_path, mime_type): + return loader + + # Try extension mapping + if ext in self._extension_map: + for loader in self._extension_map[ext]: + if loader.can_handle(file_path, mime_type): + return loader + + # Fallback loader + if self.fallback_loader in self._loaders: + fallback = self._loaders[self.fallback_loader] + if fallback.can_handle(file_path, mime_type): + return fallback + + return None + + async def load_file( + self, file_path: str, mime_type: str = None, preferred_loaders: List[str] = None, **kwargs + ) -> LoaderResult: + """ + Load file using appropriate loader. + + Args: + file_path: Path to the file to be processed + mime_type: Optional MIME type of the file + preferred_loaders: List of preferred loader names to try first + **kwargs: Additional loader-specific configuration + + Returns: + LoaderResult containing processed content and metadata + + Raises: + ValueError: If no suitable loader is found + Exception: If file processing fails + """ + loader = self.get_loader(file_path, mime_type, preferred_loaders) + if not loader: + raise ValueError(f"No loader found for file: {file_path}") + + self.logger.debug(f"Loading {file_path} with {loader.loader_name}") + return await loader.load(file_path, **kwargs) + + def discover_loaders(self): + """ + Auto-discover loaders from configured directories. + + Scans loader directories for Python modules containing + LoaderInterface implementations and registers them. + """ + for directory in self.loader_directories: + if os.path.exists(directory): + self._discover_in_directory(directory) + + def _discover_in_directory(self, directory: str): + """ + Discover loaders in a specific directory. + + Args: + directory: Directory path to scan for loader implementations + """ + try: + for file_name in os.listdir(directory): + if file_name.endswith(".py") and not file_name.startswith("_"): + module_name = file_name[:-3] + file_path = os.path.join(directory, file_name) + + try: + spec = importlib.util.spec_from_file_location(module_name, file_path) + if spec and spec.loader: + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # Look for loader classes + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and issubclass(attr, LoaderInterface) + and attr != LoaderInterface + ): + # Instantiate and register the loader + try: + loader_instance = attr() + self.register_loader(loader_instance) + except Exception as e: + self.logger.warning( + f"Failed to instantiate loader {attr_name}: {e}" + ) + + except Exception as e: + self.logger.warning(f"Failed to load module {module_name}: {e}") + + except OSError as e: + self.logger.warning(f"Failed to scan directory {directory}: {e}") + + def get_available_loaders(self) -> List[str]: + """ + Get list of available loader names. + + Returns: + List of registered loader names + """ + return list(self._loaders.keys()) + + def get_loader_info(self, loader_name: str) -> Dict[str, any]: + """ + Get information about a specific loader. + + Args: + loader_name: Name of the loader to inspect + + Returns: + Dictionary containing loader information + """ + if loader_name not in self._loaders: + return {} + + loader = self._loaders[loader_name] + return { + "name": loader.loader_name, + "extensions": loader.supported_extensions, + "mime_types": loader.supported_mime_types, + "dependencies": loader.get_dependencies(), + "available": loader.validate_dependencies(), + } diff --git a/cognee/infrastructure/loaders/LoaderInterface.py b/cognee/infrastructure/loaders/LoaderInterface.py new file mode 100644 index 000000000..66d8ede40 --- /dev/null +++ b/cognee/infrastructure/loaders/LoaderInterface.py @@ -0,0 +1,101 @@ +from abc import ABC, abstractmethod +from typing import List +from .models.LoaderResult import LoaderResult + + +class LoaderInterface(ABC): + """ + Base interface for all file loaders in cognee. + + This interface follows cognee's established pattern for database adapters, + ensuring consistent behavior across all loader implementations. + """ + + @property + @abstractmethod + def supported_extensions(self) -> List[str]: + """ + List of file extensions this loader supports. + + Returns: + List of extensions including the dot (e.g., ['.txt', '.md']) + """ + pass + + @property + @abstractmethod + def supported_mime_types(self) -> List[str]: + """ + List of MIME types this loader supports. + + Returns: + List of MIME type strings (e.g., ['text/plain', 'application/pdf']) + """ + pass + + @property + @abstractmethod + def loader_name(self) -> str: + """ + Unique name identifier for this loader. + + Returns: + String identifier used for registration and configuration + """ + pass + + @abstractmethod + def can_handle(self, file_path: str, mime_type: str = None) -> bool: + """ + Check if this loader can handle the given file. + + Args: + file_path: Path to the file to be processed + mime_type: Optional MIME type of the file + + Returns: + True if this loader can process the file, False otherwise + """ + pass + + @abstractmethod + async def load(self, file_path: str, **kwargs) -> LoaderResult: + """ + Load and process the file, returning standardized result. + + Args: + file_path: Path to the file to be processed + **kwargs: Additional loader-specific configuration + + Returns: + LoaderResult containing processed content and metadata + + Raises: + Exception: If file cannot be processed + """ + pass + + def get_dependencies(self) -> List[str]: + """ + Optional: Return list of required dependencies for this loader. + + Returns: + List of package names with optional version specifications + """ + return [] + + def validate_dependencies(self) -> bool: + """ + Check if all required dependencies are available. + + Returns: + True if all dependencies are installed, False otherwise + """ + for dep in self.get_dependencies(): + # Extract package name from version specification + package_name = dep.split(">=")[0].split("==")[0].split("<")[0] + try: + __import__(package_name) + except ImportError: + return False + return True diff --git a/cognee/infrastructure/loaders/__init__.py b/cognee/infrastructure/loaders/__init__.py new file mode 100644 index 000000000..5374c2afc --- /dev/null +++ b/cognee/infrastructure/loaders/__init__.py @@ -0,0 +1,19 @@ +""" +File loader infrastructure for cognee. + +This package provides a plugin-based system for loading different file formats +into cognee, following the same patterns as database adapters. + +Main exports: +- get_loader_engine(): Factory function to get configured loader engine +- use_loader(): Register custom loaders at runtime +- LoaderInterface: Base interface for implementing loaders +- LoaderResult, ContentType: Data models for loader results +""" + +from .get_loader_engine import get_loader_engine +from .use_loader import use_loader +from .LoaderInterface import LoaderInterface +from .models.LoaderResult import LoaderResult, ContentType + +__all__ = ["get_loader_engine", "use_loader", "LoaderInterface", "LoaderResult", "ContentType"] diff --git a/cognee/infrastructure/loaders/config.py b/cognee/infrastructure/loaders/config.py new file mode 100644 index 000000000..c14a30e2e --- /dev/null +++ b/cognee/infrastructure/loaders/config.py @@ -0,0 +1,57 @@ +from functools import lru_cache +from typing import List, Optional, Dict, Any +from pydantic_settings import BaseSettings, SettingsConfigDict +from cognee.root_dir import get_absolute_path + + +class LoaderConfig(BaseSettings): + """ + Configuration for file loader system. + + Follows cognee's pattern using pydantic_settings.BaseSettings for + environment variable support and validation. + """ + + loader_directories: List[str] = [ + get_absolute_path("infrastructure/loaders/core"), + get_absolute_path("infrastructure/loaders/external"), + ] + default_loader_priority: List[str] = [ + "text_loader", + "pypdf_loader", + "unstructured_loader", + "dlt_loader", + ] + auto_discover: bool = True + fallback_loader: str = "text_loader" + enable_dependency_validation: bool = True + + model_config = SettingsConfigDict(env_file=".env", extra="allow", env_prefix="LOADER_") + + def to_dict(self) -> Dict[str, Any]: + """ + Convert configuration to dictionary format. + + Returns: + Dict containing all loader configuration settings + """ + return { + "loader_directories": self.loader_directories, + "default_loader_priority": self.default_loader_priority, + "auto_discover": self.auto_discover, + "fallback_loader": self.fallback_loader, + "enable_dependency_validation": self.enable_dependency_validation, + } + + +@lru_cache +def get_loader_config() -> LoaderConfig: + """ + Get cached loader configuration. + + Uses LRU cache following cognee's pattern for configuration objects. + + Returns: + LoaderConfig instance with current settings + """ + return LoaderConfig() diff --git a/cognee/infrastructure/loaders/core/__init__.py b/cognee/infrastructure/loaders/core/__init__.py new file mode 100644 index 000000000..d21282a52 --- /dev/null +++ b/cognee/infrastructure/loaders/core/__init__.py @@ -0,0 +1,5 @@ +"""Core loader implementations that are always available.""" + +from .text_loader import TextLoader + +__all__ = ["TextLoader"] diff --git a/cognee/infrastructure/loaders/core/text_loader.py b/cognee/infrastructure/loaders/core/text_loader.py new file mode 100644 index 000000000..b2529b08f --- /dev/null +++ b/cognee/infrastructure/loaders/core/text_loader.py @@ -0,0 +1,128 @@ +import os +from typing import List +from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface +from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType + + +class TextLoader(LoaderInterface): + """ + Core text file loader that handles basic text file formats. + + This loader is always available and serves as the fallback for + text-based files when no specialized loader is available. + """ + + @property + def supported_extensions(self) -> List[str]: + """Supported text file extensions.""" + return [".txt", ".md", ".csv", ".json", ".xml", ".yaml", ".yml", ".log"] + + @property + def supported_mime_types(self) -> List[str]: + """Supported MIME types for text content.""" + return [ + "text/plain", + "text/markdown", + "text/csv", + "application/json", + "text/xml", + "application/xml", + "text/yaml", + "application/yaml", + ] + + @property + def loader_name(self) -> str: + """Unique identifier for this loader.""" + return "text_loader" + + def can_handle(self, file_path: str, mime_type: str = None) -> bool: + """ + Check if this loader can handle the given file. + + Args: + file_path: Path to the file + mime_type: Optional MIME type + + Returns: + True if file can be handled, False otherwise + """ + # Check by extension + ext = os.path.splitext(file_path)[1].lower() + if ext in self.supported_extensions: + return True + + # Check by MIME type + if mime_type and mime_type in self.supported_mime_types: + return True + + # As fallback loader, can attempt to handle any text-like file + # This is useful when other loaders fail + try: + # Quick check if file appears to be text + with open(file_path, "rb") as f: + sample = f.read(512) + # Simple heuristic: if most bytes are printable, consider it text + if sample: + try: + sample.decode("utf-8") + return True + except UnicodeDecodeError: + try: + sample.decode("latin-1") + return True + except UnicodeDecodeError: + pass + except (OSError, IOError): + pass + + return False + + async def load(self, file_path: str, encoding: str = "utf-8", **kwargs) -> LoaderResult: + """ + Load and process the text file. + + Args: + file_path: Path to the file to load + encoding: Text encoding to use (default: utf-8) + **kwargs: Additional configuration (unused) + + Returns: + LoaderResult containing the file content and metadata + + Raises: + FileNotFoundError: If file doesn't exist + UnicodeDecodeError: If file cannot be decoded with specified encoding + OSError: If file cannot be read + """ + if not os.path.exists(file_path): + raise FileNotFoundError(f"File not found: {file_path}") + + try: + with open(file_path, "r", encoding=encoding) as f: + content = f.read() + except UnicodeDecodeError: + # Try with fallback encoding + if encoding == "utf-8": + return await self.load(file_path, encoding="latin-1", **kwargs) + else: + raise + + # Extract basic metadata + file_stat = os.stat(file_path) + metadata = { + "name": os.path.basename(file_path), + "size": file_stat.st_size, + "extension": os.path.splitext(file_path)[1], + "encoding": encoding, + "loader": self.loader_name, + "lines": len(content.splitlines()) if content else 0, + "characters": len(content), + } + + return LoaderResult( + content=content, + metadata=metadata, + content_type=ContentType.TEXT, + source_info={"file_path": file_path, "encoding": encoding}, + ) diff --git a/cognee/infrastructure/loaders/create_loader_engine.py b/cognee/infrastructure/loaders/create_loader_engine.py new file mode 100644 index 000000000..fa56e069a --- /dev/null +++ b/cognee/infrastructure/loaders/create_loader_engine.py @@ -0,0 +1,49 @@ +from typing import List +from .LoaderEngine import LoaderEngine +from .supported_loaders import supported_loaders + + +def create_loader_engine( + loader_directories: List[str], + default_loader_priority: List[str], + auto_discover: bool = True, + fallback_loader: str = "text_loader", + enable_dependency_validation: bool = True, +) -> LoaderEngine: + """ + Create loader engine with given configuration. + + Follows cognee's pattern for engine creation functions used + in database adapters. + + Args: + loader_directories: Directories to search for loader implementations + default_loader_priority: Priority order for loader selection + auto_discover: Whether to auto-discover loaders from directories + fallback_loader: Default loader to use when no other matches + enable_dependency_validation: Whether to validate loader dependencies + + Returns: + Configured LoaderEngine instance + """ + engine = LoaderEngine( + loader_directories=loader_directories, + default_loader_priority=default_loader_priority, + fallback_loader=fallback_loader, + enable_dependency_validation=enable_dependency_validation, + ) + + # Register supported loaders from registry + for loader_name, loader_class in supported_loaders.items(): + try: + loader_instance = loader_class() + engine.register_loader(loader_instance) + except Exception as e: + # Log but don't fail - allow engine to continue with other loaders + engine.logger.warning(f"Failed to register loader {loader_name}: {e}") + + # Auto-discover loaders if enabled + if auto_discover: + engine.discover_loaders() + + return engine diff --git a/cognee/infrastructure/loaders/external/__init__.py b/cognee/infrastructure/loaders/external/__init__.py new file mode 100644 index 000000000..d96dffb4e --- /dev/null +++ b/cognee/infrastructure/loaders/external/__init__.py @@ -0,0 +1,34 @@ +""" +External loader implementations for cognee. + +This module contains loaders that depend on external libraries: +- pypdf_loader: PDF processing using pypdf +- unstructured_loader: Document processing using unstructured +- dlt_loader: Data lake/warehouse integration using DLT + +These loaders are optional and only available if their dependencies are installed. +""" + +__all__ = [] + +# Conditional imports based on dependency availability +try: + from .pypdf_loader import PyPdfLoader + + __all__.append("PyPdfLoader") +except ImportError: + pass + +try: + from .unstructured_loader import UnstructuredLoader + + __all__.append("UnstructuredLoader") +except ImportError: + pass + +try: + from .dlt_loader import DltLoader + + __all__.append("DltLoader") +except ImportError: + pass diff --git a/cognee/infrastructure/loaders/external/dlt_loader.py b/cognee/infrastructure/loaders/external/dlt_loader.py new file mode 100644 index 000000000..1bfbab006 --- /dev/null +++ b/cognee/infrastructure/loaders/external/dlt_loader.py @@ -0,0 +1,203 @@ +import os +from typing import List, Dict, Any, Optional +from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface +from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType +from cognee.shared.logging_utils import get_logger + + +class DltLoader(LoaderInterface): + """ + Data loader using DLT (Data Load Tool) for various data sources. + + Supports loading data from REST APIs, databases, cloud storage, + and other data sources through DLT pipelines. + """ + + def __init__(self): + self.logger = get_logger(__name__) + + @property + def supported_extensions(self) -> List[str]: + return [ + ".dlt", # DLT pipeline configuration + ".json", # JSON data + ".jsonl", # JSON Lines + ".csv", # CSV data + ".parquet", # Parquet files + ".yaml", # YAML configuration + ".yml", # YAML configuration + ] + + @property + def supported_mime_types(self) -> List[str]: + return [ + "application/json", + "application/x-ndjson", # JSON Lines + "text/csv", + "application/x-parquet", + "application/yaml", + "text/yaml", + ] + + @property + def loader_name(self) -> str: + return "dlt_loader" + + def get_dependencies(self) -> List[str]: + return ["dlt>=0.4.0"] + + def can_handle(self, file_path: str, mime_type: str = None) -> bool: + """Check if file can be handled by this loader.""" + # Check file extension + file_ext = os.path.splitext(file_path)[1].lower() + if file_ext not in self.supported_extensions: + return False + + # Check MIME type if provided + if mime_type and mime_type not in self.supported_mime_types: + return False + + # Validate dependencies + return self.validate_dependencies() + + async def load(self, file_path: str, source_type: str = "auto", **kwargs) -> LoaderResult: + """ + Load data using DLT pipeline. + + Args: + file_path: Path to the data file or DLT configuration + source_type: Type of data source ("auto", "json", "csv", "parquet", "api") + **kwargs: Additional DLT pipeline configuration + + Returns: + LoaderResult with loaded data and metadata + + Raises: + ImportError: If DLT is not installed + Exception: If data loading fails + """ + try: + import dlt + except ImportError as e: + raise ImportError( + "dlt is required for data loading. Install with: pip install dlt" + ) from e + + try: + self.logger.info(f"Loading data with DLT: {file_path}") + + file_ext = os.path.splitext(file_path)[1].lower() + file_name = os.path.basename(file_path) + file_size = os.path.getsize(file_path) + + # Determine source type if auto + if source_type == "auto": + if file_ext == ".json": + source_type = "json" + elif file_ext == ".jsonl": + source_type = "jsonl" + elif file_ext == ".csv": + source_type = "csv" + elif file_ext == ".parquet": + source_type = "parquet" + elif file_ext in [".yaml", ".yml"]: + source_type = "yaml" + else: + source_type = "file" + + # Load data based on source type + if source_type == "json": + content = self._load_json(file_path) + elif source_type == "jsonl": + content = self._load_jsonl(file_path) + elif source_type == "csv": + content = self._load_csv(file_path) + elif source_type == "parquet": + content = self._load_parquet(file_path) + elif source_type == "yaml": + content = self._load_yaml(file_path) + else: + # Default: read as text + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + + # Determine content type + if isinstance(content, (dict, list)): + content_type = ContentType.STRUCTURED + text_content = str(content) + else: + content_type = ContentType.TEXT + text_content = content + + # Gather metadata + metadata = { + "name": file_name, + "size": file_size, + "extension": file_ext, + "loader": self.loader_name, + "source_type": source_type, + "dlt_version": dlt.__version__, + } + + # Add data-specific metadata + if isinstance(content, list): + metadata["records_count"] = len(content) + elif isinstance(content, dict): + metadata["keys_count"] = len(content) + + return LoaderResult( + content=text_content, + metadata=metadata, + content_type=content_type, + chunks=[text_content], # Single chunk for now + source_info={ + "file_path": file_path, + "source_type": source_type, + "raw_data": content if isinstance(content, (dict, list)) else None, + }, + ) + + except Exception as e: + self.logger.error(f"Failed to load data with DLT from {file_path}: {e}") + raise Exception(f"DLT data loading failed: {e}") from e + + def _load_json(self, file_path: str) -> Dict[str, Any]: + """Load JSON file.""" + import json + + with open(file_path, "r", encoding="utf-8") as f: + return json.load(f) + + def _load_jsonl(self, file_path: str) -> List[Dict[str, Any]]: + """Load JSON Lines file.""" + import json + + data = [] + with open(file_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line: + data.append(json.loads(line)) + return data + + def _load_csv(self, file_path: str) -> str: + """Load CSV file as text.""" + with open(file_path, "r", encoding="utf-8") as f: + return f.read() + + def _load_parquet(self, file_path: str) -> str: + """Load Parquet file (requires pandas).""" + try: + import pandas as pd + + df = pd.read_parquet(file_path) + return df.to_string() + except ImportError: + # Fallback: read as binary and convert to string representation + with open(file_path, "rb") as f: + return f"" + + def _load_yaml(self, file_path: str) -> str: + """Load YAML file as text.""" + with open(file_path, "r", encoding="utf-8") as f: + return f.read() diff --git a/cognee/infrastructure/loaders/external/pypdf_loader.py b/cognee/infrastructure/loaders/external/pypdf_loader.py new file mode 100644 index 000000000..8b35f29bd --- /dev/null +++ b/cognee/infrastructure/loaders/external/pypdf_loader.py @@ -0,0 +1,127 @@ +import os +from typing import List +from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface +from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType +from cognee.shared.logging_utils import get_logger + + +class PyPdfLoader(LoaderInterface): + """ + PDF loader using pypdf library. + + Extracts text content from PDF files page by page, providing + structured page information and handling PDF-specific errors. + """ + + def __init__(self): + self.logger = get_logger(__name__) + + @property + def supported_extensions(self) -> List[str]: + return [".pdf"] + + @property + def supported_mime_types(self) -> List[str]: + return ["application/pdf"] + + @property + def loader_name(self) -> str: + return "pypdf_loader" + + def get_dependencies(self) -> List[str]: + return ["pypdf>=4.0.0"] + + def can_handle(self, file_path: str, mime_type: str = None) -> bool: + """Check if file can be handled by this loader.""" + # Check file extension + if not file_path.lower().endswith(".pdf"): + return False + + # Check MIME type if provided + if mime_type and mime_type != "application/pdf": + return False + + # Validate dependencies + return self.validate_dependencies() + + async def load(self, file_path: str, strict: bool = False, **kwargs) -> LoaderResult: + """ + Load PDF file and extract text content. + + Args: + file_path: Path to the PDF file + strict: Whether to use strict mode for PDF reading + **kwargs: Additional arguments + + Returns: + LoaderResult with extracted text content and metadata + + Raises: + ImportError: If pypdf is not installed + Exception: If PDF processing fails + """ + try: + from pypdf import PdfReader + except ImportError as e: + raise ImportError( + "pypdf is required for PDF processing. Install with: pip install pypdf" + ) from e + + try: + with open(file_path, "rb") as file: + self.logger.info(f"Reading PDF: {file_path}") + reader = PdfReader(file, strict=strict) + + content_parts = [] + page_texts = [] + + for page_num, page in enumerate(reader.pages, 1): + try: + page_text = page.extract_text() + if page_text.strip(): # Only add non-empty pages + page_texts.append(page_text) + content_parts.append(f"Page {page_num}:\n{page_text}\n") + except Exception as e: + self.logger.warning(f"Failed to extract text from page {page_num}: {e}") + continue + + # Combine all content + full_content = "\n".join(content_parts) + + # Gather metadata + metadata = { + "name": os.path.basename(file_path), + "size": os.path.getsize(file_path), + "extension": ".pdf", + "pages": len(reader.pages), + "pages_with_text": len(page_texts), + "loader": self.loader_name, + } + + # Add PDF metadata if available + if reader.metadata: + metadata["pdf_metadata"] = { + "title": reader.metadata.get("/Title", ""), + "author": reader.metadata.get("/Author", ""), + "subject": reader.metadata.get("/Subject", ""), + "creator": reader.metadata.get("/Creator", ""), + "producer": reader.metadata.get("/Producer", ""), + "creation_date": str(reader.metadata.get("/CreationDate", "")), + "modification_date": str(reader.metadata.get("/ModDate", "")), + } + + return LoaderResult( + content=full_content, + metadata=metadata, + content_type=ContentType.TEXT, + chunks=page_texts, # Pre-chunked by page + source_info={ + "file_path": file_path, + "pages": len(reader.pages), + "strict_mode": strict, + }, + ) + + except Exception as e: + self.logger.error(f"Failed to process PDF {file_path}: {e}") + raise Exception(f"PDF processing failed: {e}") from e diff --git a/cognee/infrastructure/loaders/external/unstructured_loader.py b/cognee/infrastructure/loaders/external/unstructured_loader.py new file mode 100644 index 000000000..b60691018 --- /dev/null +++ b/cognee/infrastructure/loaders/external/unstructured_loader.py @@ -0,0 +1,169 @@ +import os +from typing import List +from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface +from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType +from cognee.shared.logging_utils import get_logger + + +class UnstructuredLoader(LoaderInterface): + """ + Document loader using the unstructured library. + + Handles various document formats including docx, pptx, xlsx, odt, etc. + Uses the unstructured library's auto-partition functionality. + """ + + def __init__(self): + self.logger = get_logger(__name__) + + @property + def supported_extensions(self) -> List[str]: + return [ + ".docx", + ".doc", + ".odt", # Word documents + ".xlsx", + ".xls", + ".ods", # Spreadsheets + ".pptx", + ".ppt", + ".odp", # Presentations + ".rtf", + ".html", + ".htm", # Rich text and HTML + ".eml", + ".msg", # Email formats + ".epub", # eBooks + ] + + @property + def supported_mime_types(self) -> List[str]: + return [ + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", # docx + "application/msword", # doc + "application/vnd.oasis.opendocument.text", # odt + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # xlsx + "application/vnd.ms-excel", # xls + "application/vnd.oasis.opendocument.spreadsheet", # ods + "application/vnd.openxmlformats-officedocument.presentationml.presentation", # pptx + "application/vnd.ms-powerpoint", # ppt + "application/vnd.oasis.opendocument.presentation", # odp + "application/rtf", # rtf + "text/html", # html + "message/rfc822", # eml + "application/epub+zip", # epub + ] + + @property + def loader_name(self) -> str: + return "unstructured_loader" + + def get_dependencies(self) -> List[str]: + return ["unstructured>=0.10.0"] + + def can_handle(self, file_path: str, mime_type: str = None) -> bool: + """Check if file can be handled by this loader.""" + # Check file extension + file_ext = os.path.splitext(file_path)[1].lower() + if file_ext not in self.supported_extensions: + return False + + # Check MIME type if provided + if mime_type and mime_type not in self.supported_mime_types: + return False + + # Validate dependencies + return self.validate_dependencies() + + async def load(self, file_path: str, strategy: str = "auto", **kwargs) -> LoaderResult: + """ + Load document using unstructured library. + + Args: + file_path: Path to the document file + strategy: Partitioning strategy ("auto", "fast", "hi_res", "ocr_only") + **kwargs: Additional arguments passed to unstructured partition + + Returns: + LoaderResult with extracted text content and metadata + + Raises: + ImportError: If unstructured is not installed + Exception: If document processing fails + """ + try: + from unstructured.partition.auto import partition + except ImportError as e: + raise ImportError( + "unstructured is required for document processing. " + "Install with: pip install unstructured" + ) from e + + try: + self.logger.info(f"Processing document: {file_path}") + + # Determine content type from file extension + file_ext = os.path.splitext(file_path)[1].lower() + content_type_hint = None + + # Get file size and basic info + file_size = os.path.getsize(file_path) + file_name = os.path.basename(file_path) + + # Set partitioning parameters + partition_kwargs = {"filename": file_path, "strategy": strategy, **kwargs} + + # Use partition to extract elements + elements = partition(**partition_kwargs) + + # Process elements into text content + text_parts = [] + element_info = [] + + for element in elements: + element_text = str(element).strip() + if element_text: + text_parts.append(element_text) + element_info.append( + { + "type": type(element).__name__, + "text": element_text[:100] + "..." + if len(element_text) > 100 + else element_text, + } + ) + + # Combine all text content + full_content = "\n\n".join(text_parts) + + # Determine content type based on structure + content_type = ContentType.STRUCTURED if len(element_info) > 1 else ContentType.TEXT + + # Gather metadata + metadata = { + "name": file_name, + "size": file_size, + "extension": file_ext, + "loader": self.loader_name, + "elements_count": len(elements), + "text_elements_count": len(text_parts), + "strategy": strategy, + "element_types": list(set(info["type"] for info in element_info)), + } + + return LoaderResult( + content=full_content, + metadata=metadata, + content_type=content_type, + chunks=text_parts, # Pre-chunked by elements + source_info={ + "file_path": file_path, + "strategy": strategy, + "elements": element_info[:10], # First 10 elements for debugging + "total_elements": len(elements), + }, + ) + + except Exception as e: + self.logger.error(f"Failed to process document {file_path}: {e}") + raise Exception(f"Document processing failed: {e}") from e diff --git a/cognee/infrastructure/loaders/get_loader_engine.py b/cognee/infrastructure/loaders/get_loader_engine.py new file mode 100644 index 000000000..8f249874c --- /dev/null +++ b/cognee/infrastructure/loaders/get_loader_engine.py @@ -0,0 +1,20 @@ +from functools import lru_cache +from .config import get_loader_config +from .LoaderEngine import LoaderEngine +from .create_loader_engine import create_loader_engine + + +@lru_cache +def get_loader_engine() -> LoaderEngine: + """ + Factory function to get loader engine. + + Follows cognee's pattern with @lru_cache for efficient reuse + of engine instances. Configuration is loaded from environment + variables and settings. + + Returns: + Cached LoaderEngine instance configured with current settings + """ + config = get_loader_config() + return create_loader_engine(**config.to_dict()) diff --git a/cognee/infrastructure/loaders/models/LoaderResult.py b/cognee/infrastructure/loaders/models/LoaderResult.py new file mode 100644 index 000000000..bf8cb77d3 --- /dev/null +++ b/cognee/infrastructure/loaders/models/LoaderResult.py @@ -0,0 +1,47 @@ +from pydantic import BaseModel +from typing import Optional, Dict, Any, List +from enum import Enum + + +class ContentType(Enum): + """Content type classification for loaded files""" + + TEXT = "text" + STRUCTURED = "structured" + BINARY = "binary" + + +class LoaderResult(BaseModel): + """ + Standardized output format for all file loaders. + + This model ensures consistent data structure across all loader implementations, + following cognee's pattern of using Pydantic models for data validation. + """ + + content: str # Primary text content extracted from file + metadata: Dict[str, Any] # File metadata (name, size, type, loader info, etc.) + content_type: ContentType # Content classification + chunks: Optional[List[str]] = None # Pre-chunked content if available + source_info: Optional[Dict[str, Any]] = None # Source-specific information + + def to_dict(self) -> Dict[str, Any]: + """ + Convert the loader result to a dictionary format. + + Returns: + Dict containing all loader result data with string-serialized content_type + """ + return { + "content": self.content, + "metadata": self.metadata, + "content_type": self.content_type.value, + "source_info": self.source_info or {}, + "chunks": self.chunks, + } + + class Config: + """Pydantic configuration following cognee patterns""" + + use_enum_values = True + validate_assignment = True diff --git a/cognee/infrastructure/loaders/models/__init__.py b/cognee/infrastructure/loaders/models/__init__.py new file mode 100644 index 000000000..53ebae0fb --- /dev/null +++ b/cognee/infrastructure/loaders/models/__init__.py @@ -0,0 +1,3 @@ +from .LoaderResult import LoaderResult, ContentType + +__all__ = ["LoaderResult", "ContentType"] diff --git a/cognee/infrastructure/loaders/supported_loaders.py b/cognee/infrastructure/loaders/supported_loaders.py new file mode 100644 index 000000000..6c7ebec3b --- /dev/null +++ b/cognee/infrastructure/loaders/supported_loaders.py @@ -0,0 +1,3 @@ +# Registry for loader implementations +# Follows cognee's pattern used in databases/vector/supported_databases.py +supported_loaders = {} diff --git a/cognee/infrastructure/loaders/use_loader.py b/cognee/infrastructure/loaders/use_loader.py new file mode 100644 index 000000000..4c6c28080 --- /dev/null +++ b/cognee/infrastructure/loaders/use_loader.py @@ -0,0 +1,22 @@ +from .supported_loaders import supported_loaders + + +def use_loader(loader_name: str, loader_class): + """ + Register a loader at runtime. + + Follows cognee's pattern used in databases for adapter registration. + This allows external packages and custom loaders to be registered + into the loader system. + + Args: + loader_name: Unique name for the loader + loader_class: Loader class implementing LoaderInterface + + Example: + from cognee.infrastructure.loaders import use_loader + from my_package import MyCustomLoader + + use_loader("my_custom_loader", MyCustomLoader) + """ + supported_loaders[loader_name] = loader_class diff --git a/cognee/modules/graph/utils/expand_with_nodes_and_edges.py b/cognee/modules/graph/utils/expand_with_nodes_and_edges.py index 9f0b0bd12..d3a0a8522 100644 --- a/cognee/modules/graph/utils/expand_with_nodes_and_edges.py +++ b/cognee/modules/graph/utils/expand_with_nodes_and_edges.py @@ -22,16 +22,16 @@ def _create_edge_key(source_id: str, target_id: str, relationship_name: str) -> def _process_ontology_nodes( - ontology_nodes: list, - data_chunk: DocumentChunk, - added_nodes_map: dict, - added_ontology_nodes_map: dict + ontology_nodes: list, + data_chunk: DocumentChunk, + added_nodes_map: dict, + added_ontology_nodes_map: dict, ) -> None: """Process and store ontology nodes""" for ontology_node in ontology_nodes: ont_node_id = generate_node_id(ontology_node.name) ont_node_name = generate_node_name(ontology_node.name) - + if ontology_node.category == "classes": ont_node_key = _create_node_key(ont_node_id, "type") if ont_node_key not in added_nodes_map and ont_node_key not in added_ontology_nodes_map: @@ -41,7 +41,7 @@ def _process_ontology_nodes( description=ont_node_name, ontology_valid=True, ) - + elif ontology_node.category == "individuals": ont_node_key = _create_node_key(ont_node_id, "entity") if ont_node_key not in added_nodes_map and ont_node_key not in added_ontology_nodes_map: @@ -55,9 +55,7 @@ def _process_ontology_nodes( def _process_ontology_edges( - ontology_edges: list, - existing_edges_map: dict, - ontology_relationships: list + ontology_edges: list, existing_edges_map: dict, ontology_relationships: list ) -> None: """Process ontology edges and add them if new""" for source, relation, target in ontology_edges: @@ -65,7 +63,7 @@ def _process_ontology_edges( target_node_id = generate_node_id(target) relationship_name = generate_edge_name(relation) edge_key = _create_edge_key(source_node_id, target_node_id, relationship_name) - + if edge_key not in existing_edges_map: ontology_relationships.append( ( @@ -84,41 +82,43 @@ def _process_ontology_edges( def _create_type_node( - node_type: str, - ontology_resolver: OntologyResolver, - added_nodes_map: dict, + node_type: str, + ontology_resolver: OntologyResolver, + added_nodes_map: dict, added_ontology_nodes_map: dict, - name_mapping: dict, - key_mapping: dict, - data_chunk: DocumentChunk, - existing_edges_map: dict, - ontology_relationships: list + name_mapping: dict, + key_mapping: dict, + data_chunk: DocumentChunk, + existing_edges_map: dict, + ontology_relationships: list, ) -> EntityType: """Create or retrieve a type node with ontology validation""" node_id = generate_node_id(node_type) node_name = generate_node_name(node_type) type_node_key = _create_node_key(node_id, "type") - + if type_node_key in added_nodes_map or type_node_key in key_mapping: - return added_nodes_map.get(type_node_key) or added_nodes_map.get(key_mapping.get(type_node_key)) - + return added_nodes_map.get(type_node_key) or added_nodes_map.get( + key_mapping.get(type_node_key) + ) + # Get ontology validation ontology_nodes, ontology_edges, closest_class = ontology_resolver.get_subgraph( node_name=node_name, node_type="classes" ) - + ontology_validated = bool(closest_class) - + if ontology_validated: old_key = type_node_key node_id = generate_node_id(closest_class.name) type_node_key = _create_node_key(node_id, "type") new_node_name = generate_node_name(closest_class.name) - + name_mapping[node_name] = closest_class.name key_mapping[old_key] = type_node_key node_name = new_node_name - + type_node = EntityType( id=node_id, name=node_name, @@ -126,55 +126,57 @@ def _create_type_node( description=node_name, ontology_valid=ontology_validated, ) - + added_nodes_map[type_node_key] = type_node - + # Process ontology nodes and edges _process_ontology_nodes(ontology_nodes, data_chunk, added_nodes_map, added_ontology_nodes_map) _process_ontology_edges(ontology_edges, existing_edges_map, ontology_relationships) - + return type_node def _create_entity_node( - node_id: str, - node_name: str, - node_description: str, - type_node: EntityType, - ontology_resolver: OntologyResolver, - added_nodes_map: dict, + node_id: str, + node_name: str, + node_description: str, + type_node: EntityType, + ontology_resolver: OntologyResolver, + added_nodes_map: dict, added_ontology_nodes_map: dict, - name_mapping: dict, - key_mapping: dict, - data_chunk: DocumentChunk, - existing_edges_map: dict, - ontology_relationships: list + name_mapping: dict, + key_mapping: dict, + data_chunk: DocumentChunk, + existing_edges_map: dict, + ontology_relationships: list, ) -> Entity: """Create or retrieve an entity node with ontology validation""" generated_node_id = generate_node_id(node_id) generated_node_name = generate_node_name(node_name) entity_node_key = _create_node_key(generated_node_id, "entity") - + if entity_node_key in added_nodes_map or entity_node_key in key_mapping: - return added_nodes_map.get(entity_node_key) or added_nodes_map.get(key_mapping.get(entity_node_key)) - + return added_nodes_map.get(entity_node_key) or added_nodes_map.get( + key_mapping.get(entity_node_key) + ) + # Get ontology validation ontology_nodes, ontology_edges, start_ent_ont = ontology_resolver.get_subgraph( node_name=generated_node_name, node_type="individuals" ) - + ontology_validated = bool(start_ent_ont) - + if ontology_validated: old_key = entity_node_key generated_node_id = generate_node_id(start_ent_ont.name) entity_node_key = _create_node_key(generated_node_id, "entity") new_node_name = generate_node_name(start_ent_ont.name) - + name_mapping[generated_node_name] = start_ent_ont.name key_mapping[old_key] = entity_node_key generated_node_name = new_node_name - + entity_node = Entity( id=generated_node_id, name=generated_node_name, @@ -183,42 +185,58 @@ def _create_entity_node( ontology_valid=ontology_validated, belongs_to_set=data_chunk.belongs_to_set, ) - + added_nodes_map[entity_node_key] = entity_node - + # Process ontology nodes and edges _process_ontology_nodes(ontology_nodes, data_chunk, added_nodes_map, added_ontology_nodes_map) _process_ontology_edges(ontology_edges, existing_edges_map, ontology_relationships) - + return entity_node def _process_graph_nodes( - data_chunk: DocumentChunk, - graph: KnowledgeGraph, - ontology_resolver: OntologyResolver, - added_nodes_map: dict, + data_chunk: DocumentChunk, + graph: KnowledgeGraph, + ontology_resolver: OntologyResolver, + added_nodes_map: dict, added_ontology_nodes_map: dict, - name_mapping: dict, - key_mapping: dict, - existing_edges_map: dict, - ontology_relationships: list + name_mapping: dict, + key_mapping: dict, + existing_edges_map: dict, + ontology_relationships: list, ) -> None: """Process nodes in a knowledge graph""" for node in graph.nodes: # Create type node type_node = _create_type_node( - node.type, ontology_resolver, added_nodes_map, added_ontology_nodes_map, - name_mapping, key_mapping, data_chunk, existing_edges_map, ontology_relationships + node.type, + ontology_resolver, + added_nodes_map, + added_ontology_nodes_map, + name_mapping, + key_mapping, + data_chunk, + existing_edges_map, + ontology_relationships, ) - + # Create entity node entity_node = _create_entity_node( - node.id, node.name, node.description, type_node, ontology_resolver, - added_nodes_map, added_ontology_nodes_map, name_mapping, key_mapping, - data_chunk, existing_edges_map, ontology_relationships + node.id, + node.name, + node.description, + type_node, + ontology_resolver, + added_nodes_map, + added_ontology_nodes_map, + name_mapping, + key_mapping, + data_chunk, + existing_edges_map, + ontology_relationships, ) - + # Add entity to data chunk if data_chunk.contains is None: data_chunk.contains = [] @@ -226,22 +244,19 @@ def _process_graph_nodes( def _process_graph_edges( - graph: KnowledgeGraph, - name_mapping: dict, - existing_edges_map: dict, - relationships: list + graph: KnowledgeGraph, name_mapping: dict, existing_edges_map: dict, relationships: list ) -> None: """Process edges in a knowledge graph""" for edge in graph.edges: # Apply name mapping if exists source_id = name_mapping.get(edge.source_node_id, edge.source_node_id) target_id = name_mapping.get(edge.target_node_id, edge.target_node_id) - + source_node_id = generate_node_id(source_id) target_node_id = generate_node_id(target_id) relationship_name = generate_edge_name(edge.relationship_name) edge_key = _create_edge_key(source_node_id, target_node_id, relationship_name) - + if edge_key not in existing_edges_map: relationships.append( ( @@ -270,33 +285,40 @@ def expand_with_nodes_and_edges( """ if existing_edges_map is None: existing_edges_map = {} - + if ontology_resolver is None: ontology_resolver = OntologyResolver() - + added_nodes_map = {} added_ontology_nodes_map = {} relationships = [] ontology_relationships = [] name_mapping = {} key_mapping = {} - + # Process each chunk and its corresponding graph for data_chunk, graph in zip(data_chunks, chunk_graphs): if not graph: continue - + # Process nodes first _process_graph_nodes( - data_chunk, graph, ontology_resolver, added_nodes_map, added_ontology_nodes_map, - name_mapping, key_mapping, existing_edges_map, ontology_relationships + data_chunk, + graph, + ontology_resolver, + added_nodes_map, + added_ontology_nodes_map, + name_mapping, + key_mapping, + existing_edges_map, + ontology_relationships, ) - + # Then process edges _process_graph_edges(graph, name_mapping, existing_edges_map, relationships) - + # Return combined results graph_nodes = list(added_ontology_nodes_map.values()) graph_edges = relationships + ontology_relationships - + return graph_nodes, graph_edges diff --git a/cognee/tasks/ingestion/adapters/__init__.py b/cognee/tasks/ingestion/adapters/__init__.py new file mode 100644 index 000000000..51c7875c1 --- /dev/null +++ b/cognee/tasks/ingestion/adapters/__init__.py @@ -0,0 +1,11 @@ +""" +Adapters for bridging the new loader system with existing ingestion pipeline. + +This module provides compatibility layers to integrate the plugin-based loader +system with cognee's existing data processing pipeline while maintaining +backward compatibility and preserving permission logic. +""" + +from .loader_to_ingestion_adapter import LoaderToIngestionAdapter + +__all__ = ["LoaderToIngestionAdapter"] diff --git a/cognee/tasks/ingestion/adapters/loader_to_ingestion_adapter.py b/cognee/tasks/ingestion/adapters/loader_to_ingestion_adapter.py new file mode 100644 index 000000000..912337659 --- /dev/null +++ b/cognee/tasks/ingestion/adapters/loader_to_ingestion_adapter.py @@ -0,0 +1,240 @@ +import os +import tempfile +from typing import BinaryIO, Union, Optional, Any +from io import StringIO, BytesIO + +from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType +from cognee.modules.ingestion.data_types import IngestionData, TextData, BinaryData +from cognee.infrastructure.files import get_file_metadata +from cognee.shared.logging_utils import get_logger + + +class LoaderResultToIngestionData(IngestionData): + """ + Adapter class that wraps LoaderResult to be compatible with IngestionData interface. + + This maintains backward compatibility with existing cognee ingestion pipeline + while enabling the new loader system. + """ + + def __init__(self, loader_result: LoaderResult, original_file_path: str = None): + self.loader_result = loader_result + self.original_file_path = original_file_path + self._cached_metadata = None + self.logger = get_logger(__name__) + + def get_identifier(self) -> str: + """ + Get content identifier for deduplication. + + Uses the loader result's source info or generates hash from content. + """ + # Try to get file hash from metadata first + if "content_hash" in self.loader_result.metadata: + return self.loader_result.metadata["content_hash"] + + # Fallback: generate hash from content + import hashlib + + content_bytes = self.loader_result.content.encode("utf-8") + content_hash = hashlib.md5(content_bytes).hexdigest() + + # Add content type prefix for better identification + content_type = self.loader_result.content_type.value + return f"{content_type}_{content_hash}" + + def get_metadata(self) -> dict: + """ + Get file metadata in the format expected by existing pipeline. + + Converts LoaderResult metadata to the format used by IngestionData. + """ + if self._cached_metadata is not None: + return self._cached_metadata + + # Start with loader result metadata + metadata = self.loader_result.metadata.copy() + + # Ensure required fields are present + if "name" not in metadata: + if self.original_file_path: + metadata["name"] = os.path.basename(self.original_file_path) + else: + # Generate name from content hash + content_hash = self.get_identifier().split("_")[-1][:8] + ext = metadata.get("extension", ".txt") + metadata["name"] = f"content_{content_hash}{ext}" + + if "content_hash" not in metadata: + metadata["content_hash"] = self.get_identifier() + + if "file_path" not in metadata and self.original_file_path: + metadata["file_path"] = self.original_file_path + + # Add mime type if not present + if "mime_type" not in metadata: + ext = metadata.get("extension", "").lower() + mime_type_map = { + ".txt": "text/plain", + ".md": "text/markdown", + ".csv": "text/csv", + ".json": "application/json", + ".pdf": "application/pdf", + ".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation", + } + metadata["mime_type"] = mime_type_map.get(ext, "application/octet-stream") + + self._cached_metadata = metadata + return metadata + + def get_data(self) -> Union[str, BinaryIO]: + """ + Get data content in format expected by existing pipeline. + + Returns content as string for text data or creates a file-like object + for binary data to maintain compatibility. + """ + if self.loader_result.content_type == ContentType.TEXT: + return self.loader_result.content + + # For structured or binary content, return as string for now + # The existing pipeline expects text content for processing + return self.loader_result.content + + +class LoaderToIngestionAdapter: + """ + Adapter that bridges the new loader system with existing ingestion pipeline. + + This class provides methods to process files using the loader system + while maintaining compatibility with the existing IngestionData interface. + """ + + def __init__(self): + self.logger = get_logger(__name__) + + async def process_file_with_loaders( + self, + file_path: str, + s3fs: Optional[Any] = None, + preferred_loaders: Optional[list] = None, + loader_config: Optional[dict] = None, + ) -> IngestionData: + """ + Process a file using the loader system and return IngestionData. + + Args: + file_path: Path to the file to process + s3fs: S3 filesystem (for compatibility with existing code) + preferred_loaders: List of preferred loader names + loader_config: Configuration for specific loaders + + Returns: + IngestionData compatible object + + Raises: + Exception: If no loader can handle the file + """ + from cognee.infrastructure.loaders import get_loader_engine + + try: + # Get the loader engine + engine = get_loader_engine() + + # Determine MIME type if possible + mime_type = None + try: + import mimetypes + + mime_type, _ = mimetypes.guess_type(file_path) + except Exception: + pass + + # Load file using loader system + self.logger.info(f"Processing file with loaders: {file_path}") + + # Extract loader-specific config if provided + kwargs = {} + if loader_config: + # Find the first available loader that matches our preferred loaders + loader = engine.get_loader(file_path, mime_type, preferred_loaders) + if loader and loader.loader_name in loader_config: + kwargs = loader_config[loader.loader_name] + + loader_result = await engine.load_file( + file_path, mime_type=mime_type, preferred_loaders=preferred_loaders, **kwargs + ) + + # Convert to IngestionData compatible format + return LoaderResultToIngestionData(loader_result, file_path) + + except Exception as e: + self.logger.warning(f"Loader system failed for {file_path}: {e}") + # Fallback to existing classification system + return await self._fallback_to_existing_system(file_path, s3fs) + + async def _fallback_to_existing_system( + self, file_path: str, s3fs: Optional[Any] = None + ) -> IngestionData: + """ + Fallback to existing ingestion.classify() system for backward compatibility. + + This ensures that even if the loader system fails, we can still process + files using the original classification method. + """ + from cognee.modules.ingestion import classify + + self.logger.info(f"Falling back to existing classification system for: {file_path}") + + # Open file and classify using existing system + if file_path.startswith("s3://"): + if s3fs: + with s3fs.open(file_path, "rb") as file: + return classify(file, s3fs=s3fs) + else: + raise ValueError("S3 file path provided but no s3fs available") + else: + # Handle local files and file:// URLs + local_path = file_path.replace("file://", "") + with open(local_path, "rb") as file: + return classify(file) + + def is_text_content(self, data: Union[str, Any]) -> bool: + """ + Check if the provided data is text content (not a file path). + + Args: + data: The data to check + + Returns: + True if data is text content, False if it's a file path + """ + if not isinstance(data, str): + return False + + # Check if it's a file path + if ( + data.startswith("/") + or data.startswith("file://") + or data.startswith("s3://") + or (len(data) > 1 and data[1] == ":") + ): # Windows drive paths + return False + + return True + + def create_text_ingestion_data(self, content: str) -> IngestionData: + """ + Create IngestionData for text content. + + Args: + content: Text content to wrap + + Returns: + IngestionData compatible object + """ + from cognee.modules.ingestion.data_types import TextData + + return TextData(content) diff --git a/cognee/tasks/ingestion/plugin_ingest_data.py b/cognee/tasks/ingestion/plugin_ingest_data.py new file mode 100644 index 000000000..1b3101a2d --- /dev/null +++ b/cognee/tasks/ingestion/plugin_ingest_data.py @@ -0,0 +1,223 @@ +import json +import inspect +from uuid import UUID +from typing import Union, BinaryIO, Any, List, Optional + +import cognee.modules.ingestion as ingestion +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.modules.data.models import Data +from cognee.modules.users.models import User +from cognee.modules.users.methods import get_default_user +from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets +from cognee.modules.data.methods import ( + get_authorized_existing_datasets, + get_dataset_data, + load_or_create_datasets, +) +from cognee.shared.logging_utils import get_logger + +from .save_data_item_to_storage import save_data_item_to_storage +from .adapters import LoaderToIngestionAdapter +from cognee.api.v1.add.config import get_s3_config + + +logger = get_logger(__name__) + + +async def plugin_ingest_data( + data: Any, + dataset_name: str, + user: User, + node_set: Optional[List[str]] = None, + dataset_id: UUID = None, + preferred_loaders: Optional[List[str]] = None, + loader_config: Optional[dict] = None, +): + """ + Plugin-based data ingestion using the loader system. + + This function maintains full backward compatibility with the existing + ingest_data function while adding support for the new loader system. + + Args: + data: The data to ingest + dataset_name: Name of the dataset + user: User object for permissions + node_set: Optional node set for organization + dataset_id: Optional specific dataset ID + preferred_loaders: List of preferred loader names to try first + loader_config: Configuration for specific loaders + + Returns: + List of Data objects that were ingested + """ + if not user: + user = await get_default_user() + + # Initialize S3 support (maintain existing behavior) + s3_config = get_s3_config() + fs = None + if s3_config.aws_access_key_id is not None and s3_config.aws_secret_access_key is not None: + import s3fs + + fs = s3fs.S3FileSystem( + key=s3_config.aws_access_key_id, secret=s3_config.aws_secret_access_key, anon=False + ) + + # Initialize the loader adapter + loader_adapter = LoaderToIngestionAdapter() + + def open_data_file(file_path: str): + """Open file with S3 support (preserves existing behavior).""" + if file_path.startswith("s3://"): + return fs.open(file_path, mode="rb") + else: + local_path = file_path.replace("file://", "") + return open(local_path, mode="rb") + + def get_external_metadata_dict(data_item: Union[BinaryIO, str, Any]) -> dict[str, Any]: + """Get external metadata (preserves existing behavior).""" + if hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")): + return {"metadata": data_item.dict(), "origin": str(type(data_item))} + else: + return {} + + async def store_data_to_dataset( + data: Any, + dataset_name: str, + user: User, + node_set: Optional[List[str]] = None, + dataset_id: UUID = None, + ): + """ + Core data storage logic with plugin-based file processing. + + This function preserves all existing permission and database logic + while using the new loader system for file processing. + """ + logger.info(f"Plugin-based ingestion starting for dataset: {dataset_name}") + + # Preserve existing dataset creation and permission logic + user_datasets = await get_specific_user_permission_datasets(user.id, ["write"]) + existing_datasets = await get_authorized_existing_datasets(user.id, dataset_name, ["write"]) + + datasets = await load_or_create_datasets( + user_datasets, existing_datasets, dataset_name, user, dataset_id + ) + + dataset = datasets[0] + + new_datapoints = [] + existing_data_points = [] + dataset_new_data_points = [] + + # Get existing dataset data for deduplication (preserve existing logic) + dataset_data: list[Data] = await get_dataset_data(dataset.id) + dataset_data_map = {str(data.id): True for data in dataset_data} + + for data_item in data: + file_path = await save_data_item_to_storage(data_item, dataset_name) + + # NEW: Use loader system or existing classification based on data type + try: + if loader_adapter.is_text_content(data_item): + # Handle text content (preserve existing behavior) + logger.info("Processing text content with existing system") + classified_data = ingestion.classify(data_item) + else: + # Use loader system for file paths + logger.info(f"Processing file with loader system: {file_path}") + classified_data = await loader_adapter.process_file_with_loaders( + file_path, + s3fs=fs, + preferred_loaders=preferred_loaders, + loader_config=loader_config, + ) + + except Exception as e: + logger.warning(f"Plugin system failed for {file_path}, falling back: {e}") + # Fallback to existing system for full backward compatibility + with open_data_file(file_path) as file: + classified_data = ingestion.classify(file, s3fs=fs) + + # Preserve all existing data processing logic + data_id = ingestion.identify(classified_data, user) + file_metadata = classified_data.get_metadata() + + from sqlalchemy import select + + db_engine = get_relational_engine() + + # Check if data should be updated (preserve existing logic) + async with db_engine.get_async_session() as session: + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() + + ext_metadata = get_external_metadata_dict(data_item) + + if node_set: + ext_metadata["node_set"] = node_set + + # Preserve existing data point creation/update logic + if data_point is not None: + data_point.name = file_metadata["name"] + data_point.raw_data_location = file_metadata["file_path"] + data_point.extension = file_metadata["extension"] + data_point.mime_type = file_metadata["mime_type"] + data_point.owner_id = user.id + data_point.content_hash = file_metadata["content_hash"] + data_point.external_metadata = ext_metadata + data_point.node_set = json.dumps(node_set) if node_set else None + + if str(data_point.id) in dataset_data_map: + existing_data_points.append(data_point) + else: + dataset_new_data_points.append(data_point) + dataset_data_map[str(data_point.id)] = True + else: + if str(data_id) in dataset_data_map: + continue + + data_point = Data( + id=data_id, + name=file_metadata["name"], + raw_data_location=file_metadata["file_path"], + extension=file_metadata["extension"], + mime_type=file_metadata["mime_type"], + owner_id=user.id, + content_hash=file_metadata["content_hash"], + external_metadata=ext_metadata, + node_set=json.dumps(node_set) if node_set else None, + token_count=-1, + ) + + new_datapoints.append(data_point) + dataset_data_map[str(data_point.id)] = True + + # Preserve existing database operations + async with db_engine.get_async_session() as session: + if dataset not in session: + session.add(dataset) + + if len(new_datapoints) > 0: + dataset.data.extend(new_datapoints) + + if len(existing_data_points) > 0: + for data_point in existing_data_points: + await session.merge(data_point) + + if len(dataset_new_data_points) > 0: + dataset.data.extend(dataset_new_data_points) + + await session.merge(dataset) + await session.commit() + + logger.info( + f"Plugin-based ingestion completed. New: {len(new_datapoints)}, " + + f"Updated: {len(existing_data_points)}, Dataset new: {len(dataset_new_data_points)}" + ) + + return existing_data_points + dataset_new_data_points + new_datapoints + + return await store_data_to_dataset(data, dataset_name, user, node_set, dataset_id) diff --git a/infrastructure/loaders/LoaderEngine.py b/infrastructure/loaders/LoaderEngine.py new file mode 100644 index 000000000..cd75da6cb --- /dev/null +++ b/infrastructure/loaders/LoaderEngine.py @@ -0,0 +1,237 @@ +import os +import importlib.util +from typing import Dict, List, Optional +from .LoaderInterface import LoaderInterface +from .models.LoaderResult import LoaderResult +from cognee.shared.logging_utils import get_logger + + +class LoaderEngine: + """ + Main loader engine for managing file loaders. + + Follows cognee's adapter pattern similar to database engines, + providing a centralized system for file loading operations. + """ + + def __init__( + self, + loader_directories: List[str], + default_loader_priority: List[str], + fallback_loader: str = "text_loader", + enable_dependency_validation: bool = True, + ): + """ + Initialize the loader engine. + + Args: + loader_directories: Directories to search for loader implementations + default_loader_priority: Priority order for loader selection + fallback_loader: Default loader to use when no other matches + enable_dependency_validation: Whether to validate loader dependencies + """ + self._loaders: Dict[str, LoaderInterface] = {} + self._extension_map: Dict[str, List[LoaderInterface]] = {} + self._mime_type_map: Dict[str, List[LoaderInterface]] = {} + self.loader_directories = loader_directories + self.default_loader_priority = default_loader_priority + self.fallback_loader = fallback_loader + self.enable_dependency_validation = enable_dependency_validation + self.logger = get_logger(__name__) + + def register_loader(self, loader: LoaderInterface) -> bool: + """ + Register a loader with the engine. + + Args: + loader: LoaderInterface implementation to register + + Returns: + True if loader was registered successfully, False otherwise + """ + # Validate dependencies if enabled + if self.enable_dependency_validation and not loader.validate_dependencies(): + self.logger.warning( + f"Skipping loader '{loader.loader_name}' - missing dependencies: " + f"{loader.get_dependencies()}" + ) + return False + + self._loaders[loader.loader_name] = loader + + # Map extensions to loaders + for ext in loader.supported_extensions: + ext_lower = ext.lower() + if ext_lower not in self._extension_map: + self._extension_map[ext_lower] = [] + self._extension_map[ext_lower].append(loader) + + # Map mime types to loaders + for mime_type in loader.supported_mime_types: + if mime_type not in self._mime_type_map: + self._mime_type_map[mime_type] = [] + self._mime_type_map[mime_type].append(loader) + + self.logger.info(f"Registered loader: {loader.loader_name}") + return True + + def get_loader( + self, file_path: str, mime_type: str = None, preferred_loaders: List[str] = None + ) -> Optional[LoaderInterface]: + """ + Get appropriate loader for a file. + + Args: + file_path: Path to the file to be processed + mime_type: Optional MIME type of the file + preferred_loaders: List of preferred loader names to try first + + Returns: + LoaderInterface that can handle the file, or None if not found + """ + ext = os.path.splitext(file_path)[1].lower() + + # Try preferred loaders first + if preferred_loaders: + for loader_name in preferred_loaders: + if loader_name in self._loaders: + loader = self._loaders[loader_name] + if loader.can_handle(file_path, mime_type): + return loader + + # Try priority order + for loader_name in self.default_loader_priority: + if loader_name in self._loaders: + loader = self._loaders[loader_name] + if loader.can_handle(file_path, mime_type): + return loader + + # Try mime type mapping + if mime_type and mime_type in self._mime_type_map: + for loader in self._mime_type_map[mime_type]: + if loader.can_handle(file_path, mime_type): + return loader + + # Try extension mapping + if ext in self._extension_map: + for loader in self._extension_map[ext]: + if loader.can_handle(file_path, mime_type): + return loader + + # Fallback loader + if self.fallback_loader in self._loaders: + fallback = self._loaders[self.fallback_loader] + if fallback.can_handle(file_path, mime_type): + return fallback + + return None + + async def load_file( + self, file_path: str, mime_type: str = None, preferred_loaders: List[str] = None, **kwargs + ) -> LoaderResult: + """ + Load file using appropriate loader. + + Args: + file_path: Path to the file to be processed + mime_type: Optional MIME type of the file + preferred_loaders: List of preferred loader names to try first + **kwargs: Additional loader-specific configuration + + Returns: + LoaderResult containing processed content and metadata + + Raises: + ValueError: If no suitable loader is found + Exception: If file processing fails + """ + loader = self.get_loader(file_path, mime_type, preferred_loaders) + if not loader: + raise ValueError(f"No loader found for file: {file_path}") + + self.logger.debug(f"Loading {file_path} with {loader.loader_name}") + return await loader.load(file_path, **kwargs) + + def discover_loaders(self): + """ + Auto-discover loaders from configured directories. + + Scans loader directories for Python modules containing + LoaderInterface implementations and registers them. + """ + for directory in self.loader_directories: + if os.path.exists(directory): + self._discover_in_directory(directory) + + def _discover_in_directory(self, directory: str): + """ + Discover loaders in a specific directory. + + Args: + directory: Directory path to scan for loader implementations + """ + try: + for file_name in os.listdir(directory): + if file_name.endswith(".py") and not file_name.startswith("_"): + module_name = file_name[:-3] + file_path = os.path.join(directory, file_name) + + try: + spec = importlib.util.spec_from_file_location(module_name, file_path) + if spec and spec.loader: + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # Look for loader classes + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and issubclass(attr, LoaderInterface) + and attr != LoaderInterface + ): + # Instantiate and register the loader + try: + loader_instance = attr() + self.register_loader(loader_instance) + except Exception as e: + self.logger.warning( + f"Failed to instantiate loader {attr_name}: {e}" + ) + + except Exception as e: + self.logger.warning(f"Failed to load module {module_name}: {e}") + + except OSError as e: + self.logger.warning(f"Failed to scan directory {directory}: {e}") + + def get_available_loaders(self) -> List[str]: + """ + Get list of available loader names. + + Returns: + List of registered loader names + """ + return list(self._loaders.keys()) + + def get_loader_info(self, loader_name: str) -> Dict[str, any]: + """ + Get information about a specific loader. + + Args: + loader_name: Name of the loader to inspect + + Returns: + Dictionary containing loader information + """ + if loader_name not in self._loaders: + return {} + + loader = self._loaders[loader_name] + return { + "name": loader.loader_name, + "extensions": loader.supported_extensions, + "mime_types": loader.supported_mime_types, + "dependencies": loader.get_dependencies(), + "available": loader.validate_dependencies(), + } diff --git a/infrastructure/loaders/LoaderInterface.py b/infrastructure/loaders/LoaderInterface.py new file mode 100644 index 000000000..66d8ede40 --- /dev/null +++ b/infrastructure/loaders/LoaderInterface.py @@ -0,0 +1,101 @@ +from abc import ABC, abstractmethod +from typing import List +from .models.LoaderResult import LoaderResult + + +class LoaderInterface(ABC): + """ + Base interface for all file loaders in cognee. + + This interface follows cognee's established pattern for database adapters, + ensuring consistent behavior across all loader implementations. + """ + + @property + @abstractmethod + def supported_extensions(self) -> List[str]: + """ + List of file extensions this loader supports. + + Returns: + List of extensions including the dot (e.g., ['.txt', '.md']) + """ + pass + + @property + @abstractmethod + def supported_mime_types(self) -> List[str]: + """ + List of MIME types this loader supports. + + Returns: + List of MIME type strings (e.g., ['text/plain', 'application/pdf']) + """ + pass + + @property + @abstractmethod + def loader_name(self) -> str: + """ + Unique name identifier for this loader. + + Returns: + String identifier used for registration and configuration + """ + pass + + @abstractmethod + def can_handle(self, file_path: str, mime_type: str = None) -> bool: + """ + Check if this loader can handle the given file. + + Args: + file_path: Path to the file to be processed + mime_type: Optional MIME type of the file + + Returns: + True if this loader can process the file, False otherwise + """ + pass + + @abstractmethod + async def load(self, file_path: str, **kwargs) -> LoaderResult: + """ + Load and process the file, returning standardized result. + + Args: + file_path: Path to the file to be processed + **kwargs: Additional loader-specific configuration + + Returns: + LoaderResult containing processed content and metadata + + Raises: + Exception: If file cannot be processed + """ + pass + + def get_dependencies(self) -> List[str]: + """ + Optional: Return list of required dependencies for this loader. + + Returns: + List of package names with optional version specifications + """ + return [] + + def validate_dependencies(self) -> bool: + """ + Check if all required dependencies are available. + + Returns: + True if all dependencies are installed, False otherwise + """ + for dep in self.get_dependencies(): + # Extract package name from version specification + package_name = dep.split(">=")[0].split("==")[0].split("<")[0] + try: + __import__(package_name) + except ImportError: + return False + return True diff --git a/infrastructure/loaders/__init__.py b/infrastructure/loaders/__init__.py new file mode 100644 index 000000000..5374c2afc --- /dev/null +++ b/infrastructure/loaders/__init__.py @@ -0,0 +1,19 @@ +""" +File loader infrastructure for cognee. + +This package provides a plugin-based system for loading different file formats +into cognee, following the same patterns as database adapters. + +Main exports: +- get_loader_engine(): Factory function to get configured loader engine +- use_loader(): Register custom loaders at runtime +- LoaderInterface: Base interface for implementing loaders +- LoaderResult, ContentType: Data models for loader results +""" + +from .get_loader_engine import get_loader_engine +from .use_loader import use_loader +from .LoaderInterface import LoaderInterface +from .models.LoaderResult import LoaderResult, ContentType + +__all__ = ["get_loader_engine", "use_loader", "LoaderInterface", "LoaderResult", "ContentType"] diff --git a/infrastructure/loaders/config.py b/infrastructure/loaders/config.py new file mode 100644 index 000000000..efae0b138 --- /dev/null +++ b/infrastructure/loaders/config.py @@ -0,0 +1,57 @@ +from functools import lru_cache +from typing import List, Optional, Dict, Any +from pydantic_settings import BaseSettings, SettingsConfigDict +from cognee.root_dir import get_absolute_path + + +class LoaderConfig(BaseSettings): + """ + Configuration for file loader system. + + Follows cognee's pattern using pydantic_settings.BaseSettings for + environment variable support and validation. + """ + + loader_directories: List[str] = [ + get_absolute_path("cognee/infrastructure/loaders/core"), + get_absolute_path("cognee/infrastructure/loaders/external"), + ] + default_loader_priority: List[str] = [ + "text_loader", + "pypdf_loader", + "unstructured_loader", + "dlt_loader", + ] + auto_discover: bool = True + fallback_loader: str = "text_loader" + enable_dependency_validation: bool = True + + model_config = SettingsConfigDict(env_file=".env", extra="allow", env_prefix="LOADER_") + + def to_dict(self) -> Dict[str, Any]: + """ + Convert configuration to dictionary format. + + Returns: + Dict containing all loader configuration settings + """ + return { + "loader_directories": self.loader_directories, + "default_loader_priority": self.default_loader_priority, + "auto_discover": self.auto_discover, + "fallback_loader": self.fallback_loader, + "enable_dependency_validation": self.enable_dependency_validation, + } + + +@lru_cache +def get_loader_config() -> LoaderConfig: + """ + Get cached loader configuration. + + Uses LRU cache following cognee's pattern for configuration objects. + + Returns: + LoaderConfig instance with current settings + """ + return LoaderConfig() diff --git a/infrastructure/loaders/core/__init__.py b/infrastructure/loaders/core/__init__.py new file mode 100644 index 000000000..d21282a52 --- /dev/null +++ b/infrastructure/loaders/core/__init__.py @@ -0,0 +1,5 @@ +"""Core loader implementations that are always available.""" + +from .text_loader import TextLoader + +__all__ = ["TextLoader"] diff --git a/infrastructure/loaders/core/text_loader.py b/infrastructure/loaders/core/text_loader.py new file mode 100644 index 000000000..c85ea9781 --- /dev/null +++ b/infrastructure/loaders/core/text_loader.py @@ -0,0 +1,128 @@ +import os +from typing import List +from ..LoaderInterface import LoaderInterface +from ..models.LoaderResult import LoaderResult, ContentType + + +class TextLoader(LoaderInterface): + """ + Core text file loader that handles basic text file formats. + + This loader is always available and serves as the fallback for + text-based files when no specialized loader is available. + """ + + @property + def supported_extensions(self) -> List[str]: + """Supported text file extensions.""" + return [".txt", ".md", ".csv", ".json", ".xml", ".yaml", ".yml", ".log"] + + @property + def supported_mime_types(self) -> List[str]: + """Supported MIME types for text content.""" + return [ + "text/plain", + "text/markdown", + "text/csv", + "application/json", + "text/xml", + "application/xml", + "text/yaml", + "application/yaml", + ] + + @property + def loader_name(self) -> str: + """Unique identifier for this loader.""" + return "text_loader" + + def can_handle(self, file_path: str, mime_type: str = None) -> bool: + """ + Check if this loader can handle the given file. + + Args: + file_path: Path to the file + mime_type: Optional MIME type + + Returns: + True if file can be handled, False otherwise + """ + # Check by extension + ext = os.path.splitext(file_path)[1].lower() + if ext in self.supported_extensions: + return True + + # Check by MIME type + if mime_type and mime_type in self.supported_mime_types: + return True + + # As fallback loader, can attempt to handle any text-like file + # This is useful when other loaders fail + try: + # Quick check if file appears to be text + with open(file_path, "rb") as f: + sample = f.read(512) + # Simple heuristic: if most bytes are printable, consider it text + if sample: + try: + sample.decode("utf-8") + return True + except UnicodeDecodeError: + try: + sample.decode("latin-1") + return True + except UnicodeDecodeError: + pass + except (OSError, IOError): + pass + + return False + + async def load(self, file_path: str, encoding: str = "utf-8", **kwargs) -> LoaderResult: + """ + Load and process the text file. + + Args: + file_path: Path to the file to load + encoding: Text encoding to use (default: utf-8) + **kwargs: Additional configuration (unused) + + Returns: + LoaderResult containing the file content and metadata + + Raises: + FileNotFoundError: If file doesn't exist + UnicodeDecodeError: If file cannot be decoded with specified encoding + OSError: If file cannot be read + """ + if not os.path.exists(file_path): + raise FileNotFoundError(f"File not found: {file_path}") + + try: + with open(file_path, "r", encoding=encoding) as f: + content = f.read() + except UnicodeDecodeError: + # Try with fallback encoding + if encoding == "utf-8": + return await self.load(file_path, encoding="latin-1", **kwargs) + else: + raise + + # Extract basic metadata + file_stat = os.stat(file_path) + metadata = { + "name": os.path.basename(file_path), + "size": file_stat.st_size, + "extension": os.path.splitext(file_path)[1], + "encoding": encoding, + "loader": self.loader_name, + "lines": len(content.splitlines()) if content else 0, + "characters": len(content), + } + + return LoaderResult( + content=content, + metadata=metadata, + content_type=ContentType.TEXT, + source_info={"file_path": file_path, "encoding": encoding}, + ) diff --git a/infrastructure/loaders/create_loader_engine.py b/infrastructure/loaders/create_loader_engine.py new file mode 100644 index 000000000..fa56e069a --- /dev/null +++ b/infrastructure/loaders/create_loader_engine.py @@ -0,0 +1,49 @@ +from typing import List +from .LoaderEngine import LoaderEngine +from .supported_loaders import supported_loaders + + +def create_loader_engine( + loader_directories: List[str], + default_loader_priority: List[str], + auto_discover: bool = True, + fallback_loader: str = "text_loader", + enable_dependency_validation: bool = True, +) -> LoaderEngine: + """ + Create loader engine with given configuration. + + Follows cognee's pattern for engine creation functions used + in database adapters. + + Args: + loader_directories: Directories to search for loader implementations + default_loader_priority: Priority order for loader selection + auto_discover: Whether to auto-discover loaders from directories + fallback_loader: Default loader to use when no other matches + enable_dependency_validation: Whether to validate loader dependencies + + Returns: + Configured LoaderEngine instance + """ + engine = LoaderEngine( + loader_directories=loader_directories, + default_loader_priority=default_loader_priority, + fallback_loader=fallback_loader, + enable_dependency_validation=enable_dependency_validation, + ) + + # Register supported loaders from registry + for loader_name, loader_class in supported_loaders.items(): + try: + loader_instance = loader_class() + engine.register_loader(loader_instance) + except Exception as e: + # Log but don't fail - allow engine to continue with other loaders + engine.logger.warning(f"Failed to register loader {loader_name}: {e}") + + # Auto-discover loaders if enabled + if auto_discover: + engine.discover_loaders() + + return engine diff --git a/infrastructure/loaders/get_loader_engine.py b/infrastructure/loaders/get_loader_engine.py new file mode 100644 index 000000000..8f249874c --- /dev/null +++ b/infrastructure/loaders/get_loader_engine.py @@ -0,0 +1,20 @@ +from functools import lru_cache +from .config import get_loader_config +from .LoaderEngine import LoaderEngine +from .create_loader_engine import create_loader_engine + + +@lru_cache +def get_loader_engine() -> LoaderEngine: + """ + Factory function to get loader engine. + + Follows cognee's pattern with @lru_cache for efficient reuse + of engine instances. Configuration is loaded from environment + variables and settings. + + Returns: + Cached LoaderEngine instance configured with current settings + """ + config = get_loader_config() + return create_loader_engine(**config.to_dict()) diff --git a/infrastructure/loaders/models/LoaderResult.py b/infrastructure/loaders/models/LoaderResult.py new file mode 100644 index 000000000..bf8cb77d3 --- /dev/null +++ b/infrastructure/loaders/models/LoaderResult.py @@ -0,0 +1,47 @@ +from pydantic import BaseModel +from typing import Optional, Dict, Any, List +from enum import Enum + + +class ContentType(Enum): + """Content type classification for loaded files""" + + TEXT = "text" + STRUCTURED = "structured" + BINARY = "binary" + + +class LoaderResult(BaseModel): + """ + Standardized output format for all file loaders. + + This model ensures consistent data structure across all loader implementations, + following cognee's pattern of using Pydantic models for data validation. + """ + + content: str # Primary text content extracted from file + metadata: Dict[str, Any] # File metadata (name, size, type, loader info, etc.) + content_type: ContentType # Content classification + chunks: Optional[List[str]] = None # Pre-chunked content if available + source_info: Optional[Dict[str, Any]] = None # Source-specific information + + def to_dict(self) -> Dict[str, Any]: + """ + Convert the loader result to a dictionary format. + + Returns: + Dict containing all loader result data with string-serialized content_type + """ + return { + "content": self.content, + "metadata": self.metadata, + "content_type": self.content_type.value, + "source_info": self.source_info or {}, + "chunks": self.chunks, + } + + class Config: + """Pydantic configuration following cognee patterns""" + + use_enum_values = True + validate_assignment = True diff --git a/infrastructure/loaders/models/__init__.py b/infrastructure/loaders/models/__init__.py new file mode 100644 index 000000000..53ebae0fb --- /dev/null +++ b/infrastructure/loaders/models/__init__.py @@ -0,0 +1,3 @@ +from .LoaderResult import LoaderResult, ContentType + +__all__ = ["LoaderResult", "ContentType"] diff --git a/infrastructure/loaders/supported_loaders.py b/infrastructure/loaders/supported_loaders.py new file mode 100644 index 000000000..6c7ebec3b --- /dev/null +++ b/infrastructure/loaders/supported_loaders.py @@ -0,0 +1,3 @@ +# Registry for loader implementations +# Follows cognee's pattern used in databases/vector/supported_databases.py +supported_loaders = {} diff --git a/infrastructure/loaders/use_loader.py b/infrastructure/loaders/use_loader.py new file mode 100644 index 000000000..4c6c28080 --- /dev/null +++ b/infrastructure/loaders/use_loader.py @@ -0,0 +1,22 @@ +from .supported_loaders import supported_loaders + + +def use_loader(loader_name: str, loader_class): + """ + Register a loader at runtime. + + Follows cognee's pattern used in databases for adapter registration. + This allows external packages and custom loaders to be registered + into the loader system. + + Args: + loader_name: Unique name for the loader + loader_class: Loader class implementing LoaderInterface + + Example: + from cognee.infrastructure.loaders import use_loader + from my_package import MyCustomLoader + + use_loader("my_custom_loader", MyCustomLoader) + """ + supported_loaders[loader_name] = loader_class diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..d4839a6b1 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +# Tests package diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 000000000..4a5d26360 --- /dev/null +++ b/tests/unit/__init__.py @@ -0,0 +1 @@ +# Unit tests package diff --git a/tests/unit/infrastructure/__init__.py b/tests/unit/infrastructure/__init__.py new file mode 100644 index 000000000..3bbd49948 --- /dev/null +++ b/tests/unit/infrastructure/__init__.py @@ -0,0 +1 @@ +# Infrastructure tests package diff --git a/tests/unit/infrastructure/loaders/__init__.py b/tests/unit/infrastructure/loaders/__init__.py new file mode 100644 index 000000000..e53cf9c8a --- /dev/null +++ b/tests/unit/infrastructure/loaders/__init__.py @@ -0,0 +1 @@ +# Loaders tests package diff --git a/tests/unit/infrastructure/loaders/test_loader_engine.py b/tests/unit/infrastructure/loaders/test_loader_engine.py new file mode 100644 index 000000000..5d606e8f3 --- /dev/null +++ b/tests/unit/infrastructure/loaders/test_loader_engine.py @@ -0,0 +1,252 @@ +import pytest +import tempfile +import os +from unittest.mock import Mock, AsyncMock + +from cognee.infrastructure.loaders.LoaderEngine import LoaderEngine +from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface +from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType + + +class MockLoader(LoaderInterface): + """Mock loader for testing.""" + + def __init__(self, name="mock_loader", extensions=None, mime_types=None, fail_deps=False): + self._name = name + self._extensions = extensions or [".mock"] + self._mime_types = mime_types or ["application/mock"] + self._fail_deps = fail_deps + + @property + def supported_extensions(self): + return self._extensions + + @property + def supported_mime_types(self): + return self._mime_types + + @property + def loader_name(self): + return self._name + + def can_handle(self, file_path: str, mime_type: str = None) -> bool: + ext = os.path.splitext(file_path)[1].lower() + return ext in self._extensions or mime_type in self._mime_types + + async def load(self, file_path: str, **kwargs) -> LoaderResult: + return LoaderResult( + content=f"Mock content from {self._name}", + metadata={"loader": self._name, "name": os.path.basename(file_path)}, + content_type=ContentType.TEXT, + ) + + def validate_dependencies(self) -> bool: + return not self._fail_deps + + +class TestLoaderEngine: + """Test the LoaderEngine class.""" + + @pytest.fixture + def engine(self): + """Create a LoaderEngine instance for testing.""" + return LoaderEngine( + loader_directories=[], + default_loader_priority=["loader1", "loader2"], + fallback_loader="fallback", + enable_dependency_validation=True, + ) + + def test_engine_initialization(self, engine): + """Test LoaderEngine initialization.""" + assert engine.loader_directories == [] + assert engine.default_loader_priority == ["loader1", "loader2"] + assert engine.fallback_loader == "fallback" + assert engine.enable_dependency_validation is True + assert len(engine.get_available_loaders()) == 0 + + def test_register_loader_success(self, engine): + """Test successful loader registration.""" + loader = MockLoader("test_loader", [".test"]) + + success = engine.register_loader(loader) + + assert success is True + assert "test_loader" in engine.get_available_loaders() + assert engine._loaders["test_loader"] == loader + assert ".test" in engine._extension_map + assert "application/mock" in engine._mime_type_map + + def test_register_loader_with_failed_dependencies(self, engine): + """Test loader registration with failed dependency validation.""" + loader = MockLoader("test_loader", [".test"], fail_deps=True) + + success = engine.register_loader(loader) + + assert success is False + assert "test_loader" not in engine.get_available_loaders() + + def test_register_loader_without_dependency_validation(self): + """Test loader registration without dependency validation.""" + engine = LoaderEngine( + loader_directories=[], default_loader_priority=[], enable_dependency_validation=False + ) + loader = MockLoader("test_loader", [".test"], fail_deps=True) + + success = engine.register_loader(loader) + + assert success is True + assert "test_loader" in engine.get_available_loaders() + + def test_get_loader_by_extension(self, engine): + """Test getting loader by file extension.""" + loader1 = MockLoader("loader1", [".txt"]) + loader2 = MockLoader("loader2", [".pdf"]) + + engine.register_loader(loader1) + engine.register_loader(loader2) + + result = engine.get_loader("test.txt") + assert result == loader1 + + result = engine.get_loader("test.pdf") + assert result == loader2 + + result = engine.get_loader("test.unknown") + assert result is None + + def test_get_loader_by_mime_type(self, engine): + """Test getting loader by MIME type.""" + loader = MockLoader("loader", [".txt"], ["text/plain"]) + engine.register_loader(loader) + + result = engine.get_loader("test.unknown", mime_type="text/plain") + assert result == loader + + result = engine.get_loader("test.unknown", mime_type="application/pdf") + assert result is None + + def test_get_loader_with_preferences(self, engine): + """Test getting loader with preferred loaders.""" + loader1 = MockLoader("loader1", [".txt"]) + loader2 = MockLoader("loader2", [".txt"]) + + engine.register_loader(loader1) + engine.register_loader(loader2) + + # Should get preferred loader + result = engine.get_loader("test.txt", preferred_loaders=["loader2"]) + assert result == loader2 + + # Should fallback to first available if preferred not found + result = engine.get_loader("test.txt", preferred_loaders=["nonexistent"]) + assert result in [loader1, loader2] # One of them should be returned + + def test_get_loader_with_priority(self, engine): + """Test loader selection with priority order.""" + engine.default_loader_priority = ["priority_loader", "other_loader"] + + priority_loader = MockLoader("priority_loader", [".txt"]) + other_loader = MockLoader("other_loader", [".txt"]) + + # Register in reverse order + engine.register_loader(other_loader) + engine.register_loader(priority_loader) + + # Should get priority loader even though other was registered first + result = engine.get_loader("test.txt") + assert result == priority_loader + + def test_get_loader_fallback(self, engine): + """Test fallback loader selection.""" + fallback_loader = MockLoader("fallback", [".txt"]) + other_loader = MockLoader("other", [".pdf"]) + + engine.register_loader(fallback_loader) + engine.register_loader(other_loader) + engine.fallback_loader = "fallback" + + # For .txt file, fallback should be considered + result = engine.get_loader("test.txt") + assert result == fallback_loader + + # For unknown extension, should still get fallback if it can handle + result = engine.get_loader("test.unknown") + assert result == fallback_loader + + @pytest.mark.asyncio + async def test_load_file_success(self, engine): + """Test successful file loading.""" + loader = MockLoader("test_loader", [".txt"]) + engine.register_loader(loader) + + with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f: + f.write(b"test content") + temp_path = f.name + + try: + result = await engine.load_file(temp_path) + assert result.content == "Mock content from test_loader" + assert result.metadata["loader"] == "test_loader" + finally: + if os.path.exists(temp_path): + os.unlink(temp_path) + + @pytest.mark.asyncio + async def test_load_file_no_loader(self, engine): + """Test file loading when no suitable loader is found.""" + with pytest.raises(ValueError, match="No loader found for file"): + await engine.load_file("test.unknown") + + @pytest.mark.asyncio + async def test_load_file_with_preferences(self, engine): + """Test file loading with preferred loaders.""" + loader1 = MockLoader("loader1", [".txt"]) + loader2 = MockLoader("loader2", [".txt"]) + + engine.register_loader(loader1) + engine.register_loader(loader2) + + with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f: + f.write(b"test content") + temp_path = f.name + + try: + result = await engine.load_file(temp_path, preferred_loaders=["loader2"]) + assert result.metadata["loader"] == "loader2" + finally: + if os.path.exists(temp_path): + os.unlink(temp_path) + + def test_get_loader_info(self, engine): + """Test getting loader information.""" + loader = MockLoader("test_loader", [".txt"], ["text/plain"]) + engine.register_loader(loader) + + info = engine.get_loader_info("test_loader") + + assert info["name"] == "test_loader" + assert info["extensions"] == [".txt"] + assert info["mime_types"] == ["text/plain"] + assert info["available"] is True + + # Test non-existent loader + info = engine.get_loader_info("nonexistent") + assert info == {} + + def test_discover_loaders_empty_directory(self, engine): + """Test loader discovery with empty directory.""" + with tempfile.TemporaryDirectory() as temp_dir: + engine.loader_directories = [temp_dir] + engine.discover_loaders() + + # Should not find any loaders in empty directory + assert len(engine.get_available_loaders()) == 0 + + def test_discover_loaders_nonexistent_directory(self, engine): + """Test loader discovery with non-existent directory.""" + engine.loader_directories = ["/nonexistent/directory"] + + # Should not raise exception, just log warning + engine.discover_loaders() + assert len(engine.get_available_loaders()) == 0 diff --git a/tests/unit/infrastructure/loaders/test_loader_interface.py b/tests/unit/infrastructure/loaders/test_loader_interface.py new file mode 100644 index 000000000..7284518ad --- /dev/null +++ b/tests/unit/infrastructure/loaders/test_loader_interface.py @@ -0,0 +1,99 @@ +import pytest +import tempfile +import os +from unittest.mock import AsyncMock + +from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface +from cognee.infrastructure.loaders.models.LoaderResult import LoaderResult, ContentType + + +class TestLoaderInterface: + """Test the LoaderInterface abstract base class.""" + + def test_loader_interface_is_abstract(self): + """Test that LoaderInterface cannot be instantiated directly.""" + with pytest.raises(TypeError): + LoaderInterface() + + def test_dependency_validation_with_no_dependencies(self): + """Test dependency validation when no dependencies are required.""" + + class MockLoader(LoaderInterface): + @property + def supported_extensions(self): + return [".txt"] + + @property + def supported_mime_types(self): + return ["text/plain"] + + @property + def loader_name(self): + return "mock_loader" + + def can_handle(self, file_path: str, mime_type: str = None) -> bool: + return True + + async def load(self, file_path: str, **kwargs) -> LoaderResult: + return LoaderResult(content="test", metadata={}, content_type=ContentType.TEXT) + + loader = MockLoader() + assert loader.validate_dependencies() is True + assert loader.get_dependencies() == [] + + def test_dependency_validation_with_missing_dependencies(self): + """Test dependency validation with missing dependencies.""" + + class MockLoaderWithDeps(LoaderInterface): + @property + def supported_extensions(self): + return [".txt"] + + @property + def supported_mime_types(self): + return ["text/plain"] + + @property + def loader_name(self): + return "mock_loader_deps" + + def get_dependencies(self): + return ["non_existent_package>=1.0.0"] + + def can_handle(self, file_path: str, mime_type: str = None) -> bool: + return True + + async def load(self, file_path: str, **kwargs) -> LoaderResult: + return LoaderResult(content="test", metadata={}, content_type=ContentType.TEXT) + + loader = MockLoaderWithDeps() + assert loader.validate_dependencies() is False + assert "non_existent_package>=1.0.0" in loader.get_dependencies() + + def test_dependency_validation_with_existing_dependencies(self): + """Test dependency validation with existing dependencies.""" + + class MockLoaderWithExistingDeps(LoaderInterface): + @property + def supported_extensions(self): + return [".txt"] + + @property + def supported_mime_types(self): + return ["text/plain"] + + @property + def loader_name(self): + return "mock_loader_existing" + + def get_dependencies(self): + return ["os"] # Built-in module that always exists + + def can_handle(self, file_path: str, mime_type: str = None) -> bool: + return True + + async def load(self, file_path: str, **kwargs) -> LoaderResult: + return LoaderResult(content="test", metadata={}, content_type=ContentType.TEXT) + + loader = MockLoaderWithExistingDeps() + assert loader.validate_dependencies() is True diff --git a/tests/unit/infrastructure/loaders/test_text_loader.py b/tests/unit/infrastructure/loaders/test_text_loader.py new file mode 100644 index 000000000..c0b6e8ccb --- /dev/null +++ b/tests/unit/infrastructure/loaders/test_text_loader.py @@ -0,0 +1,157 @@ +import pytest +import tempfile +import os +from pathlib import Path + +from cognee.infrastructure.loaders.core.text_loader import TextLoader +from cognee.infrastructure.loaders.models.LoaderResult import ContentType + + +class TestTextLoader: + """Test the TextLoader implementation.""" + + @pytest.fixture + def text_loader(self): + """Create a TextLoader instance for testing.""" + return TextLoader() + + @pytest.fixture + def temp_text_file(self): + """Create a temporary text file for testing.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: + f.write("This is a test file.\nIt has multiple lines.\n") + temp_path = f.name + + yield temp_path + + # Cleanup + if os.path.exists(temp_path): + os.unlink(temp_path) + + @pytest.fixture + def temp_binary_file(self): + """Create a temporary binary file for testing.""" + with tempfile.NamedTemporaryFile(mode="wb", suffix=".bin", delete=False) as f: + f.write(b"\x00\x01\x02\x03\x04\x05") + temp_path = f.name + + yield temp_path + + # Cleanup + if os.path.exists(temp_path): + os.unlink(temp_path) + + def test_loader_properties(self, text_loader): + """Test basic loader properties.""" + assert text_loader.loader_name == "text_loader" + assert ".txt" in text_loader.supported_extensions + assert ".md" in text_loader.supported_extensions + assert "text/plain" in text_loader.supported_mime_types + assert "application/json" in text_loader.supported_mime_types + + def test_can_handle_by_extension(self, text_loader): + """Test file handling by extension.""" + assert text_loader.can_handle("test.txt") + assert text_loader.can_handle("test.md") + assert text_loader.can_handle("test.json") + assert text_loader.can_handle("test.TXT") # Case insensitive + assert not text_loader.can_handle("test.pdf") + + def test_can_handle_by_mime_type(self, text_loader): + """Test file handling by MIME type.""" + assert text_loader.can_handle("test.unknown", mime_type="text/plain") + assert text_loader.can_handle("test.unknown", mime_type="application/json") + assert not text_loader.can_handle("test.unknown", mime_type="application/pdf") + + def test_can_handle_text_file_heuristic(self, text_loader, temp_text_file): + """Test handling of text files by content heuristic.""" + # Remove extension to force heuristic check + no_ext_path = temp_text_file.replace(".txt", "") + os.rename(temp_text_file, no_ext_path) + + try: + assert text_loader.can_handle(no_ext_path) + finally: + if os.path.exists(no_ext_path): + os.unlink(no_ext_path) + + def test_cannot_handle_binary_file(self, text_loader, temp_binary_file): + """Test that binary files are not handled.""" + assert not text_loader.can_handle(temp_binary_file) + + @pytest.mark.asyncio + async def test_load_text_file(self, text_loader, temp_text_file): + """Test loading a text file.""" + result = await text_loader.load(temp_text_file) + + assert isinstance(result.content, str) + assert "This is a test file." in result.content + assert result.content_type == ContentType.TEXT + assert result.metadata["loader"] == "text_loader" + assert result.metadata["name"] == os.path.basename(temp_text_file) + assert result.metadata["lines"] == 2 + assert result.metadata["encoding"] == "utf-8" + assert result.source_info["file_path"] == temp_text_file + + @pytest.mark.asyncio + async def test_load_with_custom_encoding(self, text_loader): + """Test loading with custom encoding.""" + # Create a file with latin-1 encoding + with tempfile.NamedTemporaryFile( + mode="w", suffix=".txt", delete=False, encoding="latin-1" + ) as f: + f.write("Test with åéîøü characters") + temp_path = f.name + + try: + result = await text_loader.load(temp_path, encoding="latin-1") + assert "åéîøü" in result.content + assert result.metadata["encoding"] == "latin-1" + finally: + if os.path.exists(temp_path): + os.unlink(temp_path) + + @pytest.mark.asyncio + async def test_load_with_fallback_encoding(self, text_loader): + """Test automatic fallback to latin-1 encoding.""" + # Create a file with latin-1 content but try to read as utf-8 + with tempfile.NamedTemporaryFile(mode="wb", suffix=".txt", delete=False) as f: + # Write latin-1 encoded bytes that are invalid in utf-8 + f.write(b"Test with \xe5\xe9\xee\xf8\xfc characters") + temp_path = f.name + + try: + # Should automatically fallback to latin-1 + result = await text_loader.load(temp_path) + assert result.metadata["encoding"] == "latin-1" + assert len(result.content) > 0 + finally: + if os.path.exists(temp_path): + os.unlink(temp_path) + + @pytest.mark.asyncio + async def test_load_nonexistent_file(self, text_loader): + """Test loading a file that doesn't exist.""" + with pytest.raises(FileNotFoundError): + await text_loader.load("/nonexistent/file.txt") + + @pytest.mark.asyncio + async def test_load_empty_file(self, text_loader): + """Test loading an empty file.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: + # Create empty file + temp_path = f.name + + try: + result = await text_loader.load(temp_path) + assert result.content == "" + assert result.metadata["lines"] == 0 + assert result.metadata["characters"] == 0 + finally: + if os.path.exists(temp_path): + os.unlink(temp_path) + + def test_no_dependencies(self, text_loader): + """Test that TextLoader has no external dependencies.""" + assert text_loader.get_dependencies() == [] + assert text_loader.validate_dependencies() is True