Compare commits

...
Sign in to create a new pull request.

9 commits

Author SHA1 Message Date
Igor Ilic
d78599aa7e
Merge branch 'dev' into incremental-loading 2025-07-14 17:34:50 +02:00
Igor Ilic
081fae8273
Merge branch 'dev' into incremental-loading 2025-07-14 13:44:46 +02:00
Igor Ilic
30be0df314 fix: Resolve for loop break issue 2025-07-10 23:49:00 +02:00
Igor Ilic
dc38ff3838 fix: Resolve S3 adding specific files issue 2025-07-10 23:03:48 +02:00
Igor Ilic
67b61ff964 fix: Resolve code graph pipeline issue 2025-07-10 22:20:30 +02:00
Igor Ilic
80896fdcc5 test: Fix deduplication test 2025-07-10 22:10:08 +02:00
Igor Ilic
593cfcab5b feat: Add incremental load 2025-07-10 21:58:11 +02:00
Igor Ilic
d98253c28c fix: Resolve issue with Data object when running cognify pipeline 2025-07-10 21:14:26 +02:00
Igor Ilic
af2b0735f6 feat: incremental load initial commit 2025-07-10 20:46:36 +02:00
14 changed files with 158 additions and 38 deletions

View file

@ -15,6 +15,7 @@ async def add(
vector_db_config: dict = None, vector_db_config: dict = None,
graph_db_config: dict = None, graph_db_config: dict = None,
dataset_id: UUID = None, dataset_id: UUID = None,
incremental_loading: bool = True,
): ):
""" """
Add data to Cognee for knowledge graph processing. Add data to Cognee for knowledge graph processing.
@ -153,6 +154,7 @@ async def add(
pipeline_name="add_pipeline", pipeline_name="add_pipeline",
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
): ):
pipeline_run_info = run_info pipeline_run_info = run_info

View file

@ -79,7 +79,9 @@ async def run_code_graph_pipeline(repo_path, include_docs=False):
async for run_status in non_code_pipeline_run: async for run_status in non_code_pipeline_run:
yield run_status yield run_status
async for run_status in run_tasks(tasks, dataset.id, repo_path, user, "cognify_code_pipeline"): async for run_status in run_tasks(
tasks, dataset.id, repo_path, user, "cognify_code_pipeline", incremental_loading=False
):
yield run_status yield run_status

View file

@ -39,6 +39,7 @@ async def cognify(
vector_db_config: dict = None, vector_db_config: dict = None,
graph_db_config: dict = None, graph_db_config: dict = None,
run_in_background: bool = False, run_in_background: bool = False,
incremental_loading: bool = True,
): ):
""" """
Transform ingested data into a structured knowledge graph. Transform ingested data into a structured knowledge graph.
@ -194,6 +195,7 @@ async def cognify(
datasets=datasets, datasets=datasets,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
) )
else: else:
return await run_cognify_blocking( return await run_cognify_blocking(
@ -202,6 +204,7 @@ async def cognify(
datasets=datasets, datasets=datasets,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
) )
@ -211,6 +214,7 @@ async def run_cognify_blocking(
datasets, datasets,
graph_db_config: dict = None, graph_db_config: dict = None,
vector_db_config: dict = False, vector_db_config: dict = False,
incremental_loading: bool = True,
): ):
total_run_info = {} total_run_info = {}
@ -221,6 +225,7 @@ async def run_cognify_blocking(
pipeline_name="cognify_pipeline", pipeline_name="cognify_pipeline",
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
incremental_loading=incremental_loading,
): ):
if run_info.dataset_id: if run_info.dataset_id:
total_run_info[run_info.dataset_id] = run_info total_run_info[run_info.dataset_id] = run_info
@ -236,12 +241,14 @@ async def run_cognify_as_background_process(
datasets, datasets,
graph_db_config: dict = None, graph_db_config: dict = None,
vector_db_config: dict = False, vector_db_config: dict = False,
incremental_loading: bool = True,
): ):
# Store pipeline status for all pipelines # Store pipeline status for all pipelines
pipeline_run_started_info = [] pipeline_run_started_info = []
async def handle_rest_of_the_run(pipeline_list): async def handle_rest_of_the_run(pipeline_list):
# Execute all provided pipelines one by one to avoid database write conflicts # Execute all provided pipelines one by one to avoid database write conflicts
# TODO: Convert to async gather task instead of for loop when Queue mechanism for database is created
for pipeline in pipeline_list: for pipeline in pipeline_list:
while True: while True:
try: try:
@ -266,6 +273,7 @@ async def run_cognify_as_background_process(
pipeline_name="cognify_pipeline", pipeline_name="cognify_pipeline",
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
incremental_loading=incremental_loading,
) )
# Save dataset Pipeline run started info # Save dataset Pipeline run started info

