Compare commits
9 commits
main
...
incrementa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d78599aa7e | ||
|
|
081fae8273 | ||
|
|
30be0df314 | ||
|
|
dc38ff3838 | ||
|
|
67b61ff964 | ||
|
|
80896fdcc5 | ||
|
|
593cfcab5b | ||
|
|
d98253c28c | ||
|
|
af2b0735f6 |
14 changed files with 158 additions and 38 deletions
|
|
@ -15,6 +15,7 @@ async def add(
|
||||||
vector_db_config: dict = None,
|
vector_db_config: dict = None,
|
||||||
graph_db_config: dict = None,
|
graph_db_config: dict = None,
|
||||||
dataset_id: UUID = None,
|
dataset_id: UUID = None,
|
||||||
|
incremental_loading: bool = True,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Add data to Cognee for knowledge graph processing.
|
Add data to Cognee for knowledge graph processing.
|
||||||
|
|
@ -153,6 +154,7 @@ async def add(
|
||||||
pipeline_name="add_pipeline",
|
pipeline_name="add_pipeline",
|
||||||
vector_db_config=vector_db_config,
|
vector_db_config=vector_db_config,
|
||||||
graph_db_config=graph_db_config,
|
graph_db_config=graph_db_config,
|
||||||
|
incremental_loading=incremental_loading,
|
||||||
):
|
):
|
||||||
pipeline_run_info = run_info
|
pipeline_run_info = run_info
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -79,7 +79,9 @@ async def run_code_graph_pipeline(repo_path, include_docs=False):
|
||||||
async for run_status in non_code_pipeline_run:
|
async for run_status in non_code_pipeline_run:
|
||||||
yield run_status
|
yield run_status
|
||||||
|
|
||||||
async for run_status in run_tasks(tasks, dataset.id, repo_path, user, "cognify_code_pipeline"):
|
async for run_status in run_tasks(
|
||||||
|
tasks, dataset.id, repo_path, user, "cognify_code_pipeline", incremental_loading=False
|
||||||
|
):
|
||||||
yield run_status
|
yield run_status
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,7 @@ async def cognify(
|
||||||
vector_db_config: dict = None,
|
vector_db_config: dict = None,
|
||||||
graph_db_config: dict = None,
|
graph_db_config: dict = None,
|
||||||
run_in_background: bool = False,
|
run_in_background: bool = False,
|
||||||
|
incremental_loading: bool = True,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Transform ingested data into a structured knowledge graph.
|
Transform ingested data into a structured knowledge graph.
|
||||||
|
|
@ -194,6 +195,7 @@ async def cognify(
|
||||||
datasets=datasets,
|
datasets=datasets,
|
||||||
vector_db_config=vector_db_config,
|
vector_db_config=vector_db_config,
|
||||||
graph_db_config=graph_db_config,
|
graph_db_config=graph_db_config,
|
||||||
|
incremental_loading=incremental_loading,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
return await run_cognify_blocking(
|
return await run_cognify_blocking(
|
||||||
|
|
@ -202,6 +204,7 @@ async def cognify(
|
||||||
datasets=datasets,
|
datasets=datasets,
|
||||||
vector_db_config=vector_db_config,
|
vector_db_config=vector_db_config,
|
||||||
graph_db_config=graph_db_config,
|
graph_db_config=graph_db_config,
|
||||||
|
incremental_loading=incremental_loading,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -211,6 +214,7 @@ async def run_cognify_blocking(
|
||||||
datasets,
|
datasets,
|
||||||
graph_db_config: dict = None,
|
graph_db_config: dict = None,
|
||||||
vector_db_config: dict = False,
|
vector_db_config: dict = False,
|
||||||
|
incremental_loading: bool = True,
|
||||||
):
|
):
|
||||||
total_run_info = {}
|
total_run_info = {}
|
||||||
|
|
||||||
|
|
@ -221,6 +225,7 @@ async def run_cognify_blocking(
|
||||||
pipeline_name="cognify_pipeline",
|
pipeline_name="cognify_pipeline",
|
||||||
graph_db_config=graph_db_config,
|
graph_db_config=graph_db_config,
|
||||||
vector_db_config=vector_db_config,
|
vector_db_config=vector_db_config,
|
||||||
|
incremental_loading=incremental_loading,
|
||||||
):
|
):
|
||||||
if run_info.dataset_id:
|
if run_info.dataset_id:
|
||||||
total_run_info[run_info.dataset_id] = run_info
|
total_run_info[run_info.dataset_id] = run_info
|
||||||
|
|
@ -236,12 +241,14 @@ async def run_cognify_as_background_process(
|
||||||
datasets,
|
datasets,
|
||||||
graph_db_config: dict = None,
|
graph_db_config: dict = None,
|
||||||
vector_db_config: dict = False,
|
vector_db_config: dict = False,
|
||||||
|
incremental_loading: bool = True,
|
||||||
):
|
):
|
||||||
# Store pipeline status for all pipelines
|
# Store pipeline status for all pipelines
|
||||||
pipeline_run_started_info = []
|
pipeline_run_started_info = []
|
||||||
|
|
||||||
async def handle_rest_of_the_run(pipeline_list):
|
async def handle_rest_of_the_run(pipeline_list):
|
||||||
# Execute all provided pipelines one by one to avoid database write conflicts
|
# Execute all provided pipelines one by one to avoid database write conflicts
|
||||||
|
# TODO: Convert to async gather task instead of for loop when Queue mechanism for database is created
|
||||||
for pipeline in pipeline_list:
|
for pipeline in pipeline_list:
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
|
@ -266,6 +273,7 @@ async def run_cognify_as_background_process(
|
||||||
pipeline_name="cognify_pipeline",
|
pipeline_name="cognify_pipeline",
|
||||||
graph_db_config=graph_db_config,
|
graph_db_config=graph_db_config,
|
||||||
vector_db_config=vector_db_config,
|
vector_db_config=vector_db_config,
|
||||||
|
incremental_loading=incremental_loading,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Save dataset Pipeline run started info
|
# Save dataset Pipeline run started info
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer
|
from sqlalchemy import UUID, Column, DateTime, String, JSON, Integer
|
||||||
|
from sqlalchemy.ext.mutable import MutableDict
|
||||||
from sqlalchemy.orm import relationship
|
from sqlalchemy.orm import relationship
|
||||||
|
|
||||||
from cognee.infrastructure.databases.relational import Base
|
from cognee.infrastructure.databases.relational import Base
|
||||||
|
|
@ -20,7 +21,11 @@ class Data(Base):
|
||||||
owner_id = Column(UUID, index=True)
|
owner_id = Column(UUID, index=True)
|
||||||
content_hash = Column(String)
|
content_hash = Column(String)
|
||||||
external_metadata = Column(JSON)
|
external_metadata = Column(JSON)
|
||||||
node_set = Column(JSON, nullable=True) # Store NodeSet as JSON list of strings
|
# Store NodeSet as JSON list of strings
|
||||||
|
node_set = Column(JSON, nullable=True)
|
||||||
|
# MutableDict allows SQLAlchemy to notice key-value pair changes, without it changing a value for a key
|
||||||
|
# wouldn't be noticed when commiting a database session
|
||||||
|
pipeline_status = Column(MutableDict.as_mutable(JSON))
|
||||||
token_count = Column(Integer)
|
token_count = Column(Integer)
|
||||||
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
||||||
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))
|
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))
|
||||||
|
|
|
||||||
2
cognee/modules/ingestion/methods/__init__.py
Normal file
2
cognee/modules/ingestion/methods/__init__.py
Normal file
|
|
@ -0,0 +1,2 @@
|
||||||
|
from .get_s3_fs import get_s3_fs
|
||||||
|
from .open_data_file import open_data_file
|
||||||
14
cognee/modules/ingestion/methods/get_s3_fs.py
Normal file
14
cognee/modules/ingestion/methods/get_s3_fs.py
Normal file
|
|
@ -0,0 +1,14 @@
|
||||||
|
from cognee.api.v1.add.config import get_s3_config
|
||||||
|
|
||||||
|
|
||||||
|
def get_s3_fs():
|
||||||
|
s3_config = get_s3_config()
|
||||||
|
|
||||||
|
fs = None
|
||||||
|
if s3_config.aws_access_key_id is not None and s3_config.aws_secret_access_key is not None:
|
||||||
|
import s3fs
|
||||||
|
|
||||||
|
fs = s3fs.S3FileSystem(
|
||||||
|
key=s3_config.aws_access_key_id, secret=s3_config.aws_secret_access_key, anon=False
|
||||||
|
)
|
||||||
|
return fs
|
||||||
6
cognee/modules/ingestion/methods/open_data_file.py
Normal file
6
cognee/modules/ingestion/methods/open_data_file.py
Normal file
|
|
@ -0,0 +1,6 @@
|
||||||
|
def open_data_file(file_path: str, s3fs):
|
||||||
|
if file_path.startswith("s3://"):
|
||||||
|
return s3fs.open(file_path, mode="rb")
|
||||||
|
else:
|
||||||
|
local_path = file_path.replace("file://", "")
|
||||||
|
return open(local_path, mode="rb")
|
||||||
|
|
@ -9,6 +9,7 @@ class PipelineRunInfo(BaseModel):
|
||||||
dataset_id: UUID
|
dataset_id: UUID
|
||||||
dataset_name: str
|
dataset_name: str
|
||||||
payload: Optional[Any] = None
|
payload: Optional[Any] = None
|
||||||
|
data_ingestion_info: Optional[dict] = None
|
||||||
|
|
||||||
model_config = {
|
model_config = {
|
||||||
"arbitrary_types_allowed": True,
|
"arbitrary_types_allowed": True,
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ import asyncio
|
||||||
from typing import Union
|
from typing import Union
|
||||||
from uuid import NAMESPACE_OID, uuid5, UUID
|
from uuid import NAMESPACE_OID, uuid5, UUID
|
||||||
|
|
||||||
|
from cognee.modules.ingestion.exceptions import IngestionError
|
||||||
from cognee.shared.logging_utils import get_logger
|
from cognee.shared.logging_utils import get_logger
|
||||||
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
|
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
|
||||||
from cognee.modules.data.models import Data, Dataset
|
from cognee.modules.data.models import Data, Dataset
|
||||||
|
|
@ -52,6 +53,7 @@ async def cognee_pipeline(
|
||||||
pipeline_name: str = "custom_pipeline",
|
pipeline_name: str = "custom_pipeline",
|
||||||
vector_db_config: dict = None,
|
vector_db_config: dict = None,
|
||||||
graph_db_config: dict = None,
|
graph_db_config: dict = None,
|
||||||
|
incremental_loading: bool = True,
|
||||||
):
|
):
|
||||||
# Note: These context variables allow different value assignment for databases in Cognee
|
# Note: These context variables allow different value assignment for databases in Cognee
|
||||||
# per async task, thread, process and etc.
|
# per async task, thread, process and etc.
|
||||||
|
|
@ -106,6 +108,7 @@ async def cognee_pipeline(
|
||||||
data=data,
|
data=data,
|
||||||
pipeline_name=pipeline_name,
|
pipeline_name=pipeline_name,
|
||||||
context={"dataset": dataset},
|
context={"dataset": dataset},
|
||||||
|
incremental_loading=incremental_loading,
|
||||||
):
|
):
|
||||||
yield run_info
|
yield run_info
|
||||||
|
|
||||||
|
|
@ -117,6 +120,7 @@ async def run_pipeline(
|
||||||
data=None,
|
data=None,
|
||||||
pipeline_name: str = "custom_pipeline",
|
pipeline_name: str = "custom_pipeline",
|
||||||
context: dict = None,
|
context: dict = None,
|
||||||
|
incremental_loading=True,
|
||||||
):
|
):
|
||||||
check_dataset_name(dataset.name)
|
check_dataset_name(dataset.name)
|
||||||
|
|
||||||
|
|
@ -184,7 +188,9 @@ async def run_pipeline(
|
||||||
if not isinstance(task, Task):
|
if not isinstance(task, Task):
|
||||||
raise ValueError(f"Task {task} is not an instance of Task")
|
raise ValueError(f"Task {task} is not an instance of Task")
|
||||||
|
|
||||||
pipeline_run = run_tasks(tasks, dataset_id, data, user, pipeline_name, context)
|
pipeline_run = run_tasks(
|
||||||
|
tasks, dataset_id, data, user, pipeline_name, context, incremental_loading
|
||||||
|
)
|
||||||
|
|
||||||
async for pipeline_run_info in pipeline_run:
|
async for pipeline_run_info in pipeline_run:
|
||||||
yield pipeline_run_info
|
yield pipeline_run_info
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,20 @@
|
||||||
import os
|
import os
|
||||||
|
import cognee.modules.ingestion as ingestion
|
||||||
|
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
from sqlalchemy import select
|
||||||
|
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed
|
from cognee.modules.pipelines.operations.run_tasks_distributed import run_tasks_distributed
|
||||||
from cognee.modules.users.models import User
|
from cognee.modules.users.models import User
|
||||||
|
from cognee.modules.data.models import Data
|
||||||
|
from cognee.modules.ingestion.methods import get_s3_fs, open_data_file
|
||||||
from cognee.shared.logging_utils import get_logger
|
from cognee.shared.logging_utils import get_logger
|
||||||
from cognee.modules.users.methods import get_default_user
|
from cognee.modules.users.methods import get_default_user
|
||||||
from cognee.modules.pipelines.utils import generate_pipeline_id
|
from cognee.modules.pipelines.utils import generate_pipeline_id
|
||||||
|
from cognee.tasks.ingestion import save_data_item_to_storage, resolve_data_directories
|
||||||
from cognee.modules.pipelines.models.PipelineRunInfo import (
|
from cognee.modules.pipelines.models.PipelineRunInfo import (
|
||||||
PipelineRunCompleted,
|
PipelineRunCompleted,
|
||||||
PipelineRunErrored,
|
PipelineRunErrored,
|
||||||
|
|
@ -55,6 +61,7 @@ async def run_tasks(
|
||||||
user: User = None,
|
user: User = None,
|
||||||
pipeline_name: str = "unknown_pipeline",
|
pipeline_name: str = "unknown_pipeline",
|
||||||
context: dict = None,
|
context: dict = None,
|
||||||
|
incremental_loading: bool = True,
|
||||||
):
|
):
|
||||||
if not user:
|
if not user:
|
||||||
user = get_default_user()
|
user = get_default_user()
|
||||||
|
|
@ -79,10 +86,45 @@ async def run_tasks(
|
||||||
payload=data,
|
payload=data,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
fs = get_s3_fs()
|
||||||
|
data_items_pipeline_run_info = {}
|
||||||
|
ingestion_error = None
|
||||||
|
try:
|
||||||
|
if not isinstance(data, list):
|
||||||
|
data = [data]
|
||||||
|
|
||||||
|
if incremental_loading:
|
||||||
|
data = await resolve_data_directories(data)
|
||||||
|
|
||||||
|
# TODO: Convert to async gather task instead of for loop (just make sure it can work there were some issues when async gathering datasets)
|
||||||
|
for data_item in data:
|
||||||
|
# If incremental_loading of data is set to True don't process documents already processed by pipeline
|
||||||
|
if incremental_loading:
|
||||||
|
# If data is being added to Cognee for the first time calculate the id of the data
|
||||||
|
if not isinstance(data_item, Data):
|
||||||
|
file_path = await save_data_item_to_storage(data_item, dataset.name)
|
||||||
|
# Ingest data and add metadata
|
||||||
|
with open_data_file(file_path, s3fs=fs) as file:
|
||||||
|
classified_data = ingestion.classify(file, s3fs=fs)
|
||||||
|
# data_id is the hash of file contents + owner id to avoid duplicate data
|
||||||
|
data_id = ingestion.identify(classified_data, user)
|
||||||
|
else:
|
||||||
|
# If data was already processed by Cognee get data id
|
||||||
|
data_id = data_item.id
|
||||||
|
|
||||||
|
# Check pipeline status, if Data already processed for pipeline before skip current processing
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
data_point = (
|
||||||
|
await session.execute(select(Data).filter(Data.id == data_id))
|
||||||
|
).scalar_one_or_none()
|
||||||
|
if data_point:
|
||||||
|
if data_point.pipeline_status.get(pipeline_name) == "Completed":
|
||||||
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async for result in run_tasks_with_telemetry(
|
async for result in run_tasks_with_telemetry(
|
||||||
tasks=tasks,
|
tasks=tasks,
|
||||||
data=data,
|
data=[data_item],
|
||||||
user=user,
|
user=user,
|
||||||
pipeline_name=pipeline_id,
|
pipeline_name=pipeline_id,
|
||||||
context=context,
|
context=context,
|
||||||
|
|
@ -94,6 +136,46 @@ async def run_tasks(
|
||||||
payload=result,
|
payload=result,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if incremental_loading:
|
||||||
|
data_items_pipeline_run_info[data_id] = {
|
||||||
|
"run_info": PipelineRunCompleted(
|
||||||
|
pipeline_run_id=pipeline_run_id,
|
||||||
|
dataset_id=dataset.id,
|
||||||
|
dataset_name=dataset.name,
|
||||||
|
),
|
||||||
|
"data_id": data_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Update pipeline status for Data element
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
data_point = (
|
||||||
|
await session.execute(select(Data).filter(Data.id == data_id))
|
||||||
|
).scalar_one_or_none()
|
||||||
|
data_point.pipeline_status[pipeline_name] = "Completed"
|
||||||
|
await session.merge(data_point)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
except Exception as error:
|
||||||
|
# Temporarily swallow error and try to process rest of documents first, then re-raise error at end of data ingestion pipeline
|
||||||
|
ingestion_error = error
|
||||||
|
logger.error(
|
||||||
|
f"Exception caught while processing data: {error}.\n Data processing failed for data item: {data_item}."
|
||||||
|
)
|
||||||
|
if incremental_loading:
|
||||||
|
data_items_pipeline_run_info = {
|
||||||
|
"run_info": PipelineRunErrored(
|
||||||
|
pipeline_run_id=pipeline_run_id,
|
||||||
|
payload=error,
|
||||||
|
dataset_id=dataset.id,
|
||||||
|
dataset_name=dataset.name,
|
||||||
|
),
|
||||||
|
"data_id": data_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
# re-raise error found during data ingestion
|
||||||
|
if ingestion_error:
|
||||||
|
raise ingestion_error
|
||||||
|
|
||||||
await log_pipeline_run_complete(
|
await log_pipeline_run_complete(
|
||||||
pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data
|
pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data
|
||||||
)
|
)
|
||||||
|
|
@ -102,6 +184,7 @@ async def run_tasks(
|
||||||
pipeline_run_id=pipeline_run_id,
|
pipeline_run_id=pipeline_run_id,
|
||||||
dataset_id=dataset.id,
|
dataset_id=dataset.id,
|
||||||
dataset_name=dataset.name,
|
dataset_name=dataset.name,
|
||||||
|
data_ingestion_info=data_items_pipeline_run_info,
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
|
|
@ -114,6 +197,7 @@ async def run_tasks(
|
||||||
payload=error,
|
payload=error,
|
||||||
dataset_id=dataset.id,
|
dataset_id=dataset.id,
|
||||||
dataset_name=dataset.name,
|
dataset_name=dataset.name,
|
||||||
|
data_ingestion_info=data_items_pipeline_run_info,
|
||||||
)
|
)
|
||||||
|
|
||||||
raise error
|
raise error
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ from cognee.modules.data.models import Data
|
||||||
from cognee.modules.users.models import User
|
from cognee.modules.users.models import User
|
||||||
from cognee.modules.users.methods import get_default_user
|
from cognee.modules.users.methods import get_default_user
|
||||||
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
|
from cognee.modules.users.permissions.methods import get_specific_user_permission_datasets
|
||||||
|
from cognee.modules.ingestion.methods import get_s3_fs, open_data_file
|
||||||
from cognee.modules.data.methods import (
|
from cognee.modules.data.methods import (
|
||||||
get_authorized_existing_datasets,
|
get_authorized_existing_datasets,
|
||||||
get_dataset_data,
|
get_dataset_data,
|
||||||
|
|
@ -18,9 +19,6 @@ from cognee.modules.data.methods import (
|
||||||
from .save_data_item_to_storage import save_data_item_to_storage
|
from .save_data_item_to_storage import save_data_item_to_storage
|
||||||
|
|
||||||
|
|
||||||
from cognee.api.v1.add.config import get_s3_config
|
|
||||||
|
|
||||||
|
|
||||||
async def ingest_data(
|
async def ingest_data(
|
||||||
data: Any,
|
data: Any,
|
||||||
dataset_name: str,
|
dataset_name: str,
|
||||||
|
|
@ -31,22 +29,7 @@ async def ingest_data(
|
||||||
if not user:
|
if not user:
|
||||||
user = await get_default_user()
|
user = await get_default_user()
|
||||||
|
|
||||||
s3_config = get_s3_config()
|
fs = get_s3_fs()
|
||||||
|
|
||||||
fs = None
|
|
||||||
if s3_config.aws_access_key_id is not None and s3_config.aws_secret_access_key is not None:
|
|
||||||
import s3fs
|
|
||||||
|
|
||||||
fs = s3fs.S3FileSystem(
|
|
||||||
key=s3_config.aws_access_key_id, secret=s3_config.aws_secret_access_key, anon=False
|
|
||||||
)
|
|
||||||
|
|
||||||
def open_data_file(file_path: str):
|
|
||||||
if file_path.startswith("s3://"):
|
|
||||||
return fs.open(file_path, mode="rb")
|
|
||||||
else:
|
|
||||||
local_path = file_path.replace("file://", "")
|
|
||||||
return open(local_path, mode="rb")
|
|
||||||
|
|
||||||
def get_external_metadata_dict(data_item: Union[BinaryIO, str, Any]) -> dict[str, Any]:
|
def get_external_metadata_dict(data_item: Union[BinaryIO, str, Any]) -> dict[str, Any]:
|
||||||
if hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")):
|
if hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")):
|
||||||
|
|
@ -95,7 +78,7 @@ async def ingest_data(
|
||||||
file_path = await save_data_item_to_storage(data_item, dataset_name)
|
file_path = await save_data_item_to_storage(data_item, dataset_name)
|
||||||
|
|
||||||
# Ingest data and add metadata
|
# Ingest data and add metadata
|
||||||
with open_data_file(file_path) as file:
|
with open_data_file(file_path, s3fs=fs) as file:
|
||||||
classified_data = ingestion.classify(file, s3fs=fs)
|
classified_data = ingestion.classify(file, s3fs=fs)
|
||||||
|
|
||||||
# data_id is the hash of file contents + owner id to avoid duplicate data
|
# data_id is the hash of file contents + owner id to avoid duplicate data
|
||||||
|
|
@ -148,6 +131,7 @@ async def ingest_data(
|
||||||
content_hash=file_metadata["content_hash"],
|
content_hash=file_metadata["content_hash"],
|
||||||
external_metadata=ext_metadata,
|
external_metadata=ext_metadata,
|
||||||
node_set=json.dumps(node_set) if node_set else None,
|
node_set=json.dumps(node_set) if node_set else None,
|
||||||
|
pipeline_status={},
|
||||||
token_count=-1,
|
token_count=-1,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -40,6 +40,9 @@ async def resolve_data_directories(
|
||||||
if include_subdirectories:
|
if include_subdirectories:
|
||||||
base_path = item if item.endswith("/") else item + "/"
|
base_path = item if item.endswith("/") else item + "/"
|
||||||
s3_keys = fs.glob(base_path + "**")
|
s3_keys = fs.glob(base_path + "**")
|
||||||
|
# If path is not directory attempt to add item directly
|
||||||
|
if not s3_keys:
|
||||||
|
s3_keys = fs.ls(item)
|
||||||
else:
|
else:
|
||||||
s3_keys = fs.ls(item)
|
s3_keys = fs.ls(item)
|
||||||
# Filter out keys that represent directories using fs.isdir
|
# Filter out keys that represent directories using fs.isdir
|
||||||
|
|
|
||||||
|
|
@ -103,6 +103,9 @@ async def get_repo_file_dependencies(
|
||||||
extraction of dependencies (default is False). (default False)
|
extraction of dependencies (default is False). (default False)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if isinstance(repo_path, list) and len(repo_path) == 1:
|
||||||
|
repo_path = repo_path[0]
|
||||||
|
|
||||||
if not os.path.exists(repo_path):
|
if not os.path.exists(repo_path):
|
||||||
raise FileNotFoundError(f"Repository path {repo_path} does not exist.")
|
raise FileNotFoundError(f"Repository path {repo_path} does not exist.")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -25,8 +25,8 @@ async def test_deduplication():
|
||||||
explanation_file_path2 = os.path.join(
|
explanation_file_path2 = os.path.join(
|
||||||
pathlib.Path(__file__).parent, "test_data/Natural_language_processing_copy.txt"
|
pathlib.Path(__file__).parent, "test_data/Natural_language_processing_copy.txt"
|
||||||
)
|
)
|
||||||
await cognee.add([explanation_file_path], dataset_name)
|
await cognee.add([explanation_file_path], dataset_name, incremental_loading=False)
|
||||||
await cognee.add([explanation_file_path2], dataset_name2)
|
await cognee.add([explanation_file_path2], dataset_name2, incremental_loading=False)
|
||||||
|
|
||||||
result = await relational_engine.get_all_data_from_table("data")
|
result = await relational_engine.get_all_data_from_table("data")
|
||||||
assert len(result) == 1, "More than one data entity was found."
|
assert len(result) == 1, "More than one data entity was found."
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue