feat: Enable async execution of data items for incremental loading (#1092)

<!-- .github/pull_request_template.md -->

## Description
Attempt at making incremental loading run async

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Igor Ilic 2025-07-29 16:39:31 +02:00 committed by GitHub
parent f78af0cec3
commit 14ba3e8829
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 309 additions and 50 deletions

View file

@ -15,6 +15,7 @@ async def add(
vector_db_config: dict = None, vector_db_config: dict = None,
graph_db_config: dict = None, graph_db_config: dict = None,
dataset_id: Optional[UUID] = None, dataset_id: Optional[UUID] = None,
incremental_loading: bool = True,
): ):
""" """
Add data to Cognee for knowledge graph processing. Add data to Cognee for knowledge graph processing.
@ -153,6 +154,7 @@ async def add(
pipeline_name="add_pipeline", pipeline_name="add_pipeline",
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
): ):
pipeline_run_info = run_info pipeline_run_info = run_info

View file

@ -11,6 +11,7 @@ from typing import List, Optional, Union, Literal
from cognee.modules.users.models import User from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user from cognee.modules.users.methods import get_authenticated_user
from cognee.shared.utils import send_telemetry from cognee.shared.utils import send_telemetry
from cognee.modules.pipelines.models import PipelineRunErrored
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
logger = get_logger() logger = get_logger()
@ -100,6 +101,8 @@ def get_add_router() -> APIRouter:
else: else:
add_run = await cognee_add(data, datasetName, user=user, dataset_id=datasetId) add_run = await cognee_add(data, datasetName, user=user, dataset_id=datasetId)
if isinstance(add_run, PipelineRunErrored):
return JSONResponse(status_code=420, content=add_run.model_dump(mode="json"))
return add_run.model_dump() return add_run.model_dump()
except Exception as error: except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)}) return JSONResponse(status_code=409, content={"error": str(error)})

View file

@ -79,7 +79,9 @@ async def run_code_graph_pipeline(repo_path, include_docs=False):
async for run_status in non_code_pipeline_run: async for run_status in non_code_pipeline_run:
yield run_status yield run_status
async for run_status in run_tasks(tasks, dataset.id, repo_path, user, "cognify_code_pipeline"): async for run_status in run_tasks(
tasks, dataset.id, repo_path, user, "cognify_code_pipeline", incremental_loading=False
):
yield run_status yield run_status

View file

@ -39,6 +39,7 @@ async def cognify(
vector_db_config: dict = None, vector_db_config: dict = None,
graph_db_config: dict = None, graph_db_config: dict = None,
run_in_background: bool = False, run_in_background: bool = False,
incremental_loading: bool = True,
): ):
""" """
Transform ingested data into a structured knowledge graph. Transform ingested data into a structured knowledge graph.
@ -194,6 +195,7 @@ async def cognify(
datasets=datasets, datasets=datasets,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
) )
else: else:
return await run_cognify_blocking( return await run_cognify_blocking(
@ -202,6 +204,7 @@ async def cognify(
datasets=datasets, datasets=datasets,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
) )
@ -211,6 +214,7 @@ async def run_cognify_blocking(
datasets, datasets,
graph_db_config: dict = None, graph_db_config: dict = None,
vector_db_config: dict = False, vector_db_config: dict = False,
incremental_loading: bool = True,
): ):
total_run_info = {} total_run_info = {}
@ -221,6 +225,7 @@ async def run_cognify_blocking(
pipeline_name="cognify_pipeline", pipeline_name="cognify_pipeline",
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
incremental_loading=incremental_loading,
): ):
if run_info.dataset_id: if run_info.dataset_id:
total_run_info[run_info.dataset_id] = run_info total_run_info[run_info.dataset_id] = run_info
@ -236,6 +241,7 @@ async def run_cognify_as_background_process(
datasets, datasets,
graph_db_config: dict = None, graph_db_config: dict = None,
vector_db_config: dict = False, vector_db_config: dict = False,
incremental_loading: bool = True,
): ):
# Convert dataset to list if it's a string # Convert dataset to list if it's a string
if isinstance(datasets, str): if isinstance(datasets, str):
@ -246,6 +252,7 @@ async def run_cognify_as_background_process(
async def handle_rest_of_the_run(pipeline_list): async def handle_rest_of_the_run(pipeline_list):
# Execute all provided pipelines one by one to avoid database write conflicts # Execute all provided pipelines one by one to avoid database write conflicts
# TODO: Convert to async gather task instead of for loop when Queue mechanism for database is created
for pipeline in pipeline_list: for pipeline in pipeline_list:
while True: while True:
try: try:
@ -270,6 +277,7 @@ async def run_cognify_as_background_process(
pipeline_name="cognify_pipeline", pipeline_name="cognify_pipeline",
graph_db_config=graph_db_config, graph_db_config=graph_db_config,
vector_db_config=vector_db_config, vector_db_config=vector_db_config,
incremental_loading=incremental_loading,
) )
# Save dataset Pipeline run started info # Save dataset Pipeline run started info