View file

@ -1,6 +1,7 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from uuid import uuid4 from uuid import uuid4
from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from cognee.infrastructure.databases.relational import Base from cognee.infrastructure.databases.relational import Base
@ -20,7 +21,11 @@ class Data(Base):
owner_id = Column(UUID, index=True) owner_id = Column(UUID, index=True)
content_hash = Column(String) content_hash = Column(String)
external_metadata = Column(JSON) external_metadata = Column(JSON)
node_set = Column(JSON, nullable=True) # Store NodeSet as JSON list of strings # Store NodeSet as JSON list of strings
node_set = Column(JSON, nullable=True)
# MutableDict allows SQLAlchemy to notice key-value pair changes, without it changing a value for a key
# wouldn't be noticed when commiting a database session
pipeline_status = Column(MutableDict.as_mutable(JSON))
token_count = Column(Integer) token_count = Column(Integer)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))

View file

@ -0,0 +1,2 @@
from .get_s3_fs import get_s3_fs
from .open_data_file import open_data_file

View file

@ -0,0 +1,14 @@
from cognee.api.v1.add.config import get_s3_config
def get_s3_fs():
s3_config = get_s3_config()
fs = None
if s3_config.aws_access_key_id is not None and s3_config.aws_secret_access_key is not None:
import s3fs
fs = s3fs.S3FileSystem(
key=s3_config.aws_access_key_id, secret=s3_config.aws_secret_access_key, anon=False
)
return fs

View file

@ -0,0 +1,6 @@
def open_data_file(file_path: str, s3fs):
if file_path.startswith("s3://"):
return s3fs.open(file_path, mode="rb")
else:
local_path = file_path.replace("file://", "")
return open(local_path, mode="rb")

View file

