Compare commits
7 commits
main
...
process_in
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
efcc35592e | ||
|
|
1c727c73bb | ||
|
|
07f2afa69d | ||
|
|
1ebeeac61d | ||
|
|
7c670b454b | ||
|
|
41f8eaf28d | ||
|
|
d14e5149c1 |
47 changed files with 1598 additions and 251 deletions
|
|
@ -102,7 +102,7 @@ handlers =
|
||||||
qualname = sqlalchemy.engine
|
qualname = sqlalchemy.engine
|
||||||
|
|
||||||
[logger_alembic]
|
[logger_alembic]
|
||||||
level = INFO
|
level = WARN
|
||||||
handlers =
|
handlers =
|
||||||
qualname = alembic
|
qualname = alembic
|
||||||
|
|
||||||
|
|
|
||||||
53
alembic/versions/incremental_file_signatures.py
Normal file
53
alembic/versions/incremental_file_signatures.py
Normal file
|
|
@ -0,0 +1,53 @@
|
||||||
|
"""Add file_signatures table for incremental loading
|
||||||
|
|
||||||
|
Revision ID: incremental_file_signatures
|
||||||
|
Revises: 1d0bb7fede17
|
||||||
|
Create Date: 2025-01-27 12:00:00.000000
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from sqlalchemy.dialects import postgresql
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = "incremental_file_signatures"
|
||||||
|
down_revision = "1d0bb7fede17"
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.create_table(
|
||||||
|
"file_signatures",
|
||||||
|
sa.Column("id", sa.UUID(), nullable=False, default=uuid4),
|
||||||
|
sa.Column("data_id", sa.UUID(), nullable=True),
|
||||||
|
sa.Column("file_path", sa.String(), nullable=True),
|
||||||
|
sa.Column("file_size", sa.Integer(), nullable=True),
|
||||||
|
sa.Column("content_hash", sa.String(), nullable=True),
|
||||||
|
sa.Column("total_blocks", sa.Integer(), nullable=True),
|
||||||
|
sa.Column("block_size", sa.Integer(), nullable=True),
|
||||||
|
sa.Column("strong_len", sa.Integer(), nullable=True),
|
||||||
|
sa.Column("signature_data", sa.LargeBinary(), nullable=True),
|
||||||
|
sa.Column("blocks_info", sa.JSON(), nullable=True),
|
||||||
|
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
|
||||||
|
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
|
||||||
|
sa.PrimaryKeyConstraint("id"),
|
||||||
|
)
|
||||||
|
op.create_index(
|
||||||
|
op.f("ix_file_signatures_data_id"), "file_signatures", ["data_id"], unique=False
|
||||||
|
)
|
||||||
|
op.create_index(
|
||||||
|
op.f("ix_file_signatures_content_hash"), "file_signatures", ["content_hash"], unique=False
|
||||||
|
)
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.drop_index(op.f("ix_file_signatures_content_hash"), table_name="file_signatures")
|
||||||
|
op.drop_index(op.f("ix_file_signatures_data_id"), table_name="file_signatures")
|
||||||
|
op.drop_table("file_signatures")
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
@ -4,6 +4,10 @@ set -e # Exit on error
|
||||||
echo "Debug mode: $DEBUG"
|
echo "Debug mode: $DEBUG"
|
||||||
echo "Environment: $ENVIRONMENT"
|
echo "Environment: $ENVIRONMENT"
|
||||||
|
|
||||||
|
# Set default transport mode if not specified
|
||||||
|
TRANSPORT_MODE=${TRANSPORT_MODE:-"stdio"}
|
||||||
|
echo "Transport mode: $TRANSPORT_MODE"
|
||||||
|
|
||||||
# Run Alembic migrations with proper error handling.
|
# Run Alembic migrations with proper error handling.
|
||||||
# Note on UserAlreadyExists error handling:
|
# Note on UserAlreadyExists error handling:
|
||||||
# During database migrations, we attempt to create a default user. If this user
|
# During database migrations, we attempt to create a default user. If this user
|
||||||
|
|
@ -28,19 +32,31 @@ fi
|
||||||
|
|
||||||
echo "Database migrations done."
|
echo "Database migrations done."
|
||||||
|
|
||||||
echo "Starting Cognee MCP Server..."
|
echo "Starting Cognee MCP Server with transport mode: $TRANSPORT_MODE"
|
||||||
|
|
||||||
# Add startup delay to ensure DB is ready
|
# Add startup delay to ensure DB is ready
|
||||||
sleep 2
|
sleep 2
|
||||||
|
|
||||||
# Modified Gunicorn startup with error handling
|
# Modified startup with transport mode selection and error handling
|
||||||
if [ "$ENVIRONMENT" = "dev" ] || [ "$ENVIRONMENT" = "local" ]; then
|
if [ "$ENVIRONMENT" = "dev" ] || [ "$ENVIRONMENT" = "local" ]; then
|
||||||
if [ "$DEBUG" = "true" ]; then
|
if [ "$DEBUG" = "true" ]; then
|
||||||
echo "Waiting for the debugger to attach..."
|
echo "Waiting for the debugger to attach..."
|
||||||
exec python -m debugpy --wait-for-client --listen 0.0.0.0:5678 -m cognee
|
if [ "$TRANSPORT_MODE" = "sse" ]; then
|
||||||
|
exec python -m debugpy --wait-for-client --listen 0.0.0.0:5678 -m cognee --transport sse
|
||||||
|
else
|
||||||
|
exec python -m debugpy --wait-for-client --listen 0.0.0.0:5678 -m cognee --transport stdio
|
||||||
|
fi
|
||||||
else
|
else
|
||||||
exec cognee
|
if [ "$TRANSPORT_MODE" = "sse" ]; then
|
||||||
|
exec cognee --transport sse
|
||||||
|
else
|
||||||
|
exec cognee --transport stdio
|
||||||
|
fi
|
||||||
fi
|
fi
|
||||||
else
|
else
|
||||||
exec cognee
|
if [ "$TRANSPORT_MODE" = "sse" ]; then
|
||||||
|
exec cognee --transport sse
|
||||||
|
else
|
||||||
|
exec cognee --transport stdio
|
||||||
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,11 @@ from cognee.modules.search.types import SearchType
|
||||||
from cognee.shared.data_models import KnowledgeGraph
|
from cognee.shared.data_models import KnowledgeGraph
|
||||||
from cognee.modules.storage.utils import JSONEncoder
|
from cognee.modules.storage.utils import JSONEncoder
|
||||||
|
|
||||||
|
# Import database configuration functions for logging
|
||||||
|
from cognee.infrastructure.databases.relational.config import get_relational_config
|
||||||
|
from cognee.infrastructure.databases.vector.config import get_vectordb_config
|
||||||
|
from cognee.infrastructure.databases.graph.config import get_graph_config
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from codingagents.coding_rule_associations import (
|
from codingagents.coding_rule_associations import (
|
||||||
add_rule_associations,
|
add_rule_associations,
|
||||||
|
|
@ -539,6 +544,39 @@ def load_class(model_file, model_name):
|
||||||
return model_class
|
return model_class
|
||||||
|
|
||||||
|
|
||||||
|
def log_database_configuration():
|
||||||
|
"""Log the current database configuration for all database types"""
|
||||||
|
try:
|
||||||
|
# Log relational database configuration
|
||||||
|
relational_config = get_relational_config()
|
||||||
|
logger.info(f"Relational database: {relational_config.db_provider}")
|
||||||
|
if relational_config.db_provider == "postgres":
|
||||||
|
logger.info(f"Postgres host: {relational_config.db_host}:{relational_config.db_port}")
|
||||||
|
logger.info(f"Postgres database: {relational_config.db_name}")
|
||||||
|
elif relational_config.db_provider == "sqlite":
|
||||||
|
logger.info(f"SQLite path: {relational_config.db_path}")
|
||||||
|
logger.info(f"SQLite database: {relational_config.db_name}")
|
||||||
|
|
||||||
|
# Log vector database configuration
|
||||||
|
vector_config = get_vectordb_config()
|
||||||
|
logger.info(f"Vector database: {vector_config.vector_db_provider}")
|
||||||
|
if vector_config.vector_db_provider == "lancedb":
|
||||||
|
logger.info(f"Vector database path: {vector_config.vector_db_url}")
|
||||||
|
elif vector_config.vector_db_provider in ["qdrant", "weaviate", "pgvector"]:
|
||||||
|
logger.info(f"Vector database URL: {vector_config.vector_db_url}")
|
||||||
|
|
||||||
|
# Log graph database configuration
|
||||||
|
graph_config = get_graph_config()
|
||||||
|
logger.info(f"Graph database: {graph_config.graph_database_provider}")
|
||||||
|
if graph_config.graph_database_provider == "kuzu":
|
||||||
|
logger.info(f"Graph database path: {graph_config.graph_file_path}")
|
||||||
|
elif graph_config.graph_database_provider in ["neo4j", "falkordb"]:
|
||||||
|
logger.info(f"Graph database URL: {graph_config.graph_database_url}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Could not retrieve database configuration: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
|
|
||||||
|
|
@ -551,6 +589,9 @@ async def main():
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Log database configurations
|
||||||
|
log_database_configuration()
|
||||||
|
|
||||||
logger.info(f"Starting MCP server with transport: {args.transport}")
|
logger.info(f"Starting MCP server with transport: {args.transport}")
|
||||||
if args.transport == "stdio":
|
if args.transport == "stdio":
|
||||||
await mcp.run_stdio_async()
|
await mcp.run_stdio_async()
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
import litellm
|
import litellm
|
||||||
|
import logging
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from typing import Type, Optional
|
from typing import Type, Optional
|
||||||
from litellm import acompletion, JSONSchemaValidationError
|
from litellm import acompletion, JSONSchemaValidationError
|
||||||
|
|
@ -13,6 +14,13 @@ from cognee.infrastructure.llm.rate_limiter import (
|
||||||
sleep_and_retry_async,
|
sleep_and_retry_async,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Configure Litellm logging to reduce verbosity
|
||||||
|
litellm.set_verbose = False
|
||||||
|
|
||||||
|
# Suppress Litellm ERROR logging (using standard logging for external library configuration)
|
||||||
|
logging.getLogger("LiteLLM").setLevel(logging.CRITICAL)
|
||||||
|
logging.getLogger("litellm").setLevel(logging.CRITICAL)
|
||||||
|
|
||||||
logger = get_logger()
|
logger = get_logger()
|
||||||
observe = get_observe()
|
observe = get_observe()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
"""Adapter for Generic API LLM provider API"""
|
"""Adapter for Generic API LLM provider API"""
|
||||||
|
|
||||||
|
import logging
|
||||||
from typing import Type
|
from typing import Type
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
@ -7,8 +8,16 @@ import instructor
|
||||||
from cognee.infrastructure.llm.llm_interface import LLMInterface
|
from cognee.infrastructure.llm.llm_interface import LLMInterface
|
||||||
from cognee.infrastructure.llm.config import get_llm_config
|
from cognee.infrastructure.llm.config import get_llm_config
|
||||||
from cognee.infrastructure.llm.rate_limiter import rate_limit_async, sleep_and_retry_async
|
from cognee.infrastructure.llm.rate_limiter import rate_limit_async, sleep_and_retry_async
|
||||||
|
from cognee.shared.logging_utils import get_logger
|
||||||
import litellm
|
import litellm
|
||||||
|
|
||||||
|
# Configure Litellm logging to reduce verbosity
|
||||||
|
litellm.set_verbose = False
|
||||||
|
|
||||||
|
# Suppress Litellm ERROR logging (using standard logging for external library configuration)
|
||||||
|
logging.getLogger("LiteLLM").setLevel(logging.CRITICAL)
|
||||||
|
logging.getLogger("litellm").setLevel(logging.CRITICAL)
|
||||||
|
|
||||||
|
|
||||||
class GenericAPIAdapter(LLMInterface):
|
class GenericAPIAdapter(LLMInterface):
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,15 @@ from cognee.infrastructure.llm.rate_limiter import (
|
||||||
sleep_and_retry_sync,
|
sleep_and_retry_sync,
|
||||||
)
|
)
|
||||||
from cognee.modules.observability.get_observe import get_observe
|
from cognee.modules.observability.get_observe import get_observe
|
||||||
|
from cognee.shared.logging_utils import get_logger
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# Configure Litellm logging to reduce verbosity
|
||||||
|
litellm.set_verbose = False
|
||||||
|
|
||||||
|
# Suppress Litellm ERROR logging using standard logging
|
||||||
|
logging.getLogger("LiteLLM").setLevel(logging.CRITICAL)
|
||||||
|
logging.getLogger("litellm").setLevel(logging.CRITICAL)
|
||||||
|
|
||||||
observe = get_observe()
|
observe = get_observe()
|
||||||
|
|
||||||
|
|
|
||||||
52
cognee/modules/data/models/FileSignature.py
Normal file
52
cognee/modules/data/models/FileSignature.py
Normal file
|
|
@ -0,0 +1,52 @@
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from uuid import uuid4
|
||||||
|
from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer, LargeBinary, Text
|
||||||
|
from sqlalchemy.orm import relationship
|
||||||
|
|
||||||
|
from cognee.infrastructure.databases.relational import Base
|
||||||
|
|
||||||
|
|
||||||
|
class FileSignature(Base):
|
||||||
|
__tablename__ = "file_signatures"
|
||||||
|
|
||||||
|
id = Column(UUID, primary_key=True, default=uuid4)
|
||||||
|
|
||||||
|
# Reference to the original data entry
|
||||||
|
data_id = Column(UUID, index=True)
|
||||||
|
|
||||||
|
# File information
|
||||||
|
file_path = Column(String)
|
||||||
|
file_size = Column(Integer)
|
||||||
|
content_hash = Column(String, index=True) # Overall file hash for quick comparison
|
||||||
|
|
||||||
|
# Block information
|
||||||
|
total_blocks = Column(Integer)
|
||||||
|
block_size = Column(Integer)
|
||||||
|
strong_len = Column(Integer)
|
||||||
|
|
||||||
|
# Signature data (binary)
|
||||||
|
signature_data = Column(LargeBinary)
|
||||||
|
|
||||||
|
# Block details (JSON array of block info)
|
||||||
|
blocks_info = Column(
|
||||||
|
JSON
|
||||||
|
) # Array of {block_index, weak_checksum, strong_hash, block_size, file_offset}
|
||||||
|
|
||||||
|
# Timestamps
|
||||||
|
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
||||||
|
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))
|
||||||
|
|
||||||
|
def to_json(self) -> dict:
|
||||||
|
return {
|
||||||
|
"id": str(self.id),
|
||||||
|
"data_id": str(self.data_id),
|
||||||
|
"file_path": self.file_path,
|
||||||
|
"file_size": self.file_size,
|
||||||
|
"content_hash": self.content_hash,
|
||||||
|
"total_blocks": self.total_blocks,
|
||||||
|
"block_size": self.block_size,
|
||||||
|
"strong_len": self.strong_len,
|
||||||
|
"blocks_info": self.blocks_info,
|
||||||
|
"created_at": self.created_at.isoformat(),
|
||||||
|
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
|
||||||
|
}
|
||||||
|
|
@ -2,3 +2,4 @@ from .Data import Data
|
||||||
from .Dataset import Dataset
|
from .Dataset import Dataset
|
||||||
from .DatasetData import DatasetData
|
from .DatasetData import DatasetData
|
||||||
from .GraphMetrics import GraphMetrics
|
from .GraphMetrics import GraphMetrics
|
||||||
|
from .FileSignature import FileSignature
|
||||||
|
|
|
||||||
4
cognee/modules/ingestion/incremental/__init__.py
Normal file
4
cognee/modules/ingestion/incremental/__init__.py
Normal file
|
|
@ -0,0 +1,4 @@
|
||||||
|
from .incremental_loader import IncrementalLoader
|
||||||
|
from .block_hash_service import BlockHashService
|
||||||
|
|
||||||
|
__all__ = ["IncrementalLoader", "BlockHashService"]
|
||||||
284
cognee/modules/ingestion/incremental/block_hash_service.py
Normal file
284
cognee/modules/ingestion/incremental/block_hash_service.py
Normal file
|
|
@ -0,0 +1,284 @@
|
||||||
|
"""
|
||||||
|
Block Hash Service for Incremental File Loading
|
||||||
|
|
||||||
|
This module implements the rsync algorithm for incremental file loading.
|
||||||
|
It splits files into fixed-size blocks, computes rolling weak checksums (Adler-32 variant)
|
||||||
|
and strong hashes per block, and generates deltas for changed content.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import hashlib
|
||||||
|
from io import BytesIO
|
||||||
|
from typing import BinaryIO, List, Dict, Tuple, Optional, Any
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pyrsync import signature, delta, patch, get_signature_args
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class BlockInfo:
|
||||||
|
"""Information about a file block"""
|
||||||
|
|
||||||
|
block_index: int
|
||||||
|
weak_checksum: int
|
||||||
|
strong_hash: str
|
||||||
|
block_size: int
|
||||||
|
file_offset: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FileSignature:
|
||||||
|
"""File signature containing block information"""
|
||||||
|
|
||||||
|
file_path: str
|
||||||
|
file_size: int
|
||||||
|
total_blocks: int
|
||||||
|
block_size: int
|
||||||
|
strong_len: int
|
||||||
|
blocks: List[BlockInfo]
|
||||||
|
signature_data: bytes
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FileDelta:
|
||||||
|
"""Delta information for changed blocks"""
|
||||||
|
|
||||||
|
changed_blocks: List[int] # Block indices that changed
|
||||||
|
delta_data: bytes
|
||||||
|
old_signature: FileSignature
|
||||||
|
new_signature: FileSignature
|
||||||
|
|
||||||
|
|
||||||
|
class BlockHashService:
|
||||||
|
"""Service for block-based file hashing using librsync algorithm"""
|
||||||
|
|
||||||
|
DEFAULT_BLOCK_SIZE = 1024 # 1KB blocks
|
||||||
|
DEFAULT_STRONG_LEN = 8 # 8 bytes for strong hash
|
||||||
|
|
||||||
|
def __init__(self, block_size: int = None, strong_len: int = None):
|
||||||
|
"""
|
||||||
|
Initialize the BlockHashService
|
||||||
|
|
||||||
|
Args:
|
||||||
|
block_size: Size of blocks in bytes (default: 1024)
|
||||||
|
strong_len: Length of strong hash in bytes (default: 8)
|
||||||
|
"""
|
||||||
|
self.block_size = block_size or self.DEFAULT_BLOCK_SIZE
|
||||||
|
self.strong_len = strong_len or self.DEFAULT_STRONG_LEN
|
||||||
|
|
||||||
|
def generate_signature(self, file_obj: BinaryIO, file_path: str = None) -> FileSignature:
|
||||||
|
"""
|
||||||
|
Generate a signature for a file using librsync algorithm
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_obj: File object to generate signature for
|
||||||
|
file_path: Optional file path for metadata
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
FileSignature object containing block information
|
||||||
|
"""
|
||||||
|
file_obj.seek(0)
|
||||||
|
file_data = file_obj.read()
|
||||||
|
file_size = len(file_data)
|
||||||
|
|
||||||
|
# Calculate optimal signature parameters
|
||||||
|
magic, block_len, strong_len = get_signature_args(
|
||||||
|
file_size, block_len=self.block_size, strong_len=self.strong_len
|
||||||
|
)
|
||||||
|
|
||||||
|
# Generate signature using librsync
|
||||||
|
file_io = BytesIO(file_data)
|
||||||
|
sig_io = BytesIO()
|
||||||
|
|
||||||
|
signature(file_io, sig_io, strong_len, magic, block_len)
|
||||||
|
signature_data = sig_io.getvalue()
|
||||||
|
|
||||||
|
# Parse signature to extract block information
|
||||||
|
blocks = self._parse_signature(signature_data, file_data, block_len)
|
||||||
|
|
||||||
|
return FileSignature(
|
||||||
|
file_path=file_path or "",
|
||||||
|
file_size=file_size,
|
||||||
|
total_blocks=len(blocks),
|
||||||
|
block_size=block_len,
|
||||||
|
strong_len=strong_len,
|
||||||
|
blocks=blocks,
|
||||||
|
signature_data=signature_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _parse_signature(
|
||||||
|
self, signature_data: bytes, file_data: bytes, block_size: int
|
||||||
|
) -> List[BlockInfo]:
|
||||||
|
"""
|
||||||
|
Parse signature data to extract block information
|
||||||
|
|
||||||
|
Args:
|
||||||
|
signature_data: Raw signature data from librsync
|
||||||
|
file_data: Original file data
|
||||||
|
block_size: Size of blocks
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of BlockInfo objects
|
||||||
|
"""
|
||||||
|
blocks = []
|
||||||
|
total_blocks = (len(file_data) + block_size - 1) // block_size
|
||||||
|
|
||||||
|
for i in range(total_blocks):
|
||||||
|
start_offset = i * block_size
|
||||||
|
end_offset = min(start_offset + block_size, len(file_data))
|
||||||
|
block_data = file_data[start_offset:end_offset]
|
||||||
|
|
||||||
|
# Calculate weak checksum (simple Adler-32 variant)
|
||||||
|
weak_checksum = self._calculate_weak_checksum(block_data)
|
||||||
|
|
||||||
|
# Calculate strong hash (MD5)
|
||||||
|
strong_hash = hashlib.md5(block_data).hexdigest()
|
||||||
|
|
||||||
|
blocks.append(
|
||||||
|
BlockInfo(
|
||||||
|
block_index=i,
|
||||||
|
weak_checksum=weak_checksum,
|
||||||
|
strong_hash=strong_hash,
|
||||||
|
block_size=len(block_data),
|
||||||
|
file_offset=start_offset,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return blocks
|
||||||
|
|
||||||
|
def _calculate_weak_checksum(self, data: bytes) -> int:
|
||||||
|
"""
|
||||||
|
Calculate a weak checksum similar to Adler-32
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: Block data
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Weak checksum value
|
||||||
|
"""
|
||||||
|
a = 1
|
||||||
|
b = 0
|
||||||
|
for byte in data:
|
||||||
|
a = (a + byte) % 65521
|
||||||
|
b = (b + a) % 65521
|
||||||
|
return (b << 16) | a
|
||||||
|
|
||||||
|
def compare_signatures(self, old_sig: FileSignature, new_sig: FileSignature) -> List[int]:
|
||||||
|
"""
|
||||||
|
Compare two signatures to find changed blocks
|
||||||
|
|
||||||
|
Args:
|
||||||
|
old_sig: Previous file signature
|
||||||
|
new_sig: New file signature
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of block indices that have changed
|
||||||
|
"""
|
||||||
|
changed_blocks = []
|
||||||
|
|
||||||
|
# Create lookup tables for efficient comparison
|
||||||
|
old_blocks = {block.block_index: block for block in old_sig.blocks}
|
||||||
|
new_blocks = {block.block_index: block for block in new_sig.blocks}
|
||||||
|
|
||||||
|
# Find changed, added, or removed blocks
|
||||||
|
all_indices = set(old_blocks.keys()) | set(new_blocks.keys())
|
||||||
|
|
||||||
|
for block_idx in all_indices:
|
||||||
|
old_block = old_blocks.get(block_idx)
|
||||||
|
new_block = new_blocks.get(block_idx)
|
||||||
|
|
||||||
|
if old_block is None or new_block is None:
|
||||||
|
# Block was added or removed
|
||||||
|
changed_blocks.append(block_idx)
|
||||||
|
elif (
|
||||||
|
old_block.weak_checksum != new_block.weak_checksum
|
||||||
|
or old_block.strong_hash != new_block.strong_hash
|
||||||
|
):
|
||||||
|
# Block content changed
|
||||||
|
changed_blocks.append(block_idx)
|
||||||
|
|
||||||
|
return sorted(changed_blocks)
|
||||||
|
|
||||||
|
def generate_delta(
|
||||||
|
self, old_file: BinaryIO, new_file: BinaryIO, old_signature: FileSignature = None
|
||||||
|
) -> FileDelta:
|
||||||
|
"""
|
||||||
|
Generate a delta between two file versions
|
||||||
|
|
||||||
|
Args:
|
||||||
|
old_file: Previous version of the file
|
||||||
|
new_file: New version of the file
|
||||||
|
old_signature: Optional pre-computed signature of old file
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
FileDelta object containing change information
|
||||||
|
"""
|
||||||
|
# Generate signatures if not provided
|
||||||
|
if old_signature is None:
|
||||||
|
old_signature = self.generate_signature(old_file)
|
||||||
|
|
||||||
|
new_signature = self.generate_signature(new_file)
|
||||||
|
|
||||||
|
# Generate delta using librsync
|
||||||
|
new_file.seek(0)
|
||||||
|
old_sig_io = BytesIO(old_signature.signature_data)
|
||||||
|
delta_io = BytesIO()
|
||||||
|
|
||||||
|
delta(new_file, old_sig_io, delta_io)
|
||||||
|
delta_data = delta_io.getvalue()
|
||||||
|
|
||||||
|
# Find changed blocks
|
||||||
|
changed_blocks = self.compare_signatures(old_signature, new_signature)
|
||||||
|
|
||||||
|
return FileDelta(
|
||||||
|
changed_blocks=changed_blocks,
|
||||||
|
delta_data=delta_data,
|
||||||
|
old_signature=old_signature,
|
||||||
|
new_signature=new_signature,
|
||||||
|
)
|
||||||
|
|
||||||
|
def apply_delta(self, old_file: BinaryIO, delta_obj: FileDelta) -> BytesIO:
|
||||||
|
"""
|
||||||
|
Apply a delta to reconstruct the new file
|
||||||
|
|
||||||
|
Args:
|
||||||
|
old_file: Original file
|
||||||
|
delta_obj: Delta information
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
BytesIO object containing the reconstructed file
|
||||||
|
"""
|
||||||
|
old_file.seek(0)
|
||||||
|
delta_io = BytesIO(delta_obj.delta_data)
|
||||||
|
result_io = BytesIO()
|
||||||
|
|
||||||
|
patch(old_file, delta_io, result_io)
|
||||||
|
result_io.seek(0)
|
||||||
|
|
||||||
|
return result_io
|
||||||
|
|
||||||
|
def calculate_block_changes(
|
||||||
|
self, old_sig: FileSignature, new_sig: FileSignature
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Calculate detailed statistics about block changes
|
||||||
|
|
||||||
|
Args:
|
||||||
|
old_sig: Previous file signature
|
||||||
|
new_sig: New file signature
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with change statistics
|
||||||
|
"""
|
||||||
|
changed_blocks = self.compare_signatures(old_sig, new_sig)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"total_old_blocks": len(old_sig.blocks),
|
||||||
|
"total_new_blocks": len(new_sig.blocks),
|
||||||
|
"changed_blocks": len(changed_blocks),
|
||||||
|
"changed_block_indices": changed_blocks,
|
||||||
|
"unchanged_blocks": len(old_sig.blocks) - len(changed_blocks),
|
||||||
|
"compression_ratio": 1.0 - (len(changed_blocks) / max(len(old_sig.blocks), 1)),
|
||||||
|
"old_file_size": old_sig.file_size,
|
||||||
|
"new_file_size": new_sig.file_size,
|
||||||
|
}
|
||||||
296
cognee/modules/ingestion/incremental/incremental_loader.py
Normal file
296
cognee/modules/ingestion/incremental/incremental_loader.py
Normal file
|
|
@ -0,0 +1,296 @@
|
||||||
|
"""
|
||||||
|
Incremental Loader for Cognee
|
||||||
|
|
||||||
|
This module implements incremental file loading using the rsync algorithm.
|
||||||
|
It integrates with the existing cognee ingestion pipeline to only process
|
||||||
|
changed blocks when a file is re-added.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from io import BytesIO
|
||||||
|
from typing import BinaryIO, List, Optional, Any, Dict, Tuple
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
from cognee.modules.data.models import Data, FileSignature
|
||||||
|
from cognee.modules.users.models import User
|
||||||
|
from cognee.shared.utils import get_file_content_hash
|
||||||
|
|
||||||
|
from .block_hash_service import BlockHashService, FileSignature as ServiceFileSignature
|
||||||
|
|
||||||
|
|
||||||
|
class IncrementalLoader:
|
||||||
|
"""
|
||||||
|
Incremental file loader using rsync algorithm for efficient updates
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, block_size: int = 1024, strong_len: int = 8):
|
||||||
|
"""
|
||||||
|
Initialize the incremental loader
|
||||||
|
|
||||||
|
Args:
|
||||||
|
block_size: Size of blocks in bytes for rsync algorithm
|
||||||
|
strong_len: Length of strong hash in bytes
|
||||||
|
"""
|
||||||
|
self.block_service = BlockHashService(block_size, strong_len)
|
||||||
|
|
||||||
|
async def should_process_file(
|
||||||
|
self, file_obj: BinaryIO, data_id: str
|
||||||
|
) -> Tuple[bool, Optional[Dict]]:
|
||||||
|
"""
|
||||||
|
Determine if a file should be processed based on incremental changes
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_obj: File object to check
|
||||||
|
data_id: Data ID for the file
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (should_process, change_info)
|
||||||
|
- should_process: True if file needs processing
|
||||||
|
- change_info: Dictionary with change details if applicable
|
||||||
|
"""
|
||||||
|
db_engine = get_relational_engine()
|
||||||
|
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
# Check if we have an existing signature for this file
|
||||||
|
existing_signature = await self._get_existing_signature(session, data_id)
|
||||||
|
|
||||||
|
if existing_signature is None:
|
||||||
|
# First time seeing this file, needs full processing
|
||||||
|
return True, {"type": "new_file", "full_processing": True}
|
||||||
|
|
||||||
|
# Generate signature for current file version
|
||||||
|
current_signature = self.block_service.generate_signature(file_obj)
|
||||||
|
|
||||||
|
# Quick check: if overall content hash is the same, no changes
|
||||||
|
file_obj.seek(0)
|
||||||
|
current_content_hash = get_file_content_hash(file_obj)
|
||||||
|
|
||||||
|
if current_content_hash == existing_signature.content_hash:
|
||||||
|
return False, {"type": "no_changes", "full_processing": False}
|
||||||
|
|
||||||
|
# Convert database signature to service signature for comparison
|
||||||
|
service_old_sig = self._db_signature_to_service(existing_signature)
|
||||||
|
|
||||||
|
# Compare signatures to find changed blocks
|
||||||
|
changed_blocks = self.block_service.compare_signatures(
|
||||||
|
service_old_sig, current_signature
|
||||||
|
)
|
||||||
|
|
||||||
|
if not changed_blocks:
|
||||||
|
# Signatures match, no processing needed
|
||||||
|
return False, {"type": "no_changes", "full_processing": False}
|
||||||
|
|
||||||
|
# Calculate change statistics
|
||||||
|
change_stats = self.block_service.calculate_block_changes(
|
||||||
|
service_old_sig, current_signature
|
||||||
|
)
|
||||||
|
|
||||||
|
change_info = {
|
||||||
|
"type": "incremental_changes",
|
||||||
|
"full_processing": len(changed_blocks)
|
||||||
|
> (len(service_old_sig.blocks) * 0.7), # >70% changed = full reprocess
|
||||||
|
"changed_blocks": changed_blocks,
|
||||||
|
"stats": change_stats,
|
||||||
|
"new_signature": current_signature,
|
||||||
|
"old_signature": service_old_sig,
|
||||||
|
}
|
||||||
|
|
||||||
|
return True, change_info
|
||||||
|
|
||||||
|
async def process_incremental_changes(
|
||||||
|
self, file_obj: BinaryIO, data_id: str, change_info: Dict
|
||||||
|
) -> List[Dict]:
|
||||||
|
"""
|
||||||
|
Process only the changed blocks of a file
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_obj: File object to process
|
||||||
|
data_id: Data ID for the file
|
||||||
|
change_info: Change information from should_process_file
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of block data that needs reprocessing
|
||||||
|
"""
|
||||||
|
if change_info["type"] != "incremental_changes":
|
||||||
|
raise ValueError("Invalid change_info type for incremental processing")
|
||||||
|
|
||||||
|
file_obj.seek(0)
|
||||||
|
file_data = file_obj.read()
|
||||||
|
|
||||||
|
changed_blocks = change_info["changed_blocks"]
|
||||||
|
new_signature = change_info["new_signature"]
|
||||||
|
|
||||||
|
# Extract data for changed blocks
|
||||||
|
changed_block_data = []
|
||||||
|
|
||||||
|
for block_idx in changed_blocks:
|
||||||
|
# Find the block info
|
||||||
|
block_info = None
|
||||||
|
for block in new_signature.blocks:
|
||||||
|
if block.block_index == block_idx:
|
||||||
|
block_info = block
|
||||||
|
break
|
||||||
|
|
||||||
|
if block_info is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Extract block data
|
||||||
|
start_offset = block_info.file_offset
|
||||||
|
end_offset = start_offset + block_info.block_size
|
||||||
|
block_data = file_data[start_offset:end_offset]
|
||||||
|
|
||||||
|
changed_block_data.append(
|
||||||
|
{
|
||||||
|
"block_index": block_idx,
|
||||||
|
"block_data": block_data,
|
||||||
|
"block_info": block_info,
|
||||||
|
"file_offset": start_offset,
|
||||||
|
"block_size": len(block_data),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return changed_block_data
|
||||||
|
|
||||||
|
async def save_file_signature(self, file_obj: BinaryIO, data_id: str) -> None:
|
||||||
|
"""
|
||||||
|
Save or update the file signature in the database
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_obj: File object
|
||||||
|
data_id: Data ID for the file
|
||||||
|
"""
|
||||||
|
# Generate signature
|
||||||
|
signature = self.block_service.generate_signature(file_obj, str(data_id))
|
||||||
|
|
||||||
|
# Calculate content hash
|
||||||
|
file_obj.seek(0)
|
||||||
|
content_hash = get_file_content_hash(file_obj)
|
||||||
|
|
||||||
|
db_engine = get_relational_engine()
|
||||||
|
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
# Check if signature already exists
|
||||||
|
existing = await session.execute(
|
||||||
|
select(FileSignature).filter(FileSignature.data_id == data_id)
|
||||||
|
)
|
||||||
|
existing_signature = existing.scalar_one_or_none()
|
||||||
|
|
||||||
|
# Prepare block info for JSON storage
|
||||||
|
blocks_info = [
|
||||||
|
{
|
||||||
|
"block_index": block.block_index,
|
||||||
|
"weak_checksum": block.weak_checksum,
|
||||||
|
"strong_hash": block.strong_hash,
|
||||||
|
"block_size": block.block_size,
|
||||||
|
"file_offset": block.file_offset,
|
||||||
|
}
|
||||||
|
for block in signature.blocks
|
||||||
|
]
|
||||||
|
|
||||||
|
if existing_signature:
|
||||||
|
# Update existing signature
|
||||||
|
existing_signature.file_path = signature.file_path
|
||||||
|
existing_signature.file_size = signature.file_size
|
||||||
|
existing_signature.content_hash = content_hash
|
||||||
|
existing_signature.total_blocks = signature.total_blocks
|
||||||
|
existing_signature.block_size = signature.block_size
|
||||||
|
existing_signature.strong_len = signature.strong_len
|
||||||
|
existing_signature.signature_data = signature.signature_data
|
||||||
|
existing_signature.blocks_info = blocks_info
|
||||||
|
|
||||||
|
await session.merge(existing_signature)
|
||||||
|
else:
|
||||||
|
# Create new signature
|
||||||
|
new_signature = FileSignature(
|
||||||
|
data_id=data_id,
|
||||||
|
file_path=signature.file_path,
|
||||||
|
file_size=signature.file_size,
|
||||||
|
content_hash=content_hash,
|
||||||
|
total_blocks=signature.total_blocks,
|
||||||
|
block_size=signature.block_size,
|
||||||
|
strong_len=signature.strong_len,
|
||||||
|
signature_data=signature.signature_data,
|
||||||
|
blocks_info=blocks_info,
|
||||||
|
)
|
||||||
|
session.add(new_signature)
|
||||||
|
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
async def _get_existing_signature(
|
||||||
|
self, session: AsyncSession, data_id: str
|
||||||
|
) -> Optional[FileSignature]:
|
||||||
|
"""
|
||||||
|
Get existing file signature from database
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session: Database session
|
||||||
|
data_id: Data ID to search for
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
FileSignature object or None if not found
|
||||||
|
"""
|
||||||
|
result = await session.execute(
|
||||||
|
select(FileSignature).filter(FileSignature.data_id == data_id)
|
||||||
|
)
|
||||||
|
return result.scalar_one_or_none()
|
||||||
|
|
||||||
|
def _db_signature_to_service(self, db_signature: FileSignature) -> ServiceFileSignature:
|
||||||
|
"""
|
||||||
|
Convert database FileSignature to service FileSignature
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db_signature: Database signature object
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Service FileSignature object
|
||||||
|
"""
|
||||||
|
from .block_hash_service import BlockInfo
|
||||||
|
|
||||||
|
# Convert blocks info
|
||||||
|
blocks = [
|
||||||
|
BlockInfo(
|
||||||
|
block_index=block["block_index"],
|
||||||
|
weak_checksum=block["weak_checksum"],
|
||||||
|
strong_hash=block["strong_hash"],
|
||||||
|
block_size=block["block_size"],
|
||||||
|
file_offset=block["file_offset"],
|
||||||
|
)
|
||||||
|
for block in db_signature.blocks_info
|
||||||
|
]
|
||||||
|
|
||||||
|
return ServiceFileSignature(
|
||||||
|
file_path=db_signature.file_path,
|
||||||
|
file_size=db_signature.file_size,
|
||||||
|
total_blocks=db_signature.total_blocks,
|
||||||
|
block_size=db_signature.block_size,
|
||||||
|
strong_len=db_signature.strong_len,
|
||||||
|
blocks=blocks,
|
||||||
|
signature_data=db_signature.signature_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def cleanup_orphaned_signatures(self) -> int:
|
||||||
|
"""
|
||||||
|
Clean up file signatures that no longer have corresponding data entries
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of signatures removed
|
||||||
|
"""
|
||||||
|
db_engine = get_relational_engine()
|
||||||
|
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
# Find signatures without corresponding data entries
|
||||||
|
orphaned_query = """
|
||||||
|
DELETE FROM file_signatures
|
||||||
|
WHERE data_id NOT IN (SELECT id FROM data)
|
||||||
|
"""
|
||||||
|
|
||||||
|
result = await session.execute(orphaned_query)
|
||||||
|
removed_count = result.rowcount
|
||||||
|
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
return removed_count
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
import dlt
|
import dlt
|
||||||
import json
|
import json
|
||||||
import inspect
|
import inspect
|
||||||
|
from datetime import datetime
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
from typing import Union, BinaryIO, Any, List, Optional
|
from typing import Union, BinaryIO, Any, List, Optional
|
||||||
import cognee.modules.ingestion as ingestion
|
import cognee.modules.ingestion as ingestion
|
||||||
|
|
@ -13,6 +14,7 @@ from cognee.modules.users.permissions.methods import give_permission_on_dataset
|
||||||
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
|
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
|
||||||
from .get_dlt_destination import get_dlt_destination
|
from .get_dlt_destination import get_dlt_destination
|
||||||
from .save_data_item_to_storage import save_data_item_to_storage
|
from .save_data_item_to_storage import save_data_item_to_storage
|
||||||
|
from cognee.modules.ingestion.incremental import IncrementalLoader
|
||||||
|
|
||||||
|
|
||||||
from cognee.api.v1.add.config import get_s3_config
|
from cognee.api.v1.add.config import get_s3_config
|
||||||
|
|
@ -108,6 +110,17 @@ async def ingest_data(
|
||||||
|
|
||||||
file_metadata = classified_data.get_metadata()
|
file_metadata = classified_data.get_metadata()
|
||||||
|
|
||||||
|
# Initialize incremental loader for this file
|
||||||
|
incremental_loader = IncrementalLoader()
|
||||||
|
|
||||||
|
# Check if file needs incremental processing
|
||||||
|
should_process, change_info = await incremental_loader.should_process_file(
|
||||||
|
file, data_id
|
||||||
|
)
|
||||||
|
|
||||||
|
# Save updated file signature regardless of whether processing is needed
|
||||||
|
await incremental_loader.save_file_signature(file, data_id)
|
||||||
|
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
from cognee.modules.data.models import Data
|
from cognee.modules.data.models import Data
|
||||||
|
|
@ -140,6 +153,13 @@ async def ingest_data(
|
||||||
if node_set:
|
if node_set:
|
||||||
ext_metadata["node_set"] = node_set
|
ext_metadata["node_set"] = node_set
|
||||||
|
|
||||||
|
# Add incremental processing metadata
|
||||||
|
ext_metadata["incremental_processing"] = {
|
||||||
|
"should_process": should_process,
|
||||||
|
"change_info": change_info,
|
||||||
|
"processing_timestamp": json.loads(json.dumps(datetime.now().isoformat())),
|
||||||
|
}
|
||||||
|
|
||||||
if data_point is not None:
|
if data_point is not None:
|
||||||
data_point.name = file_metadata["name"]
|
data_point.name = file_metadata["name"]
|
||||||
data_point.raw_data_location = file_metadata["file_path"]
|
data_point.raw_data_location = file_metadata["file_path"]
|
||||||
|
|
|
||||||
|
|
@ -51,12 +51,12 @@ def test_AudioDocument(mock_engine):
|
||||||
GROUND_TRUTH,
|
GROUND_TRUTH,
|
||||||
document.read(chunker_cls=TextChunker, max_chunk_size=64),
|
document.read(chunker_cls=TextChunker, max_chunk_size=64),
|
||||||
):
|
):
|
||||||
assert ground_truth["word_count"] == paragraph_data.chunk_size, (
|
assert (
|
||||||
f'{ground_truth["word_count"] = } != {paragraph_data.chunk_size = }'
|
ground_truth["word_count"] == paragraph_data.chunk_size
|
||||||
)
|
), f'{ground_truth["word_count"] = } != {paragraph_data.chunk_size = }'
|
||||||
assert ground_truth["len_text"] == len(paragraph_data.text), (
|
assert ground_truth["len_text"] == len(
|
||||||
f'{ground_truth["len_text"] = } != {len(paragraph_data.text) = }'
|
paragraph_data.text
|
||||||
)
|
), f'{ground_truth["len_text"] = } != {len(paragraph_data.text) = }'
|
||||||
assert ground_truth["cut_type"] == paragraph_data.cut_type, (
|
assert (
|
||||||
f'{ground_truth["cut_type"] = } != {paragraph_data.cut_type = }'
|
ground_truth["cut_type"] == paragraph_data.cut_type
|
||||||
)
|
), f'{ground_truth["cut_type"] = } != {paragraph_data.cut_type = }'
|
||||||
|
|
|
||||||
|
|
@ -34,12 +34,12 @@ def test_ImageDocument(mock_engine):
|
||||||
GROUND_TRUTH,
|
GROUND_TRUTH,
|
||||||
document.read(chunker_cls=TextChunker, max_chunk_size=64),
|
document.read(chunker_cls=TextChunker, max_chunk_size=64),
|
||||||
):
|
):
|
||||||
assert ground_truth["word_count"] == paragraph_data.chunk_size, (
|
assert (
|
||||||
f'{ground_truth["word_count"] = } != {paragraph_data.chunk_size = }'
|
ground_truth["word_count"] == paragraph_data.chunk_size
|
||||||
)
|
), f'{ground_truth["word_count"] = } != {paragraph_data.chunk_size = }'
|
||||||
assert ground_truth["len_text"] == len(paragraph_data.text), (
|
assert ground_truth["len_text"] == len(
|
||||||
f'{ground_truth["len_text"] = } != {len(paragraph_data.text) = }'
|
paragraph_data.text
|
||||||
)
|
), f'{ground_truth["len_text"] = } != {len(paragraph_data.text) = }'
|
||||||
assert ground_truth["cut_type"] == paragraph_data.cut_type, (
|
assert (
|
||||||
f'{ground_truth["cut_type"] = } != {paragraph_data.cut_type = }'
|
ground_truth["cut_type"] == paragraph_data.cut_type
|
||||||
)
|
), f'{ground_truth["cut_type"] = } != {paragraph_data.cut_type = }'
|
||||||
|
|
|
||||||
|
|
@ -36,12 +36,12 @@ def test_PdfDocument(mock_engine):
|
||||||
for ground_truth, paragraph_data in zip(
|
for ground_truth, paragraph_data in zip(
|
||||||
GROUND_TRUTH, document.read(chunker_cls=TextChunker, max_chunk_size=1024)
|
GROUND_TRUTH, document.read(chunker_cls=TextChunker, max_chunk_size=1024)
|
||||||
):
|
):
|
||||||
assert ground_truth["word_count"] == paragraph_data.chunk_size, (
|
assert (
|
||||||
f'{ground_truth["word_count"] = } != {paragraph_data.chunk_size = }'
|
ground_truth["word_count"] == paragraph_data.chunk_size
|
||||||
)
|
), f'{ground_truth["word_count"] = } != {paragraph_data.chunk_size = }'
|
||||||
assert ground_truth["len_text"] == len(paragraph_data.text), (
|
assert ground_truth["len_text"] == len(
|
||||||
f'{ground_truth["len_text"] = } != {len(paragraph_data.text) = }'
|
paragraph_data.text
|
||||||
)
|
), f'{ground_truth["len_text"] = } != {len(paragraph_data.text) = }'
|
||||||
assert ground_truth["cut_type"] == paragraph_data.cut_type, (
|
assert (
|
||||||
f'{ground_truth["cut_type"] = } != {paragraph_data.cut_type = }'
|
ground_truth["cut_type"] == paragraph_data.cut_type
|
||||||
)
|
), f'{ground_truth["cut_type"] = } != {paragraph_data.cut_type = }'
|
||||||
|
|
|
||||||
|
|
@ -49,12 +49,12 @@ def test_TextDocument(mock_engine, input_file, chunk_size):
|
||||||
GROUND_TRUTH[input_file],
|
GROUND_TRUTH[input_file],
|
||||||
document.read(chunker_cls=TextChunker, max_chunk_size=chunk_size),
|
document.read(chunker_cls=TextChunker, max_chunk_size=chunk_size),
|
||||||
):
|
):
|
||||||
assert ground_truth["word_count"] == paragraph_data.chunk_size, (
|
assert (
|
||||||
f'{ground_truth["word_count"] = } != {paragraph_data.chunk_size = }'
|
ground_truth["word_count"] == paragraph_data.chunk_size
|
||||||
)
|
), f'{ground_truth["word_count"] = } != {paragraph_data.chunk_size = }'
|
||||||
assert ground_truth["len_text"] == len(paragraph_data.text), (
|
assert ground_truth["len_text"] == len(
|
||||||
f'{ground_truth["len_text"] = } != {len(paragraph_data.text) = }'
|
paragraph_data.text
|
||||||
)
|
), f'{ground_truth["len_text"] = } != {len(paragraph_data.text) = }'
|
||||||
assert ground_truth["cut_type"] == paragraph_data.cut_type, (
|
assert (
|
||||||
f'{ground_truth["cut_type"] = } != {paragraph_data.cut_type = }'
|
ground_truth["cut_type"] == paragraph_data.cut_type
|
||||||
)
|
), f'{ground_truth["cut_type"] = } != {paragraph_data.cut_type = }'
|
||||||
|
|
|
||||||
|
|
@ -79,32 +79,32 @@ def test_UnstructuredDocument(mock_engine):
|
||||||
for paragraph_data in pptx_document.read(chunker_cls=TextChunker, max_chunk_size=1024):
|
for paragraph_data in pptx_document.read(chunker_cls=TextChunker, max_chunk_size=1024):
|
||||||
assert 19 == paragraph_data.chunk_size, f" 19 != {paragraph_data.chunk_size = }"
|
assert 19 == paragraph_data.chunk_size, f" 19 != {paragraph_data.chunk_size = }"
|
||||||
assert 104 == len(paragraph_data.text), f" 104 != {len(paragraph_data.text) = }"
|
assert 104 == len(paragraph_data.text), f" 104 != {len(paragraph_data.text) = }"
|
||||||
assert "sentence_cut" == paragraph_data.cut_type, (
|
assert (
|
||||||
f" sentence_cut != {paragraph_data.cut_type = }"
|
"sentence_cut" == paragraph_data.cut_type
|
||||||
)
|
), f" sentence_cut != {paragraph_data.cut_type = }"
|
||||||
|
|
||||||
# Test DOCX
|
# Test DOCX
|
||||||
for paragraph_data in docx_document.read(chunker_cls=TextChunker, max_chunk_size=1024):
|
for paragraph_data in docx_document.read(chunker_cls=TextChunker, max_chunk_size=1024):
|
||||||
assert 16 == paragraph_data.chunk_size, f" 16 != {paragraph_data.chunk_size = }"
|
assert 16 == paragraph_data.chunk_size, f" 16 != {paragraph_data.chunk_size = }"
|
||||||
assert 145 == len(paragraph_data.text), f" 145 != {len(paragraph_data.text) = }"
|
assert 145 == len(paragraph_data.text), f" 145 != {len(paragraph_data.text) = }"
|
||||||
assert "sentence_end" == paragraph_data.cut_type, (
|
assert (
|
||||||
f" sentence_end != {paragraph_data.cut_type = }"
|
"sentence_end" == paragraph_data.cut_type
|
||||||
)
|
), f" sentence_end != {paragraph_data.cut_type = }"
|
||||||
|
|
||||||
# TEST CSV
|
# TEST CSV
|
||||||
for paragraph_data in csv_document.read(chunker_cls=TextChunker, max_chunk_size=1024):
|
for paragraph_data in csv_document.read(chunker_cls=TextChunker, max_chunk_size=1024):
|
||||||
assert 15 == paragraph_data.chunk_size, f" 15 != {paragraph_data.chunk_size = }"
|
assert 15 == paragraph_data.chunk_size, f" 15 != {paragraph_data.chunk_size = }"
|
||||||
assert "A A A A A A A A A,A A A A A A,A A" == paragraph_data.text, (
|
assert (
|
||||||
f"Read text doesn't match expected text: {paragraph_data.text}"
|
"A A A A A A A A A,A A A A A A,A A" == paragraph_data.text
|
||||||
)
|
), f"Read text doesn't match expected text: {paragraph_data.text}"
|
||||||
assert "sentence_cut" == paragraph_data.cut_type, (
|
assert (
|
||||||
f" sentence_cut != {paragraph_data.cut_type = }"
|
"sentence_cut" == paragraph_data.cut_type
|
||||||
)
|
), f" sentence_cut != {paragraph_data.cut_type = }"
|
||||||
|
|
||||||
# Test XLSX
|
# Test XLSX
|
||||||
for paragraph_data in xlsx_document.read(chunker_cls=TextChunker, max_chunk_size=1024):
|
for paragraph_data in xlsx_document.read(chunker_cls=TextChunker, max_chunk_size=1024):
|
||||||
assert 36 == paragraph_data.chunk_size, f" 36 != {paragraph_data.chunk_size = }"
|
assert 36 == paragraph_data.chunk_size, f" 36 != {paragraph_data.chunk_size = }"
|
||||||
assert 171 == len(paragraph_data.text), f" 171 != {len(paragraph_data.text) = }"
|
assert 171 == len(paragraph_data.text), f" 171 != {len(paragraph_data.text) = }"
|
||||||
assert "sentence_cut" == paragraph_data.cut_type, (
|
assert (
|
||||||
f" sentence_cut != {paragraph_data.cut_type = }"
|
"sentence_cut" == paragraph_data.cut_type
|
||||||
)
|
), f" sentence_cut != {paragraph_data.cut_type = }"
|
||||||
|
|
|
||||||
|
|
@ -12,9 +12,9 @@ async def check_graph_metrics_consistency_across_adapters(include_optional=False
|
||||||
raise AssertionError(f"Metrics dictionaries have different keys: {diff_keys}")
|
raise AssertionError(f"Metrics dictionaries have different keys: {diff_keys}")
|
||||||
|
|
||||||
for key, neo4j_value in neo4j_metrics.items():
|
for key, neo4j_value in neo4j_metrics.items():
|
||||||
assert networkx_metrics[key] == neo4j_value, (
|
assert (
|
||||||
f"Difference in '{key}': got {neo4j_value} with neo4j and {networkx_metrics[key]} with networkx"
|
networkx_metrics[key] == neo4j_value
|
||||||
)
|
), f"Difference in '{key}': got {neo4j_value} with neo4j and {networkx_metrics[key]} with networkx"
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
||||||
|
|
@ -71,6 +71,6 @@ async def assert_metrics(provider, include_optional=True):
|
||||||
raise AssertionError(f"Metrics dictionaries have different keys: {diff_keys}")
|
raise AssertionError(f"Metrics dictionaries have different keys: {diff_keys}")
|
||||||
|
|
||||||
for key, ground_truth_value in ground_truth_metrics.items():
|
for key, ground_truth_value in ground_truth_metrics.items():
|
||||||
assert metrics[key] == ground_truth_value, (
|
assert (
|
||||||
f"Expected {ground_truth_value} for '{key}' with {provider}, got {metrics[key]}"
|
metrics[key] == ground_truth_value
|
||||||
)
|
), f"Expected {ground_truth_value} for '{key}' with {provider}, got {metrics[key]}"
|
||||||
|
|
|
||||||
|
|
@ -24,28 +24,28 @@ async def test_local_file_deletion(data_text, file_location):
|
||||||
data_hash = hashlib.md5(encoded_text).hexdigest()
|
data_hash = hashlib.md5(encoded_text).hexdigest()
|
||||||
# Get data entry from database based on hash contents
|
# Get data entry from database based on hash contents
|
||||||
data = (await session.scalars(select(Data).where(Data.content_hash == data_hash))).one()
|
data = (await session.scalars(select(Data).where(Data.content_hash == data_hash))).one()
|
||||||
assert os.path.isfile(data.raw_data_location), (
|
assert os.path.isfile(
|
||||||
f"Data location doesn't exist: {data.raw_data_location}"
|
data.raw_data_location
|
||||||
)
|
), f"Data location doesn't exist: {data.raw_data_location}"
|
||||||
# Test deletion of data along with local files created by cognee
|
# Test deletion of data along with local files created by cognee
|
||||||
await engine.delete_data_entity(data.id)
|
await engine.delete_data_entity(data.id)
|
||||||
assert not os.path.exists(data.raw_data_location), (
|
assert not os.path.exists(
|
||||||
f"Data location still exists after deletion: {data.raw_data_location}"
|
data.raw_data_location
|
||||||
)
|
), f"Data location still exists after deletion: {data.raw_data_location}"
|
||||||
|
|
||||||
async with engine.get_async_session() as session:
|
async with engine.get_async_session() as session:
|
||||||
# Get data entry from database based on file path
|
# Get data entry from database based on file path
|
||||||
data = (
|
data = (
|
||||||
await session.scalars(select(Data).where(Data.raw_data_location == file_location))
|
await session.scalars(select(Data).where(Data.raw_data_location == file_location))
|
||||||
).one()
|
).one()
|
||||||
assert os.path.isfile(data.raw_data_location), (
|
assert os.path.isfile(
|
||||||
f"Data location doesn't exist: {data.raw_data_location}"
|
data.raw_data_location
|
||||||
)
|
), f"Data location doesn't exist: {data.raw_data_location}"
|
||||||
# Test local files not created by cognee won't get deleted
|
# Test local files not created by cognee won't get deleted
|
||||||
await engine.delete_data_entity(data.id)
|
await engine.delete_data_entity(data.id)
|
||||||
assert os.path.exists(data.raw_data_location), (
|
assert os.path.exists(
|
||||||
f"Data location doesn't exists: {data.raw_data_location}"
|
data.raw_data_location
|
||||||
)
|
), f"Data location doesn't exists: {data.raw_data_location}"
|
||||||
|
|
||||||
|
|
||||||
async def test_getting_of_documents(dataset_name_1):
|
async def test_getting_of_documents(dataset_name_1):
|
||||||
|
|
@ -54,16 +54,16 @@ async def test_getting_of_documents(dataset_name_1):
|
||||||
|
|
||||||
user = await get_default_user()
|
user = await get_default_user()
|
||||||
document_ids = await get_document_ids_for_user(user.id, [dataset_name_1])
|
document_ids = await get_document_ids_for_user(user.id, [dataset_name_1])
|
||||||
assert len(document_ids) == 1, (
|
assert (
|
||||||
f"Number of expected documents doesn't match {len(document_ids)} != 1"
|
len(document_ids) == 1
|
||||||
)
|
), f"Number of expected documents doesn't match {len(document_ids)} != 1"
|
||||||
|
|
||||||
# Test getting of documents for search when no dataset is provided
|
# Test getting of documents for search when no dataset is provided
|
||||||
user = await get_default_user()
|
user = await get_default_user()
|
||||||
document_ids = await get_document_ids_for_user(user.id)
|
document_ids = await get_document_ids_for_user(user.id)
|
||||||
assert len(document_ids) == 2, (
|
assert (
|
||||||
f"Number of expected documents doesn't match {len(document_ids)} != 2"
|
len(document_ids) == 2
|
||||||
)
|
), f"Number of expected documents doesn't match {len(document_ids)} != 2"
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
|
||||||
|
|
@ -30,9 +30,9 @@ async def test_deduplication():
|
||||||
|
|
||||||
result = await relational_engine.get_all_data_from_table("data")
|
result = await relational_engine.get_all_data_from_table("data")
|
||||||
assert len(result) == 1, "More than one data entity was found."
|
assert len(result) == 1, "More than one data entity was found."
|
||||||
assert result[0]["name"] == "Natural_language_processing_copy", (
|
assert (
|
||||||
"Result name does not match expected value."
|
result[0]["name"] == "Natural_language_processing_copy"
|
||||||
)
|
), "Result name does not match expected value."
|
||||||
|
|
||||||
result = await relational_engine.get_all_data_from_table("datasets")
|
result = await relational_engine.get_all_data_from_table("datasets")
|
||||||
assert len(result) == 2, "Unexpected number of datasets found."
|
assert len(result) == 2, "Unexpected number of datasets found."
|
||||||
|
|
@ -61,9 +61,9 @@ async def test_deduplication():
|
||||||
|
|
||||||
result = await relational_engine.get_all_data_from_table("data")
|
result = await relational_engine.get_all_data_from_table("data")
|
||||||
assert len(result) == 1, "More than one data entity was found."
|
assert len(result) == 1, "More than one data entity was found."
|
||||||
assert hashlib.md5(text.encode("utf-8")).hexdigest() in result[0]["name"], (
|
assert (
|
||||||
"Content hash is not a part of file name."
|
hashlib.md5(text.encode("utf-8")).hexdigest() in result[0]["name"]
|
||||||
)
|
), "Content hash is not a part of file name."
|
||||||
|
|
||||||
await cognee.prune.prune_data()
|
await cognee.prune.prune_data()
|
||||||
await cognee.prune.prune_system(metadata=True)
|
await cognee.prune.prune_system(metadata=True)
|
||||||
|
|
|
||||||
|
|
@ -92,9 +92,9 @@ async def main():
|
||||||
|
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
|
||||||
assert not os.path.exists(get_relational_engine().db_path), (
|
assert not os.path.exists(
|
||||||
"SQLite relational database is not empty"
|
get_relational_engine().db_path
|
||||||
)
|
), "SQLite relational database is not empty"
|
||||||
|
|
||||||
from cognee.infrastructure.databases.graph import get_graph_config
|
from cognee.infrastructure.databases.graph import get_graph_config
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -103,13 +103,13 @@ async def main():
|
||||||
node_name=["nonexistent"],
|
node_name=["nonexistent"],
|
||||||
).get_context("What is in the context?")
|
).get_context("What is in the context?")
|
||||||
|
|
||||||
assert isinstance(context_nonempty, str) and context_nonempty != "", (
|
assert (
|
||||||
f"Nodeset_search_test:Expected non-empty string for context_nonempty, got: {context_nonempty!r}"
|
isinstance(context_nonempty, str) and context_nonempty != ""
|
||||||
)
|
), f"Nodeset_search_test:Expected non-empty string for context_nonempty, got: {context_nonempty!r}"
|
||||||
|
|
||||||
assert context_empty == "", (
|
assert (
|
||||||
f"Nodeset_search_test:Expected empty string for context_empty, got: {context_empty!r}"
|
context_empty == ""
|
||||||
)
|
), f"Nodeset_search_test:Expected empty string for context_empty, got: {context_empty!r}"
|
||||||
|
|
||||||
await cognee.prune.prune_data()
|
await cognee.prune.prune_data()
|
||||||
assert not os.path.isdir(data_directory_path), "Local data files are not deleted"
|
assert not os.path.isdir(data_directory_path), "Local data files are not deleted"
|
||||||
|
|
|
||||||
|
|
@ -107,13 +107,13 @@ async def main():
|
||||||
node_name=["nonexistent"],
|
node_name=["nonexistent"],
|
||||||
).get_context("What is in the context?")
|
).get_context("What is in the context?")
|
||||||
|
|
||||||
assert isinstance(context_nonempty, str) and context_nonempty != "", (
|
assert (
|
||||||
f"Nodeset_search_test:Expected non-empty string for context_nonempty, got: {context_nonempty!r}"
|
isinstance(context_nonempty, str) and context_nonempty != ""
|
||||||
)
|
), f"Nodeset_search_test:Expected non-empty string for context_nonempty, got: {context_nonempty!r}"
|
||||||
|
|
||||||
assert context_empty == "", (
|
assert (
|
||||||
f"Nodeset_search_test:Expected empty string for context_empty, got: {context_empty!r}"
|
context_empty == ""
|
||||||
)
|
), f"Nodeset_search_test:Expected empty string for context_empty, got: {context_empty!r}"
|
||||||
|
|
||||||
await cognee.prune.prune_data()
|
await cognee.prune.prune_data()
|
||||||
assert not os.path.isdir(data_directory_path), "Local data files are not deleted"
|
assert not os.path.isdir(data_directory_path), "Local data files are not deleted"
|
||||||
|
|
|
||||||
|
|
@ -23,28 +23,28 @@ async def test_local_file_deletion(data_text, file_location):
|
||||||
data_hash = hashlib.md5(encoded_text).hexdigest()
|
data_hash = hashlib.md5(encoded_text).hexdigest()
|
||||||
# Get data entry from database based on hash contents
|
# Get data entry from database based on hash contents
|
||||||
data = (await session.scalars(select(Data).where(Data.content_hash == data_hash))).one()
|
data = (await session.scalars(select(Data).where(Data.content_hash == data_hash))).one()
|
||||||
assert os.path.isfile(data.raw_data_location), (
|
assert os.path.isfile(
|
||||||
f"Data location doesn't exist: {data.raw_data_location}"
|
data.raw_data_location
|
||||||
)
|
), f"Data location doesn't exist: {data.raw_data_location}"
|
||||||
# Test deletion of data along with local files created by cognee
|
# Test deletion of data along with local files created by cognee
|
||||||
await engine.delete_data_entity(data.id)
|
await engine.delete_data_entity(data.id)
|
||||||
assert not os.path.exists(data.raw_data_location), (
|
assert not os.path.exists(
|
||||||
f"Data location still exists after deletion: {data.raw_data_location}"
|
data.raw_data_location
|
||||||
)
|
), f"Data location still exists after deletion: {data.raw_data_location}"
|
||||||
|
|
||||||
async with engine.get_async_session() as session:
|
async with engine.get_async_session() as session:
|
||||||
# Get data entry from database based on file path
|
# Get data entry from database based on file path
|
||||||
data = (
|
data = (
|
||||||
await session.scalars(select(Data).where(Data.raw_data_location == file_location))
|
await session.scalars(select(Data).where(Data.raw_data_location == file_location))
|
||||||
).one()
|
).one()
|
||||||
assert os.path.isfile(data.raw_data_location), (
|
assert os.path.isfile(
|
||||||
f"Data location doesn't exist: {data.raw_data_location}"
|
data.raw_data_location
|
||||||
)
|
), f"Data location doesn't exist: {data.raw_data_location}"
|
||||||
# Test local files not created by cognee won't get deleted
|
# Test local files not created by cognee won't get deleted
|
||||||
await engine.delete_data_entity(data.id)
|
await engine.delete_data_entity(data.id)
|
||||||
assert os.path.exists(data.raw_data_location), (
|
assert os.path.exists(
|
||||||
f"Data location doesn't exists: {data.raw_data_location}"
|
data.raw_data_location
|
||||||
)
|
), f"Data location doesn't exists: {data.raw_data_location}"
|
||||||
|
|
||||||
|
|
||||||
async def test_getting_of_documents(dataset_name_1):
|
async def test_getting_of_documents(dataset_name_1):
|
||||||
|
|
@ -53,16 +53,16 @@ async def test_getting_of_documents(dataset_name_1):
|
||||||
|
|
||||||
user = await get_default_user()
|
user = await get_default_user()
|
||||||
document_ids = await get_document_ids_for_user(user.id, [dataset_name_1])
|
document_ids = await get_document_ids_for_user(user.id, [dataset_name_1])
|
||||||
assert len(document_ids) == 1, (
|
assert (
|
||||||
f"Number of expected documents doesn't match {len(document_ids)} != 1"
|
len(document_ids) == 1
|
||||||
)
|
), f"Number of expected documents doesn't match {len(document_ids)} != 1"
|
||||||
|
|
||||||
# Test getting of documents for search when no dataset is provided
|
# Test getting of documents for search when no dataset is provided
|
||||||
user = await get_default_user()
|
user = await get_default_user()
|
||||||
document_ids = await get_document_ids_for_user(user.id)
|
document_ids = await get_document_ids_for_user(user.id)
|
||||||
assert len(document_ids) == 2, (
|
assert (
|
||||||
f"Number of expected documents doesn't match {len(document_ids)} != 2"
|
len(document_ids) == 2
|
||||||
)
|
), f"Number of expected documents doesn't match {len(document_ids)} != 2"
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
|
||||||
|
|
@ -112,9 +112,9 @@ async def relational_db_migration():
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unsupported graph database provider: {graph_db_provider}")
|
raise ValueError(f"Unsupported graph database provider: {graph_db_provider}")
|
||||||
|
|
||||||
assert len(distinct_node_names) == 12, (
|
assert (
|
||||||
f"Expected 12 distinct node references, found {len(distinct_node_names)}"
|
len(distinct_node_names) == 12
|
||||||
)
|
), f"Expected 12 distinct node references, found {len(distinct_node_names)}"
|
||||||
assert len(found_edges) == 15, f"Expected 15 {relationship_label} edges, got {len(found_edges)}"
|
assert len(found_edges) == 15, f"Expected 15 {relationship_label} edges, got {len(found_edges)}"
|
||||||
|
|
||||||
expected_edges = {
|
expected_edges = {
|
||||||
|
|
|
||||||
|
|
@ -29,54 +29,54 @@ async def main():
|
||||||
logging.info(edge_type_counts)
|
logging.info(edge_type_counts)
|
||||||
|
|
||||||
# Assert there is exactly one PdfDocument.
|
# Assert there is exactly one PdfDocument.
|
||||||
assert type_counts.get("PdfDocument", 0) == 1, (
|
assert (
|
||||||
f"Expected exactly one PdfDocument, but found {type_counts.get('PdfDocument', 0)}"
|
type_counts.get("PdfDocument", 0) == 1
|
||||||
)
|
), f"Expected exactly one PdfDocument, but found {type_counts.get('PdfDocument', 0)}"
|
||||||
|
|
||||||
# Assert there is exactly one TextDocument.
|
# Assert there is exactly one TextDocument.
|
||||||
assert type_counts.get("TextDocument", 0) == 1, (
|
assert (
|
||||||
f"Expected exactly one TextDocument, but found {type_counts.get('TextDocument', 0)}"
|
type_counts.get("TextDocument", 0) == 1
|
||||||
)
|
), f"Expected exactly one TextDocument, but found {type_counts.get('TextDocument', 0)}"
|
||||||
|
|
||||||
# Assert there are at least two DocumentChunk nodes.
|
# Assert there are at least two DocumentChunk nodes.
|
||||||
assert type_counts.get("DocumentChunk", 0) >= 2, (
|
assert (
|
||||||
f"Expected at least two DocumentChunk nodes, but found {type_counts.get('DocumentChunk', 0)}"
|
type_counts.get("DocumentChunk", 0) >= 2
|
||||||
)
|
), f"Expected at least two DocumentChunk nodes, but found {type_counts.get('DocumentChunk', 0)}"
|
||||||
|
|
||||||
# Assert there is at least two TextSummary.
|
# Assert there is at least two TextSummary.
|
||||||
assert type_counts.get("TextSummary", 0) >= 2, (
|
assert (
|
||||||
f"Expected at least two TextSummary, but found {type_counts.get('TextSummary', 0)}"
|
type_counts.get("TextSummary", 0) >= 2
|
||||||
)
|
), f"Expected at least two TextSummary, but found {type_counts.get('TextSummary', 0)}"
|
||||||
|
|
||||||
# Assert there is at least one Entity.
|
# Assert there is at least one Entity.
|
||||||
assert type_counts.get("Entity", 0) > 0, (
|
assert (
|
||||||
f"Expected more than zero Entity nodes, but found {type_counts.get('Entity', 0)}"
|
type_counts.get("Entity", 0) > 0
|
||||||
)
|
), f"Expected more than zero Entity nodes, but found {type_counts.get('Entity', 0)}"
|
||||||
|
|
||||||
# Assert there is at least one EntityType.
|
# Assert there is at least one EntityType.
|
||||||
assert type_counts.get("EntityType", 0) > 0, (
|
assert (
|
||||||
f"Expected more than zero EntityType nodes, but found {type_counts.get('EntityType', 0)}"
|
type_counts.get("EntityType", 0) > 0
|
||||||
)
|
), f"Expected more than zero EntityType nodes, but found {type_counts.get('EntityType', 0)}"
|
||||||
|
|
||||||
# Assert that there are at least two 'is_part_of' edges.
|
# Assert that there are at least two 'is_part_of' edges.
|
||||||
assert edge_type_counts.get("is_part_of", 0) >= 2, (
|
assert (
|
||||||
f"Expected at least two 'is_part_of' edges, but found {edge_type_counts.get('is_part_of', 0)}"
|
edge_type_counts.get("is_part_of", 0) >= 2
|
||||||
)
|
), f"Expected at least two 'is_part_of' edges, but found {edge_type_counts.get('is_part_of', 0)}"
|
||||||
|
|
||||||
# Assert that there are at least two 'made_from' edges.
|
# Assert that there are at least two 'made_from' edges.
|
||||||
assert edge_type_counts.get("made_from", 0) >= 2, (
|
assert (
|
||||||
f"Expected at least two 'made_from' edges, but found {edge_type_counts.get('made_from', 0)}"
|
edge_type_counts.get("made_from", 0) >= 2
|
||||||
)
|
), f"Expected at least two 'made_from' edges, but found {edge_type_counts.get('made_from', 0)}"
|
||||||
|
|
||||||
# Assert that there is at least one 'is_a' edge.
|
# Assert that there is at least one 'is_a' edge.
|
||||||
assert edge_type_counts.get("is_a", 0) >= 1, (
|
assert (
|
||||||
f"Expected at least one 'is_a' edge, but found {edge_type_counts.get('is_a', 0)}"
|
edge_type_counts.get("is_a", 0) >= 1
|
||||||
)
|
), f"Expected at least one 'is_a' edge, but found {edge_type_counts.get('is_a', 0)}"
|
||||||
|
|
||||||
# Assert that there is at least one 'contains' edge.
|
# Assert that there is at least one 'contains' edge.
|
||||||
assert edge_type_counts.get("contains", 0) >= 1, (
|
assert (
|
||||||
f"Expected at least one 'contains' edge, but found {edge_type_counts.get('contains', 0)}"
|
edge_type_counts.get("contains", 0) >= 1
|
||||||
)
|
), f"Expected at least one 'contains' edge, but found {edge_type_counts.get('contains', 0)}"
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
||||||
|
|
@ -66,9 +66,9 @@ async def main():
|
||||||
assert isinstance(context, str), f"{name}: Context should be a string"
|
assert isinstance(context, str), f"{name}: Context should be a string"
|
||||||
assert context.strip(), f"{name}: Context should not be empty"
|
assert context.strip(), f"{name}: Context should not be empty"
|
||||||
lower = context.lower()
|
lower = context.lower()
|
||||||
assert "germany" in lower or "netherlands" in lower, (
|
assert (
|
||||||
f"{name}: Context did not contain 'germany' or 'netherlands'; got: {context!r}"
|
"germany" in lower or "netherlands" in lower
|
||||||
)
|
), f"{name}: Context did not contain 'germany' or 'netherlands'; got: {context!r}"
|
||||||
|
|
||||||
triplets_gk = await GraphCompletionRetriever().get_triplets(
|
triplets_gk = await GraphCompletionRetriever().get_triplets(
|
||||||
query="Next to which country is Germany located?"
|
query="Next to which country is Germany located?"
|
||||||
|
|
@ -96,18 +96,18 @@ async def main():
|
||||||
distance = edge.attributes.get("vector_distance")
|
distance = edge.attributes.get("vector_distance")
|
||||||
node1_distance = edge.node1.attributes.get("vector_distance")
|
node1_distance = edge.node1.attributes.get("vector_distance")
|
||||||
node2_distance = edge.node2.attributes.get("vector_distance")
|
node2_distance = edge.node2.attributes.get("vector_distance")
|
||||||
assert isinstance(distance, float), (
|
assert isinstance(
|
||||||
f"{name}: vector_distance should be float, got {type(distance)}"
|
distance, float
|
||||||
)
|
), f"{name}: vector_distance should be float, got {type(distance)}"
|
||||||
assert 0 <= distance <= 1, (
|
assert (
|
||||||
f"{name}: edge vector_distance {distance} out of [0,1], this shouldn't happen"
|
0 <= distance <= 1
|
||||||
)
|
), f"{name}: edge vector_distance {distance} out of [0,1], this shouldn't happen"
|
||||||
assert 0 <= node1_distance <= 1, (
|
assert (
|
||||||
f"{name}: node_1 vector_distance {distance} out of [0,1], this shouldn't happen"
|
0 <= node1_distance <= 1
|
||||||
)
|
), f"{name}: node_1 vector_distance {distance} out of [0,1], this shouldn't happen"
|
||||||
assert 0 <= node2_distance <= 1, (
|
assert (
|
||||||
f"{name}: node_2 vector_distance {distance} out of [0,1], this shouldn't happen"
|
0 <= node2_distance <= 1
|
||||||
)
|
), f"{name}: node_2 vector_distance {distance} out of [0,1], this shouldn't happen"
|
||||||
|
|
||||||
completion_gk = await cognee.search(
|
completion_gk = await cognee.search(
|
||||||
query_type=SearchType.GRAPH_COMPLETION,
|
query_type=SearchType.GRAPH_COMPLETION,
|
||||||
|
|
@ -137,9 +137,9 @@ async def main():
|
||||||
text = completion[0]
|
text = completion[0]
|
||||||
assert isinstance(text, str), f"{name}: element should be a string"
|
assert isinstance(text, str), f"{name}: element should be a string"
|
||||||
assert text.strip(), f"{name}: string should not be empty"
|
assert text.strip(), f"{name}: string should not be empty"
|
||||||
assert "netherlands" in text.lower(), (
|
assert (
|
||||||
f"{name}: expected 'netherlands' in result, got: {text!r}"
|
"netherlands" in text.lower()
|
||||||
)
|
), f"{name}: expected 'netherlands' in result, got: {text!r}"
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
||||||
|
|
@ -24,12 +24,12 @@ async def test_answer_generation():
|
||||||
mock_retriever.get_context.assert_any_await(qa_pairs[0]["question"])
|
mock_retriever.get_context.assert_any_await(qa_pairs[0]["question"])
|
||||||
|
|
||||||
assert len(answers) == len(qa_pairs)
|
assert len(answers) == len(qa_pairs)
|
||||||
assert answers[0]["question"] == qa_pairs[0]["question"], (
|
assert (
|
||||||
"AnswerGeneratorExecutor is passing the question incorrectly"
|
answers[0]["question"] == qa_pairs[0]["question"]
|
||||||
)
|
), "AnswerGeneratorExecutor is passing the question incorrectly"
|
||||||
assert answers[0]["golden_answer"] == qa_pairs[0]["answer"], (
|
assert (
|
||||||
"AnswerGeneratorExecutor is passing the golden answer incorrectly"
|
answers[0]["golden_answer"] == qa_pairs[0]["answer"]
|
||||||
)
|
), "AnswerGeneratorExecutor is passing the golden answer incorrectly"
|
||||||
assert answers[0]["answer"] == "Mocked answer", (
|
assert (
|
||||||
"AnswerGeneratorExecutor is passing the generated answer incorrectly"
|
answers[0]["answer"] == "Mocked answer"
|
||||||
)
|
), "AnswerGeneratorExecutor is passing the generated answer incorrectly"
|
||||||
|
|
|
||||||
|
|
@ -44,9 +44,9 @@ def test_adapter_can_instantiate_and_load(AdapterClass):
|
||||||
|
|
||||||
corpus_list, qa_pairs = result
|
corpus_list, qa_pairs = result
|
||||||
assert isinstance(corpus_list, list), f"{AdapterClass.__name__} corpus_list is not a list."
|
assert isinstance(corpus_list, list), f"{AdapterClass.__name__} corpus_list is not a list."
|
||||||
assert isinstance(qa_pairs, list), (
|
assert isinstance(
|
||||||
f"{AdapterClass.__name__} question_answer_pairs is not a list."
|
qa_pairs, list
|
||||||
)
|
), f"{AdapterClass.__name__} question_answer_pairs is not a list."
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("AdapterClass", ADAPTER_CLASSES)
|
@pytest.mark.parametrize("AdapterClass", ADAPTER_CLASSES)
|
||||||
|
|
@ -71,9 +71,9 @@ def test_adapter_returns_some_content(AdapterClass):
|
||||||
# We don't know how large the dataset is, but we expect at least 1 item
|
# We don't know how large the dataset is, but we expect at least 1 item
|
||||||
assert len(corpus_list) > 0, f"{AdapterClass.__name__} returned an empty corpus_list."
|
assert len(corpus_list) > 0, f"{AdapterClass.__name__} returned an empty corpus_list."
|
||||||
assert len(qa_pairs) > 0, f"{AdapterClass.__name__} returned an empty question_answer_pairs."
|
assert len(qa_pairs) > 0, f"{AdapterClass.__name__} returned an empty question_answer_pairs."
|
||||||
assert len(qa_pairs) <= limit, (
|
assert (
|
||||||
f"{AdapterClass.__name__} returned more QA items than requested limit={limit}."
|
len(qa_pairs) <= limit
|
||||||
)
|
), f"{AdapterClass.__name__} returned more QA items than requested limit={limit}."
|
||||||
|
|
||||||
for item in qa_pairs:
|
for item in qa_pairs:
|
||||||
assert "question" in item, f"{AdapterClass.__name__} missing 'question' key in QA pair."
|
assert "question" in item, f"{AdapterClass.__name__} missing 'question' key in QA pair."
|
||||||
|
|
|
||||||
|
|
@ -12,9 +12,9 @@ def test_corpus_builder_load_corpus(benchmark):
|
||||||
corpus_builder = CorpusBuilderExecutor(benchmark, "Default")
|
corpus_builder = CorpusBuilderExecutor(benchmark, "Default")
|
||||||
raw_corpus, questions = corpus_builder.load_corpus(limit=limit)
|
raw_corpus, questions = corpus_builder.load_corpus(limit=limit)
|
||||||
assert len(raw_corpus) > 0, f"Corpus builder loads empty corpus for {benchmark}"
|
assert len(raw_corpus) > 0, f"Corpus builder loads empty corpus for {benchmark}"
|
||||||
assert len(questions) <= 2, (
|
assert (
|
||||||
f"Corpus builder loads {len(questions)} for {benchmark} when limit is {limit}"
|
len(questions) <= 2
|
||||||
)
|
), f"Corpus builder loads {len(questions)} for {benchmark} when limit is {limit}"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|
@ -24,6 +24,6 @@ async def test_corpus_builder_build_corpus(mock_run_cognee, benchmark):
|
||||||
limit = 2
|
limit = 2
|
||||||
corpus_builder = CorpusBuilderExecutor(benchmark, "Default")
|
corpus_builder = CorpusBuilderExecutor(benchmark, "Default")
|
||||||
questions = await corpus_builder.build_corpus(limit=limit)
|
questions = await corpus_builder.build_corpus(limit=limit)
|
||||||
assert len(questions) <= 2, (
|
assert (
|
||||||
f"Corpus builder loads {len(questions)} for {benchmark} when limit is {limit}"
|
len(questions) <= 2
|
||||||
)
|
), f"Corpus builder loads {len(questions)} for {benchmark} when limit is {limit}"
|
||||||
|
|
|
||||||
|
|
@ -52,14 +52,14 @@ def test_metrics(metrics, actual, expected, expected_exact_score, expected_f1_ra
|
||||||
test_case = MockTestCase(actual, expected)
|
test_case = MockTestCase(actual, expected)
|
||||||
|
|
||||||
exact_match_score = metrics["exact_match"].measure(test_case)
|
exact_match_score = metrics["exact_match"].measure(test_case)
|
||||||
assert exact_match_score == expected_exact_score, (
|
assert (
|
||||||
f"Exact match failed for '{actual}' vs '{expected}'"
|
exact_match_score == expected_exact_score
|
||||||
)
|
), f"Exact match failed for '{actual}' vs '{expected}'"
|
||||||
|
|
||||||
f1_score = metrics["f1"].measure(test_case)
|
f1_score = metrics["f1"].measure(test_case)
|
||||||
assert expected_f1_range[0] <= f1_score <= expected_f1_range[1], (
|
assert (
|
||||||
f"F1 score failed for '{actual}' vs '{expected}'"
|
expected_f1_range[0] <= f1_score <= expected_f1_range[1]
|
||||||
)
|
), f"F1 score failed for '{actual}' vs '{expected}'"
|
||||||
|
|
||||||
|
|
||||||
class TestBootstrapCI(unittest.TestCase):
|
class TestBootstrapCI(unittest.TestCase):
|
||||||
|
|
|
||||||
|
|
@ -157,15 +157,15 @@ def test_rate_limit_60_per_minute():
|
||||||
if len(failures) > 0:
|
if len(failures) > 0:
|
||||||
first_failure_idx = int(failures[0].split()[1])
|
first_failure_idx = int(failures[0].split()[1])
|
||||||
print(f"First failure occurred at request index: {first_failure_idx}")
|
print(f"First failure occurred at request index: {first_failure_idx}")
|
||||||
assert 58 <= first_failure_idx <= 62, (
|
assert (
|
||||||
f"Expected first failure around request #60, got #{first_failure_idx}"
|
58 <= first_failure_idx <= 62
|
||||||
)
|
), f"Expected first failure around request #60, got #{first_failure_idx}"
|
||||||
|
|
||||||
# Calculate requests per minute
|
# Calculate requests per minute
|
||||||
rate_per_minute = len(successes)
|
rate_per_minute = len(successes)
|
||||||
print(f"Rate: {rate_per_minute} requests per minute")
|
print(f"Rate: {rate_per_minute} requests per minute")
|
||||||
|
|
||||||
# Verify the rate is close to our target of 60 requests per minute
|
# Verify the rate is close to our target of 60 requests per minute
|
||||||
assert 58 <= rate_per_minute <= 62, (
|
assert (
|
||||||
f"Expected rate of ~60 requests per minute, got {rate_per_minute}"
|
58 <= rate_per_minute <= 62
|
||||||
)
|
), f"Expected rate of ~60 requests per minute, got {rate_per_minute}"
|
||||||
|
|
|
||||||
|
|
@ -110,9 +110,9 @@ def test_sync_retry():
|
||||||
print(f"Number of attempts: {test_function_sync.counter}")
|
print(f"Number of attempts: {test_function_sync.counter}")
|
||||||
|
|
||||||
# The function should succeed on the 3rd attempt (after 2 failures)
|
# The function should succeed on the 3rd attempt (after 2 failures)
|
||||||
assert test_function_sync.counter == 3, (
|
assert (
|
||||||
f"Expected 3 attempts, got {test_function_sync.counter}"
|
test_function_sync.counter == 3
|
||||||
)
|
), f"Expected 3 attempts, got {test_function_sync.counter}"
|
||||||
assert elapsed >= 0.3, f"Expected at least 0.3 seconds of backoff, got {elapsed:.2f}"
|
assert elapsed >= 0.3, f"Expected at least 0.3 seconds of backoff, got {elapsed:.2f}"
|
||||||
|
|
||||||
print("✅ PASS: Synchronous retry mechanism is working correctly")
|
print("✅ PASS: Synchronous retry mechanism is working correctly")
|
||||||
|
|
@ -143,9 +143,9 @@ async def test_async_retry():
|
||||||
print(f"Number of attempts: {test_function_async.counter}")
|
print(f"Number of attempts: {test_function_async.counter}")
|
||||||
|
|
||||||
# The function should succeed on the 3rd attempt (after 2 failures)
|
# The function should succeed on the 3rd attempt (after 2 failures)
|
||||||
assert test_function_async.counter == 3, (
|
assert (
|
||||||
f"Expected 3 attempts, got {test_function_async.counter}"
|
test_function_async.counter == 3
|
||||||
)
|
), f"Expected 3 attempts, got {test_function_async.counter}"
|
||||||
assert elapsed >= 0.3, f"Expected at least 0.3 seconds of backoff, got {elapsed:.2f}"
|
assert elapsed >= 0.3, f"Expected at least 0.3 seconds of backoff, got {elapsed:.2f}"
|
||||||
|
|
||||||
print("✅ PASS: Asynchronous retry mechanism is working correctly")
|
print("✅ PASS: Asynchronous retry mechanism is working correctly")
|
||||||
|
|
|
||||||
|
|
@ -57,9 +57,9 @@ class TestGraphCompletionRetriever:
|
||||||
answer = await retriever.get_completion("Who works at Canva?")
|
answer = await retriever.get_completion("Who works at Canva?")
|
||||||
|
|
||||||
assert isinstance(answer, list), f"Expected list, got {type(answer).__name__}"
|
assert isinstance(answer, list), f"Expected list, got {type(answer).__name__}"
|
||||||
assert all(isinstance(item, str) and item.strip() for item in answer), (
|
assert all(
|
||||||
"Answer must contain only non-empty strings"
|
isinstance(item, str) and item.strip() for item in answer
|
||||||
)
|
), "Answer must contain only non-empty strings"
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_graph_completion_extension_context_complex(self):
|
async def test_graph_completion_extension_context_complex(self):
|
||||||
|
|
@ -136,9 +136,9 @@ class TestGraphCompletionRetriever:
|
||||||
answer = await retriever.get_completion("Who works at Figma?")
|
answer = await retriever.get_completion("Who works at Figma?")
|
||||||
|
|
||||||
assert isinstance(answer, list), f"Expected list, got {type(answer).__name__}"
|
assert isinstance(answer, list), f"Expected list, got {type(answer).__name__}"
|
||||||
assert all(isinstance(item, str) and item.strip() for item in answer), (
|
assert all(
|
||||||
"Answer must contain only non-empty strings"
|
isinstance(item, str) and item.strip() for item in answer
|
||||||
)
|
), "Answer must contain only non-empty strings"
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_graph_completion_extension_context_on_empty_graph(self):
|
async def test_get_graph_completion_extension_context_on_empty_graph(self):
|
||||||
|
|
@ -167,9 +167,9 @@ class TestGraphCompletionRetriever:
|
||||||
answer = await retriever.get_completion("Who works at Figma?")
|
answer = await retriever.get_completion("Who works at Figma?")
|
||||||
|
|
||||||
assert isinstance(answer, list), f"Expected list, got {type(answer).__name__}"
|
assert isinstance(answer, list), f"Expected list, got {type(answer).__name__}"
|
||||||
assert all(isinstance(item, str) and item.strip() for item in answer), (
|
assert all(
|
||||||
"Answer must contain only non-empty strings"
|
isinstance(item, str) and item.strip() for item in answer
|
||||||
)
|
), "Answer must contain only non-empty strings"
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
||||||
|
|
@ -55,9 +55,9 @@ class TestGraphCompletionRetriever:
|
||||||
answer = await retriever.get_completion("Who works at Canva?")
|
answer = await retriever.get_completion("Who works at Canva?")
|
||||||
|
|
||||||
assert isinstance(answer, list), f"Expected list, got {type(answer).__name__}"
|
assert isinstance(answer, list), f"Expected list, got {type(answer).__name__}"
|
||||||
assert all(isinstance(item, str) and item.strip() for item in answer), (
|
assert all(
|
||||||
"Answer must contain only non-empty strings"
|
isinstance(item, str) and item.strip() for item in answer
|
||||||
)
|
), "Answer must contain only non-empty strings"
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_graph_completion_cot_context_complex(self):
|
async def test_graph_completion_cot_context_complex(self):
|
||||||
|
|
@ -134,9 +134,9 @@ class TestGraphCompletionRetriever:
|
||||||
answer = await retriever.get_completion("Who works at Figma?")
|
answer = await retriever.get_completion("Who works at Figma?")
|
||||||
|
|
||||||
assert isinstance(answer, list), f"Expected list, got {type(answer).__name__}"
|
assert isinstance(answer, list), f"Expected list, got {type(answer).__name__}"
|
||||||
assert all(isinstance(item, str) and item.strip() for item in answer), (
|
assert all(
|
||||||
"Answer must contain only non-empty strings"
|
isinstance(item, str) and item.strip() for item in answer
|
||||||
)
|
), "Answer must contain only non-empty strings"
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_graph_completion_cot_context_on_empty_graph(self):
|
async def test_get_graph_completion_cot_context_on_empty_graph(self):
|
||||||
|
|
@ -165,9 +165,9 @@ class TestGraphCompletionRetriever:
|
||||||
answer = await retriever.get_completion("Who works at Figma?")
|
answer = await retriever.get_completion("Who works at Figma?")
|
||||||
|
|
||||||
assert isinstance(answer, list), f"Expected list, got {type(answer).__name__}"
|
assert isinstance(answer, list), f"Expected list, got {type(answer).__name__}"
|
||||||
assert all(isinstance(item, str) and item.strip() for item in answer), (
|
assert all(
|
||||||
"Answer must contain only non-empty strings"
|
isinstance(item, str) and item.strip() for item in answer
|
||||||
)
|
), "Answer must contain only non-empty strings"
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
||||||
|
|
@ -24,9 +24,9 @@ max_chunk_size_vals = [512, 1024, 4096]
|
||||||
def test_chunk_by_paragraph_isomorphism(input_text, max_chunk_size, batch_paragraphs):
|
def test_chunk_by_paragraph_isomorphism(input_text, max_chunk_size, batch_paragraphs):
|
||||||
chunks = chunk_by_paragraph(input_text, max_chunk_size, batch_paragraphs)
|
chunks = chunk_by_paragraph(input_text, max_chunk_size, batch_paragraphs)
|
||||||
reconstructed_text = "".join([chunk["text"] for chunk in chunks])
|
reconstructed_text = "".join([chunk["text"] for chunk in chunks])
|
||||||
assert reconstructed_text == input_text, (
|
assert (
|
||||||
f"texts are not identical: {len(input_text) = }, {len(reconstructed_text) = }"
|
reconstructed_text == input_text
|
||||||
)
|
), f"texts are not identical: {len(input_text) = }, {len(reconstructed_text) = }"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
|
@ -54,9 +54,9 @@ def test_paragraph_chunk_length(input_text, max_chunk_size, batch_paragraphs):
|
||||||
)
|
)
|
||||||
|
|
||||||
larger_chunks = chunk_lengths[chunk_lengths > max_chunk_size]
|
larger_chunks = chunk_lengths[chunk_lengths > max_chunk_size]
|
||||||
assert np.all(chunk_lengths <= max_chunk_size), (
|
assert np.all(
|
||||||
f"{max_chunk_size = }: {larger_chunks} are too large"
|
chunk_lengths <= max_chunk_size
|
||||||
)
|
), f"{max_chunk_size = }: {larger_chunks} are too large"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
|
@ -76,6 +76,6 @@ def test_chunk_by_paragraph_chunk_numbering(input_text, max_chunk_size, batch_pa
|
||||||
batch_paragraphs=batch_paragraphs,
|
batch_paragraphs=batch_paragraphs,
|
||||||
)
|
)
|
||||||
chunk_indices = np.array([chunk["chunk_index"] for chunk in chunks])
|
chunk_indices = np.array([chunk["chunk_index"] for chunk in chunks])
|
||||||
assert np.all(chunk_indices == np.arange(len(chunk_indices))), (
|
assert np.all(
|
||||||
f"{chunk_indices = } are not monotonically increasing"
|
chunk_indices == np.arange(len(chunk_indices))
|
||||||
)
|
), f"{chunk_indices = } are not monotonically increasing"
|
||||||
|
|
|
||||||
|
|
@ -71,9 +71,9 @@ def run_chunking_test(test_text, expected_chunks, mock_engine):
|
||||||
|
|
||||||
for expected_chunks_item, chunk in zip(expected_chunks, chunks):
|
for expected_chunks_item, chunk in zip(expected_chunks, chunks):
|
||||||
for key in ["text", "chunk_size", "cut_type"]:
|
for key in ["text", "chunk_size", "cut_type"]:
|
||||||
assert chunk[key] == expected_chunks_item[key], (
|
assert (
|
||||||
f"{key = }: {chunk[key] = } != {expected_chunks_item[key] = }"
|
chunk[key] == expected_chunks_item[key]
|
||||||
)
|
), f"{key = }: {chunk[key] = } != {expected_chunks_item[key] = }"
|
||||||
|
|
||||||
|
|
||||||
def test_chunking_whole_text():
|
def test_chunking_whole_text():
|
||||||
|
|
|
||||||
|
|
@ -17,9 +17,9 @@ maximum_length_vals = [None, 16, 64]
|
||||||
def test_chunk_by_sentence_isomorphism(input_text, maximum_length):
|
def test_chunk_by_sentence_isomorphism(input_text, maximum_length):
|
||||||
chunks = chunk_by_sentence(input_text, maximum_length)
|
chunks = chunk_by_sentence(input_text, maximum_length)
|
||||||
reconstructed_text = "".join([chunk[1] for chunk in chunks])
|
reconstructed_text = "".join([chunk[1] for chunk in chunks])
|
||||||
assert reconstructed_text == input_text, (
|
assert (
|
||||||
f"texts are not identical: {len(input_text) = }, {len(reconstructed_text) = }"
|
reconstructed_text == input_text
|
||||||
)
|
), f"texts are not identical: {len(input_text) = }, {len(reconstructed_text) = }"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
|
@ -40,9 +40,9 @@ def test_paragraph_chunk_length(input_text, maximum_length):
|
||||||
)
|
)
|
||||||
|
|
||||||
larger_chunks = chunk_lengths[chunk_lengths > maximum_length]
|
larger_chunks = chunk_lengths[chunk_lengths > maximum_length]
|
||||||
assert np.all(chunk_lengths <= maximum_length), (
|
assert np.all(
|
||||||
f"{maximum_length = }: {larger_chunks} are too large"
|
chunk_lengths <= maximum_length
|
||||||
)
|
), f"{maximum_length = }: {larger_chunks} are too large"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
|
|
||||||
|
|
@ -17,9 +17,9 @@ from cognee.tests.unit.processing.chunks.test_input import INPUT_TEXTS, INPUT_TE
|
||||||
def test_chunk_by_word_isomorphism(input_text):
|
def test_chunk_by_word_isomorphism(input_text):
|
||||||
chunks = chunk_by_word(input_text)
|
chunks = chunk_by_word(input_text)
|
||||||
reconstructed_text = "".join([chunk[0] for chunk in chunks])
|
reconstructed_text = "".join([chunk[0] for chunk in chunks])
|
||||||
assert reconstructed_text == input_text, (
|
assert (
|
||||||
f"texts are not identical: {len(input_text) = }, {len(reconstructed_text) = }"
|
reconstructed_text == input_text
|
||||||
)
|
), f"texts are not identical: {len(input_text) = }, {len(reconstructed_text) = }"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
|
|
||||||
192
examples/python/incremental_loading_example.py
Normal file
192
examples/python/incremental_loading_example.py
Normal file
|
|
@ -0,0 +1,192 @@
|
||||||
|
"""
|
||||||
|
Example: Incremental File Loading with Cognee
|
||||||
|
|
||||||
|
This example demonstrates how to use Cognee's incremental file loading feature
|
||||||
|
to efficiently process only changed parts of files when they are re-added.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
import cognee
|
||||||
|
from cognee.modules.ingestion.incremental import IncrementalLoader, BlockHashService
|
||||||
|
|
||||||
|
|
||||||
|
async def demonstrate_incremental_loading():
|
||||||
|
"""
|
||||||
|
Demonstrate incremental file loading by creating a file, modifying it,
|
||||||
|
and showing how only changed blocks are detected.
|
||||||
|
"""
|
||||||
|
|
||||||
|
print("🚀 Cognee Incremental File Loading Demo")
|
||||||
|
print("=" * 50)
|
||||||
|
|
||||||
|
# Initialize the incremental loader
|
||||||
|
IncrementalLoader(block_size=512) # 512 byte blocks for demo
|
||||||
|
block_service = BlockHashService(block_size=512)
|
||||||
|
|
||||||
|
# Create initial file content
|
||||||
|
initial_content = b"""
|
||||||
|
This is the initial content of our test file.
|
||||||
|
It contains multiple lines of text that will be
|
||||||
|
split into blocks for incremental processing.
|
||||||
|
|
||||||
|
Block 1: Lorem ipsum dolor sit amet, consectetur adipiscing elit.
|
||||||
|
Block 2: Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
|
||||||
|
Block 3: Ut enim ad minim veniam, quis nostrud exercitation ullamco.
|
||||||
|
Block 4: Duis aute irure dolor in reprehenderit in voluptate velit esse.
|
||||||
|
Block 5: Excepteur sint occaecat cupidatat non proident, sunt in culpa.
|
||||||
|
|
||||||
|
This is the end of the initial content.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create modified content (change Block 2 and add Block 6)
|
||||||
|
modified_content = b"""
|
||||||
|
This is the initial content of our test file.
|
||||||
|
It contains multiple lines of text that will be
|
||||||
|
split into blocks for incremental processing.
|
||||||
|
|
||||||
|
Block 1: Lorem ipsum dolor sit amet, consectetur adipiscing elit.
|
||||||
|
Block 2: MODIFIED - This block has been changed significantly!
|
||||||
|
Block 3: Ut enim ad minim veniam, quis nostrud exercitation ullamco.
|
||||||
|
Block 4: Duis aute irure dolor in reprehenderit in voluptate velit esse.
|
||||||
|
Block 5: Excepteur sint occaecat cupidatat non proident, sunt in culpa.
|
||||||
|
Block 6: NEW BLOCK - This is additional content that was added.
|
||||||
|
|
||||||
|
This is the end of the modified content.
|
||||||
|
"""
|
||||||
|
|
||||||
|
print("1. Creating signatures for initial and modified versions...")
|
||||||
|
|
||||||
|
# Generate signatures
|
||||||
|
initial_file = BytesIO(initial_content)
|
||||||
|
modified_file = BytesIO(modified_content)
|
||||||
|
|
||||||
|
initial_signature = block_service.generate_signature(initial_file, "test_file.txt")
|
||||||
|
modified_signature = block_service.generate_signature(modified_file, "test_file.txt")
|
||||||
|
|
||||||
|
print(
|
||||||
|
f" Initial file: {initial_signature.file_size} bytes, {initial_signature.total_blocks} blocks"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" Modified file: {modified_signature.file_size} bytes, {modified_signature.total_blocks} blocks"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Compare signatures to find changes
|
||||||
|
print("\n2. Comparing signatures to detect changes...")
|
||||||
|
|
||||||
|
changed_blocks = block_service.compare_signatures(initial_signature, modified_signature)
|
||||||
|
change_stats = block_service.calculate_block_changes(initial_signature, modified_signature)
|
||||||
|
|
||||||
|
print(f" Changed blocks: {changed_blocks}")
|
||||||
|
print(f" Compression ratio: {change_stats['compression_ratio']:.2%}")
|
||||||
|
print(
|
||||||
|
f" Total blocks changed: {change_stats['changed_blocks']} out of {change_stats['total_old_blocks']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Generate delta
|
||||||
|
print("\n3. Generating delta for changed content...")
|
||||||
|
|
||||||
|
initial_file.seek(0)
|
||||||
|
modified_file.seek(0)
|
||||||
|
|
||||||
|
delta = block_service.generate_delta(initial_file, modified_file, initial_signature)
|
||||||
|
|
||||||
|
print(f" Delta size: {len(delta.delta_data)} bytes")
|
||||||
|
print(f" Changed blocks in delta: {delta.changed_blocks}")
|
||||||
|
|
||||||
|
# Demonstrate reconstruction
|
||||||
|
print("\n4. Reconstructing file from delta...")
|
||||||
|
|
||||||
|
initial_file.seek(0)
|
||||||
|
reconstructed = block_service.apply_delta(initial_file, delta)
|
||||||
|
reconstructed_content = reconstructed.read()
|
||||||
|
|
||||||
|
print(f" Reconstruction successful: {reconstructed_content == modified_content}")
|
||||||
|
print(f" Reconstructed size: {len(reconstructed_content)} bytes")
|
||||||
|
|
||||||
|
# Show block details
|
||||||
|
print("\n5. Block-by-block analysis:")
|
||||||
|
print(" Block | Status | Strong Hash (first 8 chars)")
|
||||||
|
print(" ------|----------|---------------------------")
|
||||||
|
|
||||||
|
old_blocks = {b.block_index: b for b in initial_signature.blocks}
|
||||||
|
new_blocks = {b.block_index: b for b in modified_signature.blocks}
|
||||||
|
|
||||||
|
all_indices = sorted(set(old_blocks.keys()) | set(new_blocks.keys()))
|
||||||
|
|
||||||
|
for idx in all_indices:
|
||||||
|
old_block = old_blocks.get(idx)
|
||||||
|
new_block = new_blocks.get(idx)
|
||||||
|
|
||||||
|
if old_block is None:
|
||||||
|
status = "ADDED"
|
||||||
|
hash_display = new_block.strong_hash[:8] if new_block else ""
|
||||||
|
elif new_block is None:
|
||||||
|
status = "REMOVED"
|
||||||
|
hash_display = old_block.strong_hash[:8]
|
||||||
|
elif old_block.strong_hash == new_block.strong_hash:
|
||||||
|
status = "UNCHANGED"
|
||||||
|
hash_display = old_block.strong_hash[:8]
|
||||||
|
else:
|
||||||
|
status = "MODIFIED"
|
||||||
|
hash_display = f"{old_block.strong_hash[:8]}→{new_block.strong_hash[:8]}"
|
||||||
|
|
||||||
|
print(f" {idx:5d} | {status:8s} | {hash_display}")
|
||||||
|
|
||||||
|
print("\n✅ Incremental loading demo completed!")
|
||||||
|
print("\nThis demonstrates how Cognee can efficiently process only the changed")
|
||||||
|
print("parts of files, significantly reducing processing time for large files")
|
||||||
|
print("with small modifications.")
|
||||||
|
|
||||||
|
|
||||||
|
async def demonstrate_with_cognee():
|
||||||
|
"""
|
||||||
|
Demonstrate integration with Cognee's add functionality
|
||||||
|
"""
|
||||||
|
|
||||||
|
print("\n" + "=" * 50)
|
||||||
|
print("🔧 Integration with Cognee Add Functionality")
|
||||||
|
print("=" * 50)
|
||||||
|
|
||||||
|
# Create a temporary file
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
|
||||||
|
f.write("Initial content for Cognee processing.")
|
||||||
|
temp_file_path = f.name
|
||||||
|
|
||||||
|
try:
|
||||||
|
print(f"1. Adding initial file: {temp_file_path}")
|
||||||
|
|
||||||
|
# Add file to Cognee
|
||||||
|
await cognee.add(temp_file_path)
|
||||||
|
|
||||||
|
print(" ✅ File added successfully")
|
||||||
|
|
||||||
|
# Modify the file
|
||||||
|
with open(temp_file_path, "w") as f:
|
||||||
|
f.write("Modified content for Cognee processing with additional text.")
|
||||||
|
|
||||||
|
print("2. Adding modified version of the same file...")
|
||||||
|
|
||||||
|
# Add modified file - this should trigger incremental processing
|
||||||
|
await cognee.add(temp_file_path)
|
||||||
|
|
||||||
|
print(" ✅ Modified file processed with incremental loading")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Clean up
|
||||||
|
if os.path.exists(temp_file_path):
|
||||||
|
os.unlink(temp_file_path)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
print("Starting Cognee Incremental Loading Demo...")
|
||||||
|
|
||||||
|
# Run the demonstration
|
||||||
|
asyncio.run(demonstrate_incremental_loading())
|
||||||
|
|
||||||
|
# Uncomment the line below to test with actual Cognee integration
|
||||||
|
# asyncio.run(demonstrate_with_cognee())
|
||||||
72
poetry.lock
generated
72
poetry.lock
generated
|
|
@ -8659,6 +8659,76 @@ Pillow = ">=3.3.2"
|
||||||
typing-extensions = ">=4.9.0"
|
typing-extensions = ">=4.9.0"
|
||||||
XlsxWriter = ">=0.5.7"
|
XlsxWriter = ">=0.5.7"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "python-rsync"
|
||||||
|
version = "0.1.2"
|
||||||
|
description = "python binding for librsync"
|
||||||
|
optional = false
|
||||||
|
python-versions = ">=3.6"
|
||||||
|
groups = ["main"]
|
||||||
|
files = [
|
||||||
|
{file = "python_rsync-0.1.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:33b08611d2155c94e93fd0e79eb7446ea941e9359f66858c173683445d6bbf54"},
|
||||||
|
{file = "python_rsync-0.1.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:4f3b13a529de705b8c2e7f4ce060ade38bd72ee5134ed9ccb5a47041bd39e5e1"},
|
||||||
|
{file = "python_rsync-0.1.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:6078a30e1f853d5238b1c149a7ec35eb1fc67a546440b72b9eaabd42e5413f3c"},
|
||||||
|
{file = "python_rsync-0.1.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:110f1c025c44786b225b692bd17619347125dd44712997798942bfc36835b601"},
|
||||||
|
{file = "python_rsync-0.1.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9de3fae89818098f02c69509852ead8a41eab2a157f276665f82124513de7c88"},
|
||||||
|
{file = "python_rsync-0.1.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:5076210ca66c34461b0254c1fbe98d9a137ec91ad9c800f9ed710ee86da93d2c"},
|
||||||
|
{file = "python_rsync-0.1.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:3929df72beb02cff26fa0cf239e0c896dea2a4a2de9101ddecce8c2e578460b2"},
|
||||||
|
{file = "python_rsync-0.1.2-cp310-cp310-win32.whl", hash = "sha256:9ff16b0250a9d11806236d8d06df08b4bb4a55650b42e1928b43b6b83cc38025"},
|
||||||
|
{file = "python_rsync-0.1.2-cp310-cp310-win_amd64.whl", hash = "sha256:1aa1a229a7a912c9c21ca12ea7b8ef6c93475fdfa6ad9d932ca27bd70faa988b"},
|
||||||
|
{file = "python_rsync-0.1.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:edc553dd34cce4e0572c5dae8210bbc904e6a5e691d5b4d4cc7e21538bcf8cd1"},
|
||||||
|
{file = "python_rsync-0.1.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:d53ad0d172f64bb1ae004e464d873d89c7b412e476e61978b05afc310a9d1e53"},
|
||||||
|
{file = "python_rsync-0.1.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ed2026e4ad261dd19c1237c5156f7b5d702ee5a1678a311da62257c81aa5210e"},
|
||||||
|
{file = "python_rsync-0.1.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1c4c3b8280323f9a4633b334260abce42b6e6be7bd3e2fe25320e8525f2a1da8"},
|
||||||
|
{file = "python_rsync-0.1.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:21eddc755236d53624ae641707b122d3afda39f139df1e1f02fdd2144fec863f"},
|
||||||
|
{file = "python_rsync-0.1.2-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:e6a5d4207047b047caa35d6ffae8b59cf7d8b545617e0f699b6747789ee55355"},
|
||||||
|
{file = "python_rsync-0.1.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:8a1df1d98848fd722a2f063a05296b48d57d3d5335f10c71542c7c97ce1c1235"},
|
||||||
|
{file = "python_rsync-0.1.2-cp311-cp311-win32.whl", hash = "sha256:c9fdae57970426290fefec3e7c71abc8ab082e8d3d1cdeae6a28c8c902ebea03"},
|
||||||
|
{file = "python_rsync-0.1.2-cp311-cp311-win_amd64.whl", hash = "sha256:364ad90a43550d0af7d00297a12b1851453c7c52de2e867cd4eab6d401634c36"},
|
||||||
|
{file = "python_rsync-0.1.2-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:1c5dd30f6aca29906be2d11bea8a49754dff9badf43c9da3a487dc9f1512ef7b"},
|
||||||
|
{file = "python_rsync-0.1.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:a66e573ef9c3116633c04ad6890e0238d1b101d62b558737c5e3c5fb954d521f"},
|
||||||
|
{file = "python_rsync-0.1.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:89c98d71126e89ba9106fe03144c6e4c31506f2ccf1d518a4ee6781d26118553"},
|
||||||
|
{file = "python_rsync-0.1.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6be8098287209b0b225cf680530ff12ebd1a254585b76f32ecb2f5dba5489b9a"},
|
||||||
|
{file = "python_rsync-0.1.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f02b49e04d4deb9a24f2518c14fe37a1f7deaab9c8619b95374062d1353cb576"},
|
||||||
|
{file = "python_rsync-0.1.2-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:c699193580d347dbae38dcda189d6f92fec3a385861b79157748e891ab81b441"},
|
||||||
|
{file = "python_rsync-0.1.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:0e692aa7656a623dff3fdd6a2df64eda61ff20db13fdfaf735fbc1897ab4a966"},
|
||||||
|
{file = "python_rsync-0.1.2-cp312-cp312-win32.whl", hash = "sha256:5480823fff254f5b31279af3facbf7162caf131778528c0c4c03b2180a71a204"},
|
||||||
|
{file = "python_rsync-0.1.2-cp312-cp312-win_amd64.whl", hash = "sha256:d0fa9c3efc482daed331c930854b1e5282391a322b16e28a7b097e675a098a45"},
|
||||||
|
{file = "python_rsync-0.1.2-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:e3ea57b70e1bfd8fdfd47a4398aee55c29df4b944536e5feb192dccfd76cddaf"},
|
||||||
|
{file = "python_rsync-0.1.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:23fc6f81ee71e45b9374118d9c71766af9d10e6a7eb7c8b7a7c408caf943e8f0"},
|
||||||
|
{file = "python_rsync-0.1.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:1143c6a156e45e5d2a6885be305b42ad612c3653c3dc9bcffdcba156126c56a0"},
|
||||||
|
{file = "python_rsync-0.1.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:21f16e6adfd78a703516f89fb90dcc2651118289d0de7dbb4ba0873c702b1441"},
|
||||||
|
{file = "python_rsync-0.1.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:44ef72ca47f2d3b76bdac3e0ab895c50c143979da31fbf069be8b83ee273ff33"},
|
||||||
|
{file = "python_rsync-0.1.2-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:ae294c5b71cbe9a5a2f78f35df7c3acca2ca1f42060da0361d2e37ea0f7bbf50"},
|
||||||
|
{file = "python_rsync-0.1.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:ec5a18f93eed7bd3507adff1a77ee7b5d41f1a2f0a452e0fd8d624be7c480d9b"},
|
||||||
|
{file = "python_rsync-0.1.2-cp38-cp38-win32.whl", hash = "sha256:e83ce678853abb6119735f65da635600009fc4b4a09f518c5a390e3c22088f5e"},
|
||||||
|
{file = "python_rsync-0.1.2-cp38-cp38-win_amd64.whl", hash = "sha256:0df3d3914794369130bbf4b611682a1bb8cdff08de85cae4f53b57c062df9d81"},
|
||||||
|
{file = "python_rsync-0.1.2-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:e35bdaab27a1125694f8eed3596a54b374fdf6a705376dc01057c0eea71121f8"},
|
||||||
|
{file = "python_rsync-0.1.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:a5588d8db64a3e0a1aa4b017ccd29d510e2dcee06cfc6a4b65bc17f6a10491f5"},
|
||||||
|
{file = "python_rsync-0.1.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c11e2079e3f4abe25765cd4d01032f2ee6163217e7075751e254519c3064459e"},
|
||||||
|
{file = "python_rsync-0.1.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:018475ce3a2a926159b6182ce27f8a14c27e40061d0758103bccbe2542610210"},
|
||||||
|
{file = "python_rsync-0.1.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:503f48b7ac9f83433676d9cce1a1c7f38e5c1f3866a6a2d12efc8ed4e3879ee6"},
|
||||||
|
{file = "python_rsync-0.1.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:797be624f24a3c8a799719f92d2709cd5b4fbdaa55f6ab39ed9fb7c45cd03450"},
|
||||||
|
{file = "python_rsync-0.1.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:791acef7eb9a73182532e67e722203e6b2ffcd38aa29fee007af048d32331033"},
|
||||||
|
{file = "python_rsync-0.1.2-cp39-cp39-win32.whl", hash = "sha256:f49d5b7776b1187699fbcdbcaa4c829af69204b4d1ad52c979357ac4e705371a"},
|
||||||
|
{file = "python_rsync-0.1.2-cp39-cp39-win_amd64.whl", hash = "sha256:13ed69f16527263de839bf71556a513af526208f669b36b4eb13c80e25842a37"},
|
||||||
|
{file = "python_rsync-0.1.2-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:caa9bd3cec7678d1b297a13bf25deb577e6f588dd9e1f8e6e93d9e966d448983"},
|
||||||
|
{file = "python_rsync-0.1.2-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:deb1f7df5c5b6a14a86d818c646018d4402356370d40532f831ecc2e347e5eb3"},
|
||||||
|
{file = "python_rsync-0.1.2-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:393dd81f3a6f686b6a36aa99dc8924c731276a9055a9762e482b59dc19140385"},
|
||||||
|
{file = "python_rsync-0.1.2-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:cc3b528e2101b6ed0852efa1fb77e7e35302ef0e163af6d77771bebc69833739"},
|
||||||
|
{file = "python_rsync-0.1.2-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:43df902e3cdafe452951afdab47888d69fbd42f3d522547803c595c6009acd40"},
|
||||||
|
{file = "python_rsync-0.1.2-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d5a56422a162f931c39fe3b7e0c42866101afab710ca2cecb1324f91ebe137ca"},
|
||||||
|
{file = "python_rsync-0.1.2-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3c3a07694c620e7845086eb6ef41a324e02d2b9b47c67b43fe3da9f40c69ed1e"},
|
||||||
|
{file = "python_rsync-0.1.2-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:277e1c3259cf781fef83e586e992d8f7dd321c73ef4cc91a436a9ca3cdb3bc83"},
|
||||||
|
{file = "python_rsync-0.1.2-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:4d93f814e1c106fb52957c2f162c01334d785ec599d441d61f16f8e08362c4bf"},
|
||||||
|
{file = "python_rsync-0.1.2-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8a8a4821c9f9e69defc54acc45f4f054b33610c23a89d03fd9bf8cd786bad619"},
|
||||||
|
{file = "python_rsync-0.1.2-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7448dd9b301e217e3d2eb2cdd772725294051827230b5b77bbda74934a03cedb"},
|
||||||
|
{file = "python_rsync-0.1.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:f9f954297585f8bee97681a64b40eca4d98bbc325f49da893141f08d2bb49b7d"},
|
||||||
|
]
|
||||||
|
|
||||||
|
[package.dependencies]
|
||||||
|
cffi = ">=1.0.0"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pytz"
|
name = "pytz"
|
||||||
version = "2025.2"
|
version = "2025.2"
|
||||||
|
|
@ -12100,4 +12170,4 @@ weaviate = ["weaviate-client"]
|
||||||
[metadata]
|
[metadata]
|
||||||
lock-version = "2.1"
|
lock-version = "2.1"
|
||||||
python-versions = ">=3.10,<=3.13"
|
python-versions = ">=3.10,<=3.13"
|
||||||
content-hash = "c33c4bc9654072b74c2f768c6d4bf9da7b1a814b50046539fceb78a5541d30ee"
|
content-hash = "cd3fac9f262eaf01a4ae8cbe81ae0bc9b4729c61cd2f88942acaef5a93f30818"
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,8 @@ dependencies = [
|
||||||
"structlog>=25.2.0,<26",
|
"structlog>=25.2.0,<26",
|
||||||
"onnxruntime<=1.21.1",
|
"onnxruntime<=1.21.1",
|
||||||
"pylance==0.22.0",
|
"pylance==0.22.0",
|
||||||
"kuzu==0.9.0"
|
"kuzu==0.9.0",
|
||||||
|
"python-rsync>=0.1.2"
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
|
|
|
||||||
1
requirements.txt
Normal file
1
requirements.txt
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
python-rsync>=0.1.2
|
||||||
134
test_incremental_simple.py
Normal file
134
test_incremental_simple.py
Normal file
|
|
@ -0,0 +1,134 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Simple test for incremental loading functionality
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
|
||||||
|
sys.path.insert(0, ".")
|
||||||
|
|
||||||
|
from io import BytesIO
|
||||||
|
from cognee.modules.ingestion.incremental import BlockHashService
|
||||||
|
|
||||||
|
|
||||||
|
def test_incremental_loading():
|
||||||
|
"""
|
||||||
|
Simple test of the incremental loading functionality
|
||||||
|
"""
|
||||||
|
|
||||||
|
print("🚀 Cognee Incremental File Loading Test")
|
||||||
|
print("=" * 50)
|
||||||
|
|
||||||
|
# Initialize the block service
|
||||||
|
block_service = BlockHashService(block_size=64) # Small blocks for demo
|
||||||
|
|
||||||
|
# Create initial file content
|
||||||
|
initial_content = b"""This is the initial content.
|
||||||
|
Line 1: Lorem ipsum dolor sit amet
|
||||||
|
Line 2: Consectetur adipiscing elit
|
||||||
|
Line 3: Sed do eiusmod tempor
|
||||||
|
Line 4: Incididunt ut labore et dolore
|
||||||
|
Line 5: End of initial content"""
|
||||||
|
|
||||||
|
# Create modified content (change Line 2 and add Line 6)
|
||||||
|
modified_content = b"""This is the initial content.
|
||||||
|
Line 1: Lorem ipsum dolor sit amet
|
||||||
|
Line 2: MODIFIED - This line has changed!
|
||||||
|
Line 3: Sed do eiusmod tempor
|
||||||
|
Line 4: Incididunt ut labore et dolore
|
||||||
|
Line 5: End of initial content
|
||||||
|
Line 6: NEW - This is additional content"""
|
||||||
|
|
||||||
|
print("1. Creating signatures for initial and modified versions...")
|
||||||
|
|
||||||
|
# Generate signatures
|
||||||
|
initial_file = BytesIO(initial_content)
|
||||||
|
modified_file = BytesIO(modified_content)
|
||||||
|
|
||||||
|
initial_signature = block_service.generate_signature(initial_file, "test_file.txt")
|
||||||
|
modified_signature = block_service.generate_signature(modified_file, "test_file.txt")
|
||||||
|
|
||||||
|
print(
|
||||||
|
f" Initial file: {initial_signature.file_size} bytes, {initial_signature.total_blocks} blocks"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" Modified file: {modified_signature.file_size} bytes, {modified_signature.total_blocks} blocks"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Compare signatures to find changes
|
||||||
|
print("\n2. Comparing signatures to detect changes...")
|
||||||
|
|
||||||
|
changed_blocks = block_service.compare_signatures(initial_signature, modified_signature)
|
||||||
|
change_stats = block_service.calculate_block_changes(initial_signature, modified_signature)
|
||||||
|
|
||||||
|
print(f" Changed blocks: {changed_blocks}")
|
||||||
|
print(f" Compression ratio: {change_stats['compression_ratio']:.2%}")
|
||||||
|
print(
|
||||||
|
f" Total blocks changed: {change_stats['changed_blocks']} out of {change_stats['total_old_blocks']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Generate delta
|
||||||
|
print("\n3. Generating delta for changed content...")
|
||||||
|
|
||||||
|
initial_file.seek(0)
|
||||||
|
modified_file.seek(0)
|
||||||
|
|
||||||
|
delta = block_service.generate_delta(initial_file, modified_file, initial_signature)
|
||||||
|
|
||||||
|
print(f" Delta size: {len(delta.delta_data)} bytes")
|
||||||
|
print(f" Changed blocks in delta: {delta.changed_blocks}")
|
||||||
|
|
||||||
|
# Demonstrate reconstruction
|
||||||
|
print("\n4. Reconstructing file from delta...")
|
||||||
|
|
||||||
|
initial_file.seek(0)
|
||||||
|
reconstructed = block_service.apply_delta(initial_file, delta)
|
||||||
|
reconstructed_content = reconstructed.read()
|
||||||
|
|
||||||
|
print(f" Reconstruction successful: {reconstructed_content == modified_content}")
|
||||||
|
print(f" Reconstructed size: {len(reconstructed_content)} bytes")
|
||||||
|
|
||||||
|
# Show block details
|
||||||
|
print("\n5. Block-by-block analysis:")
|
||||||
|
print(" Block | Status | Strong Hash (first 8 chars)")
|
||||||
|
print(" ------|----------|---------------------------")
|
||||||
|
|
||||||
|
old_blocks = {b.block_index: b for b in initial_signature.blocks}
|
||||||
|
new_blocks = {b.block_index: b for b in modified_signature.blocks}
|
||||||
|
|
||||||
|
all_indices = sorted(set(old_blocks.keys()) | set(new_blocks.keys()))
|
||||||
|
|
||||||
|
for idx in all_indices:
|
||||||
|
old_block = old_blocks.get(idx)
|
||||||
|
new_block = new_blocks.get(idx)
|
||||||
|
|
||||||
|
if old_block is None:
|
||||||
|
status = "ADDED"
|
||||||
|
hash_display = new_block.strong_hash[:8] if new_block else ""
|
||||||
|
elif new_block is None:
|
||||||
|
status = "REMOVED"
|
||||||
|
hash_display = old_block.strong_hash[:8]
|
||||||
|
elif old_block.strong_hash == new_block.strong_hash:
|
||||||
|
status = "UNCHANGED"
|
||||||
|
hash_display = old_block.strong_hash[:8]
|
||||||
|
else:
|
||||||
|
status = "MODIFIED"
|
||||||
|
hash_display = f"{old_block.strong_hash[:8]}→{new_block.strong_hash[:8]}"
|
||||||
|
|
||||||
|
print(f" {idx:5d} | {status:8s} | {hash_display}")
|
||||||
|
|
||||||
|
print("\n✅ Incremental loading test completed!")
|
||||||
|
print("\nThis demonstrates how Cognee can efficiently process only the changed")
|
||||||
|
print("parts of files, significantly reducing processing time for large files")
|
||||||
|
print("with small modifications.")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
success = test_incremental_loading()
|
||||||
|
if success:
|
||||||
|
print("\n🎉 Test passed successfully!")
|
||||||
|
else:
|
||||||
|
print("\n❌ Test failed!")
|
||||||
|
sys.exit(1)
|
||||||
156
tests/test_incremental_loading.py
Normal file
156
tests/test_incremental_loading.py
Normal file
|
|
@ -0,0 +1,156 @@
|
||||||
|
"""
|
||||||
|
Unit tests for incremental file loading functionality
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from io import BytesIO
|
||||||
|
from cognee.modules.ingestion.incremental import BlockHashService, IncrementalLoader
|
||||||
|
|
||||||
|
|
||||||
|
class TestBlockHashService:
|
||||||
|
"""Test the core block hashing service"""
|
||||||
|
|
||||||
|
def test_signature_generation(self):
|
||||||
|
"""Test basic signature generation"""
|
||||||
|
service = BlockHashService(block_size=10)
|
||||||
|
|
||||||
|
content = b"Hello, this is a test file for block hashing!"
|
||||||
|
file_obj = BytesIO(content)
|
||||||
|
|
||||||
|
signature = service.generate_signature(file_obj, "test.txt")
|
||||||
|
|
||||||
|
assert signature.file_path == "test.txt"
|
||||||
|
assert signature.file_size == len(content)
|
||||||
|
assert signature.block_size == 10
|
||||||
|
assert len(signature.blocks) > 0
|
||||||
|
assert signature.signature_data is not None
|
||||||
|
|
||||||
|
def test_change_detection(self):
|
||||||
|
"""Test detection of changes between file versions"""
|
||||||
|
service = BlockHashService(block_size=10)
|
||||||
|
|
||||||
|
# Original content
|
||||||
|
original_content = b"Hello, world! This is the original content."
|
||||||
|
original_file = BytesIO(original_content)
|
||||||
|
original_sig = service.generate_signature(original_file)
|
||||||
|
|
||||||
|
# Modified content (change in middle)
|
||||||
|
modified_content = b"Hello, world! This is the MODIFIED content."
|
||||||
|
modified_file = BytesIO(modified_content)
|
||||||
|
modified_sig = service.generate_signature(modified_file)
|
||||||
|
|
||||||
|
# Check for changes
|
||||||
|
changed_blocks = service.compare_signatures(original_sig, modified_sig)
|
||||||
|
|
||||||
|
assert len(changed_blocks) > 0 # Should detect changes
|
||||||
|
assert len(changed_blocks) < len(original_sig.blocks) # Not all blocks changed
|
||||||
|
|
||||||
|
def test_no_changes(self):
|
||||||
|
"""Test that identical files show no changes"""
|
||||||
|
service = BlockHashService(block_size=10)
|
||||||
|
|
||||||
|
content = b"This content will not change at all!"
|
||||||
|
|
||||||
|
file1 = BytesIO(content)
|
||||||
|
file2 = BytesIO(content)
|
||||||
|
|
||||||
|
sig1 = service.generate_signature(file1)
|
||||||
|
sig2 = service.generate_signature(file2)
|
||||||
|
|
||||||
|
changed_blocks = service.compare_signatures(sig1, sig2)
|
||||||
|
|
||||||
|
assert len(changed_blocks) == 0
|
||||||
|
|
||||||
|
def test_delta_generation(self):
|
||||||
|
"""Test delta generation and application"""
|
||||||
|
service = BlockHashService(block_size=8)
|
||||||
|
|
||||||
|
original_content = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||||
|
modified_content = b"ABCDEFGHXXXXXXXXXXXXXXWXYZ" # Change middle part
|
||||||
|
|
||||||
|
original_file = BytesIO(original_content)
|
||||||
|
modified_file = BytesIO(modified_content)
|
||||||
|
|
||||||
|
# Generate delta
|
||||||
|
delta = service.generate_delta(original_file, modified_file)
|
||||||
|
|
||||||
|
assert len(delta.changed_blocks) > 0
|
||||||
|
assert delta.delta_data is not None
|
||||||
|
|
||||||
|
# Apply delta
|
||||||
|
original_file.seek(0)
|
||||||
|
reconstructed = service.apply_delta(original_file, delta)
|
||||||
|
reconstructed_content = reconstructed.read()
|
||||||
|
|
||||||
|
assert reconstructed_content == modified_content
|
||||||
|
|
||||||
|
def test_block_statistics(self):
|
||||||
|
"""Test calculation of block change statistics"""
|
||||||
|
service = BlockHashService(block_size=5)
|
||||||
|
|
||||||
|
old_content = b"ABCDEFGHIJ" # 2 blocks
|
||||||
|
new_content = b"ABCDEFXXXX" # 2 blocks, second one changed
|
||||||
|
|
||||||
|
old_file = BytesIO(old_content)
|
||||||
|
new_file = BytesIO(new_content)
|
||||||
|
|
||||||
|
old_sig = service.generate_signature(old_file)
|
||||||
|
new_sig = service.generate_signature(new_file)
|
||||||
|
|
||||||
|
stats = service.calculate_block_changes(old_sig, new_sig)
|
||||||
|
|
||||||
|
assert stats["total_old_blocks"] == 2
|
||||||
|
assert stats["total_new_blocks"] == 2
|
||||||
|
assert stats["changed_blocks"] == 1 # Only second block changed
|
||||||
|
assert stats["compression_ratio"] == 0.5 # 50% unchanged
|
||||||
|
|
||||||
|
|
||||||
|
class TestIncrementalLoader:
|
||||||
|
"""Test the incremental loader integration"""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_should_process_new_file(self):
|
||||||
|
"""Test processing decision for new files"""
|
||||||
|
IncrementalLoader()
|
||||||
|
|
||||||
|
content = b"This is a new file that hasn't been seen before."
|
||||||
|
BytesIO(content)
|
||||||
|
|
||||||
|
# For a new file (no existing signature), should process
|
||||||
|
# Note: This test would need a mock database setup in real implementation
|
||||||
|
# For now, we test the logic without database interaction
|
||||||
|
pass # Placeholder for database-dependent test
|
||||||
|
|
||||||
|
def test_block_data_extraction(self):
|
||||||
|
"""Test extraction of changed block data"""
|
||||||
|
IncrementalLoader(block_size=10)
|
||||||
|
|
||||||
|
content = b"Block1____Block2____Block3____"
|
||||||
|
BytesIO(content)
|
||||||
|
|
||||||
|
# Create mock change info
|
||||||
|
from cognee.modules.ingestion.incremental.block_hash_service import BlockInfo, FileSignature
|
||||||
|
|
||||||
|
blocks = [
|
||||||
|
BlockInfo(0, 12345, "hash1", 10, 0),
|
||||||
|
BlockInfo(1, 23456, "hash2", 10, 10),
|
||||||
|
BlockInfo(2, 34567, "hash3", 10, 20),
|
||||||
|
]
|
||||||
|
|
||||||
|
FileSignature(
|
||||||
|
file_path="test",
|
||||||
|
file_size=30,
|
||||||
|
total_blocks=3,
|
||||||
|
block_size=10,
|
||||||
|
strong_len=8,
|
||||||
|
blocks=blocks,
|
||||||
|
signature_data=b"signature",
|
||||||
|
)
|
||||||
|
|
||||||
|
# This would normally be called after should_process_file
|
||||||
|
# Testing the block extraction logic
|
||||||
|
pass # Placeholder for full integration test
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
pytest.main([__file__])
|
||||||
Loading…
Add table
Reference in a new issue