View file

@ -16,7 +16,11 @@ from cognee.modules.graph.methods import get_formatted_graph_data
from cognee.modules.users.get_user_manager import get_user_manager_context from cognee.modules.users.get_user_manager import get_user_manager_context
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.users.authentication.default.default_jwt_strategy import DefaultJWTStrategy from cognee.modules.users.authentication.default.default_jwt_strategy import DefaultJWTStrategy
from cognee.modules.pipelines.models.PipelineRunInfo import PipelineRunCompleted, PipelineRunInfo from cognee.modules.pipelines.models.PipelineRunInfo import (
PipelineRunCompleted,
PipelineRunInfo,
PipelineRunErrored,
)
from cognee.modules.pipelines.queues.pipeline_run_info_queues import ( from cognee.modules.pipelines.queues.pipeline_run_info_queues import (
get_from_queue, get_from_queue,
initialize_queue, initialize_queue,
@ -105,6 +109,9 @@ def get_cognify_router() -> APIRouter:
datasets, user, run_in_background=payload.run_in_background datasets, user, run_in_background=payload.run_in_background
) )
# If any cognify run errored return JSONResponse with proper error status code
if any(isinstance(v, PipelineRunErrored) for v in cognify_run.values()):
return JSONResponse(status_code=420, content=cognify_run)
return cognify_run return cognify_run
except Exception as error: except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)}) return JSONResponse(status_code=409, content={"error": str(error)})

View file

@ -1,6 +1,7 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from uuid import uuid4 from uuid import uuid4
from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from cognee.infrastructure.databases.relational import Base from cognee.infrastructure.databases.relational import Base
@ -21,7 +22,11 @@ class Data(Base):
tenant_id = Column(UUID, index=True, nullable=True) tenant_id = Column(UUID, index=True, nullable=True)
content_hash = Column(String) content_hash = Column(String)
external_metadata = Column(JSON) external_metadata = Column(JSON)
node_set = Column(JSON, nullable=True) # Store NodeSet as JSON list of strings # Store NodeSet as JSON list of strings
node_set = Column(JSON, nullable=True)
# MutableDict allows SQLAlchemy to notice key-value pair changes, without it changing a value for a key
# wouldn't be noticed when commiting a database session
pipeline_status = Column(MutableDict.as_mutable(JSON))
token_count = Column(Integer) token_count = Column(Integer)
data_size = Column(Integer, nullable=True) # File size in bytes data_size = Column(Integer, nullable=True) # File size in bytes
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))

View file