@ -9,6 +9,7 @@ class PipelineRunInfo(BaseModel):
dataset_id: UUID dataset_id: UUID
dataset_name: str dataset_name: str
payload: Optional[Any] = None payload: Optional[Any] = None
data_ingestion_info: Optional[dict] = None
model_config = { model_config = {
"arbitrary_types_allowed": True, "arbitrary_types_allowed": True,

View file

@ -2,6 +2,7 @@ import asyncio
from typing import Union from typing import Union
from uuid import NAMESPACE_OID, uuid5, UUID from uuid import NAMESPACE_OID, uuid5, UUID
from cognee.modules.ingestion.exceptions import IngestionError
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
from cognee.modules.data.methods.get_dataset_data import get_dataset_data from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.models import Data, Dataset from cognee.modules.data.models import Data, Dataset
@ -52,6 +53,7 @@ async def cognee_pipeline(
pipeline_name: str = "custom_pipeline", pipeline_name: str = "custom_pipeline",
vector_db_config: dict = None, vector_db_config: dict = None,
graph_db_config: dict = None, graph_db_config: dict = None,
incremental_loading: bool = True,
): ):
# Note: These context variables allow different value assignment for databases in Cognee # Note: These context variables allow different value assignment for databases in Cognee
# per async task, thread, process and etc. # per async task, thread, process and etc.
@ -106,6 +108,7 @@ async def cognee_pipeline(
data=data, data=data,
pipeline_name=pipeline_name, pipeline_name=pipeline_name,
context={"dataset": dataset}, context={"dataset": dataset},
incremental_loading=incremental_loading,
): ):
yield run_info yield run_info
@ -117,6 +120,7 @@ async def run_pipeline(
data=None, data=None,
pipeline_name: str = "custom_pipeline", pipeline_name: str = "custom_pipeline",
context: dict = None, context: dict = None,
incremental_loading=True,
): ):
check_dataset_name(dataset.name) check_dataset_name(dataset.name)
@ -184,7 +188,9 @@ async def run_pipeline(
if not isinstance(task, Task): if not isinstance(task, Task):
raise ValueError(f"Task {task} is not an instance of Task") raise ValueError(f"Task {task} is not an instance of Task")
pipeline_run = run_tasks(tasks, dataset_id, data, user, pipeline_name, context) pipeline_run = run_tasks(
tasks, dataset_id, data, user, pipeline_name, context, incremental_loading
)
async for pipeline_run_info in pipeline_run: async for pipeline_run_info in pipeline_run:
yield pipeline_run_info yield pipeline_run_info

View file

@ -1,14 +1,20 @@
import os import os
import cognee.modules.ingestion as ingestion
from uuid import UUID from uuid import UUID
from typing import Any from typing import Any
from functools import wraps from functools import wraps
from sqlalchemy import select
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed
from cognee.modules.users.models import User from cognee.modules.users.models import User
from cognee.modules.data.models import Data
from cognee.modules.ingestion.methods import get_s3_fs, open_data_file
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines.utils import generate_pipeline_id from cognee.modules.pipelines.utils import generate_pipeline_id
from cognee.tasks.ingestion import save_data_item_to_storage, resolve_data_directories
from cognee.modules.pipelines.models.PipelineRunInfo import ( from cognee.modules.pipelines.models.PipelineRunInfo import (
PipelineRunCompleted, PipelineRunCompleted,
PipelineRunErrored, PipelineRunErrored,
@ -55,6 +61,7 @@ async def run_tasks(
user: User = None, user: User = None,
pipeline_name: str = "unknown_pipeline", pipeline_name: str = "unknown_pipeline",
context: dict = None, context: dict = None,
incremental_loading: bool = True,
): ):
if not user: if not user:
user = get_default_user() user = get_default_user()
@ -79,10 +86,45 @@ async def run_tasks(
payload=data, payload=data,
) )
fs = get_s3_fs()
data_items_pipeline_run_info = {}
ingestion_error = None
try:
if not isinstance(data, list):
data = [data]
if incremental_loading:
data = await resolve_data_directories(data)
# TODO: Convert to async gather task instead of for loop (just make sure it can work there were some issues when async gathering datasets)
for data_item in data:
# If incremental_loading of data is set to True don't process documents already processed by pipeline
if incremental_loading:
# If data is being added to Cognee for the first time calculate the id of the data
if not isinstance(data_item, Data):
file_path = await save_data_item_to_storage(data_item, dataset.name)
# Ingest data and add metadata
with open_data_file(file_path, s3fs=fs) as file:
classified_data = ingestion.classify(file, s3fs=fs)
# data_id is the hash of file contents + owner id to avoid duplicate data
data_id = ingestion.identify(classified_data, user)
else:
# If data was already processed by Cognee get data id
data_id = data_item.id
# Check pipeline status, if Data already processed for pipeline before skip current processing
async with db_engine.get_async_session() as session:
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
if data_point:
if data_point.pipeline_status.get(pipeline_name) == "Completed":
continue
try: try:
async for result in run_tasks_with_telemetry( async for result in run_tasks_with_telemetry(
tasks=tasks, tasks=tasks,
data=data, data=[data_item],
user=user, user=user,
pipeline_name=pipeline_id, pipeline_name=pipeline_id,
context=context, context=context,
@ -94,6 +136,46 @@ async def run_tasks(
payload=result, payload=result,
) )
if incremental_loading:
data_items_pipeline_run_info[data_id] = {
"run_info": PipelineRunCompleted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
),
"data_id": data_id,
}
# Update pipeline status for Data element
async with db_engine.get_async_session() as session:
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
data_point.pipeline_status[pipeline_name] = "Completed"
await session.merge(data_point)
await session.commit()
except Exception as error:
# Temporarily swallow error and try to process rest of documents first, then re-raise error at end of data ingestion pipeline
ingestion_error = error
logger.error(
f"Exception caught while processing data: {error}.\n Data processing failed for data item: {data_item}."
)
if incremental_loading:
data_items_pipeline_run_info = {
"run_info": PipelineRunErrored(
pipeline_run_id=pipeline_run_id,
payload=error,
dataset_id=dataset.id,
dataset_name=dataset.name,
),
"data_id": data_id,
}
# re-raise error found during data ingestion
if ingestion_error:
raise ingestion_error
await log_pipeline_run_complete( await log_pipeline_run_complete(
pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data
) )
@ -102,6 +184,7 @@ async def run_tasks(
pipeline_run_id=pipeline_run_id, pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id, dataset_id=dataset.id,
dataset_name=dataset.name, dataset_name=dataset.name,
data_ingestion_info=data_items_pipeline_run_info,
) )
except Exception as error: except Exception as error:
@ -114,6 +197,7 @@ async def run_tasks(
payload=error, payload=error,
dataset_id=dataset.id, dataset_id=dataset.id,
dataset_name=dataset.name, dataset_name=dataset.name,
data_ingestion_info=data_items_pipeline_run_info,
) )
raise error raise error

