From 14ba3e8829634bc856ed17c9c9e50fb290d2cf5d Mon Sep 17 00:00:00 2001 From: Igor Ilic <30923996+dexters1@users.noreply.github.com> Date: Tue, 29 Jul 2025 16:39:31 +0200 Subject: [PATCH] feat: Enable async execution of data items for incremental loading (#1092) ## Description Attempt at making incremental loading run async ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. --- cognee/api/v1/add/add.py | 2 + cognee/api/v1/add/routers/get_add_router.py | 3 + cognee/api/v1/cognify/code_graph_pipeline.py | 4 +- cognee/api/v1/cognify/cognify.py | 8 + .../v1/cognify/routers/get_cognify_router.py | 9 +- cognee/modules/data/models/Data.py | 7 +- .../processing/document_types/PdfDocument.py | 15 +- .../modules/pipelines/exceptions/__init__.py | 1 + .../pipelines/exceptions/exceptions.py | 12 + .../pipelines/models/DataItemStatus.py | 5 + .../pipelines/models/PipelineRunInfo.py | 6 + cognee/modules/pipelines/models/__init__.py | 1 + .../modules/pipelines/operations/pipeline.py | 7 +- .../modules/pipelines/operations/run_tasks.py | 246 ++++++++++++++++-- .../extract_chunks_from_documents.py | 20 +- cognee/tasks/ingestion/ingest_data.py | 3 +- .../ingestion/resolve_data_directories.py | 3 + .../get_repo_file_dependencies.py | 3 + cognee/tests/test_deduplication.py | 4 +- 19 files changed, 309 insertions(+), 50 deletions(-) create mode 100644 cognee/modules/pipelines/exceptions/__init__.py create mode 100644 cognee/modules/pipelines/exceptions/exceptions.py create mode 100644 cognee/modules/pipelines/models/DataItemStatus.py diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 4f51729a3..3e4aaae49 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -15,6 +15,7 @@ async def add( vector_db_config: dict = None, graph_db_config: dict = None, dataset_id: Optional[UUID] = None, + incremental_loading: bool = True, ): """ Add data to Cognee for knowledge graph processing. @@ -153,6 +154,7 @@ async def add( pipeline_name="add_pipeline", vector_db_config=vector_db_config, graph_db_config=graph_db_config, + incremental_loading=incremental_loading, ): pipeline_run_info = run_info diff --git a/cognee/api/v1/add/routers/get_add_router.py b/cognee/api/v1/add/routers/get_add_router.py index 4519af728..66b165a38 100644 --- a/cognee/api/v1/add/routers/get_add_router.py +++ b/cognee/api/v1/add/routers/get_add_router.py @@ -11,6 +11,7 @@ from typing import List, Optional, Union, Literal from cognee.modules.users.models import User from cognee.modules.users.methods import get_authenticated_user from cognee.shared.utils import send_telemetry +from cognee.modules.pipelines.models import PipelineRunErrored from cognee.shared.logging_utils import get_logger logger = get_logger() @@ -100,6 +101,8 @@ def get_add_router() -> APIRouter: else: add_run = await cognee_add(data, datasetName, user=user, dataset_id=datasetId) + if isinstance(add_run, PipelineRunErrored): + return JSONResponse(status_code=420, content=add_run.model_dump(mode="json")) return add_run.model_dump() except Exception as error: return JSONResponse(status_code=409, content={"error": str(error)}) diff --git a/cognee/api/v1/cognify/code_graph_pipeline.py b/cognee/api/v1/cognify/code_graph_pipeline.py index 00a0d3dc9..0da286c4b 100644 --- a/cognee/api/v1/cognify/code_graph_pipeline.py +++ b/cognee/api/v1/cognify/code_graph_pipeline.py @@ -79,7 +79,9 @@ async def run_code_graph_pipeline(repo_path, include_docs=False): async for run_status in non_code_pipeline_run: yield run_status - async for run_status in run_tasks(tasks, dataset.id, repo_path, user, "cognify_code_pipeline"): + async for run_status in run_tasks( + tasks, dataset.id, repo_path, user, "cognify_code_pipeline", incremental_loading=False + ): yield run_status diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index 7c7821460..c6508f3a7 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -39,6 +39,7 @@ async def cognify( vector_db_config: dict = None, graph_db_config: dict = None, run_in_background: bool = False, + incremental_loading: bool = True, ): """ Transform ingested data into a structured knowledge graph. @@ -194,6 +195,7 @@ async def cognify( datasets=datasets, vector_db_config=vector_db_config, graph_db_config=graph_db_config, + incremental_loading=incremental_loading, ) else: return await run_cognify_blocking( @@ -202,6 +204,7 @@ async def cognify( datasets=datasets, vector_db_config=vector_db_config, graph_db_config=graph_db_config, + incremental_loading=incremental_loading, ) @@ -211,6 +214,7 @@ async def run_cognify_blocking( datasets, graph_db_config: dict = None, vector_db_config: dict = False, + incremental_loading: bool = True, ): total_run_info = {} @@ -221,6 +225,7 @@ async def run_cognify_blocking( pipeline_name="cognify_pipeline", graph_db_config=graph_db_config, vector_db_config=vector_db_config, + incremental_loading=incremental_loading, ): if run_info.dataset_id: total_run_info[run_info.dataset_id] = run_info @@ -236,6 +241,7 @@ async def run_cognify_as_background_process( datasets, graph_db_config: dict = None, vector_db_config: dict = False, + incremental_loading: bool = True, ): # Convert dataset to list if it's a string if isinstance(datasets, str): @@ -246,6 +252,7 @@ async def run_cognify_as_background_process( async def handle_rest_of_the_run(pipeline_list): # Execute all provided pipelines one by one to avoid database write conflicts + # TODO: Convert to async gather task instead of for loop when Queue mechanism for database is created for pipeline in pipeline_list: while True: try: @@ -270,6 +277,7 @@ async def run_cognify_as_background_process( pipeline_name="cognify_pipeline", graph_db_config=graph_db_config, vector_db_config=vector_db_config, + incremental_loading=incremental_loading, ) # Save dataset Pipeline run started info diff --git a/cognee/api/v1/cognify/routers/get_cognify_router.py b/cognee/api/v1/cognify/routers/get_cognify_router.py index ecfceec52..b63238966 100644 --- a/cognee/api/v1/cognify/routers/get_cognify_router.py +++ b/cognee/api/v1/cognify/routers/get_cognify_router.py @@ -16,7 +16,11 @@ from cognee.modules.graph.methods import get_formatted_graph_data from cognee.modules.users.get_user_manager import get_user_manager_context from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.users.authentication.default.default_jwt_strategy import DefaultJWTStrategy -from cognee.modules.pipelines.models.PipelineRunInfo import PipelineRunCompleted, PipelineRunInfo +from cognee.modules.pipelines.models.PipelineRunInfo import ( + PipelineRunCompleted, + PipelineRunInfo, + PipelineRunErrored, +) from cognee.modules.pipelines.queues.pipeline_run_info_queues import ( get_from_queue, initialize_queue, @@ -105,6 +109,9 @@ def get_cognify_router() -> APIRouter: datasets, user, run_in_background=payload.run_in_background ) + # If any cognify run errored return JSONResponse with proper error status code + if any(isinstance(v, PipelineRunErrored) for v in cognify_run.values()): + return JSONResponse(status_code=420, content=cognify_run) return cognify_run except Exception as error: return JSONResponse(status_code=409, content={"error": str(error)}) diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index c22cc338e..dc918c2ed 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -1,6 +1,7 @@ from datetime import datetime, timezone from uuid import uuid4 from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer +from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.orm import relationship from cognee.infrastructure.databases.relational import Base @@ -21,7 +22,11 @@ class Data(Base): tenant_id = Column(UUID, index=True, nullable=True) content_hash = Column(String) external_metadata = Column(JSON) - node_set = Column(JSON, nullable=True) # Store NodeSet as JSON list of strings + # Store NodeSet as JSON list of strings + node_set = Column(JSON, nullable=True) + # MutableDict allows SQLAlchemy to notice key-value pair changes, without it changing a value for a key + # wouldn't be noticed when commiting a database session + pipeline_status = Column(MutableDict.as_mutable(JSON)) token_count = Column(Integer) data_size = Column(Integer, nullable=True) # File size in bytes created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) diff --git a/cognee/modules/data/processing/document_types/PdfDocument.py b/cognee/modules/data/processing/document_types/PdfDocument.py index e92868c2e..dc90899eb 100644 --- a/cognee/modules/data/processing/document_types/PdfDocument.py +++ b/cognee/modules/data/processing/document_types/PdfDocument.py @@ -5,7 +5,6 @@ from cognee.modules.chunking.Chunker import Chunker from cognee.infrastructure.files.utils.open_data_file import open_data_file from .Document import Document -from .exceptions.exceptions import PyPdfInternalError logger = get_logger("PDFDocument") @@ -17,18 +16,12 @@ class PdfDocument(Document): async with open_data_file(self.raw_data_location, mode="rb") as stream: logger.info(f"Reading PDF: {self.raw_data_location}") - try: - file = PdfReader(stream, strict=False) - except Exception: - raise PyPdfInternalError() + file = PdfReader(stream, strict=False) async def get_text(): - try: - for page in file.pages: - page_text = page.extract_text() - yield page_text - except Exception: - raise PyPdfInternalError() + for page in file.pages: + page_text = page.extract_text() + yield page_text chunker = chunker_cls(self, get_text=get_text, max_chunk_size=max_chunk_size) diff --git a/cognee/modules/pipelines/exceptions/__init__.py b/cognee/modules/pipelines/exceptions/__init__.py new file mode 100644 index 000000000..f4e296be3 --- /dev/null +++ b/cognee/modules/pipelines/exceptions/__init__.py @@ -0,0 +1 @@ +from .exceptions import PipelineRunFailedError diff --git a/cognee/modules/pipelines/exceptions/exceptions.py b/cognee/modules/pipelines/exceptions/exceptions.py new file mode 100644 index 000000000..0a4863075 --- /dev/null +++ b/cognee/modules/pipelines/exceptions/exceptions.py @@ -0,0 +1,12 @@ +from cognee.exceptions import CogneeApiError +from fastapi import status + + +class PipelineRunFailedError(CogneeApiError): + def __init__( + self, + message: str = "Pipeline run failed.", + name: str = "PipelineRunFailedError", + status_code: int = status.HTTP_422_UNPROCESSABLE_ENTITY, + ): + super().__init__(message, name, status_code) diff --git a/cognee/modules/pipelines/models/DataItemStatus.py b/cognee/modules/pipelines/models/DataItemStatus.py new file mode 100644 index 000000000..c9be26255 --- /dev/null +++ b/cognee/modules/pipelines/models/DataItemStatus.py @@ -0,0 +1,5 @@ +import enum + + +class DataItemStatus(str, enum.Enum): + DATA_ITEM_PROCESSING_COMPLETED = "DATA_ITEM_PROCESSING_COMPLETED" diff --git a/cognee/modules/pipelines/models/PipelineRunInfo.py b/cognee/modules/pipelines/models/PipelineRunInfo.py index d910f4fc8..5f5a91c34 100644 --- a/cognee/modules/pipelines/models/PipelineRunInfo.py +++ b/cognee/modules/pipelines/models/PipelineRunInfo.py @@ -9,6 +9,7 @@ class PipelineRunInfo(BaseModel): dataset_id: UUID dataset_name: str payload: Optional[Any] = None + data_ingestion_info: Optional[list] = None model_config = { "arbitrary_types_allowed": True, @@ -30,6 +31,11 @@ class PipelineRunCompleted(PipelineRunInfo): pass +class PipelineRunAlreadyCompleted(PipelineRunInfo): + status: str = "PipelineRunAlreadyCompleted" + pass + + class PipelineRunErrored(PipelineRunInfo): status: str = "PipelineRunErrored" pass diff --git a/cognee/modules/pipelines/models/__init__.py b/cognee/modules/pipelines/models/__init__.py index f109d7196..ed81f1398 100644 --- a/cognee/modules/pipelines/models/__init__.py +++ b/cognee/modules/pipelines/models/__init__.py @@ -6,3 +6,4 @@ from .PipelineRunInfo import ( PipelineRunCompleted, PipelineRunErrored, ) +from .DataItemStatus import DataItemStatus diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index e58c15254..b08f8e3bb 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -52,6 +52,7 @@ async def cognee_pipeline( pipeline_name: str = "custom_pipeline", vector_db_config: dict = None, graph_db_config: dict = None, + incremental_loading: bool = True, ): # Note: These context variables allow different value assignment for databases in Cognee # per async task, thread, process and etc. @@ -106,6 +107,7 @@ async def cognee_pipeline( data=data, pipeline_name=pipeline_name, context={"dataset": dataset}, + incremental_loading=incremental_loading, ): yield run_info @@ -117,6 +119,7 @@ async def run_pipeline( data=None, pipeline_name: str = "custom_pipeline", context: dict = None, + incremental_loading=True, ): check_dataset_name(dataset.name) @@ -184,7 +187,9 @@ async def run_pipeline( if not isinstance(task, Task): raise ValueError(f"Task {task} is not an instance of Task") - pipeline_run = run_tasks(tasks, dataset_id, data, user, pipeline_name, context) + pipeline_run = run_tasks( + tasks, dataset_id, data, user, pipeline_name, context, incremental_loading + ) async for pipeline_run_info in pipeline_run: yield pipeline_run_info diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 926d433fe..1f503f7d2 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -1,21 +1,31 @@ import os + +import asyncio from uuid import UUID from typing import Any from functools import wraps +from sqlalchemy import select +import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed from cognee.modules.users.models import User +from cognee.modules.data.models import Data +from cognee.infrastructure.files.utils.open_data_file import open_data_file from cognee.shared.logging_utils import get_logger from cognee.modules.users.methods import get_default_user from cognee.modules.pipelines.utils import generate_pipeline_id +from cognee.modules.pipelines.exceptions import PipelineRunFailedError +from cognee.tasks.ingestion import save_data_item_to_storage, resolve_data_directories from cognee.modules.pipelines.models.PipelineRunInfo import ( PipelineRunCompleted, PipelineRunErrored, PipelineRunStarted, PipelineRunYield, + PipelineRunAlreadyCompleted, ) +from cognee.modules.pipelines.models.DataItemStatus import DataItemStatus from cognee.modules.pipelines.operations import ( log_pipeline_run_start, @@ -56,34 +66,116 @@ async def run_tasks( user: User = None, pipeline_name: str = "unknown_pipeline", context: dict = None, + incremental_loading: bool = True, ): - if not user: - user = await get_default_user() + async def _run_tasks_data_item_incremental( + data_item, + dataset, + tasks, + pipeline_name, + pipeline_id, + pipeline_run_id, + context, + user, + ): + db_engine = get_relational_engine() + # If incremental_loading of data is set to True don't process documents already processed by pipeline + # If data is being added to Cognee for the first time calculate the id of the data + if not isinstance(data_item, Data): + file_path = await save_data_item_to_storage(data_item) + # Ingest data and add metadata + async with open_data_file(file_path) as file: + classified_data = ingestion.classify(file) + # data_id is the hash of file contents + owner id to avoid duplicate data + data_id = ingestion.identify(classified_data, user) + else: + # If data was already processed by Cognee get data id + data_id = data_item.id - # Get Dataset object - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - from cognee.modules.data.models import Dataset + # Check pipeline status, if Data already processed for pipeline before skip current processing + async with db_engine.get_async_session() as session: + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() + if data_point: + if ( + data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id)) + == DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED + ): + yield { + "run_info": PipelineRunAlreadyCompleted( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + ), + "data_id": data_id, + } + return - dataset = await session.get(Dataset, dataset_id) + try: + # Process data based on data_item and list of tasks + async for result in run_tasks_with_telemetry( + tasks=tasks, + data=[data_item], + user=user, + pipeline_name=pipeline_id, + context=context, + ): + yield PipelineRunYield( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + payload=result, + ) - pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name) + # Update pipeline status for Data element + async with db_engine.get_async_session() as session: + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() + data_point.pipeline_status[pipeline_name] = { + str(dataset.id): DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED + } + await session.merge(data_point) + await session.commit() - pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset_id, data) + yield { + "run_info": PipelineRunCompleted( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + ), + "data_id": data_id, + } - pipeline_run_id = pipeline_run.pipeline_run_id + except Exception as error: + # Temporarily swallow error and try to process rest of documents first, then re-raise error at end of data ingestion pipeline + logger.error( + f"Exception caught while processing data: {error}.\n Data processing failed for data item: {data_item}." + ) + yield { + "run_info": PipelineRunErrored( + pipeline_run_id=pipeline_run_id, + payload=repr(error), + dataset_id=dataset.id, + dataset_name=dataset.name, + ), + "data_id": data_id, + } - yield PipelineRunStarted( - pipeline_run_id=pipeline_run_id, - dataset_id=dataset.id, - dataset_name=dataset.name, - payload=data, - ) - - try: + async def _run_tasks_data_item_regular( + data_item, + dataset, + tasks, + pipeline_id, + pipeline_run_id, + context, + user, + ): + # Process data based on data_item and list of tasks async for result in run_tasks_with_telemetry( tasks=tasks, - data=data, + data=[data_item], user=user, pipeline_name=pipeline_id, context=context, @@ -95,6 +187,112 @@ async def run_tasks( payload=result, ) + yield { + "run_info": PipelineRunCompleted( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + ) + } + + async def _run_tasks_data_item( + data_item, + dataset, + tasks, + pipeline_name, + pipeline_id, + pipeline_run_id, + context, + user, + incremental_loading, + ): + # Go through async generator and return data item processing result. Result can be PipelineRunAlreadyCompleted when data item is skipped, + # PipelineRunCompleted when processing was successful and PipelineRunErrored if there were issues + result = None + if incremental_loading: + async for result in _run_tasks_data_item_incremental( + data_item=data_item, + dataset=dataset, + tasks=tasks, + pipeline_name=pipeline_name, + pipeline_id=pipeline_id, + pipeline_run_id=pipeline_run_id, + context=context, + user=user, + ): + pass + else: + async for result in _run_tasks_data_item_regular( + data_item=data_item, + dataset=dataset, + tasks=tasks, + pipeline_id=pipeline_id, + pipeline_run_id=pipeline_run_id, + context=context, + user=user, + ): + pass + + return result + + if not user: + user = await get_default_user() + + # Get Dataset object + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + from cognee.modules.data.models import Dataset + + dataset = await session.get(Dataset, dataset_id) + + pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name) + pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset_id, data) + pipeline_run_id = pipeline_run.pipeline_run_id + + yield PipelineRunStarted( + pipeline_run_id=pipeline_run_id, + dataset_id=dataset.id, + dataset_name=dataset.name, + payload=data, + ) + + try: + if not isinstance(data, list): + data = [data] + + if incremental_loading: + data = await resolve_data_directories(data) + + # Create async tasks per data item that will run the pipeline for the data item + data_item_tasks = [ + asyncio.create_task( + _run_tasks_data_item( + data_item, + dataset, + tasks, + pipeline_name, + pipeline_id, + pipeline_run_id, + context, + user, + incremental_loading, + ) + ) + for data_item in data + ] + results = await asyncio.gather(*data_item_tasks) + # Remove skipped data items from results + results = [result for result in results if result] + + # If any data item could not be processed propagate error + errored_results = [ + result for result in results if isinstance(result["run_info"], PipelineRunErrored) + ] + if errored_results: + raise PipelineRunFailedError( + message="Pipeline run failed. Data item could not be processed." + ) + await log_pipeline_run_complete( pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data ) @@ -103,6 +301,7 @@ async def run_tasks( pipeline_run_id=pipeline_run_id, dataset_id=dataset.id, dataset_name=dataset.name, + data_ingestion_info=results, ) graph_engine = await get_graph_engine() @@ -120,9 +319,14 @@ async def run_tasks( yield PipelineRunErrored( pipeline_run_id=pipeline_run_id, - payload=error, + payload=repr(error), dataset_id=dataset.id, dataset_name=dataset.name, + data_ingestion_info=locals().get( + "results" + ), # Returns results if they exist or returns None ) - raise error + # In case of error during incremental loading of data just let the user know the pipeline Errored, don't raise error + if not isinstance(error, PipelineRunFailedError): + raise error diff --git a/cognee/tasks/documents/extract_chunks_from_documents.py b/cognee/tasks/documents/extract_chunks_from_documents.py index 1d1870d98..216185495 100644 --- a/cognee/tasks/documents/extract_chunks_from_documents.py +++ b/cognee/tasks/documents/extract_chunks_from_documents.py @@ -8,7 +8,6 @@ from cognee.modules.data.models import Data from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.chunking.TextChunker import TextChunker from cognee.modules.chunking.Chunker import Chunker -from cognee.modules.data.processing.document_types.exceptions.exceptions import PyPdfInternalError async def update_document_token_count(document_id: UUID, token_count: int) -> None: @@ -40,15 +39,14 @@ async def extract_chunks_from_documents( """ for document in documents: document_token_count = 0 - try: - async for document_chunk in document.read( - max_chunk_size=max_chunk_size, chunker_cls=chunker - ): - document_token_count += document_chunk.chunk_size - document_chunk.belongs_to_set = document.belongs_to_set - yield document_chunk - await update_document_token_count(document.id, document_token_count) - except PyPdfInternalError: - pass + async for document_chunk in document.read( + max_chunk_size=max_chunk_size, chunker_cls=chunker + ): + document_token_count += document_chunk.chunk_size + document_chunk.belongs_to_set = document.belongs_to_set + yield document_chunk + + await update_document_token_count(document.id, document_token_count) + # todo rita diff --git a/cognee/tasks/ingestion/ingest_data.py b/cognee/tasks/ingestion/ingest_data.py index 846c183d4..429e04e5d 100644 --- a/cognee/tasks/ingestion/ingest_data.py +++ b/cognee/tasks/ingestion/ingest_data.py @@ -5,12 +5,12 @@ from uuid import UUID from typing import Union, BinaryIO, Any, List, Optional import cognee.modules.ingestion as ingestion -from cognee.infrastructure.files.utils.open_data_file import open_data_file from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.models import Data from cognee.modules.users.models import User from cognee.modules.users.methods import get_default_user from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets +from cognee.infrastructure.files.utils.open_data_file import open_data_file from cognee.modules.data.methods import ( get_authorized_existing_datasets, get_dataset_data, @@ -134,6 +134,7 @@ async def ingest_data( node_set=json.dumps(node_set) if node_set else None, data_size=file_metadata["file_size"], tenant_id=user.tenant_id if user.tenant_id else None, + pipeline_status={}, token_count=-1, ) diff --git a/cognee/tasks/ingestion/resolve_data_directories.py b/cognee/tasks/ingestion/resolve_data_directories.py index dfabcea0b..0f2f2a85f 100644 --- a/cognee/tasks/ingestion/resolve_data_directories.py +++ b/cognee/tasks/ingestion/resolve_data_directories.py @@ -40,6 +40,9 @@ async def resolve_data_directories( if include_subdirectories: base_path = item if item.endswith("/") else item + "/" s3_keys = fs.glob(base_path + "**") + # If path is not directory attempt to add item directly + if not s3_keys: + s3_keys = fs.ls(item) else: s3_keys = fs.ls(item) # Filter out keys that represent directories using fs.isdir diff --git a/cognee/tasks/repo_processor/get_repo_file_dependencies.py b/cognee/tasks/repo_processor/get_repo_file_dependencies.py index 232850936..b0cdb4402 100644 --- a/cognee/tasks/repo_processor/get_repo_file_dependencies.py +++ b/cognee/tasks/repo_processor/get_repo_file_dependencies.py @@ -103,6 +103,9 @@ async def get_repo_file_dependencies( extraction of dependencies (default is False). (default False) """ + if isinstance(repo_path, list) and len(repo_path) == 1: + repo_path = repo_path[0] + if not os.path.exists(repo_path): raise FileNotFoundError(f"Repository path {repo_path} does not exist.") diff --git a/cognee/tests/test_deduplication.py b/cognee/tests/test_deduplication.py index c449719c7..bef813317 100644 --- a/cognee/tests/test_deduplication.py +++ b/cognee/tests/test_deduplication.py @@ -26,8 +26,8 @@ async def test_deduplication(): explanation_file_path2 = os.path.join( pathlib.Path(__file__).parent, "test_data/Natural_language_processing_copy.txt" ) - await cognee.add([explanation_file_path], dataset_name) - await cognee.add([explanation_file_path2], dataset_name2) + await cognee.add([explanation_file_path], dataset_name, incremental_loading=False) + await cognee.add([explanation_file_path2], dataset_name2, incremental_loading=False) result = await relational_engine.get_all_data_from_table("data") assert len(result) == 1, "More than one data entity was found."