@ -5,7 +5,6 @@ from cognee.modules.chunking.Chunker import Chunker
from cognee.infrastructure.files.utils.open_data_file import open_data_file from cognee.infrastructure.files.utils.open_data_file import open_data_file
from .Document import Document from .Document import Document
from .exceptions.exceptions import PyPdfInternalError
logger = get_logger("PDFDocument") logger = get_logger("PDFDocument")
@ -17,18 +16,12 @@ class PdfDocument(Document):
async with open_data_file(self.raw_data_location, mode="rb") as stream: async with open_data_file(self.raw_data_location, mode="rb") as stream:
logger.info(f"Reading PDF: {self.raw_data_location}") logger.info(f"Reading PDF: {self.raw_data_location}")
try: file = PdfReader(stream, strict=False)
file = PdfReader(stream, strict=False)
except Exception:
raise PyPdfInternalError()
async def get_text(): async def get_text():
try: for page in file.pages:
for page in file.pages: page_text = page.extract_text()
page_text = page.extract_text() yield page_text
yield page_text
except Exception:
raise PyPdfInternalError()
chunker = chunker_cls(self, get_text=get_text, max_chunk_size=max_chunk_size) chunker = chunker_cls(self, get_text=get_text, max_chunk_size=max_chunk_size)

View file

@ -0,0 +1 @@
from .exceptions import PipelineRunFailedError

View file

@ -0,0 +1,12 @@
from cognee.exceptions import CogneeApiError
from fastapi import status
class PipelineRunFailedError(CogneeApiError):
def __init__(
self,
message: str = "Pipeline run failed.",
name: str = "PipelineRunFailedError",
status_code: int = status.HTTP_422_UNPROCESSABLE_ENTITY,
):
super().__init__(message, name, status_code)

View file

@ -0,0 +1,5 @@
import enum
class DataItemStatus(str, enum.Enum):
DATA_ITEM_PROCESSING_COMPLETED = "DATA_ITEM_PROCESSING_COMPLETED"

View file