View file

@ -9,6 +9,7 @@ from cognee.modules.data.models import Data
from cognee.modules.users.models import User from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
from cognee.modules.ingestion.methods import get_s3_fs, open_data_file
from cognee.modules.data.methods import ( from cognee.modules.data.methods import (
get_authorized_existing_datasets, get_authorized_existing_datasets,
get_dataset_data, get_dataset_data,
@ -18,9 +19,6 @@ from cognee.modules.data.methods import (
from .save_data_item_to_storage import save_data_item_to_storage from .save_data_item_to_storage import save_data_item_to_storage
from cognee.api.v1.add.config import get_s3_config
async def ingest_data( async def ingest_data(
data: Any, data: Any,
dataset_name: str, dataset_name: str,
@ -31,22 +29,7 @@ async def ingest_data(
if not user: if not user:
user = await get_default_user() user = await get_default_user()
s3_config = get_s3_config() fs = get_s3_fs()
fs = None
if s3_config.aws_access_key_id is not None and s3_config.aws_secret_access_key is not None:
import s3fs
fs = s3fs.S3FileSystem(
key=s3_config.aws_access_key_id, secret=s3_config.aws_secret_access_key, anon=False
)
def open_data_file(file_path: str):
if file_path.startswith("s3://"):
return fs.open(file_path, mode="rb")
else:
local_path = file_path.replace("file://", "")
return open(local_path, mode="rb")
def get_external_metadata_dict(data_item: Union[BinaryIO, str, Any]) -> dict[str, Any]: def get_external_metadata_dict(data_item: Union[BinaryIO, str, Any]) -> dict[str, Any]:
if hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")): if hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")):
@ -95,7 +78,7 @@ async def ingest_data(
file_path = await save_data_item_to_storage(data_item, dataset_name) file_path = await save_data_item_to_storage(data_item, dataset_name)
# Ingest data and add metadata # Ingest data and add metadata
with open_data_file(file_path) as file: with open_data_file(file_path, s3fs=fs) as file:
classified_data = ingestion.classify(file, s3fs=fs) classified_data = ingestion.classify(file, s3fs=fs)
# data_id is the hash of file contents + owner id to avoid duplicate data # data_id is the hash of file contents + owner id to avoid duplicate data
@ -148,6 +131,7 @@ async def ingest_data(
content_hash=file_metadata["content_hash"], content_hash=file_metadata["content_hash"],
external_metadata=ext_metadata, external_metadata=ext_metadata,
node_set=json.dumps(node_set) if node_set else None, node_set=json.dumps(node_set) if node_set else None,
pipeline_status={},
token_count=-1, token_count=-1,
) )

View file

@ -40,6 +40,9 @@ async def resolve_data_directories(
if include_subdirectories: if include_subdirectories:
base_path = item if item.endswith("/") else item + "/" base_path = item if item.endswith("/") else item + "/"
s3_keys = fs.glob(base_path + "**") s3_keys = fs.glob(base_path + "**")
# If path is not directory attempt to add item directly
if not s3_keys:
s3_keys = fs.ls(item)
else: else:
s3_keys = fs.ls(item) s3_keys = fs.ls(item)
# Filter out keys that represent directories using fs.isdir # Filter out keys that represent directories using fs.isdir

View file

@ -103,6 +103,9 @@ async def get_repo_file_dependencies(
extraction of dependencies (default is False). (default False) extraction of dependencies (default is False). (default False)
""" """
if isinstance(repo_path, list) and len(repo_path) == 1:
repo_path = repo_path[0]
if not os.path.exists(repo_path): if not os.path.exists(repo_path):
raise FileNotFoundError(f"Repository path {repo_path} does not exist.") raise FileNotFoundError(f"Repository path {repo_path} does not exist.")

View file

@ -25,8 +25,8 @@ async def test_deduplication():
explanation_file_path2 = os.path.join( explanation_file_path2 = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing_copy.txt" pathlib.Path(__file__).parent, "test_data/Natural_language_processing_copy.txt"
) )
await cognee.add([explanation_file_path], dataset_name) await cognee.add([explanation_file_path], dataset_name, incremental_loading=False)
await cognee.add([explanation_file_path2], dataset_name2) await cognee.add([explanation_file_path2], dataset_name2, incremental_loading=False)
result = await relational_engine.get_all_data_from_table("data") result = await relational_engine.get_all_data_from_table("data")
assert len(result) == 1, "More than one data entity was found." assert len(result) == 1, "More than one data entity was found."