Merge remote-tracking branch 'origin/dev' into feature/cog-2516-refactor-cognify-pipeline

This commit is contained in:
Boris Arzentar 2025-07-30 15:11:56 +02:00
commit 088ca317fd
No known key found for this signature in database
GPG key ID: D5CC274C784807B7
24 changed files with 379 additions and 74 deletions

View file

@ -1,6 +1,6 @@
"use client";
import { MutableRefObject, useEffect, useImperativeHandle, useRef, useState } from "react";
import { MutableRefObject, useEffect, useImperativeHandle, useRef, useState, useCallback } from "react";
import { forceCollide, forceManyBody } from "d3-force-3d";
import ForceGraph, { ForceGraphMethods, GraphData, LinkObject, NodeObject } from "react-force-graph-2d";
import { GraphControlsAPI } from "./GraphControls";
@ -22,6 +22,45 @@ export default function GraphVisualization({ ref, data, graphControls }: GraphVi
const nodeSize = 15;
// const addNodeDistanceFromSourceNode = 15;
// State for tracking container dimensions
const [dimensions, setDimensions] = useState({ width: 0, height: 0 });
const containerRef = useRef<HTMLDivElement>(null);
// Handle resize
const handleResize = useCallback(() => {
if (containerRef.current) {
const { clientWidth, clientHeight } = containerRef.current;
setDimensions({ width: clientWidth, height: clientHeight });
// Trigger graph refresh after resize
if (graphRef.current) {
// Small delay to ensure DOM has updated
setTimeout(() => {
graphRef.current?.zoomToFit(1000,50);
}, 100);
}
}
}, []);
// Set up resize observer
useEffect(() => {
// Initial size calculation
handleResize();
// ResizeObserver
const resizeObserver = new ResizeObserver(() => {
handleResize();
});
if (containerRef.current) {
resizeObserver.observe(containerRef.current);
}
return () => {
resizeObserver.disconnect();
};
}, [handleResize]);
const handleNodeClick = (node: NodeObject) => {
graphControls.current?.setSelectedNode(node);
// ref.current?.d3ReheatSimulation()
@ -174,10 +213,12 @@ export default function GraphVisualization({ ref, data, graphControls }: GraphVi
}));
return (
<div className="w-full h-full" id="graph-container">
<div ref={containerRef} className="w-full h-full" id="graph-container">
{(data && typeof window !== "undefined") ? (
<ForceGraph
ref={graphRef}
width={dimensions.width}
height={dimensions.height}
dagMode={graphShape as unknown as undefined}
dagLevelDistance={300}
onDagError={handleDagError}
@ -201,6 +242,8 @@ export default function GraphVisualization({ ref, data, graphControls }: GraphVi
) : (
<ForceGraph
ref={graphRef}
width={dimensions.width}
height={dimensions.height}
dagMode={graphShape as unknown as undefined}
dagLevelDistance={100}
graphData={{

View file

@ -22,6 +22,7 @@ async def add(
user: Optional[User] = None,
node_set: Optional[List[str]] = None,
dataset_id: Optional[UUID] = None,
incremental_loading: bool = True,
):
"""
Add data to Cognee for knowledge graph processing.
@ -178,6 +179,7 @@ async def add(
dataset=authorized_dataset,
user=user,
pipeline_name="add_pipeline",
incremental_loading=incremental_loading,
):
pipeline_run_info = run_info

View file

@ -12,6 +12,7 @@ from typing import BinaryIO, List, Literal, Optional, Union
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
from cognee.shared.utils import send_telemetry
from cognee.modules.pipelines.models import PipelineRunErrored
from cognee.shared.logging_utils import get_logger
logger = get_logger()
@ -107,6 +108,9 @@ def get_add_router() -> APIRouter:
data, dataset_name=datasetName, user=user, dataset_id=datasetId
)
if isinstance(add_run, PipelineRunErrored):
return JSONResponse(status_code=420, content=add_run.model_dump(mode="json"))
return add_run.model_dump() if add_run else None
except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)})

View file

@ -79,7 +79,9 @@ async def run_code_graph_pipeline(repo_path, include_docs=False):
async for run_status in non_code_pipeline_run:
yield run_status
async for run_status in run_tasks(tasks, dataset.id, repo_path, user, "cognify_code_pipeline"):
async for run_status in run_tasks(
tasks, dataset.id, repo_path, user, "cognify_code_pipeline", incremental_loading=False
):
yield run_status

View file

@ -41,6 +41,7 @@ async def cognify(
vector_db_config: dict = None,
graph_db_config: dict = None,
run_in_background: bool = False,
incremental_loading: bool = True,
):
"""
Transform ingested data into a structured knowledge graph.
@ -204,6 +205,7 @@ async def cognify(
datasets=user_datasets,
vector_db_config=vector_db_config,
graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
)
else:
return await run_cognify_blocking(
@ -212,6 +214,7 @@ async def cognify(
datasets=user_datasets,
vector_db_config=vector_db_config,
graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
)
@ -221,6 +224,7 @@ async def run_cognify_blocking(
datasets,
graph_db_config: dict = None,
vector_db_config: dict = False,
incremental_loading: bool = True,
):
total_run_info = {}
@ -231,6 +235,7 @@ async def run_cognify_blocking(
pipeline_name="cognify_pipeline",
graph_db_config=graph_db_config,
vector_db_config=vector_db_config,
incremental_loading=incremental_loading,
):
if run_info.dataset_id:
total_run_info[run_info.dataset_id] = run_info
@ -246,6 +251,7 @@ async def run_cognify_as_background_process(
datasets,
graph_db_config: dict = None,
vector_db_config: dict = False,
incremental_loading: bool = True,
):
# Convert dataset to list if it's a string
if isinstance(datasets, str):
@ -256,6 +262,7 @@ async def run_cognify_as_background_process(
async def handle_rest_of_the_run(pipeline_list):
# Execute all provided pipelines one by one to avoid database write conflicts
# TODO: Convert to async gather task instead of for loop when Queue mechanism for database is created
for pipeline in pipeline_list:
while True:
try:
@ -280,6 +287,7 @@ async def run_cognify_as_background_process(
pipeline_name="cognify_pipeline",
graph_db_config=graph_db_config,
vector_db_config=vector_db_config,
incremental_loading=incremental_loading,
)
# Save dataset Pipeline run started info

View file

@ -16,7 +16,11 @@ from cognee.modules.graph.methods import get_formatted_graph_data
from cognee.modules.users.get_user_manager import get_user_manager_context
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.users.authentication.default.default_jwt_strategy import DefaultJWTStrategy
from cognee.modules.pipelines.models.PipelineRunInfo import PipelineRunCompleted, PipelineRunInfo
from cognee.modules.pipelines.models.PipelineRunInfo import (
PipelineRunCompleted,
PipelineRunInfo,
PipelineRunErrored,
)
from cognee.modules.pipelines.queues.pipeline_run_info_queues import (
get_from_queue,
initialize_queue,
@ -105,6 +109,9 @@ def get_cognify_router() -> APIRouter:
datasets, user, run_in_background=payload.run_in_background
)
# If any cognify run errored return JSONResponse with proper error status code
if any(isinstance(v, PipelineRunErrored) for v in cognify_run.values()):
return JSONResponse(status_code=420, content=cognify_run)
return cognify_run
except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)})

View file

@ -1,6 +1,7 @@
from datetime import datetime, timezone
from uuid import uuid4
from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import relationship
from cognee.infrastructure.databases.relational import Base
@ -21,7 +22,11 @@ class Data(Base):
tenant_id = Column(UUID, index=True, nullable=True)
content_hash = Column(String)
external_metadata = Column(JSON)
node_set = Column(JSON, nullable=True) # Store NodeSet as JSON list of strings
# Store NodeSet as JSON list of strings
node_set = Column(JSON, nullable=True)
# MutableDict allows SQLAlchemy to notice key-value pair changes, without it changing a value for a key
# wouldn't be noticed when commiting a database session
pipeline_status = Column(MutableDict.as_mutable(JSON))
token_count = Column(Integer)
data_size = Column(Integer, nullable=True) # File size in bytes
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))

View file

@ -5,7 +5,6 @@ from cognee.modules.chunking.Chunker import Chunker
from cognee.infrastructure.files.utils.open_data_file import open_data_file
from .Document import Document
from .exceptions.exceptions import PyPdfInternalError
logger = get_logger("PDFDocument")
@ -17,18 +16,12 @@ class PdfDocument(Document):
async with open_data_file(self.raw_data_location, mode="rb") as stream:
logger.info(f"Reading PDF: {self.raw_data_location}")
try:
file = PdfReader(stream, strict=False)
except Exception:
raise PyPdfInternalError()
file = PdfReader(stream, strict=False)
async def get_text():
try:
for page in file.pages:
page_text = page.extract_text()
yield page_text
except Exception:
raise PyPdfInternalError()
for page in file.pages:
page_text = page.extract_text()
yield page_text
chunker = chunker_cls(self, get_text=get_text, max_chunk_size=max_chunk_size)

View file

@ -0,0 +1,5 @@
from uuid import NAMESPACE_OID, uuid5
def generate_edge_id(edge_id: str) -> str:
return uuid5(NAMESPACE_OID, edge_id.lower().replace(" ", "_").replace("'", ""))

View file

@ -170,28 +170,19 @@ class CogneeGraph(CogneeAbstractGraph):
for edge in self.edges:
relationship_type = edge.attributes.get("relationship_type")
if relationship_type and relationship_type in embedding_map:
edge.attributes["vector_distance"] = embedding_map[relationship_type]
distance = embedding_map.get(relationship_type, None)
if distance is not None:
edge.attributes["vector_distance"] = distance
except Exception as ex:
logger.error(f"Error mapping vector distances to edges: {str(ex)}")
raise ex
async def calculate_top_triplet_importances(self, k: int) -> List:
min_heap = []
def score(edge):
n1 = edge.node1.attributes.get("vector_distance", 1)
n2 = edge.node2.attributes.get("vector_distance", 1)
e = edge.attributes.get("vector_distance", 1)
return n1 + n2 + e
for i, edge in enumerate(self.edges):
source_node = self.get_node(edge.node1.id)
target_node = self.get_node(edge.node2.id)
source_distance = source_node.attributes.get("vector_distance", 1) if source_node else 1
target_distance = target_node.attributes.get("vector_distance", 1) if target_node else 1
edge_distance = edge.attributes.get("vector_distance", 1)
total_distance = source_distance + target_distance + edge_distance
heapq.heappush(min_heap, (-total_distance, i, edge))
if len(min_heap) > k:
heapq.heappop(min_heap)
return [edge for _, _, edge in sorted(min_heap)]
return heapq.nsmallest(k, self.edges, key=score)

View file

@ -0,0 +1 @@
from .exceptions import PipelineRunFailedError

View file

@ -0,0 +1,12 @@
from cognee.exceptions import CogneeApiError
from fastapi import status
class PipelineRunFailedError(CogneeApiError):
def __init__(
self,
message: str = "Pipeline run failed.",
name: str = "PipelineRunFailedError",
status_code: int = status.HTTP_422_UNPROCESSABLE_ENTITY,
):
super().__init__(message, name, status_code)

View file

@ -0,0 +1,5 @@
import enum
class DataItemStatus(str, enum.Enum):
DATA_ITEM_PROCESSING_COMPLETED = "DATA_ITEM_PROCESSING_COMPLETED"

View file

@ -9,6 +9,7 @@ class PipelineRunInfo(BaseModel):
dataset_id: UUID
dataset_name: str
payload: Optional[Any] = None
data_ingestion_info: Optional[list] = None
model_config = {
"arbitrary_types_allowed": True,
@ -30,6 +31,11 @@ class PipelineRunCompleted(PipelineRunInfo):
pass
class PipelineRunAlreadyCompleted(PipelineRunInfo):
status: str = "PipelineRunAlreadyCompleted"
pass
class PipelineRunErrored(PipelineRunInfo):
status: str = "PipelineRunErrored"
pass

View file

@ -6,3 +6,4 @@ from .PipelineRunInfo import (
PipelineRunCompleted,
PipelineRunErrored,
)
from .DataItemStatus import DataItemStatus

View file

@ -46,6 +46,7 @@ async def cognee_pipeline(
pipeline_name: str = "custom_pipeline",
vector_db_config: dict = None,
graph_db_config: dict = None,
incremental_loading: bool = True,
):
# Note: These context variables allow different value assignment for databases in Cognee
# per async task, thread, process and etc.
@ -100,6 +101,7 @@ async def cognee_pipeline(
data=data,
pipeline_name=pipeline_name,
context={"dataset": dataset},
incremental_loading=incremental_loading,
):
yield run_info
@ -111,6 +113,7 @@ async def run_pipeline(
data=None,
pipeline_name: str = "custom_pipeline",
context: dict = None,
incremental_loading=True,
):
# Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True
await set_database_global_context_variables(dataset.id, dataset.owner_id)
@ -149,7 +152,9 @@ async def run_pipeline(
)
return
pipeline_run = run_tasks(tasks, dataset_id, data, user, pipeline_name, context)
pipeline_run = run_tasks(
tasks, dataset_id, data, user, pipeline_name, context, incremental_loading
)
async for pipeline_run_info in pipeline_run:
yield pipeline_run_info

View file

@ -1,3 +1,4 @@
from typing import Optional
from cognee.shared.logging_utils import get_logger
from cognee.modules.users.models import User
from cognee.modules.data.models import Dataset
@ -17,6 +18,7 @@ async def run_add_pipeline(
dataset: Dataset,
user: User,
pipeline_name: str = "add_pipeline",
incremental_loading: Optional[bool] = True,
):
await set_database_global_context_variables(dataset.id, dataset.owner_id)
@ -30,6 +32,7 @@ async def run_add_pipeline(
"user": user,
"dataset": dataset,
},
incremental_loading,
)
async for pipeline_run_info in pipeline_run:

View file

@ -1,21 +1,31 @@
import os
from uuid import UUID
from typing import Any
from functools import wraps
import asyncio
from uuid import UUID
from typing import Any, Optional
from functools import wraps
from sqlalchemy import select
import cognee.modules.ingestion as ingestion
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed
from cognee.modules.users.models import User
from cognee.modules.data.models import Data
from cognee.infrastructure.files.utils.open_data_file import open_data_file
from cognee.shared.logging_utils import get_logger
from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines.utils import generate_pipeline_id
from cognee.modules.pipelines.exceptions import PipelineRunFailedError
from cognee.tasks.ingestion import save_data_item_to_storage, resolve_data_directories
from cognee.modules.pipelines.models.PipelineRunInfo import (
PipelineRunCompleted,
PipelineRunErrored,
PipelineRunStarted,
PipelineRunYield,
PipelineRunAlreadyCompleted,
)
from cognee.modules.pipelines.models.DataItemStatus import DataItemStatus
from cognee.modules.pipelines.operations import (
log_pipeline_run_start,
@ -56,34 +66,116 @@ async def run_tasks(
user: User = None,
pipeline_name: str = "unknown_pipeline",
context: dict = None,
incremental_loading: Optional[bool] = True,
):
if not user:
user = await get_default_user()
async def _run_tasks_data_item_incremental(
data_item,
dataset,
tasks,
pipeline_name,
pipeline_id,
pipeline_run_id,
context,
user,
):
db_engine = get_relational_engine()
# If incremental_loading of data is set to True don't process documents already processed by pipeline
# If data is being added to Cognee for the first time calculate the id of the data
if not isinstance(data_item, Data):
file_path = await save_data_item_to_storage(data_item)
# Ingest data and add metadata
async with open_data_file(file_path) as file:
classified_data = ingestion.classify(file)
# data_id is the hash of file contents + owner id to avoid duplicate data
data_id = ingestion.identify(classified_data, user)
else:
# If data was already processed by Cognee get data id
data_id = data_item.id
# Get Dataset object
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
from cognee.modules.data.models import Dataset
# Check pipeline status, if Data already processed for pipeline before skip current processing
async with db_engine.get_async_session() as session:
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
if data_point:
if (
data_point.pipeline_status.get(pipeline_name, {}).get(str(dataset.id))
== DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
):
yield {
"run_info": PipelineRunAlreadyCompleted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
),
"data_id": data_id,
}
return
dataset = await session.get(Dataset, dataset_id)
try:
# Process data based on data_item and list of tasks
async for result in run_tasks_with_telemetry(
tasks=tasks,
data=[data_item],
user=user,
pipeline_name=pipeline_id,
context=context,
):
yield PipelineRunYield(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
payload=result,
)
pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name)
# Update pipeline status for Data element
async with db_engine.get_async_session() as session:
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
data_point.pipeline_status[pipeline_name] = {
str(dataset.id): DataItemStatus.DATA_ITEM_PROCESSING_COMPLETED
}
await session.merge(data_point)
await session.commit()
pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset_id, data)
yield {
"run_info": PipelineRunCompleted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
),
"data_id": data_id,
}
pipeline_run_id = pipeline_run.pipeline_run_id
except Exception as error:
# Temporarily swallow error and try to process rest of documents first, then re-raise error at end of data ingestion pipeline
logger.error(
f"Exception caught while processing data: {error}.\n Data processing failed for data item: {data_item}."
)
yield {
"run_info": PipelineRunErrored(
pipeline_run_id=pipeline_run_id,
payload=repr(error),
dataset_id=dataset.id,
dataset_name=dataset.name,
),
"data_id": data_id,
}
yield PipelineRunStarted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
payload=data,
)
try:
async def _run_tasks_data_item_regular(
data_item,
dataset,
tasks,
pipeline_id,
pipeline_run_id,
context,
user,
):
# Process data based on data_item and list of tasks
async for result in run_tasks_with_telemetry(
tasks=tasks,
data=data,
data=[data_item],
user=user,
pipeline_name=pipeline_id,
context=context,
@ -95,6 +187,112 @@ async def run_tasks(
payload=result,
)
yield {
"run_info": PipelineRunCompleted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
)
}
async def _run_tasks_data_item(
data_item,
dataset,
tasks,
pipeline_name,
pipeline_id,
pipeline_run_id,
context,
user,
incremental_loading,
):
# Go through async generator and return data item processing result. Result can be PipelineRunAlreadyCompleted when data item is skipped,
# PipelineRunCompleted when processing was successful and PipelineRunErrored if there were issues
result = None
if incremental_loading:
async for result in _run_tasks_data_item_incremental(
data_item=data_item,
dataset=dataset,
tasks=tasks,
pipeline_name=pipeline_name,
pipeline_id=pipeline_id,
pipeline_run_id=pipeline_run_id,
context=context,
user=user,
):
pass
else:
async for result in _run_tasks_data_item_regular(
data_item=data_item,
dataset=dataset,
tasks=tasks,
pipeline_id=pipeline_id,
pipeline_run_id=pipeline_run_id,
context=context,
user=user,
):
pass
return result
if not user:
user = await get_default_user()
# Get Dataset object
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
from cognee.modules.data.models import Dataset
dataset = await session.get(Dataset, dataset_id)
pipeline_id = generate_pipeline_id(user.id, dataset.id, pipeline_name)
pipeline_run = await log_pipeline_run_start(pipeline_id, pipeline_name, dataset_id, data)
pipeline_run_id = pipeline_run.pipeline_run_id
yield PipelineRunStarted(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
payload=data,
)
try:
if not isinstance(data, list):
data = [data]
if incremental_loading:
data = await resolve_data_directories(data)
# Create async tasks per data item that will run the pipeline for the data item
data_item_tasks = [
asyncio.create_task(
_run_tasks_data_item(
data_item,
dataset,
tasks,
pipeline_name,
pipeline_id,
pipeline_run_id,
context,
user,
incremental_loading,
)
)
for data_item in data
]
results = await asyncio.gather(*data_item_tasks)
# Remove skipped data items from results
results = [result for result in results if result]
# If any data item could not be processed propagate error
errored_results = [
result for result in results if isinstance(result["run_info"], PipelineRunErrored)
]
if errored_results:
raise PipelineRunFailedError(
message="Pipeline run failed. Data item could not be processed."
)
await log_pipeline_run_complete(
pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data
)
@ -103,6 +301,7 @@ async def run_tasks(
pipeline_run_id=pipeline_run_id,
dataset_id=dataset.id,
dataset_name=dataset.name,
data_ingestion_info=results,
)
graph_engine = await get_graph_engine()
@ -120,9 +319,14 @@ async def run_tasks(
yield PipelineRunErrored(
pipeline_run_id=pipeline_run_id,
payload=error,
payload=repr(error),
dataset_id=dataset.id,
dataset_name=dataset.name,
data_ingestion_info=locals().get(
"results"
), # Returns results if they exist or returns None
)
raise error
# In case of error during incremental loading of data just let the user know the pipeline Errored, don't raise error
if not isinstance(error, PipelineRunFailedError):
raise error

View file

@ -8,7 +8,6 @@ from cognee.modules.data.models import Data
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.chunking.Chunker import Chunker
from cognee.modules.data.processing.document_types.exceptions.exceptions import PyPdfInternalError
async def update_document_token_count(document_id: UUID, token_count: int) -> None:
@ -40,15 +39,14 @@ async def extract_chunks_from_documents(
"""
for document in documents:
document_token_count = 0
try:
async for document_chunk in document.read(
max_chunk_size=max_chunk_size, chunker_cls=chunker
):
document_token_count += document_chunk.chunk_size
document_chunk.belongs_to_set = document.belongs_to_set
yield document_chunk
await update_document_token_count(document.id, document_token_count)
except PyPdfInternalError:
pass
async for document_chunk in document.read(
max_chunk_size=max_chunk_size, chunker_cls=chunker
):
document_token_count += document_chunk.chunk_size
document_chunk.belongs_to_set = document.belongs_to_set
yield document_chunk
await update_document_token_count(document.id, document_token_count)
# todo rita

View file

@ -5,12 +5,12 @@ from uuid import UUID
from typing import Union, BinaryIO, Any, List, Optional
import cognee.modules.ingestion as ingestion
from cognee.infrastructure.files.utils.open_data_file import open_data_file
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Data
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
from cognee.infrastructure.files.utils.open_data_file import open_data_file
from cognee.modules.data.methods import (
get_authorized_existing_datasets,
get_dataset_data,
@ -134,6 +134,7 @@ async def ingest_data(
node_set=json.dumps(node_set) if node_set else None,
data_size=file_metadata["file_size"],
tenant_id=user.tenant_id if user.tenant_id else None,
pipeline_status={},
token_count=-1,
)

View file

@ -40,6 +40,9 @@ async def resolve_data_directories(
if include_subdirectories:
base_path = item if item.endswith("/") else item + "/"
s3_keys = fs.glob(base_path + "**")
# If path is not directory attempt to add item directly
if not s3_keys:
s3_keys = fs.ls(item)
else:
s3_keys = fs.ls(item)
# Filter out keys that represent directories using fs.isdir

View file

@ -103,6 +103,9 @@ async def get_repo_file_dependencies(
extraction of dependencies (default is False). (default False)
"""
if isinstance(repo_path, list) and len(repo_path) == 1:
repo_path = repo_path[0]
if not os.path.exists(repo_path):
raise FileNotFoundError(f"Repository path {repo_path} does not exist.")

View file

@ -1,3 +1,4 @@
from cognee.modules.engine.utils.generate_edge_id import generate_edge_id
from cognee.shared.logging_utils import get_logger, ERROR
from collections import Counter
@ -49,7 +50,9 @@ async def index_graph_edges(batch_size: int = 1024):
)
for text, count in edge_types.items():
edge = EdgeType(relationship_name=text, number_of_edges=count)
edge = EdgeType(
id=generate_edge_id(edge_id=text), relationship_name=text, number_of_edges=count
)
data_point_type = type(edge)
for field_name in edge.metadata["index_fields"]:

View file

@ -26,8 +26,8 @@ async def test_deduplication():
explanation_file_path2 = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing_copy.txt"
)
await cognee.add([explanation_file_path], dataset_name)
await cognee.add([explanation_file_path2], dataset_name2)
await cognee.add([explanation_file_path], dataset_name, incremental_loading=False)
await cognee.add([explanation_file_path2], dataset_name2, incremental_loading=False)
result = await relational_engine.get_all_data_from_table("data")
assert len(result) == 1, "More than one data entity was found."