Merge branch 'dev' into feature/cog-2717-add-better-error-management-to-cognee

This commit is contained in:
hajdul88 2025-08-15 08:15:05 +02:00 committed by GitHub
commit d884cc46e9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 4259 additions and 2889 deletions

View file

@ -117,6 +117,9 @@ ACCEPT_LOCAL_FILE_PATH=True
# This protects against Server Side Request Forgery when proper infrastructure is not in place.
ALLOW_HTTP_REQUESTS=True
# When set to False errors during data processing will be returned as info but not raised to allow handling of faulty documents
RAISE_INCREMENTAL_LOADING_ERRORS=True
# Set this variable to True to enforce usage of backend access control for Cognee
# Note: This is only currently supported by the following databases:
# Relational: SQLite, Postgres

View file

@ -1,6 +1,14 @@
name: community | Greetings
on: [pull_request, issues]
on:
issues:
types: [opened]
pull_request_target:
types: [opened]
permissions:
issues: write
pull-requests: write
jobs:
greeting:

View file

@ -148,10 +148,8 @@ jobs:
- name: Run Deduplication Example
env:
ENV: 'dev'
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
LLM_API_KEY: ${{ secrets.OPENAI_API_KEY }} # Test needs OpenAI endpoint to handle multimedia
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
@ -175,10 +173,8 @@ jobs:
- name: Run Deletion Tests
env:
ENV: 'dev'
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
LLM_API_KEY: ${{ secrets.OPENAI_API_KEY }} # Test needs OpenAI endpoint to handle multimedia
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}

View file