@ -9,6 +9,7 @@ class PipelineRunInfo(BaseModel):
dataset_id: UUID dataset_id: UUID
dataset_name: str dataset_name: str
payload: Optional[Any] = None payload: Optional[Any] = None
data_ingestion_info: Optional[list] = None
model_config = { model_config = {
"arbitrary_types_allowed": True, "arbitrary_types_allowed": True,
@ -30,6 +31,11 @@ class PipelineRunCompleted(PipelineRunInfo):
pass pass
class PipelineRunAlreadyCompleted(PipelineRunInfo):
status: str = "PipelineRunAlreadyCompleted"
pass
class PipelineRunErrored(PipelineRunInfo): class PipelineRunErrored(PipelineRunInfo):
status: str = "PipelineRunErrored" status: str = "PipelineRunErrored"
pass pass

View file

@ -6,3 +6,4 @@ from .PipelineRunInfo import (
PipelineRunCompleted, PipelineRunCompleted,
PipelineRunErrored, PipelineRunErrored,
) )
from .DataItemStatus import DataItemStatus

View file

@ -52,6 +52,7 @@ async def cognee_pipeline(
pipeline_name: str = "custom_pipeline", pipeline_name: str = "custom_pipeline",
vector_db_config: dict = None, vector_db_config: dict = None,
graph_db_config: dict = None, graph_db_config: dict = None,
incremental_loading: bool = True,
): ):
# Note: These context variables allow different value assignment for databases in Cognee # Note: These context variables allow different value assignment for databases in Cognee
# per async task, thread, process and etc. # per async task, thread, process and etc.
@ -106,6 +107,7 @@ async def cognee_pipeline(
data=data, data=data,
pipeline_name=pipeline_name, pipeline_name=pipeline_name,
context={"dataset": dataset}, context={"dataset": dataset},
incremental_loading=incremental_loading,
): ):
yield run_info yield run_info
@ -117,6 +119,7 @@ async def run_pipeline(
data=None, data=None,
pipeline_name: str = "custom_pipeline", pipeline_name: str = "custom_pipeline",
context: dict = None, context: dict = None,
incremental_loading=True,
): ):
check_dataset_name(dataset.name) check_dataset_name(dataset.name)
@ -184,7 +187,9 @@ async def run_pipeline(
if not isinstance(task, Task): if not isinstance(task, Task):
raise ValueError(f"Task {task} is not an instance of Task") raise ValueError(f"Task {task} is not an instance of Task")
pipeline_run = run_tasks(tasks, dataset_id, data, user, pipeline_name, context) pipeline_run = run_tasks(
tasks, dataset_id, data, user, pipeline_name, context, incremental_loading
)
async for pipeline_run_info in pipeline_run: async for pipeline_run_info in pipeline_run:
yield pipeline_run_info yield pipeline_run_info

View file

@ -1,21 +1,31 @@
import os import os
import asyncio
from uuid import UUID from uuid import UUID
from typing import Any from typing import Any
from functools import wraps from functools import wraps
from sqlalchemy import select
import cognee.modules.ingestion as ingestion
from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed
from cognee.modules.users.models import User from cognee.modules.users.models import User
from cognee.modules.data.models import Data
from cognee.infrastructure.files.utils.open_data_file import open_data_file
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines.utils import generate_pipeline_id from cognee.modules.pipelines.utils import generate_pipeline_id
from cognee.modules.pipelines.exceptions import PipelineRunFailedError
from cognee.tasks.ingestion import save_data_item_to_storage, resolve_data_directories
from cognee.modules.pipelines.models.PipelineRunInfo import ( from cognee.modules.pipelines.models.PipelineRunInfo import (
PipelineRunCompleted, PipelineRunCompleted,
PipelineRunErrored, PipelineRunErrored,
PipelineRunStarted, PipelineRunStarted,
PipelineRunYield, PipelineRunYield,
PipelineRunAlreadyCompleted,
) )
from cognee.modules.pipelines.models.DataItemStatus import DataItemStatus
from cognee.modules.pipelines.operations import ( from cognee.modules.pipelines.operations import (
log_pipeline_run_start, log_pipeline_run_start,
@ -56,34 +66,116 @@ async def run_tasks(
user: User = None, user: User = None,
pipeline_name: str = "unknown_pipeline", pipeline_name: str = "unknown_pipeline",
context: dict = None, context: dict = None,
incremental_loading: bool = True,
): ):
if not user: async def _run_tasks_data_item_incremental(
user = await get_default_user() data_item,
dataset,
tasks,
pipeline_name,
pipeline_id,
pipeline_run_id,
context,
user,
):
db_engine = get_relational_engine()
# If incremental_loading of data is set to True don't process documents already processed by pipeline
# If data is being added to Cognee for the first time calculate the id of the data
if not isinstance(data_item, Data):
file_path = await save_data_item_to_storage(data_item)
# Ingest data and add metadata
async with open_data_file(file_path) as file:
classified_data = ingestion.classify(file)
# data_id is the hash of file contents + owner id to avoid duplicate data
data_id = ingestion.identify(classified_data, user)
else:
# If data was already processed by Cognee get data id
data_id = data_item.id
# Get Dataset object # Check pipeline status, if Data already processed for pipeline before skip current processing
db_engine = get_relational_engine() async with db_engine.get_async_session() as session:
async with db_engine.get_async_session() as session: data_point = (
from cognee.modules.data.models import Dataset await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
if data_point:
if (
data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id))
== DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
):
yield {
"run_info": PipelineRunAlreadyCompleted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
),
"data_id": data_id,
}
return
dataset = await session.get(Dataset, dataset_id) try:
# Process data based on data_item and list of tasks
async for result in run_tasks_with_telemetry(
tasks=tasks,
data=[data_item],
user=user,
pipeline_name=pipeline_id,
context=context,
):
yield PipelineRunYield(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
payload=result,
)
pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name) # Update pipeline status for Data element
async with db_engine.get_async_session() as session:
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
data_point.pipeline_status[pipeline_name] = {
str(dataset.id): DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
}
await session.merge(data_point)
await session.commit()
pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset_id, data) yield {
"run_info": PipelineRunCompleted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
),
"data_id": data_id,
}
pipeline_run_id = pipeline_run.pipeline_run_id except Exception as error:
# Temporarily swallow error and try to process rest of documents first, then re-raise error at end of data ingestion pipeline
logger.error(
f"Exception caught while processing data: {error}.\n Data processing failed for data item: {data_item}."
)
yield {
"run_info": PipelineRunErrored(
pipeline_run_id=pipeline_run_id,
payload=repr(error),
dataset_id=dataset.id,
dataset_name=dataset.name,
),
"data_id": data_id,
}
yield PipelineRunStarted( async def _run_tasks_data_item_regular(
pipeline_run_id=pipeline_run_id, data_item,
dataset_id=dataset.id, dataset,
dataset_name=dataset.name, tasks,
payload=data, pipeline_id,
) pipeline_run_id,
context,
try: user,
):
# Process data based on data_item and list of tasks
async for result in run_tasks_with_telemetry( async for result in run_tasks_with_telemetry(
tasks=tasks, tasks=tasks,
data=data, data=[data_item],
user=user, user=user,
pipeline_name=pipeline_id, pipeline_name=pipeline_id,
context=context, context=context,
@ -95,6 +187,112 @@ async def run_tasks(
payload=result, payload=result,
) )
yield {
"run_info": PipelineRunCompleted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
)
}
async def _run_tasks_data_item(
data_item,
dataset,
tasks,
pipeline_name,
pipeline_id,
pipeline_run_id,
context,
user,
incremental_loading,
):
# Go through async generator and return data item processing result. Result can be PipelineRunAlreadyCompleted when data item is skipped,
# PipelineRunCompleted when processing was successful and PipelineRunErrored if there were issues
result = None
if incremental_loading:
async for result in _run_tasks_data_item_incremental(
data_item=data_item,
dataset=dataset,
tasks=tasks,
pipeline_name=pipeline_name,
pipeline_id=pipeline_id,
pipeline_run_id=pipeline_run_id,
context=context,
user=user,
):
pass
else:
async for result in _run_tasks_data_item_regular(
data_item=data_item,
dataset=dataset,
tasks=tasks,
pipeline_id=pipeline_id,
pipeline_run_id=pipeline_run_id,
context=context,
user=user,
):
pass
return result
if not user:
user = await get_default_user()
# Get Dataset object
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
from cognee.modules.data.models import Dataset
dataset = await session.get(Dataset, dataset_id)
pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name)
pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset_id, data)
pipeline_run_id = pipeline_run.pipeline_run_id
yield PipelineRunStarted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
payload=data,
)
try:
if not isinstance(data, list):
data = [data]
if incremental_loading:
data = await resolve_data_directories(data)
# Create async tasks per data item that will run the pipeline for the data item
data_item_tasks = [
asyncio.create_task(
_run_tasks_data_item(
data_item,
dataset,
tasks,
pipeline_name,
pipeline_id,
pipeline_run_id,
context,
user,
incremental_loading,
)
)
for data_item in data
]
results = await asyncio.gather(*data_item_tasks)
# Remove skipped data items from results
results = [result for result in results if result]
# If any data item could not be processed propagate error
errored_results = [
result for result in results if isinstance(result["run_info"], PipelineRunErrored)
]
if errored_results:
raise PipelineRunFailedError(
message="Pipeline run failed. Data item could not be processed."
)
await log_pipeline_run_complete( await log_pipeline_run_complete(
pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data
) )
@ -103,6 +301,7 @@ async def run_tasks(
pipeline_run_id=pipeline_run_id, pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id, dataset_id=dataset.id,
dataset_name=dataset.name, dataset_name=dataset.name,
data_ingestion_info=results,
) )
graph_engine = await get_graph_engine() graph_engine = await get_graph_engine()
@ -120,9 +319,14 @@ async def run_tasks(
yield PipelineRunErrored( yield PipelineRunErrored(
pipeline_run_id=pipeline_run_id, pipeline_run_id=pipeline_run_id,
payload=error, payload=repr(error),
dataset_id=dataset.id, dataset_id=dataset.id,
dataset_name=dataset.name, dataset_name=dataset.name,
data_ingestion_info=locals().get(
"results"
), # Returns results if they exist or returns None
) )
raise error # In case of error during incremental loading of data just let the user know the pipeline Errored, don't raise error
if not isinstance(error, PipelineRunFailedError):
raise error

View file

@ -8,7 +8,6 @@ from cognee.modules.data.models import Data
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.chunking.TextChunker import TextChunker from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.chunking.Chunker import Chunker from cognee.modules.chunking.Chunker import Chunker
from cognee.modules.data.processing.document_types.exceptions.exceptions import PyPdfInternalError
async def update_document_token_count(document_id: UUID, token_count: int) -> None: async def update_document_token_count(document_id: UUID, token_count: int) -> None:
@ -40,15 +39,14 @@ async def extract_chunks_from_documents(
""" """
for document in documents: for document in documents:
document_token_count = 0 document_token_count = 0
try:
async for document_chunk in document.read(
max_chunk_size=max_chunk_size, chunker_cls=chunker
):
document_token_count += document_chunk.chunk_size
document_chunk.belongs_to_set = document.belongs_to_set
yield document_chunk
await update_document_token_count(document.id, document_token_count) async for document_chunk in document.read(
except PyPdfInternalError: max_chunk_size=max_chunk_size, chunker_cls=chunker
pass ):
document_token_count += document_chunk.chunk_size
document_chunk.belongs_to_set = document.belongs_to_set
yield document_chunk
await update_document_token_count(document.id, document_token_count)
# todo rita # todo rita

View file

@ -5,12 +5,12 @@ from uuid import UUID
from typing import Union, BinaryIO, Any, List, Optional from typing import Union, BinaryIO, Any, List, Optional
import cognee.modules.ingestion as ingestion import cognee.modules.ingestion as ingestion
from cognee.infrastructure.files.utils.open_data_file import open_data_file
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data from cognee.modules.data.models import Data
from cognee.modules.users.models import User from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
from cognee.infrastructure.files.utils.open_data_file import open_data_file
from cognee.modules.data.methods import ( from cognee.modules.data.methods import (
get_authorized_existing_datasets, get_authorized_existing_datasets,
get_dataset_data, get_dataset_data,
@ -134,6 +134,7 @@ async def ingest_data(
node_set=json.dumps(node_set) if node_set else None, node_set=json.dumps(node_set) if node_set else None,
data_size=file_metadata["file_size"], data_size=file_metadata["file_size"],
tenant_id=user.tenant_id if user.tenant_id else None, tenant_id=user.tenant_id if user.tenant_id else None,
pipeline_status={},
token_count=-1, token_count=-1,
) )

View file

@ -40,6 +40,9 @@ async def resolve_data_directories(
if include_subdirectories: if include_subdirectories:
base_path = item if item.endswith("/") else item + "/" base_path = item if item.endswith("/") else item + "/"
s3_keys = fs.glob(base_path + "**") s3_keys = fs.glob(base_path + "**")
# If path is not directory attempt to add item directly
if not s3_keys:
s3_keys = fs.ls(item)
else: else:
s3_keys = fs.ls(item) s3_keys = fs.ls(item)
# Filter out keys that represent directories using fs.isdir # Filter out keys that represent directories using fs.isdir

View file

@ -103,6 +103,9 @@ async def get_repo_file_dependencies(
extraction of dependencies (default is False). (default False) extraction of dependencies (default is False). (default False)
""" """
if isinstance(repo_path, list) and len(repo_path) == 1:
repo_path = repo_path[0]
if not os.path.exists(repo_path): if not os.path.exists(repo_path):
raise FileNotFoundError(f"Repository path {repo_path} does not exist.") raise FileNotFoundError(f"Repository path {repo_path} does not exist.")

View file

@ -26,8 +26,8 @@ async def test_deduplication():
explanation_file_path2 = os.path.join( explanation_file_path2 = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing_copy.txt" pathlib.Path(__file__).parent, "test_data/Natural_language_processing_copy.txt"
) )
await cognee.add([explanation_file_path], dataset_name) await cognee.add([explanation_file_path], dataset_name, incremental_loading=False)
await cognee.add([explanation_file_path2], dataset_name2) await cognee.add([explanation_file_path2], dataset_name2, incremental_loading=False)
result = await relational_engine.get_all_data_from_table("data") result = await relational_engine.get_all_data_from_table("data")
assert len(result) == 1, "More than one data entity was found." assert len(result) == 1, "More than one data entity was found."