adding rough incremental change flow

This commit is contained in:
vasilije 2025-07-01 14:23:32 +02:00
parent 41f8eaf28d
commit 7c670b454b
12 changed files with 1156 additions and 1 deletions

View file

@ -0,0 +1,47 @@
"""Add file_signatures table for incremental loading
Revision ID: incremental_file_signatures
Revises: 1d0bb7fede17
Create Date: 2025-01-27 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from uuid import uuid4
# revision identifiers, used by Alembic.
revision = 'incremental_file_signatures'
down_revision = '1d0bb7fede17'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('file_signatures',
sa.Column('id', sa.UUID(), nullable=False, default=uuid4),
sa.Column('data_id', sa.UUID(), nullable=True),
sa.Column('file_path', sa.String(), nullable=True),
sa.Column('file_size', sa.Integer(), nullable=True),
sa.Column('content_hash', sa.String(), nullable=True),
sa.Column('total_blocks', sa.Integer(), nullable=True),
sa.Column('block_size', sa.Integer(), nullable=True),
sa.Column('strong_len', sa.Integer(), nullable=True),
sa.Column('signature_data', sa.LargeBinary(), nullable=True),
sa.Column('blocks_info', sa.JSON(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_file_signatures_data_id'), 'file_signatures', ['data_id'], unique=False)
op.create_index(op.f('ix_file_signatures_content_hash'), 'file_signatures', ['content_hash'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_file_signatures_content_hash'), table_name='file_signatures')
op.drop_index(op.f('ix_file_signatures_data_id'), table_name='file_signatures')
op.drop_table('file_signatures')
# ### end Alembic commands ###

View file

@ -0,0 +1,50 @@
from datetime import datetime, timezone
from uuid import uuid4
from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer, LargeBinary, Text
from sqlalchemy.orm import relationship
from cognee.infrastructure.databases.relational import Base
class FileSignature(Base):
__tablename__ = "file_signatures"
id = Column(UUID, primary_key=True, default=uuid4)
# Reference to the original data entry
data_id = Column(UUID, index=True)
# File information
file_path = Column(String)
file_size = Column(Integer)
content_hash = Column(String, index=True) # Overall file hash for quick comparison
# Block information
total_blocks = Column(Integer)
block_size = Column(Integer)
strong_len = Column(Integer)
# Signature data (binary)
signature_data = Column(LargeBinary)
# Block details (JSON array of block info)
blocks_info = Column(JSON) # Array of {block_index, weak_checksum, strong_hash, block_size, file_offset}
# Timestamps
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))
def to_json(self) -> dict:
return {
"id": str(self.id),
"data_id": str(self.data_id),
"file_path": self.file_path,
"file_size": self.file_size,
"content_hash": self.content_hash,
"total_blocks": self.total_blocks,
"block_size": self.block_size,
"strong_len": self.strong_len,
"blocks_info": self.blocks_info,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}

View file

@ -2,3 +2,4 @@ from .Data import Data
from .Dataset import Dataset
from .DatasetData import DatasetData
from .GraphMetrics import GraphMetrics
from .FileSignature import FileSignature

View file

@ -0,0 +1,4 @@
from .incremental_loader import IncrementalLoader
from .block_hash_service import BlockHashService
__all__ = ["IncrementalLoader", "BlockHashService"]

View file

@ -0,0 +1,274 @@
"""
Block Hash Service for Incremental File Loading
This module implements the rsync algorithm for incremental file loading.
It splits files into fixed-size blocks, computes rolling weak checksums (Adler-32 variant)
and strong hashes per block, and generates deltas for changed content.
"""
import os
import hashlib
from io import BytesIO
from typing import BinaryIO, List, Dict, Tuple, Optional, Any
from dataclasses import dataclass
from pyrsync import signature, delta, patch, get_signature_args
import tempfile
@dataclass
class BlockInfo:
"""Information about a file block"""
block_index: int
weak_checksum: int
strong_hash: str
block_size: int
file_offset: int
@dataclass
class FileSignature:
"""File signature containing block information"""
file_path: str
file_size: int
total_blocks: int
block_size: int
strong_len: int
blocks: List[BlockInfo]
signature_data: bytes
@dataclass
class FileDelta:
"""Delta information for changed blocks"""
changed_blocks: List[int] # Block indices that changed
delta_data: bytes
old_signature: FileSignature
new_signature: FileSignature
class BlockHashService:
"""Service for block-based file hashing using librsync algorithm"""
DEFAULT_BLOCK_SIZE = 1024 # 1KB blocks
DEFAULT_STRONG_LEN = 8 # 8 bytes for strong hash
def __init__(self, block_size: int = None, strong_len: int = None):
"""
Initialize the BlockHashService
Args:
block_size: Size of blocks in bytes (default: 1024)
strong_len: Length of strong hash in bytes (default: 8)
"""
self.block_size = block_size or self.DEFAULT_BLOCK_SIZE
self.strong_len = strong_len or self.DEFAULT_STRONG_LEN
def generate_signature(self, file_obj: BinaryIO, file_path: str = None) -> FileSignature:
"""
Generate a signature for a file using librsync algorithm
Args:
file_obj: File object to generate signature for
file_path: Optional file path for metadata
Returns:
FileSignature object containing block information
"""
file_obj.seek(0)
file_data = file_obj.read()
file_size = len(file_data)
# Calculate optimal signature parameters
magic, block_len, strong_len = get_signature_args(
file_size,
block_len=self.block_size,
strong_len=self.strong_len
)
# Generate signature using librsync
file_io = BytesIO(file_data)
sig_io = BytesIO()
signature(file_io, sig_io, strong_len, magic, block_len)
signature_data = sig_io.getvalue()
# Parse signature to extract block information
blocks = self._parse_signature(signature_data, file_data, block_len)
return FileSignature(
file_path=file_path or "",
file_size=file_size,
total_blocks=len(blocks),
block_size=block_len,
strong_len=strong_len,
blocks=blocks,
signature_data=signature_data
)
def _parse_signature(self, signature_data: bytes, file_data: bytes, block_size: int) -> List[BlockInfo]:
"""
Parse signature data to extract block information
Args:
signature_data: Raw signature data from librsync
file_data: Original file data
block_size: Size of blocks
Returns:
List of BlockInfo objects
"""
blocks = []
total_blocks = (len(file_data) + block_size - 1) // block_size
for i in range(total_blocks):
start_offset = i * block_size
end_offset = min(start_offset + block_size, len(file_data))
block_data = file_data[start_offset:end_offset]
# Calculate weak checksum (simple Adler-32 variant)
weak_checksum = self._calculate_weak_checksum(block_data)
# Calculate strong hash (MD5)
strong_hash = hashlib.md5(block_data).hexdigest()
blocks.append(BlockInfo(
block_index=i,
weak_checksum=weak_checksum,
strong_hash=strong_hash,
block_size=len(block_data),
file_offset=start_offset
))
return blocks
def _calculate_weak_checksum(self, data: bytes) -> int:
"""
Calculate a weak checksum similar to Adler-32
Args:
data: Block data
Returns:
Weak checksum value
"""
a = 1
b = 0
for byte in data:
a = (a + byte) % 65521
b = (b + a) % 65521
return (b << 16) | a
def compare_signatures(self, old_sig: FileSignature, new_sig: FileSignature) -> List[int]:
"""
Compare two signatures to find changed blocks
Args:
old_sig: Previous file signature
new_sig: New file signature
Returns:
List of block indices that have changed
"""
changed_blocks = []
# Create lookup tables for efficient comparison
old_blocks = {block.block_index: block for block in old_sig.blocks}
new_blocks = {block.block_index: block for block in new_sig.blocks}
# Find changed, added, or removed blocks
all_indices = set(old_blocks.keys()) | set(new_blocks.keys())
for block_idx in all_indices:
old_block = old_blocks.get(block_idx)
new_block = new_blocks.get(block_idx)
if old_block is None or new_block is None:
# Block was added or removed
changed_blocks.append(block_idx)
elif (old_block.weak_checksum != new_block.weak_checksum or
old_block.strong_hash != new_block.strong_hash):
# Block content changed
changed_blocks.append(block_idx)
return sorted(changed_blocks)
def generate_delta(self, old_file: BinaryIO, new_file: BinaryIO,
old_signature: FileSignature = None) -> FileDelta:
"""
Generate a delta between two file versions
Args:
old_file: Previous version of the file
new_file: New version of the file
old_signature: Optional pre-computed signature of old file
Returns:
FileDelta object containing change information
"""
# Generate signatures if not provided
if old_signature is None:
old_signature = self.generate_signature(old_file)
new_signature = self.generate_signature(new_file)
# Generate delta using librsync
new_file.seek(0)
old_sig_io = BytesIO(old_signature.signature_data)
delta_io = BytesIO()
delta(new_file, old_sig_io, delta_io)
delta_data = delta_io.getvalue()
# Find changed blocks
changed_blocks = self.compare_signatures(old_signature, new_signature)
return FileDelta(
changed_blocks=changed_blocks,
delta_data=delta_data,
old_signature=old_signature,
new_signature=new_signature
)
def apply_delta(self, old_file: BinaryIO, delta_obj: FileDelta) -> BytesIO:
"""
Apply a delta to reconstruct the new file
Args:
old_file: Original file
delta_obj: Delta information
Returns:
BytesIO object containing the reconstructed file
"""
old_file.seek(0)
delta_io = BytesIO(delta_obj.delta_data)
result_io = BytesIO()
patch(old_file, delta_io, result_io)
result_io.seek(0)
return result_io
def calculate_block_changes(self, old_sig: FileSignature, new_sig: FileSignature) -> Dict[str, Any]:
"""
Calculate detailed statistics about block changes
Args:
old_sig: Previous file signature
new_sig: New file signature
Returns:
Dictionary with change statistics
"""
changed_blocks = self.compare_signatures(old_sig, new_sig)
return {
"total_old_blocks": len(old_sig.blocks),
"total_new_blocks": len(new_sig.blocks),
"changed_blocks": len(changed_blocks),
"changed_block_indices": changed_blocks,
"unchanged_blocks": len(old_sig.blocks) - len(changed_blocks),
"compression_ratio": 1.0 - (len(changed_blocks) / max(len(old_sig.blocks), 1)),
"old_file_size": old_sig.file_size,
"new_file_size": new_sig.file_size,
}

View file

@ -0,0 +1,284 @@
"""
Incremental Loader for Cognee
This module implements incremental file loading using the rsync algorithm.
It integrates with the existing cognee ingestion pipeline to only process
changed blocks when a file is re-added.
"""
import json
from io import BytesIO
from typing import BinaryIO, List, Optional, Any, Dict, Tuple
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data, FileSignature
from cognee.modules.users.models import User
from cognee.shared.utils import get_file_content_hash
from .block_hash_service import BlockHashService, FileSignature as ServiceFileSignature
class IncrementalLoader:
"""
Incremental file loader using rsync algorithm for efficient updates
"""
def __init__(self, block_size: int = 1024, strong_len: int = 8):
"""
Initialize the incremental loader
Args:
block_size: Size of blocks in bytes for rsync algorithm
strong_len: Length of strong hash in bytes
"""
self.block_service = BlockHashService(block_size, strong_len)
async def should_process_file(self, file_obj: BinaryIO, data_id: str) -> Tuple[bool, Optional[Dict]]:
"""
Determine if a file should be processed based on incremental changes
Args:
file_obj: File object to check
data_id: Data ID for the file
Returns:
Tuple of (should_process, change_info)
- should_process: True if file needs processing
- change_info: Dictionary with change details if applicable
"""
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# Check if we have an existing signature for this file
existing_signature = await self._get_existing_signature(session, data_id)
if existing_signature is None:
# First time seeing this file, needs full processing
return True, {"type": "new_file", "full_processing": True}
# Generate signature for current file version
current_signature = self.block_service.generate_signature(file_obj)
# Quick check: if overall content hash is the same, no changes
file_obj.seek(0)
current_content_hash = get_file_content_hash(file_obj)
if current_content_hash == existing_signature.content_hash:
return False, {"type": "no_changes", "full_processing": False}
# Convert database signature to service signature for comparison
service_old_sig = self._db_signature_to_service(existing_signature)
# Compare signatures to find changed blocks
changed_blocks = self.block_service.compare_signatures(service_old_sig, current_signature)
if not changed_blocks:
# Signatures match, no processing needed
return False, {"type": "no_changes", "full_processing": False}
# Calculate change statistics
change_stats = self.block_service.calculate_block_changes(service_old_sig, current_signature)
change_info = {
"type": "incremental_changes",
"full_processing": len(changed_blocks) > (len(service_old_sig.blocks) * 0.7), # >70% changed = full reprocess
"changed_blocks": changed_blocks,
"stats": change_stats,
"new_signature": current_signature,
"old_signature": service_old_sig,
}
return True, change_info
async def process_incremental_changes(self, file_obj: BinaryIO, data_id: str,
change_info: Dict) -> List[Dict]:
"""
Process only the changed blocks of a file
Args:
file_obj: File object to process
data_id: Data ID for the file
change_info: Change information from should_process_file
Returns:
List of block data that needs reprocessing
"""
if change_info["type"] != "incremental_changes":
raise ValueError("Invalid change_info type for incremental processing")
file_obj.seek(0)
file_data = file_obj.read()
changed_blocks = change_info["changed_blocks"]
new_signature = change_info["new_signature"]
# Extract data for changed blocks
changed_block_data = []
for block_idx in changed_blocks:
# Find the block info
block_info = None
for block in new_signature.blocks:
if block.block_index == block_idx:
block_info = block
break
if block_info is None:
continue
# Extract block data
start_offset = block_info.file_offset
end_offset = start_offset + block_info.block_size
block_data = file_data[start_offset:end_offset]
changed_block_data.append({
"block_index": block_idx,
"block_data": block_data,
"block_info": block_info,
"file_offset": start_offset,
"block_size": len(block_data),
})
return changed_block_data
async def save_file_signature(self, file_obj: BinaryIO, data_id: str) -> None:
"""
Save or update the file signature in the database
Args:
file_obj: File object
data_id: Data ID for the file
"""
# Generate signature
signature = self.block_service.generate_signature(file_obj, str(data_id))
# Calculate content hash
file_obj.seek(0)
content_hash = get_file_content_hash(file_obj)
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# Check if signature already exists
existing = await session.execute(
select(FileSignature).filter(FileSignature.data_id == data_id)
)
existing_signature = existing.scalar_one_or_none()
# Prepare block info for JSON storage
blocks_info = [
{
"block_index": block.block_index,
"weak_checksum": block.weak_checksum,
"strong_hash": block.strong_hash,
"block_size": block.block_size,
"file_offset": block.file_offset,
}
for block in signature.blocks
]
if existing_signature:
# Update existing signature
existing_signature.file_path = signature.file_path
existing_signature.file_size = signature.file_size
existing_signature.content_hash = content_hash
existing_signature.total_blocks = signature.total_blocks
existing_signature.block_size = signature.block_size
existing_signature.strong_len = signature.strong_len
existing_signature.signature_data = signature.signature_data
existing_signature.blocks_info = blocks_info
await session.merge(existing_signature)
else:
# Create new signature
new_signature = FileSignature(
data_id=data_id,
file_path=signature.file_path,
file_size=signature.file_size,
content_hash=content_hash,
total_blocks=signature.total_blocks,
block_size=signature.block_size,
strong_len=signature.strong_len,
signature_data=signature.signature_data,
blocks_info=blocks_info,
)
session.add(new_signature)
await session.commit()
async def _get_existing_signature(self, session: AsyncSession, data_id: str) -> Optional[FileSignature]:
"""
Get existing file signature from database
Args:
session: Database session
data_id: Data ID to search for
Returns:
FileSignature object or None if not found
"""
result = await session.execute(
select(FileSignature).filter(FileSignature.data_id == data_id)
)
return result.scalar_one_or_none()
def _db_signature_to_service(self, db_signature: FileSignature) -> ServiceFileSignature:
"""
Convert database FileSignature to service FileSignature
Args:
db_signature: Database signature object
Returns:
Service FileSignature object
"""
from .block_hash_service import BlockInfo
# Convert blocks info
blocks = [
BlockInfo(
block_index=block["block_index"],
weak_checksum=block["weak_checksum"],
strong_hash=block["strong_hash"],
block_size=block["block_size"],
file_offset=block["file_offset"],
)
for block in db_signature.blocks_info
]
return ServiceFileSignature(
file_path=db_signature.file_path,
file_size=db_signature.file_size,
total_blocks=db_signature.total_blocks,
block_size=db_signature.block_size,
strong_len=db_signature.strong_len,
blocks=blocks,
signature_data=db_signature.signature_data,
)
async def cleanup_orphaned_signatures(self) -> int:
"""
Clean up file signatures that no longer have corresponding data entries
Returns:
Number of signatures removed
"""
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
# Find signatures without corresponding data entries
orphaned_query = """
DELETE FROM file_signatures
WHERE data_id NOT IN (SELECT id FROM data)
"""
result = await session.execute(orphaned_query)
removed_count = result.rowcount
await session.commit()
return removed_count

View file

@ -1,6 +1,7 @@
import dlt
import json
import inspect
from datetime import datetime
from uuid import UUID
from typing import Union, BinaryIO, Any, List, Optional
import cognee.modules.ingestion as ingestion
@ -13,6 +14,7 @@ from cognee.modules.users.permissions.methods import give_permission_on_dataset
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
from .get_dlt_destination import get_dlt_destination
from .save_data_item_to_storage import save_data_item_to_storage
from cognee.modules.ingestion.incremental import IncrementalLoader
from cognee.api.v1.add.config import get_s3_config
@ -107,6 +109,15 @@ async def ingest_data(
data_id = ingestion.identify(classified_data, user)
file_metadata = classified_data.get_metadata()
# Initialize incremental loader for this file
incremental_loader = IncrementalLoader()
# Check if file needs incremental processing
should_process, change_info = await incremental_loader.should_process_file(file, data_id)
# Save updated file signature regardless of whether processing is needed
await incremental_loader.save_file_signature(file, data_id)
from sqlalchemy import select
@ -139,6 +150,13 @@ async def ingest_data(
ext_metadata = get_external_metadata_dict(data_item)
if node_set:
ext_metadata["node_set"] = node_set
# Add incremental processing metadata
ext_metadata["incremental_processing"] = {
"should_process": should_process,
"change_info": change_info,
"processing_timestamp": json.loads(json.dumps(datetime.now().isoformat()))
}
if data_point is not None:
data_point.name = file_metadata["name"]

View file

@ -0,0 +1,186 @@
"""
Example: Incremental File Loading with Cognee
This example demonstrates how to use Cognee's incremental file loading feature
to efficiently process only changed parts of files when they are re-added.
"""
import tempfile
import os
from io import BytesIO
import cognee
from cognee.modules.ingestion.incremental import IncrementalLoader, BlockHashService
async def demonstrate_incremental_loading():
"""
Demonstrate incremental file loading by creating a file, modifying it,
and showing how only changed blocks are detected.
"""
print("🚀 Cognee Incremental File Loading Demo")
print("=" * 50)
# Initialize the incremental loader
incremental_loader = IncrementalLoader(block_size=512) # 512 byte blocks for demo
block_service = BlockHashService(block_size=512)
# Create initial file content
initial_content = b"""
This is the initial content of our test file.
It contains multiple lines of text that will be
split into blocks for incremental processing.
Block 1: Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Block 2: Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
Block 3: Ut enim ad minim veniam, quis nostrud exercitation ullamco.
Block 4: Duis aute irure dolor in reprehenderit in voluptate velit esse.
Block 5: Excepteur sint occaecat cupidatat non proident, sunt in culpa.
This is the end of the initial content.
"""
# Create modified content (change Block 2 and add Block 6)
modified_content = b"""
This is the initial content of our test file.
It contains multiple lines of text that will be
split into blocks for incremental processing.
Block 1: Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Block 2: MODIFIED - This block has been changed significantly!
Block 3: Ut enim ad minim veniam, quis nostrud exercitation ullamco.
Block 4: Duis aute irure dolor in reprehenderit in voluptate velit esse.
Block 5: Excepteur sint occaecat cupidatat non proident, sunt in culpa.
Block 6: NEW BLOCK - This is additional content that was added.
This is the end of the modified content.
"""
print("1. Creating signatures for initial and modified versions...")
# Generate signatures
initial_file = BytesIO(initial_content)
modified_file = BytesIO(modified_content)
initial_signature = block_service.generate_signature(initial_file, "test_file.txt")
modified_signature = block_service.generate_signature(modified_file, "test_file.txt")
print(f" Initial file: {initial_signature.file_size} bytes, {initial_signature.total_blocks} blocks")
print(f" Modified file: {modified_signature.file_size} bytes, {modified_signature.total_blocks} blocks")
# Compare signatures to find changes
print("\n2. Comparing signatures to detect changes...")
changed_blocks = block_service.compare_signatures(initial_signature, modified_signature)
change_stats = block_service.calculate_block_changes(initial_signature, modified_signature)
print(f" Changed blocks: {changed_blocks}")
print(f" Compression ratio: {change_stats['compression_ratio']:.2%}")
print(f" Total blocks changed: {change_stats['changed_blocks']} out of {change_stats['total_old_blocks']}")
# Generate delta
print("\n3. Generating delta for changed content...")
initial_file.seek(0)
modified_file.seek(0)
delta = block_service.generate_delta(initial_file, modified_file, initial_signature)
print(f" Delta size: {len(delta.delta_data)} bytes")
print(f" Changed blocks in delta: {delta.changed_blocks}")
# Demonstrate reconstruction
print("\n4. Reconstructing file from delta...")
initial_file.seek(0)
reconstructed = block_service.apply_delta(initial_file, delta)
reconstructed_content = reconstructed.read()
print(f" Reconstruction successful: {reconstructed_content == modified_content}")
print(f" Reconstructed size: {len(reconstructed_content)} bytes")
# Show block details
print("\n5. Block-by-block analysis:")
print(" Block | Status | Strong Hash (first 8 chars)")
print(" ------|----------|---------------------------")
old_blocks = {b.block_index: b for b in initial_signature.blocks}
new_blocks = {b.block_index: b for b in modified_signature.blocks}
all_indices = sorted(set(old_blocks.keys()) | set(new_blocks.keys()))
for idx in all_indices:
old_block = old_blocks.get(idx)
new_block = new_blocks.get(idx)
if old_block is None:
status = "ADDED"
hash_display = new_block.strong_hash[:8] if new_block else ""
elif new_block is None:
status = "REMOVED"
hash_display = old_block.strong_hash[:8]
elif old_block.strong_hash == new_block.strong_hash:
status = "UNCHANGED"
hash_display = old_block.strong_hash[:8]
else:
status = "MODIFIED"
hash_display = f"{old_block.strong_hash[:8]}{new_block.strong_hash[:8]}"
print(f" {idx:5d} | {status:8s} | {hash_display}")
print("\n✅ Incremental loading demo completed!")
print("\nThis demonstrates how Cognee can efficiently process only the changed")
print("parts of files, significantly reducing processing time for large files")
print("with small modifications.")
async def demonstrate_with_cognee():
"""
Demonstrate integration with Cognee's add functionality
"""
print("\n" + "=" * 50)
print("🔧 Integration with Cognee Add Functionality")
print("=" * 50)
# Create a temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("Initial content for Cognee processing.")
temp_file_path = f.name
try:
print(f"1. Adding initial file: {temp_file_path}")
# Add file to Cognee
await cognee.add(temp_file_path)
print(" ✅ File added successfully")
# Modify the file
with open(temp_file_path, 'w') as f:
f.write("Modified content for Cognee processing with additional text.")
print("2. Adding modified version of the same file...")
# Add modified file - this should trigger incremental processing
await cognee.add(temp_file_path)
print(" ✅ Modified file processed with incremental loading")
finally:
# Clean up
if os.path.exists(temp_file_path):
os.unlink(temp_file_path)
if __name__ == "__main__":
import asyncio
print("Starting Cognee Incremental Loading Demo...")
# Run the demonstration
asyncio.run(demonstrate_incremental_loading())
# Uncomment the line below to test with actual Cognee integration
# asyncio.run(demonstrate_with_cognee())

View file

@ -58,7 +58,8 @@ dependencies = [
"structlog>=25.2.0,<26",
"onnxruntime<=1.21.1",
"pylance==0.22.0",
"kuzu==0.9.0"
"kuzu==0.9.0",
"python-rsync>=0.1.2"
]
[project.optional-dependencies]

1
requirements.txt Normal file
View file

@ -0,0 +1 @@
python-rsync>=0.1.2

127
test_incremental_simple.py Normal file
View file

@ -0,0 +1,127 @@
#!/usr/bin/env python3
"""
Simple test for incremental loading functionality
"""
import sys
sys.path.insert(0, '.')
from io import BytesIO
from cognee.modules.ingestion.incremental import BlockHashService
def test_incremental_loading():
"""
Simple test of the incremental loading functionality
"""
print("🚀 Cognee Incremental File Loading Test")
print("=" * 50)
# Initialize the block service
block_service = BlockHashService(block_size=64) # Small blocks for demo
# Create initial file content
initial_content = b"""This is the initial content.
Line 1: Lorem ipsum dolor sit amet
Line 2: Consectetur adipiscing elit
Line 3: Sed do eiusmod tempor
Line 4: Incididunt ut labore et dolore
Line 5: End of initial content"""
# Create modified content (change Line 2 and add Line 6)
modified_content = b"""This is the initial content.
Line 1: Lorem ipsum dolor sit amet
Line 2: MODIFIED - This line has changed!
Line 3: Sed do eiusmod tempor
Line 4: Incididunt ut labore et dolore
Line 5: End of initial content
Line 6: NEW - This is additional content"""
print("1. Creating signatures for initial and modified versions...")
# Generate signatures
initial_file = BytesIO(initial_content)
modified_file = BytesIO(modified_content)
initial_signature = block_service.generate_signature(initial_file, "test_file.txt")
modified_signature = block_service.generate_signature(modified_file, "test_file.txt")
print(f" Initial file: {initial_signature.file_size} bytes, {initial_signature.total_blocks} blocks")
print(f" Modified file: {modified_signature.file_size} bytes, {modified_signature.total_blocks} blocks")
# Compare signatures to find changes
print("\n2. Comparing signatures to detect changes...")
changed_blocks = block_service.compare_signatures(initial_signature, modified_signature)
change_stats = block_service.calculate_block_changes(initial_signature, modified_signature)
print(f" Changed blocks: {changed_blocks}")
print(f" Compression ratio: {change_stats['compression_ratio']:.2%}")
print(f" Total blocks changed: {change_stats['changed_blocks']} out of {change_stats['total_old_blocks']}")
# Generate delta
print("\n3. Generating delta for changed content...")
initial_file.seek(0)
modified_file.seek(0)
delta = block_service.generate_delta(initial_file, modified_file, initial_signature)
print(f" Delta size: {len(delta.delta_data)} bytes")
print(f" Changed blocks in delta: {delta.changed_blocks}")
# Demonstrate reconstruction
print("\n4. Reconstructing file from delta...")
initial_file.seek(0)
reconstructed = block_service.apply_delta(initial_file, delta)
reconstructed_content = reconstructed.read()
print(f" Reconstruction successful: {reconstructed_content == modified_content}")
print(f" Reconstructed size: {len(reconstructed_content)} bytes")
# Show block details
print("\n5. Block-by-block analysis:")
print(" Block | Status | Strong Hash (first 8 chars)")
print(" ------|----------|---------------------------")
old_blocks = {b.block_index: b for b in initial_signature.blocks}
new_blocks = {b.block_index: b for b in modified_signature.blocks}
all_indices = sorted(set(old_blocks.keys()) | set(new_blocks.keys()))
for idx in all_indices:
old_block = old_blocks.get(idx)
new_block = new_blocks.get(idx)
if old_block is None:
status = "ADDED"
hash_display = new_block.strong_hash[:8] if new_block else ""
elif new_block is None:
status = "REMOVED"
hash_display = old_block.strong_hash[:8]
elif old_block.strong_hash == new_block.strong_hash:
status = "UNCHANGED"
hash_display = old_block.strong_hash[:8]
else:
status = "MODIFIED"
hash_display = f"{old_block.strong_hash[:8]}{new_block.strong_hash[:8]}"
print(f" {idx:5d} | {status:8s} | {hash_display}")
print("\n✅ Incremental loading test completed!")
print("\nThis demonstrates how Cognee can efficiently process only the changed")
print("parts of files, significantly reducing processing time for large files")
print("with small modifications.")
return True
if __name__ == "__main__":
success = test_incremental_loading()
if success:
print("\n🎉 Test passed successfully!")
else:
print("\n❌ Test failed!")
sys.exit(1)

View file

@ -0,0 +1,162 @@
"""
Unit tests for incremental file loading functionality
"""
import pytest
from io import BytesIO
from cognee.modules.ingestion.incremental import BlockHashService, IncrementalLoader
class TestBlockHashService:
"""Test the core block hashing service"""
def test_signature_generation(self):
"""Test basic signature generation"""
service = BlockHashService(block_size=10)
content = b"Hello, this is a test file for block hashing!"
file_obj = BytesIO(content)
signature = service.generate_signature(file_obj, "test.txt")
assert signature.file_path == "test.txt"
assert signature.file_size == len(content)
assert signature.block_size == 10
assert len(signature.blocks) > 0
assert signature.signature_data is not None
def test_change_detection(self):
"""Test detection of changes between file versions"""
service = BlockHashService(block_size=10)
# Original content
original_content = b"Hello, world! This is the original content."
original_file = BytesIO(original_content)
original_sig = service.generate_signature(original_file)
# Modified content (change in middle)
modified_content = b"Hello, world! This is the MODIFIED content."
modified_file = BytesIO(modified_content)
modified_sig = service.generate_signature(modified_file)
# Check for changes
changed_blocks = service.compare_signatures(original_sig, modified_sig)
assert len(changed_blocks) > 0 # Should detect changes
assert len(changed_blocks) < len(original_sig.blocks) # Not all blocks changed
def test_no_changes(self):
"""Test that identical files show no changes"""
service = BlockHashService(block_size=10)
content = b"This content will not change at all!"
file1 = BytesIO(content)
file2 = BytesIO(content)
sig1 = service.generate_signature(file1)
sig2 = service.generate_signature(file2)
changed_blocks = service.compare_signatures(sig1, sig2)
assert len(changed_blocks) == 0
def test_delta_generation(self):
"""Test delta generation and application"""
service = BlockHashService(block_size=8)
original_content = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
modified_content = b"ABCDEFGHXXXXXXXXXXXXXXWXYZ" # Change middle part
original_file = BytesIO(original_content)
modified_file = BytesIO(modified_content)
# Generate delta
delta = service.generate_delta(original_file, modified_file)
assert len(delta.changed_blocks) > 0
assert delta.delta_data is not None
# Apply delta
original_file.seek(0)
reconstructed = service.apply_delta(original_file, delta)
reconstructed_content = reconstructed.read()
assert reconstructed_content == modified_content
def test_block_statistics(self):
"""Test calculation of block change statistics"""
service = BlockHashService(block_size=5)
old_content = b"ABCDEFGHIJ" # 2 blocks
new_content = b"ABCDEFXXXX" # 2 blocks, second one changed
old_file = BytesIO(old_content)
new_file = BytesIO(new_content)
old_sig = service.generate_signature(old_file)
new_sig = service.generate_signature(new_file)
stats = service.calculate_block_changes(old_sig, new_sig)
assert stats["total_old_blocks"] == 2
assert stats["total_new_blocks"] == 2
assert stats["changed_blocks"] == 1 # Only second block changed
assert stats["compression_ratio"] == 0.5 # 50% unchanged
class TestIncrementalLoader:
"""Test the incremental loader integration"""
@pytest.mark.asyncio
async def test_should_process_new_file(self):
"""Test processing decision for new files"""
loader = IncrementalLoader()
content = b"This is a new file that hasn't been seen before."
file_obj = BytesIO(content)
# For a new file (no existing signature), should process
# Note: This test would need a mock database setup in real implementation
# For now, we test the logic without database interaction
pass # Placeholder for database-dependent test
def test_block_data_extraction(self):
"""Test extraction of changed block data"""
loader = IncrementalLoader(block_size=10)
content = b"Block1____Block2____Block3____"
file_obj = BytesIO(content)
# Create mock change info
from cognee.modules.ingestion.incremental.block_hash_service import BlockInfo, FileSignature
blocks = [
BlockInfo(0, 12345, "hash1", 10, 0),
BlockInfo(1, 23456, "hash2", 10, 10),
BlockInfo(2, 34567, "hash3", 10, 20),
]
signature = FileSignature(
file_path="test",
file_size=30,
total_blocks=3,
block_size=10,
strong_len=8,
blocks=blocks,
signature_data=b"signature"
)
change_info = {
"type": "incremental_changes",
"changed_blocks": [1], # Only middle block changed
"new_signature": signature
}
# This would normally be called after should_process_file
# Testing the block extraction logic
pass # Placeholder for full integration test
if __name__ == "__main__":
pytest.main([__file__])