@ -0,0 +1,104 @@
"""loader_separation
Revision ID: 9e7a3cb85175
Revises: 1daae0df1866
Create Date: 2025-08-14 19:18:11.406907
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "9e7a3cb85175"
down_revision: Union[str, None] = "1daae0df1866"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def _get_column(inspector, table, name, schema=None):
for col in inspector.get_columns(table, schema=schema):
if col["name"] == name:
return col
return None
def upgrade() -> None:
conn = op.get_bind()
insp = sa.inspect(conn)
# Define table with all necessary columns including primary key
data = sa.table(
"data",
sa.Column("id", sa.UUID, primary_key=True), # Critical for SQLite
sa.Column("original_extension", sa.String()),
sa.Column("original_mime_type", sa.String()),
sa.Column("original_data_location", sa.String()),
sa.Column("extension", sa.String()),
sa.Column("mime_type", sa.String()),
sa.Column("raw_data_location", sa.String()),
)
original_extension_column = _get_column(insp, "data", "original_extension")
if not original_extension_column:
op.add_column("data", sa.Column("original_extension", sa.String(), nullable=True))
if op.get_context().dialect.name == "sqlite":
# If column doesn't exist create new original_extension column and update from values of extension column
with op.batch_alter_table("data") as batch_op:
batch_op.execute(
data.update().values(
original_extension=data.c.extension,
)
)
else:
conn = op.get_bind()
conn.execute(data.update().values(original_extension=data.c.extension))
original_mime_type = _get_column(insp, "data", "original_mime_type")
if not original_mime_type:
# If column doesn't exist create new original_mime_type column and update from values of mime_type column
op.add_column("data", sa.Column("original_mime_type", sa.String(), nullable=True))
if op.get_context().dialect.name == "sqlite":
with op.batch_alter_table("data") as batch_op:
batch_op.execute(
data.update().values(
original_mime_type=data.c.mime_type,
)
)
else:
conn = op.get_bind()
conn.execute(data.update().values(original_mime_type=data.c.mime_type))
loader_engine = _get_column(insp, "data", "loader_engine")
if not loader_engine:
op.add_column("data", sa.Column("loader_engine", sa.String(), nullable=True))
original_data_location = _get_column(insp, "data", "original_data_location")
if not original_data_location:
# If column doesn't exist create new original data column and update from values of raw_data_location column
op.add_column("data", sa.Column("original_data_location", sa.String(), nullable=True))
if op.get_context().dialect.name == "sqlite":
with op.batch_alter_table("data") as batch_op:
batch_op.execute(
data.update().values(
original_data_location=data.c.raw_data_location,
)
)
else:
conn = op.get_bind()
conn.execute(data.update().values(original_data_location=data.c.raw_data_location))
raw_content_hash = _get_column(insp, "data", "raw_content_hash")
if not raw_content_hash:
op.add_column("data", sa.Column("raw_content_hash", sa.String(), nullable=True))
def downgrade() -> None:
op.drop_column("data", "raw_content_hash")
op.drop_column("data", "original_data_location")
op.drop_column("data", "loader_engine")
op.drop_column("data", "original_mime_type")
op.drop_column("data", "original_extension")

View file

@ -57,7 +57,7 @@ class CogneeTestClient:
print(" Some tests may fail without proper LLM API configuration.")
print(" Set OPENAI_API_KEY environment variable for full functionality.")
else:
print(f"✅ API key configured (key ending in: ...{api_key[-4:]})")
print("✅ API key configured.")
# Create temporary test files
self.test_data_dir = tempfile.mkdtemp(prefix="cognee_test_")

View file

@ -15,6 +15,7 @@ async def add(
vector_db_config: dict = None,
graph_db_config: dict = None,
dataset_id: Optional[UUID] = None,
preferred_loaders: List[str] = None,
incremental_loading: bool = True,
):
"""
@ -136,7 +137,7 @@ async def add(
"""
tasks = [
Task(resolve_data_directories, include_subdirectories=True),
Task(ingest_data, dataset_name, user, node_set, dataset_id),
Task(ingest_data, dataset_name, user, node_set, dataset_id, preferred_loaders),
]
pipeline_run_info = None

View file

@ -74,7 +74,7 @@ def read_kuzu_storage_version(kuzu_db_path: str) -> int:
if kuzu_version_mapping.get(version_code):
return kuzu_version_mapping[version_code]
else:
ValueError("Could not map version_code to proper Kuzu version.")
raise ValueError("Could not map version_code to proper Kuzu version.")
def ensure_env(version: str, export_dir) -> str:

View file

@ -0,0 +1,39 @@
import os
from urllib.parse import urlparse
def get_data_file_path(file_path: str):
# Check if this is a file URI BEFORE normalizing (which corrupts URIs)
if file_path.startswith("file://"):
# Normalize the file URI for Windows - replace backslashes with forward slashes
normalized_file_uri = os.path.normpath(file_path)
parsed_url = urlparse(normalized_file_uri)
# Convert URI path to file system path
if os.name == "nt": # Windows
# Handle Windows drive letters correctly
fs_path = parsed_url.path
if fs_path.startswith("/") and len(fs_path) > 1 and fs_path[2] == ":":
fs_path = fs_path[1:] # Remove leading slash for Windows drive paths
else: # Unix-like systems
fs_path = parsed_url.path
# Now split the actual filesystem path
actual_fs_path = os.path.normpath(fs_path)
return actual_fs_path
elif file_path.startswith("s3://"):
# Handle S3 URLs without normalization (which corrupts them)
parsed_url = urlparse(file_path)
normalized_url = (
f"s3://{parsed_url.netloc}{os.sep}{os.path.normpath(parsed_url.path).lstrip(os.sep)}"
)
return normalized_url
else:
# Regular file path - normalize separators
normalized_path = os.path.normpath(file_path)
return normalized_path

View file

@ -109,8 +109,8 @@ def guess_file_type(file: BinaryIO) -> filetype.Type:
"""
Guess the file type from the given binary file stream.
If the file type cannot be determined, raise a FileTypeException with an appropriate
message.
If the file type cannot be determined from content, attempts to infer from extension.
If still unable to determine, raise a FileTypeException with an appropriate message.
Parameters:
-----------

View file

@ -3,6 +3,7 @@ from os import path
from urllib.parse import urlparse
from contextlib import asynccontextmanager
from cognee.infrastructure.files.utils.get_data_file_path import get_data_file_path
from cognee.infrastructure.files.storage.S3FileStorage import S3FileStorage
from cognee.infrastructure.files.storage.LocalFileStorage import LocalFileStorage
@ -11,22 +12,8 @@ from cognee.infrastructure.files.storage.LocalFileStorage import LocalFileStorag
async def open_data_file(file_path: str, mode: str = "rb", encoding: str = None, **kwargs):
# Check if this is a file URI BEFORE normalizing (which corrupts URIs)
if file_path.startswith("file://"):
# Normalize the file URI for Windows - replace backslashes with forward slashes
normalized_file_uri = os.path.normpath(file_path)
parsed_url = urlparse(normalized_file_uri)
# Convert URI path to file system path
if os.name == "nt": # Windows
# Handle Windows drive letters correctly
fs_path = parsed_url.path
if fs_path.startswith("/") and len(fs_path) > 1 and fs_path[2] == ":":
fs_path = fs_path[1:] # Remove leading slash for Windows drive paths
else: # Unix-like systems
fs_path = parsed_url.path
# Now split the actual filesystem path
actual_fs_path = os.path.normpath(fs_path)
actual_fs_path = get_data_file_path(file_path)
file_dir_path = path.dirname(actual_fs_path)
file_name = path.basename(actual_fs_path)
@ -36,13 +23,7 @@ async def open_data_file(file_path: str, mode: str = "rb", encoding: str = None,
yield file
elif file_path.startswith("s3://"):
# Handle S3 URLs without normalization (which corrupts them)
parsed_url = urlparse(file_path)
normalized_url = (
f"s3://{parsed_url.netloc}{os.sep}{os.path.normpath(parsed_url.path).lstrip(os.sep)}"
)
normalized_url = get_data_file_path(file_path)
s3_dir_path = os.path.dirname(normalized_url)
s3_filename = os.path.basename(normalized_url)
@ -66,7 +47,7 @@ async def open_data_file(file_path: str, mode: str = "rb", encoding: str = None,
else:
# Regular file path - normalize separators
normalized_path = os.path.normpath(file_path)
normalized_path = get_data_file_path(file_path)
file_dir_path = path.dirname(normalized_path)
file_name = path.basename(normalized_path)

View file

@ -0,0 +1,156 @@
import filetype
from typing import Dict, List, Optional, Any
from .LoaderInterface import LoaderInterface
from cognee.shared.logging_utils import get_logger
logger = get_logger(__name__)
class LoaderEngine:
"""
Main loader engine for managing file loaders.
Follows cognee's adapter pattern similar to database engines,
providing a centralized system for file loading operations.
"""
def __init__(self):
"""
Initialize the loader engine.
Args:
default_loader_priority: Priority order for loader selection
"""
self._loaders: Dict[str, LoaderInterface] = {}
self._extension_map: Dict[str, List[LoaderInterface]] = {}
self._mime_type_map: Dict[str, List[LoaderInterface]] = {}
self.default_loader_priority = [
"text_loader",
"pypdf_loader",
"image_loader",
"audio_loader",
"unstructured_loader",
]
def register_loader(self, loader: LoaderInterface) -> bool:
"""
Register a loader with the engine.
Args:
loader: LoaderInterface implementation to register
Returns:
True if loader was registered successfully, False otherwise
"""
self._loaders[loader.loader_name] = loader
# Map extensions to loaders
for ext in loader.supported_extensions:
ext_lower = ext.lower()
if ext_lower not in self._extension_map:
self._extension_map[ext_lower] = []
self._extension_map[ext_lower].append(loader)
# Map mime types to loaders
for mime_type in loader.supported_mime_types:
if mime_type not in self._mime_type_map:
self._mime_type_map[mime_type] = []
self._mime_type_map[mime_type].append(loader)
logger.info(f"Registered loader: {loader.loader_name}")
return True
def get_loader(
self, file_path: str, preferred_loaders: List[str] = None
) -> Optional[LoaderInterface]:
"""
Get appropriate loader for a file.
Args:
file_path: Path to the file to be processed
preferred_loaders: List of preferred loader names to try first
Returns:
LoaderInterface that can handle the file, or None if not found
"""
file_info = filetype.guess(file_path)
# Try preferred loaders first
if preferred_loaders:
for loader_name in preferred_loaders:
if loader_name in self._loaders:
loader = self._loaders[loader_name]
if loader.can_handle(extension=file_info.extension, mime_type=file_info.mime):
return loader
else:
raise ValueError(f"Loader does not exist: {loader_name}")
# Try default priority order
for loader_name in self.default_loader_priority:
if loader_name in self._loaders:
loader = self._loaders[loader_name]
if loader.can_handle(extension=file_info.extension, mime_type=file_info.mime):
return loader
else:
raise ValueError(f"Loader does not exist: {loader_name}")
return None
async def load_file(
self,
file_path: str,
file_stream: Optional[Any],
preferred_loaders: Optional[List[str]] = None,
**kwargs,
):
"""
Load file using appropriate loader.
Args:
file_path: Path to the file to be processed
preferred_loaders: List of preferred loader names to try first
**kwargs: Additional loader-specific configuration
Raises:
ValueError: If no suitable loader is found
Exception: If file processing fails
"""
loader = self.get_loader(file_path, preferred_loaders)
if not loader:
raise ValueError(f"No loader found for file: {file_path}")
logger.debug(f"Loading {file_path} with {loader.loader_name}")
# TODO: loading needs to be reworked to work with both file streams and file locations
return await loader.load(file_path, **kwargs)
def get_available_loaders(self) -> List[str]:
"""
Get list of available loader names.
Returns:
List of registered loader names
"""
return list(self._loaders.keys())
def get_loader_info(self, loader_name: str) -> Dict[str, any]:
"""
Get information about a specific loader.
Args:
loader_name: Name of the loader to inspect
Returns:
Dictionary containing loader information
"""
if loader_name not in self._loaders:
return {}
loader = self._loaders[loader_name]
return {
"name": loader.loader_name,
"extensions": loader.supported_extensions,
"mime_types": loader.supported_mime_types,
}

View file

@ -0,0 +1,73 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Any
class LoaderInterface(ABC):
"""
Base interface for all file loaders in cognee.
This interface follows cognee's established pattern for database adapters,
ensuring consistent behavior across all loader implementations.
"""
@property
@abstractmethod
def supported_extensions(self) -> List[str]:
"""
List of file extensions this loader supports.
Returns:
List of extensions including the dot (e.g., ['.txt', '.md'])
"""
pass
@property
@abstractmethod
def supported_mime_types(self) -> List[str]:
"""
List of MIME types this loader supports.
Returns:
List of MIME type strings (e.g., ['text/plain', 'application/pdf'])
"""
pass
@property
@abstractmethod
def loader_name(self) -> str:
"""
Unique name identifier for this loader.
Returns:
String identifier used for registration and configuration
"""
pass
@abstractmethod
def can_handle(self, extension: str, mime_type: str) -> bool:
"""
Check if this loader can handle the given file.
Args:
extension: File extension
mime_type: MIME type of the file
Returns:
True if this loader can process the file, False otherwise
"""
pass
@abstractmethod
async def load(self, file_path: str, file_stream: Optional[Any] = None, **kwargs):
"""
Load and process the file, returning standardized result.
Args:
file_path: Path to the file to be processed
file_stream: If file stream is provided it will be used to process file instead
**kwargs: Additional loader-specific configuration
Raises:
Exception: If file cannot be processed
"""
pass

View file

@ -0,0 +1,18 @@
"""
File loader infrastructure for cognee.
This package provides a plugin-based system for loading different file formats
into cognee, following the same patterns as database adapters.
Main exports:
- get_loader_engine(): Factory function to get configured loader engine
- use_loader(): Register custom loaders at runtime
- LoaderInterface: Base interface for implementing loaders
- LoaderResult, ContentType: Data models for loader results
"""
from .get_loader_engine import get_loader_engine
from .use_loader import use_loader
from .LoaderInterface import LoaderInterface
__all__ = ["get_loader_engine", "use_loader", "LoaderInterface"]

View file

@ -0,0 +1,7 @@
"""Core loader implementations that are always available."""
from .text_loader import TextLoader
from .audio_loader import AudioLoader
from .image_loader import ImageLoader
__all__ = ["TextLoader", "AudioLoader", "ImageLoader"]

View file

@ -0,0 +1,98 @@
import os
from typing import List
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.infrastructure.files.storage import get_file_storage, get_storage_config
from cognee.infrastructure.files.utils.get_file_metadata import get_file_metadata
class AudioLoader(LoaderInterface):
"""
Core text file loader that handles basic text file formats.
This loader is always available and serves as the fallback for
text-based files when no specialized loader is available.
"""
@property
def supported_extensions(self) -> List[str]:
"""Supported text file extensions."""
return [
"aac", # Audio documents
"mid",
"mp3",
"m4a",
"ogg",
"flac",
"wav",
"amr",
"aiff",
]
@property
def supported_mime_types(self) -> List[str]:
"""Supported MIME types for text content."""
return [
"audio/aac",
"audio/midi",
"audio/mpeg",
"audio/mp4",
"audio/ogg",
"audio/flac",
"audio/wav",
"audio/amr",
"audio/aiff",
]
@property
def loader_name(self) -> str:
"""Unique identifier for this loader."""
return "audio_loader"
def can_handle(self, extension: str, mime_type: str) -> bool:
"""
Check if this loader can handle the given file.
Args:
extension: File extension
mime_type: Optional MIME type
Returns:
True if file can be handled, False otherwise
"""
if extension in self.supported_extensions and mime_type in self.supported_mime_types:
return True
return False
async def load(self, file_path: str, **kwargs):
"""
Load and process the audio file.
Args:
file_path: Path to the file to load
**kwargs: Additional configuration (unused)
Returns:
LoaderResult containing the file content and metadata
Raises:
FileNotFoundError: If file doesn't exist
OSError: If file cannot be read
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
with open(file_path, "rb") as f:
file_metadata = await get_file_metadata(f)
# Name ingested file of current loader based on original file content hash
storage_file_name = "text_" + file_metadata["content_hash"] + ".txt"
result = await LLMGateway.create_transcript(file_path)
storage_config = get_storage_config()
data_root_directory = storage_config["data_root_directory"]
storage = get_file_storage(data_root_directory)
full_file_path = await storage.store(storage_file_name, result.text)
return full_file_path

View file

@ -0,0 +1,114 @@
import os
from typing import List
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.infrastructure.files.storage import get_file_storage, get_storage_config
from cognee.infrastructure.files.utils.get_file_metadata import get_file_metadata
class ImageLoader(LoaderInterface):
"""
Core image file loader that handles basic image file formats.
"""
@property
def supported_extensions(self) -> List[str]:
"""Supported text file extensions."""
return [
"png",
"dwg",
"xcf",
"jpg",
".jpe",
".jpeg",
"jpx",
"apng",
"gif",
"webp",
"cr2",
"tif",
"tiff",
"bmp",
"jxr",
"psd",
"ico",
"heic",
"avif",
]
@property
def supported_mime_types(self) -> List[str]:
"""Supported MIME types for text content."""
return [
"image/png",
"image/vnd.dwg",
"image/x-xcf",
"image/jpeg",
"image/jpx",
"image/apng",
"image/gif",
"image/webp",
"image/x-canon-cr2",
"image/tiff",
"image/bmp",
"image/jxr",
"image/vnd.adobe.photoshop",
"image/vnd.microsoft.icon",
"image/heic",
"image/avif",
]
@property
def loader_name(self) -> str:
"""Unique identifier for this loader."""
return "image_loader"
def can_handle(self, extension: str, mime_type: str) -> bool:
"""
Check if this loader can handle the given file.
Args:
extension: File extension
mime_type: Optional MIME type
Returns:
True if file can be handled, False otherwise
"""
if extension in self.supported_extensions and mime_type in self.supported_mime_types:
return True
return False
async def load(self, file_path: str, **kwargs):
"""
Load and process the image file.
Args:
file_path: Path to the file to load
**kwargs: Additional configuration (unused)
Returns:
LoaderResult containing the file content and metadata
Raises:
FileNotFoundError: If file doesn't exist
UnicodeDecodeError: If file cannot be decoded with specified encoding
OSError: If file cannot be read
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
with open(file_path, "rb") as f:
file_metadata = await get_file_metadata(f)
# Name ingested file of current loader based on original file content hash
storage_file_name = "text_" + file_metadata["content_hash"] + ".txt"
result = await LLMGateway.transcribe_image(file_path)
storage_config = get_storage_config()
data_root_directory = storage_config["data_root_directory"]
storage = get_file_storage(data_root_directory)
full_file_path = await storage.store(storage_file_name, result.choices[0].message.content)
return full_file_path

View file

@ -0,0 +1,90 @@
import os
from typing import List
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.infrastructure.files.storage import get_file_storage, get_storage_config
from cognee.infrastructure.files.utils.get_file_metadata import get_file_metadata
class TextLoader(LoaderInterface):
"""
Core text file loader that handles basic text file formats.
This loader is always available and serves as the fallback for
text-based files when no specialized loader is available.
"""
@property
def supported_extensions(self) -> List[str]:
"""Supported text file extensions."""
return ["txt", "md", "csv", "json", "xml", "yaml", "yml", "log"]
@property
def supported_mime_types(self) -> List[str]:
"""Supported MIME types for text content."""
return [
"text/plain",
"text/markdown",
"text/csv",
"application/json",
"text/xml",
"application/xml",
"text/yaml",
"application/yaml",
]
@property
def loader_name(self) -> str:
"""Unique identifier for this loader."""
return "text_loader"
def can_handle(self, extension: str, mime_type: str) -> bool:
"""
Check if this loader can handle the given file.
Args:
extension: File extension
mime_type: Optional MIME type
Returns:
True if file can be handled, False otherwise
"""
if extension in self.supported_extensions and mime_type in self.supported_mime_types:
return True
return False
async def load(self, file_path: str, encoding: str = "utf-8", **kwargs):
"""
Load and process the text file.
Args:
file_path: Path to the file to load
encoding: Text encoding to use (default: utf-8)
**kwargs: Additional configuration (unused)
Returns:
LoaderResult containing the file content and metadata
Raises:
FileNotFoundError: If file doesn't exist
UnicodeDecodeError: If file cannot be decoded with specified encoding
OSError: If file cannot be read
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
with open(file_path, "rb") as f:
file_metadata = await get_file_metadata(f)
# Name ingested file of current loader based on original file content hash
storage_file_name = "text_" + file_metadata["content_hash"] + ".txt"
with open(file_path, "r", encoding=encoding) as f:
content = f.read()
storage_config = get_storage_config()
data_root_directory = storage_config["data_root_directory"]
storage = get_file_storage(data_root_directory)
full_file_path = await storage.store(storage_file_name, content)
return full_file_path

View file

@ -0,0 +1,32 @@
from .LoaderEngine import LoaderEngine
from .supported_loaders import supported_loaders
from cognee.shared.logging_utils import get_logger
logger = get_logger(__name__)
def create_loader_engine() -> LoaderEngine:
"""
Create loader engine with given configuration.
Follows cognee's pattern for engine creation functions used
in database adapters.
Args:
default_loader_priority: Priority order for loader selection
Returns:
Configured LoaderEngine instance
"""
engine = LoaderEngine()
# Register supported loaders from registry
for loader_name, loader_class in supported_loaders.items():
try:
loader_instance = loader_class()
engine.register_loader(loader_instance)
except Exception as e:
# Log but don't fail - allow engine to continue with other loaders
logger.warning(f"Failed to register loader {loader_name}: {e}")
return engine

View file

@ -0,0 +1,22 @@
"""
External loader implementations for cognee.
This module contains loaders that depend on external libraries:
- pypdf_loader: PDF processing using pypdf
- unstructured_loader: Document processing using unstructured
- dlt_loader: Data lake/warehouse integration using DLT
These loaders are optional and only available if their dependencies are installed.
"""
from .pypdf_loader import PyPdfLoader
__all__ = ["PyPdfLoader"]
# Conditional imports based on dependency availability
try:
from .unstructured_loader import UnstructuredLoader
__all__.append("UnstructuredLoader")
except ImportError:
pass

View file

@ -0,0 +1,96 @@
from typing import List
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.files.storage import get_file_storage, get_storage_config
from cognee.infrastructure.files.utils.get_file_metadata import get_file_metadata
logger = get_logger(__name__)
class PyPdfLoader(LoaderInterface):
"""
PDF loader using pypdf library.
Extracts text content from PDF files page by page, providing
structured page information and handling PDF-specific errors.
"""
@property
def supported_extensions(self) -> List[str]:
return ["pdf"]
@property
def supported_mime_types(self) -> List[str]:
return ["application/pdf"]
@property
def loader_name(self) -> str:
return "pypdf_loader"
def can_handle(self, extension: str, mime_type: str) -> bool:
"""Check if file can be handled by this loader."""
# Check file extension
if extension in self.supported_extensions and mime_type in self.supported_mime_types:
return True
return False
async def load(self, file_path: str, strict: bool = False, **kwargs) -> str:
"""
Load PDF file and extract text content.
Args:
file_path: Path to the PDF file
strict: Whether to use strict mode for PDF reading
**kwargs: Additional arguments
Returns:
LoaderResult with extracted text content and metadata
Raises:
ImportError: If pypdf is not installed
Exception: If PDF processing fails
"""
try:
from pypdf import PdfReader
except ImportError as e:
raise ImportError(
"pypdf is required for PDF processing. Install with: pip install pypdf"
) from e
try:
with open(file_path, "rb") as file:
file_metadata = await get_file_metadata(file)
# Name ingested file of current loader based on original file content hash
storage_file_name = "text_" + file_metadata["content_hash"] + ".txt"
logger.info(f"Reading PDF: {file_path}")
reader = PdfReader(file, strict=strict)
content_parts = []
page_texts = []
for page_num, page in enumerate(reader.pages, 1):
try:
page_text = page.extract_text()
if page_text.strip(): # Only add non-empty pages
page_texts.append(page_text)
content_parts.append(f"Page {page_num}:\n{page_text}\n")
except Exception as e:
logger.warning(f"Failed to extract text from page {page_num}: {e}")
continue
# Combine all content
full_content = "\n".join(content_parts)
storage_config = get_storage_config()
data_root_directory = storage_config["data_root_directory"]
storage = get_file_storage(data_root_directory)
full_file_path = await storage.store(storage_file_name, full_content)
return full_file_path
except Exception as e:
logger.error(f"Failed to process PDF {file_path}: {e}")
raise Exception(f"PDF processing failed: {e}") from e

View file

@ -0,0 +1,127 @@
from typing import List
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.files.storage import get_file_storage, get_storage_config
from cognee.infrastructure.files.utils.get_file_metadata import get_file_metadata
logger = get_logger(__name__)
class UnstructuredLoader(LoaderInterface):
"""
Document loader using the unstructured library.
Handles various document formats including docx, pptx, xlsx, odt, etc.
Uses the unstructured library's auto-partition functionality.
"""
@property
def supported_extensions(self) -> List[str]:
return [
"docx",
"doc",
"odt", # Word documents
"xlsx",
"xls",
"ods", # Spreadsheets
"pptx",
"ppt",
"odp", # Presentations
"rtf",
"html",
"htm", # Rich text and HTML
"eml",
"msg", # Email formats
"epub", # eBooks
]
@property
def supported_mime_types(self) -> List[str]:
return [
"application/vnd.openxmlformats-officedocument.wordprocessingml.document", # docx
"application/msword", # doc
"application/vnd.oasis.opendocument.text", # odt
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # xlsx
"application/vnd.ms-excel", # xls
"application/vnd.oasis.opendocument.spreadsheet", # ods
"application/vnd.openxmlformats-officedocument.presentationml.presentation", # pptx
"application/vnd.ms-powerpoint", # ppt
"application/vnd.oasis.opendocument.presentation", # odp
"application/rtf", # rtf
"text/html", # html
"message/rfc822", # eml
"application/epub+zip", # epub
]
@property
def loader_name(self) -> str:
return "unstructured_loader"
def can_handle(self, extension: str, mime_type: str) -> bool:
"""Check if file can be handled by this loader."""
# Check file extension
if extension in self.supported_extensions and mime_type in self.supported_mime_types:
return True
return False
async def load(self, file_path: str, strategy: str = "auto", **kwargs):
"""
Load document using unstructured library.
Args:
file_path: Path to the document file
strategy: Partitioning strategy ("auto", "fast", "hi_res", "ocr_only")
**kwargs: Additional arguments passed to unstructured partition
Returns:
LoaderResult with extracted text content and metadata
Raises:
ImportError: If unstructured is not installed
Exception: If document processing fails
"""
try:
from unstructured.partition.auto import partition
except ImportError as e:
raise ImportError(
"unstructured is required for document processing. "
"Install with: pip install unstructured"
) from e
try:
logger.info(f"Processing document: {file_path}")
with open(file_path, "rb") as f:
file_metadata = await get_file_metadata(f)
# Name ingested file of current loader based on original file content hash
storage_file_name = "text_" + file_metadata["content_hash"] + ".txt"
# Set partitioning parameters
partition_kwargs = {"filename": file_path, "strategy": strategy, **kwargs}
# Use partition to extract elements
elements = partition(**partition_kwargs)
# Process elements into text content
text_parts = []
for element in elements:
element_text = str(element).strip()
if element_text:
text_parts.append(element_text)
# Combine all text content
full_content = "\n\n".join(text_parts)
storage_config = get_storage_config()
data_root_directory = storage_config["data_root_directory"]
storage = get_file_storage(data_root_directory)
full_file_path = await storage.store(storage_file_name, full_content)
return full_file_path
except Exception as e:
logger.error(f"Failed to process document {file_path}: {e}")
raise Exception(f"Document processing failed: {e}") from e

View file

@ -0,0 +1,18 @@
from functools import lru_cache
from .LoaderEngine import LoaderEngine
from .create_loader_engine import create_loader_engine
@lru_cache
def get_loader_engine() -> LoaderEngine:
"""
Factory function to get loader engine.
Follows cognee's pattern with @lru_cache for efficient reuse
of engine instances. Configuration is loaded from environment
variables and settings.
Returns:
Cached LoaderEngine instance configured with current settings
"""
return create_loader_engine()

View file

@ -0,0 +1,18 @@
from cognee.infrastructure.loaders.external import PyPdfLoader
from cognee.infrastructure.loaders.core import TextLoader, AudioLoader, ImageLoader
# Registry for loader implementations
supported_loaders = {
PyPdfLoader.loader_name: PyPdfLoader,
TextLoader.loader_name: TextLoader,
ImageLoader.loader_name: ImageLoader,
AudioLoader.loader_name: AudioLoader,
}
# Try adding optional loaders
try:
from cognee.infrastructure.loaders.external import UnstructuredLoader
supported_loaders[UnstructuredLoader.loader_name] = UnstructuredLoader
except ImportError:
pass

View file

@ -0,0 +1,21 @@
from .supported_loaders import supported_loaders
def use_loader(loader_name: str, loader_class):
"""
Register a loader at runtime.
This allows external packages and custom loaders to be registered
into the loader system.
Args:
loader_name: Unique name for the loader
loader_class: Loader class implementing LoaderInterface
Example:
from cognee.infrastructure.loaders import use_loader
from my_package import MyCustomLoader
use_loader("my_custom_loader", MyCustomLoader)
"""
supported_loaders[loader_name] = loader_class

View file

@ -17,10 +17,15 @@ class Data(Base):
name = Column(String)
extension = Column(String)
mime_type = Column(String)
original_extension = Column(String, nullable=True)
original_mime_type = Column(String, nullable=True)
loader_engine = Column(String)
raw_data_location = Column(String)
original_data_location = Column(String)
owner_id = Column(UUID, index=True)
tenant_id = Column(UUID, index=True, nullable=True)
content_hash = Column(String)
raw_content_hash = Column(String)
external_metadata = Column(JSON)
# Store NodeSet as JSON list of strings
node_set = Column(JSON, nullable=True)

View file

@ -1,5 +1,6 @@
from typing import BinaryIO
from contextlib import asynccontextmanager
import hashlib
from cognee.infrastructure.data.utils.extract_keywords import extract_keywords
from .IngestionData import IngestionData
@ -16,9 +17,9 @@ class TextData(IngestionData):
self.data = data
def get_identifier(self):
keywords = extract_keywords(self.data)
metadata = self.get_metadata()
return "text/plain" + "_" + "|".join(keywords)
return metadata["content_hash"]
def get_metadata(self):
self.ensure_metadata()
@ -29,6 +30,11 @@ class TextData(IngestionData):
if self.metadata is None:
self.metadata = {}
data_contents = self.data.encode("utf-8")
hash_contents = hashlib.md5(data_contents).hexdigest()
self.metadata["name"] = "text_" + hash_contents + ".txt"
self.metadata["content_hash"] = hash_contents
@asynccontextmanager
async def get_data(self):
yield self.data

View file

@ -1,7 +1,7 @@
import hashlib
from typing import BinaryIO, Union
from cognee.infrastructure.files.storage import get_file_storage, get_storage_config
from .classify import classify
import hashlib
async def save_data_to_file(data: Union[str, BinaryIO], filename: str = None):

View file

@ -52,7 +52,7 @@ async def cognee_pipeline(
pipeline_name: str = "custom_pipeline",
vector_db_config: dict = None,
graph_db_config: dict = None,
incremental_loading: bool = True,
incremental_loading: bool = False,
):
# Note: These context variables allow different value assignment for databases in Cognee
# per async task, thread, process and etc.
@ -122,7 +122,7 @@ async def run_pipeline(
data=None,
pipeline_name: str = "custom_pipeline",
context: dict = None,
incremental_loading=True,
incremental_loading=False,
):
check_dataset_name(dataset.name)

View file

@ -66,7 +66,7 @@ async def run_tasks(
user: User = None,
pipeline_name: str = "unknown_pipeline",
context: dict = None,
incremental_loading: bool = True,
incremental_loading: bool = False,
):
async def _run_tasks_data_item_incremental(
data_item,
@ -163,6 +163,9 @@ async def run_tasks(
"data_id": data_id,
}
if os.getenv("RAISE_INCREMENTAL_LOADING_ERRORS", "true").lower() == "true":
raise error
async def _run_tasks_data_item_regular(
data_item,
dataset,

View file

@ -90,4 +90,4 @@ class CompletionRetriever(BaseRetriever):
completion = await generate_completion(
query, context, self.user_prompt_path, self.system_prompt_path
)
return completion
return [completion]

View file

@ -175,17 +175,13 @@ def log_database_configuration(logger):
try:
# Log relational database configuration
relational_config = get_relational_config()
logger.info(f"Relational database: {relational_config.db_provider}")
if relational_config.db_provider == "postgres":
logger.info(f"Postgres host: {relational_config.db_host}:{relational_config.db_port}")
logger.info(f"Postgres database: {relational_config.db_name}")
elif relational_config.db_provider == "sqlite":
logger.info(f"SQLite path: {relational_config.db_path}")
logger.info(f"SQLite database: {relational_config.db_name}")
# Log vector database configuration
vector_config = get_vectordb_config()
logger.info(f"Vector database: {vector_config.vector_db_provider}")
if vector_config.vector_db_provider == "lancedb":
logger.info(f"Vector database path: {vector_config.vector_db_url}")
else:
@ -193,7 +189,6 @@ def log_database_configuration(logger):
# Log graph database configuration
graph_config = get_graph_config()
logger.info(f"Graph database: {graph_config.graph_database_provider}")
if graph_config.graph_database_provider == "kuzu":
logger.info(f"Graph database path: {graph_config.graph_file_path}")
else:

View file

@ -0,0 +1,79 @@
import os
from urllib.parse import urlparse
from typing import List, Tuple
from pathlib import Path
import tempfile
from cognee.infrastructure.loaders.LoaderInterface import LoaderInterface
from cognee.modules.ingestion.exceptions import IngestionError
from cognee.infrastructure.loaders import get_loader_engine
from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.files.utils.open_data_file import open_data_file
from pydantic_settings import BaseSettings, SettingsConfigDict
logger = get_logger(__name__)
class SaveDataSettings(BaseSettings):
accept_local_file_path: bool = True
model_config = SettingsConfigDict(env_file=".env", extra="allow")
settings = SaveDataSettings()
async def pull_from_s3(file_path, destination_file) -> None:
async with open_data_file(file_path) as file:
while True:
chunk = file.read(8192)
if not chunk:
break
destination_file.write(chunk)
async def data_item_to_text_file(
data_item_path: str, preferred_loaders: List[str]
) -> Tuple[str, LoaderInterface]:
if isinstance(data_item_path, str):
parsed_url = urlparse(data_item_path)
# data is s3 file path
if parsed_url.scheme == "s3":
# TODO: Rework this to work with file streams and not saving data to temp storage
# Note: proper suffix information is needed for OpenAI to handle mp3 files
path_info = Path(parsed_url.path)
with tempfile.NamedTemporaryFile(mode="wb", suffix=path_info.suffix) as temp_file:
await pull_from_s3(data_item_path, temp_file)
temp_file.flush() # Data needs to be saved to local storage
loader = get_loader_engine()
return await loader.load_file(temp_file.name, preferred_loaders), loader.get_loader(
temp_file.name, preferred_loaders
)
# data is local file path
elif parsed_url.scheme == "file":
if settings.accept_local_file_path:
loader = get_loader_engine()
return await loader.load_file(data_item_path, preferred_loaders), loader.get_loader(
data_item_path, preferred_loaders
)
else:
raise IngestionError(message="Local files are not accepted.")
# data is an absolute file path
elif data_item_path.startswith("/") or (
os.name == "nt" and len(data_item_path) > 1 and data_item_path[1] == ":"
):
# Handle both Unix absolute paths (/path) and Windows absolute paths (C:\path)
if settings.accept_local_file_path:
loader = get_loader_engine()
return await loader.load_file(data_item_path, preferred_loaders), loader.get_loader(
data_item_path, preferred_loaders
)
else:
raise IngestionError(message="Local files are not accepted.")
# data is not a supported type
raise IngestionError(message=f"Data type not supported: {type(data_item_path)}")

View file

@ -1,6 +1,5 @@
import json
import inspect
from os import path
from uuid import UUID
from typing import Union, BinaryIO, Any, List, Optional
@ -11,6 +10,7 @@ from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
from cognee.infrastructure.files.utils.open_data_file import open_data_file
from cognee.infrastructure.files.utils.get_data_file_path import get_data_file_path
from cognee.modules.data.methods import (
get_authorized_existing_datasets,
get_dataset_data,
@ -18,6 +18,7 @@ from cognee.modules.data.methods import (
)
from .save_data_item_to_storage import save_data_item_to_storage
from .data_item_to_text_file import data_item_to_text_file
async def ingest_data(
@ -26,6 +27,7 @@ async def ingest_data(
user: User,
node_set: Optional[List[str]] = None,
dataset_id: UUID = None,
preferred_loaders: List[str] = None,
):
if not user:
user = await get_default_user()
@ -42,6 +44,7 @@ async def ingest_data(
user: User,
node_set: Optional[List[str]] = None,
dataset_id: UUID = None,
preferred_loaders: List[str] = None,
):
new_datapoints = []
existing_data_points = []
@ -74,72 +77,96 @@ async def ingest_data(
dataset_data_map = {str(data.id): True for data in dataset_data}
for data_item in data:
file_path = await save_data_item_to_storage(data_item)
# Get file path of data item or create a file it doesn't exist
original_file_path = await save_data_item_to_storage(data_item)
# Ingest data and add metadata
async with open_data_file(file_path) as file:
# Transform file path to be OS usable
actual_file_path = get_data_file_path(original_file_path)
# Store all input data as text files in Cognee data storage
cognee_storage_file_path, loader_engine = await data_item_to_text_file(
actual_file_path, preferred_loaders
)
# Find metadata from original file
async with open_data_file(original_file_path) as file:
classified_data = ingestion.classify(file)
# data_id is the hash of file contents + owner id to avoid duplicate data
# data_id is the hash of original file contents + owner id to avoid duplicate data
data_id = ingestion.identify(classified_data, user)
original_file_metadata = classified_data.get_metadata()
file_metadata = classified_data.get_metadata()
# Find metadata from Cognee data storage text file
async with open_data_file(cognee_storage_file_path) as file:
classified_data = ingestion.classify(file)
storage_file_metadata = classified_data.get_metadata()
from sqlalchemy import select
from sqlalchemy import select
db_engine = get_relational_engine()
db_engine = get_relational_engine()
# Check to see if data should be updated
async with db_engine.get_async_session() as session:
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
# Check to see if data should be updated
async with db_engine.get_async_session() as session:
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
ext_metadata = get_external_metadata_dict(data_item)
# TODO: Maybe allow getting of external metadata through ingestion loader?
ext_metadata = get_external_metadata_dict(data_item)
if node_set:
ext_metadata["node_set"] = node_set
if node_set:
ext_metadata["node_set"] = node_set
if data_point is not None:
data_point.name = file_metadata["name"]
data_point.raw_data_location = file_metadata["file_path"]
data_point.extension = file_metadata["extension"]
data_point.mime_type = file_metadata["mime_type"]
data_point.owner_id = user.id
data_point.content_hash = file_metadata["content_hash"]
data_point.file_size = file_metadata["file_size"]
data_point.external_metadata = ext_metadata
data_point.node_set = json.dumps(node_set) if node_set else None
data_point.tenant_id = user.tenant_id if user.tenant_id else None
if data_point is not None:
data_point.name = original_file_metadata["name"]
data_point.raw_data_location = cognee_storage_file_path
data_point.original_data_location = original_file_metadata["file_path"]
data_point.extension = storage_file_metadata["extension"]
data_point.mime_type = storage_file_metadata["mime_type"]
data_point.original_extension = original_file_metadata["extension"]
data_point.original_mime_type = original_file_metadata["mime_type"]
data_point.loader_engine = loader_engine.loader_name
data_point.owner_id = user.id
data_point.content_hash = original_file_metadata["content_hash"]
data_point.raw_content_hash = storage_file_metadata["content_hash"]
data_point.file_size = original_file_metadata["file_size"]
data_point.external_metadata = ext_metadata
data_point.node_set = json.dumps(node_set) if node_set else None
data_point.tenant_id = user.tenant_id if user.tenant_id else None
# Check if data is already in dataset
if str(data_point.id) in dataset_data_map:
existing_data_points.append(data_point)
else:
dataset_new_data_points.append(data_point)
dataset_data_map[str(data_point.id)] = True
# Check if data is already in dataset
if str(data_point.id) in dataset_data_map:
existing_data_points.append(data_point)
else:
if str(data_id) in dataset_data_map:
continue
data_point = Data(
id=data_id,
name=file_metadata["name"],
raw_data_location=file_metadata["file_path"],
extension=file_metadata["extension"],
mime_type=file_metadata["mime_type"],
owner_id=user.id,
content_hash=file_metadata["content_hash"],
external_metadata=ext_metadata,
node_set=json.dumps(node_set) if node_set else None,
data_size=file_metadata["file_size"],
tenant_id=user.tenant_id if user.tenant_id else None,
pipeline_status={},
token_count=-1,
)
new_datapoints.append(data_point)
dataset_new_data_points.append(data_point)
dataset_data_map[str(data_point.id)] = True
else:
if str(data_id) in dataset_data_map:
continue
data_point = Data(
id=data_id,
name=original_file_metadata["name"],
raw_data_location=cognee_storage_file_path,
original_data_location=original_file_metadata["file_path"],
extension=storage_file_metadata["extension"],
mime_type=storage_file_metadata["mime_type"],
original_extension=original_file_metadata["extension"],
original_mime_type=original_file_metadata["mime_type"],
loader_engine=loader_engine.loader_name,
owner_id=user.id,
content_hash=original_file_metadata["content_hash"],
raw_content_hash=storage_file_metadata["content_hash"],
external_metadata=ext_metadata,
node_set=json.dumps(node_set) if node_set else None,
data_size=original_file_metadata["file_size"],
tenant_id=user.tenant_id if user.tenant_id else None,
pipeline_status={},
token_count=-1,
)
new_datapoints.append(data_point)
dataset_data_map[str(data_point.id)] = True
async with db_engine.get_async_session() as session:
if dataset not in session:
@ -161,4 +188,6 @@ async def ingest_data(
return existing_data_points + dataset_new_data_points + new_datapoints
return await store_data_to_dataset(data, dataset_name, user, node_set, dataset_id)
return await store_data_to_dataset(
data, dataset_name, user, node_set, dataset_id, preferred_loaders
)

View file

@ -37,16 +37,16 @@ async def test_local_file_deletion(data_text, file_location):
# Get data entry from database based on file path
data = (
await session.scalars(
select(Data).where(Data.raw_data_location == "file://" + file_location)
select(Data).where(Data.original_data_location == "file://" + file_location)
)
).one()
assert os.path.isfile(data.raw_data_location.replace("file://", "")), (
f"Data location doesn't exist: {data.raw_data_location}"
assert os.path.isfile(data.original_data_location.replace("file://", "")), (
f"Data location doesn't exist: {data.original_data_location}"
)
# Test local files not created by cognee won't get deleted
await engine.delete_data_entity(data.id)
assert os.path.exists(data.raw_data_location.replace("file://", "")), (
f"Data location doesn't exists: {data.raw_data_location}"
assert os.path.exists(data.original_data_location.replace("file://", "")), (
f"Data location doesn't exists: {data.original_data_location}"
)

View file

@ -28,13 +28,8 @@ async def main():
logging.info(type_counts)
logging.info(edge_type_counts)
# Assert there is exactly one PdfDocument.
assert type_counts.get("PdfDocument", 0) == 1, (
f"Expected exactly one PdfDocument, but found {type_counts.get('PdfDocument', 0)}"
)
# Assert there is exactly one TextDocument.
assert type_counts.get("TextDocument", 0) == 1, (
assert type_counts.get("TextDocument", 0) == 2, (
f"Expected exactly one TextDocument, but found {type_counts.get('TextDocument', 0)}"
)

3874
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,7 @@
[project]
name = "cognee"
version = "0.2.2.dev0"
version = "0.2.2"
description = "Cognee - is a library for enriching LLM context with a semantic layer for better understanding and reasoning."
authors = [
{ name = "Vasilije Markovic" },

1910
uv.lock generated

File diff suppressed because it is too large Load diff