From c5f3314c856a0ac41ec4200d83f23797c41ccfd9 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Tue, 26 Nov 2024 16:10:03 +0100 Subject: [PATCH 01/17] Add Metadata table and read write delete functions --- cognee/modules/data/models/Data.py | 1 + cognee/modules/ingestion/models/metadata.py | 13 ++++++ .../ingestion/operations/delete_metadata.py | 16 ++++++++ .../ingestion/operations/get_metadata.py | 13 ++++++ .../ingestion/operations/write_metadata.py | 41 +++++++++++++++++++ .../ingestion/ingest_data_with_metadata.py | 7 +++- ...save_data_item_with_metadata_to_storage.py | 9 ++-- 7 files changed, 94 insertions(+), 6 deletions(-) create mode 100644 cognee/modules/ingestion/models/metadata.py create mode 100644 cognee/modules/ingestion/operations/delete_metadata.py create mode 100644 cognee/modules/ingestion/operations/get_metadata.py create mode 100644 cognee/modules/ingestion/operations/write_metadata.py diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index 2e9745600..6601580eb 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -15,6 +15,7 @@ class Data(Base): extension = Column(String) mime_type = Column(String) raw_data_location = Column(String) + metadata_id = Column(UUID) created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc)) diff --git a/cognee/modules/ingestion/models/metadata.py b/cognee/modules/ingestion/models/metadata.py new file mode 100644 index 000000000..29d6cfaf2 --- /dev/null +++ b/cognee/modules/ingestion/models/metadata.py @@ -0,0 +1,13 @@ +from uuid import uuid4 +from datetime import datetime, timezone +from sqlalchemy import Column, DateTime, String, UUID +from cognee.infrastructure.databases.relational import Base + +class Metadata(Base): + __tablename__ = "queries" + + id = Column(UUID, primary_key = True, default = uuid4) + metadata = Column(String) + + created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc)) diff --git a/cognee/modules/ingestion/operations/delete_metadata.py b/cognee/modules/ingestion/operations/delete_metadata.py new file mode 100644 index 000000000..dc6bcd85f --- /dev/null +++ b/cognee/modules/ingestion/operations/delete_metadata.py @@ -0,0 +1,16 @@ +import warnings +from uuid import UUID +from sqlalchemy import select +from cognee.infrastructure.databases.relational import get_relational_engine +from ..models.Metadata import Metadata + + +async def delete_metadata(metadata_id: UUID): + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + metadata = await session.get(Metadata, metadata_id) + if metadata is None: + warnings.warn(f"metadata for metadata_id: {metadata_id} not found") + + session.delete(metadata) + session.commit() \ No newline at end of file diff --git a/cognee/modules/ingestion/operations/get_metadata.py b/cognee/modules/ingestion/operations/get_metadata.py new file mode 100644 index 000000000..047bc4906 --- /dev/null +++ b/cognee/modules/ingestion/operations/get_metadata.py @@ -0,0 +1,13 @@ +import json +from uuid import UUID +from sqlalchemy import select +from cognee.infrastructure.databases.relational import get_relational_engine +from ..models.Metadata import Metadata + +async def get_metadata(metadata_id: UUID) -> Metadata: + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + metadata = await session.get(Metadata, metadata_id) + + return json.parse(metadata) diff --git a/cognee/modules/ingestion/operations/write_metadata.py b/cognee/modules/ingestion/operations/write_metadata.py new file mode 100644 index 000000000..b97c3cf85 --- /dev/null +++ b/cognee/modules/ingestion/operations/write_metadata.py @@ -0,0 +1,41 @@ +import json +import inspect +import warnings +import re +from typing import Any + +from uuid import UUID +from cognee.infrastructure.databases.relational import get_relational_engine +from ..models.Metadata import Metadata + +async def write_metadata(data_item: Any) -> UUID: + metadata_dict = get_metadata_dict(data_item) + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + metadata = Metadata( + metadata = json.dumps(metadata_dict), + metadata_source = parse_type(type(data_item)) + ) + session.add(metadata) + await session.commit() + + return metadata.id + +def parse_type(type_: Any) -> str: + pattern = r".+'([\w_\.]+)'" + match = re.search(pattern, str(type_)) + if match: + return(match.group(1)) + else: + raise Exception(f"type: {type_} could not be parsed") + + +def get_metadata_dict(metadata: Any) -> dict[str, Any]: + if hasattr(metadata, "dict") and inspect.ismethod(getattr(metadata, "dict")): + return(metadata.dict()) + else: + warnings.warn(f"metadata of type {type(metadata)}: {str(metadata)[:20]}... does not have dict method. Defaulting to string method") + try: + return({"content": str(metadata)}) + except Exception as e: + raise Exception(f"Could not cast metadata to string: {e}") \ No newline at end of file diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index e5a50c13b..577dca57e 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -8,6 +8,7 @@ from cognee.modules.data.methods import create_dataset from cognee.modules.users.permissions.methods import give_permission_on_document from .get_dlt_destination import get_dlt_destination from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage +from cognee.modules.ingestion.operations.delete_metadata import delete_metadata async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): destination = get_dlt_destination() @@ -26,7 +27,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): # Process data for data_item in data: - file_path = save_data_item_with_metadata_to_storage(data_item, dataset_name) + file_path, metadata_id = await save_data_item_with_metadata_to_storage(data_item, dataset_name) # Ingest data and add metadata with open(file_path.replace("file://", ""), mode = "rb") as file: @@ -49,11 +50,12 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): )).scalar_one_or_none() if data_point is not None: + await delete_metadata(data_point.metadata_id) data_point.name = file_metadata["name"] data_point.raw_data_location = file_metadata["file_path"] data_point.extension = file_metadata["extension"] data_point.mime_type = file_metadata["mime_type"] - + data_point.metadata_id = metadata_id await session.merge(data_point) await session.commit() else: @@ -63,6 +65,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): raw_data_location = file_metadata["file_path"], extension = file_metadata["extension"], mime_type = file_metadata["mime_type"], + metadata_id = metadata_id ) dataset.data.append(data_point) diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py index ec29edb89..108bcd21b 100644 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py @@ -1,14 +1,15 @@ from typing import Union, BinaryIO, Any from cognee.modules.ingestion import save_data_to_file +from cognee.modules.ingestion.operations.write_metadata import write_metadata def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str: # Dynamic import is used because the llama_index module is optional. # For the same reason Any is accepted as a data item - from llama_index.core import Document - from .transform_data import get_data_from_llama_index + metadata_id = write_metadata(data_item) # Check if data is of type Document or any of it's subclasses - if isinstance(data_item, Document): + if str(type(data_item)).startswith("llama_index"): + from .transform_data import get_data_from_llama_index file_path = get_data_from_llama_index(data_item, dataset_name) # data is a file object coming from upload. @@ -25,4 +26,4 @@ def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any] else: raise ValueError(f"Data type not supported: {type(data_item)}") - return file_path \ No newline at end of file + return file_path, metadata_id \ No newline at end of file From fd987ed61ec0d1f148e5798dce38001d5f75ebf9 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Tue, 26 Nov 2024 16:13:08 +0100 Subject: [PATCH 02/17] Add autoformatting --- cognee/modules/data/models/Data.py | 30 +++++---- cognee/modules/ingestion/models/metadata.py | 19 ++++-- .../ingestion/operations/delete_metadata.py | 7 ++- .../ingestion/operations/get_metadata.py | 4 ++ .../ingestion/operations/write_metadata.py | 25 +++++--- .../ingestion/ingest_data_with_metadata.py | 62 +++++++++++-------- ...save_data_item_with_metadata_to_storage.py | 17 +++-- 7 files changed, 103 insertions(+), 61 deletions(-) diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index 6601580eb..55991541d 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -1,15 +1,19 @@ -from uuid import uuid4 -from typing import List from datetime import datetime, timezone -from sqlalchemy.orm import relationship, Mapped -from sqlalchemy import Column, String, DateTime, UUID +from typing import List +from uuid import uuid4 + +from sqlalchemy import UUID, Column, DateTime, String +from sqlalchemy.orm import Mapped, relationship + from cognee.infrastructure.databases.relational import Base + from .DatasetData import DatasetData + class Data(Base): __tablename__ = "data" - id = Column(UUID, primary_key = True, default = uuid4) + id = Column(UUID, primary_key=True, default=uuid4) name = Column(String) extension = Column(String) @@ -17,15 +21,19 @@ class Data(Base): raw_data_location = Column(String) metadata_id = Column(UUID) - created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) - updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc)) + created_at = Column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) + updated_at = Column( + DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc) + ) datasets: Mapped[List["Dataset"]] = relationship( "Dataset", - secondary = DatasetData.__tablename__, - back_populates = "data", - lazy = "noload", - cascade="all, delete" + secondary=DatasetData.__tablename__, + back_populates="data", + lazy="noload", + cascade="all, delete", ) def to_json(self) -> dict: diff --git a/cognee/modules/ingestion/models/metadata.py b/cognee/modules/ingestion/models/metadata.py index 29d6cfaf2..ab9fe1e01 100644 --- a/cognee/modules/ingestion/models/metadata.py +++ b/cognee/modules/ingestion/models/metadata.py @@ -1,13 +1,20 @@ -from uuid import uuid4 from datetime import datetime, timezone -from sqlalchemy import Column, DateTime, String, UUID +from uuid import uuid4 + +from sqlalchemy import UUID, Column, DateTime, String + from cognee.infrastructure.databases.relational import Base + class Metadata(Base): __tablename__ = "queries" - id = Column(UUID, primary_key = True, default = uuid4) + id = Column(UUID, primary_key=True, default=uuid4) metadata = Column(String) - - created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) - updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc)) + + created_at = Column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) + updated_at = Column( + DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc) + ) diff --git a/cognee/modules/ingestion/operations/delete_metadata.py b/cognee/modules/ingestion/operations/delete_metadata.py index dc6bcd85f..df94f52ed 100644 --- a/cognee/modules/ingestion/operations/delete_metadata.py +++ b/cognee/modules/ingestion/operations/delete_metadata.py @@ -1,7 +1,10 @@ import warnings from uuid import UUID + from sqlalchemy import select + from cognee.infrastructure.databases.relational import get_relational_engine + from ..models.Metadata import Metadata @@ -11,6 +14,6 @@ async def delete_metadata(metadata_id: UUID): metadata = await session.get(Metadata, metadata_id) if metadata is None: warnings.warn(f"metadata for metadata_id: {metadata_id} not found") - + session.delete(metadata) - session.commit() \ No newline at end of file + session.commit() diff --git a/cognee/modules/ingestion/operations/get_metadata.py b/cognee/modules/ingestion/operations/get_metadata.py index 047bc4906..9034b327d 100644 --- a/cognee/modules/ingestion/operations/get_metadata.py +++ b/cognee/modules/ingestion/operations/get_metadata.py @@ -1,9 +1,13 @@ import json from uuid import UUID + from sqlalchemy import select + from cognee.infrastructure.databases.relational import get_relational_engine + from ..models.Metadata import Metadata + async def get_metadata(metadata_id: UUID) -> Metadata: db_engine = get_relational_engine() diff --git a/cognee/modules/ingestion/operations/write_metadata.py b/cognee/modules/ingestion/operations/write_metadata.py index b97c3cf85..cefed21de 100644 --- a/cognee/modules/ingestion/operations/write_metadata.py +++ b/cognee/modules/ingestion/operations/write_metadata.py @@ -1,41 +1,46 @@ -import json import inspect -import warnings +import json import re +import warnings from typing import Any - from uuid import UUID + from cognee.infrastructure.databases.relational import get_relational_engine + from ..models.Metadata import Metadata + async def write_metadata(data_item: Any) -> UUID: metadata_dict = get_metadata_dict(data_item) db_engine = get_relational_engine() async with db_engine.get_async_session() as session: metadata = Metadata( - metadata = json.dumps(metadata_dict), - metadata_source = parse_type(type(data_item)) + metadata=json.dumps(metadata_dict), + metadata_source=parse_type(type(data_item)), ) session.add(metadata) await session.commit() return metadata.id + def parse_type(type_: Any) -> str: pattern = r".+'([\w_\.]+)'" match = re.search(pattern, str(type_)) if match: - return(match.group(1)) + return match.group(1) else: raise Exception(f"type: {type_} could not be parsed") def get_metadata_dict(metadata: Any) -> dict[str, Any]: if hasattr(metadata, "dict") and inspect.ismethod(getattr(metadata, "dict")): - return(metadata.dict()) + return metadata.dict() else: - warnings.warn(f"metadata of type {type(metadata)}: {str(metadata)[:20]}... does not have dict method. Defaulting to string method") + warnings.warn( + f"metadata of type {type(metadata)}: {str(metadata)[:20]}... does not have dict method. Defaulting to string method" + ) try: - return({"content": str(metadata)}) + return {"content": str(metadata)} except Exception as e: - raise Exception(f"Could not cast metadata to string: {e}") \ No newline at end of file + raise Exception(f"Could not cast metadata to string: {e}") diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index 577dca57e..07f5c8115 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -1,24 +1,30 @@ -import dlt -import cognee.modules.ingestion as ingestion from typing import Any -from cognee.shared.utils import send_telemetry -from cognee.modules.users.models import User + +import dlt + +import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.methods import create_dataset -from cognee.modules.users.permissions.methods import give_permission_on_document -from .get_dlt_destination import get_dlt_destination -from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage from cognee.modules.ingestion.operations.delete_metadata import delete_metadata +from cognee.modules.users.models import User +from cognee.modules.users.permissions.methods import give_permission_on_document +from cognee.shared.utils import send_telemetry + +from .get_dlt_destination import get_dlt_destination +from .save_data_item_with_metadata_to_storage import ( + save_data_item_with_metadata_to_storage, +) + async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): destination = get_dlt_destination() pipeline = dlt.pipeline( - pipeline_name = "file_load_from_filesystem", - destination = destination, + pipeline_name="file_load_from_filesystem", + destination=destination, ) - @dlt.resource(standalone = True, merge_key = "id") + @dlt.resource(standalone=True, merge_key="id") async def data_resources(data: Any, user: User): if not isinstance(data, list): # Convert data to a list as we work with lists further down. @@ -27,10 +33,12 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): # Process data for data_item in data: - file_path, metadata_id = await save_data_item_with_metadata_to_storage(data_item, dataset_name) + file_path, metadata_id = await save_data_item_with_metadata_to_storage( + data_item, dataset_name + ) # Ingest data and add metadata - with open(file_path.replace("file://", ""), mode = "rb") as file: + with open(file_path.replace("file://", ""), mode="rb") as file: classified_data = ingestion.classify(file) data_id = ingestion.identify(classified_data) @@ -38,6 +46,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): file_metadata = classified_data.get_metadata() from sqlalchemy import select + from cognee.modules.data.models import Data db_engine = get_relational_engine() @@ -45,9 +54,9 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): async with db_engine.get_async_session() as session: dataset = await create_dataset(dataset_name, user.id, session) - data_point = (await session.execute( - select(Data).filter(Data.id == data_id) - )).scalar_one_or_none() + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() if data_point is not None: await delete_metadata(data_point.metadata_id) @@ -60,12 +69,12 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): await session.commit() else: data_point = Data( - id = data_id, - name = file_metadata["name"], - raw_data_location = file_metadata["file_path"], - extension = file_metadata["extension"], - mime_type = file_metadata["mime_type"], - metadata_id = metadata_id + id=data_id, + name=file_metadata["name"], + raw_data_location=file_metadata["file_path"], + extension=file_metadata["extension"], + mime_type=file_metadata["mime_type"], + metadata_id=metadata_id, ) dataset.data.append(data_point) @@ -82,14 +91,13 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): await give_permission_on_document(user, data_id, "read") await give_permission_on_document(user, data_id, "write") - - send_telemetry("cognee.add EXECUTION STARTED", user_id = user.id) + send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id) run_info = pipeline.run( data_resources(data, user), - table_name = "file_metadata", - dataset_name = dataset_name, - write_disposition = "merge", + table_name="file_metadata", + dataset_name=dataset_name, + write_disposition="merge", ) - send_telemetry("cognee.add EXECUTION COMPLETED", user_id = user.id) + send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id) return run_info diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py index 108bcd21b..9acf0d413 100644 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py @@ -1,20 +1,27 @@ -from typing import Union, BinaryIO, Any +from typing import Any, BinaryIO, Union + from cognee.modules.ingestion import save_data_to_file from cognee.modules.ingestion.operations.write_metadata import write_metadata -def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str: - # Dynamic import is used because the llama_index module is optional. + +def save_data_item_with_metadata_to_storage( + data_item: Union[BinaryIO, str, Any], dataset_name: str +) -> str: + # Dynamic import is used because the llama_index module is optional. # For the same reason Any is accepted as a data item metadata_id = write_metadata(data_item) # Check if data is of type Document or any of it's subclasses if str(type(data_item)).startswith("llama_index"): from .transform_data import get_data_from_llama_index + file_path = get_data_from_llama_index(data_item, dataset_name) # data is a file object coming from upload. elif hasattr(data_item, "file"): - file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename) + file_path = save_data_to_file( + data_item.file, dataset_name, filename=data_item.filename + ) elif isinstance(data_item, str): # data is a file path @@ -26,4 +33,4 @@ def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any] else: raise ValueError(f"Data type not supported: {type(data_item)}") - return file_path, metadata_id \ No newline at end of file + return file_path, metadata_id From 7324564655c70dc4a3a038959283f3d697893f7e Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Tue, 26 Nov 2024 16:30:25 +0100 Subject: [PATCH 03/17] Add metadata_id attribute to Document and DocumentChunk, make ingest_with_metadata default --- cognee/api/v1/add/add_v2.py | 5 ++--- cognee/modules/chunking/TextChunker.py | 3 +++ cognee/modules/data/processing/document_types/Document.py | 2 ++ cognee/tasks/documents/classify_documents.py | 1 + 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cognee/api/v1/add/add_v2.py b/cognee/api/v1/add/add_v2.py index 9d6e33012..631d963e5 100644 --- a/cognee/api/v1/add/add_v2.py +++ b/cognee/api/v1/add/add_v2.py @@ -2,7 +2,7 @@ from typing import Union, BinaryIO from cognee.modules.users.models import User from cognee.modules.users.methods import get_default_user from cognee.modules.pipelines import run_tasks, Task -from cognee.tasks.ingestion import save_data_to_storage, ingest_data +from cognee.tasks.ingestion import ingest_data_with_metadata from cognee.infrastructure.databases.relational import create_db_and_tables as create_relational_db_and_tables from cognee.infrastructure.databases.vector.pgvector import create_db_and_tables as create_pgvector_db_and_tables @@ -14,8 +14,7 @@ async def add(data: Union[BinaryIO, list[BinaryIO], str, list[str]], dataset_nam user = await get_default_user() tasks = [ - Task(save_data_to_storage, dataset_name), - Task(ingest_data, dataset_name, user) + Task(ingest_data_with_metadata, dataset_name, user) ] pipeline = run_tasks(tasks, data, "add_pipeline") diff --git a/cognee/modules/chunking/TextChunker.py b/cognee/modules/chunking/TextChunker.py index f0a72b58a..24ed0b236 100644 --- a/cognee/modules/chunking/TextChunker.py +++ b/cognee/modules/chunking/TextChunker.py @@ -35,6 +35,7 @@ class TextChunker(): is_part_of = self.document, chunk_index = self.chunk_index, cut_type = chunk_data["cut_type"], + metadata_id = self.document.metadata_id ) paragraph_chunks = [] self.chunk_size = 0 @@ -48,6 +49,7 @@ class TextChunker(): is_part_of = self.document, chunk_index = self.chunk_index, cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], + metadata_id = self.document.metadata_id ) except Exception as e: print(e) @@ -65,6 +67,7 @@ class TextChunker(): is_part_of = self.document, chunk_index = self.chunk_index, cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], + metadata_id = self.document.metadata_id ) except Exception as e: print(e) diff --git a/cognee/modules/data/processing/document_types/Document.py b/cognee/modules/data/processing/document_types/Document.py index 7d5545cfc..773fc30c8 100644 --- a/cognee/modules/data/processing/document_types/Document.py +++ b/cognee/modules/data/processing/document_types/Document.py @@ -1,9 +1,11 @@ from cognee.infrastructure.engine import DataPoint +from uuid import UUID class Document(DataPoint): type: str name: str raw_data_location: str + metadata_id: UUID def read(self, chunk_size: int) -> str: pass diff --git a/cognee/tasks/documents/classify_documents.py b/cognee/tasks/documents/classify_documents.py index 8ee87bcad..599b74e17 100644 --- a/cognee/tasks/documents/classify_documents.py +++ b/cognee/tasks/documents/classify_documents.py @@ -45,6 +45,7 @@ def classify_documents(data_documents: list[Data]) -> list[Document]: title=f"{data_item.name}.{data_item.extension}", raw_data_location=data_item.raw_data_location, name=data_item.name, + metadata_id=data_item.metadata_id ) for data_item in data_documents ] From cc0127a90e0ea7cbf0cf16df62dbbfbb425e6bc4 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Tue, 26 Nov 2024 16:34:38 +0100 Subject: [PATCH 04/17] Fix Metadata file name --- cognee/modules/ingestion/models/{metadata.py => Metadata.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename cognee/modules/ingestion/models/{metadata.py => Metadata.py} (100%) diff --git a/cognee/modules/ingestion/models/metadata.py b/cognee/modules/ingestion/models/Metadata.py similarity index 100% rename from cognee/modules/ingestion/models/metadata.py rename to cognee/modules/ingestion/models/Metadata.py From 899275c25edb0c3814368a4d8560abe8e096d968 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Tue, 26 Nov 2024 16:38:24 +0100 Subject: [PATCH 05/17] Rename metadata field to metadata_repr --- cognee/modules/ingestion/models/Metadata.py | 5 +++-- cognee/modules/ingestion/operations/write_metadata.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cognee/modules/ingestion/models/Metadata.py b/cognee/modules/ingestion/models/Metadata.py index ab9fe1e01..5d6333bc3 100644 --- a/cognee/modules/ingestion/models/Metadata.py +++ b/cognee/modules/ingestion/models/Metadata.py @@ -7,10 +7,11 @@ from cognee.infrastructure.databases.relational import Base class Metadata(Base): - __tablename__ = "queries" + __tablename__ = "metadata_table" id = Column(UUID, primary_key=True, default=uuid4) - metadata = Column(String) + metadata_repr = Column(String) + metadata_source = Column(String) created_at = Column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) diff --git a/cognee/modules/ingestion/operations/write_metadata.py b/cognee/modules/ingestion/operations/write_metadata.py index cefed21de..b1b40a3d4 100644 --- a/cognee/modules/ingestion/operations/write_metadata.py +++ b/cognee/modules/ingestion/operations/write_metadata.py @@ -15,7 +15,7 @@ async def write_metadata(data_item: Any) -> UUID: db_engine = get_relational_engine() async with db_engine.get_async_session() as session: metadata = Metadata( - metadata=json.dumps(metadata_dict), + metadata_repr=json.dumps(metadata_dict), metadata_source=parse_type(type(data_item)), ) session.add(metadata) From 20d721f5ca07468a40d173b9f588b413c520e80f Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Tue, 26 Nov 2024 16:47:15 +0100 Subject: [PATCH 06/17] Add metadata_id field to documents in integration tests --- cognee/tests/integration/documents/AudioDocument_test.py | 2 +- cognee/tests/integration/documents/ImageDocument_test.py | 2 +- cognee/tests/integration/documents/PdfDocument_test.py | 2 +- cognee/tests/integration/documents/TextDocument_test.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cognee/tests/integration/documents/AudioDocument_test.py b/cognee/tests/integration/documents/AudioDocument_test.py index f133ef811..a35e3892b 100644 --- a/cognee/tests/integration/documents/AudioDocument_test.py +++ b/cognee/tests/integration/documents/AudioDocument_test.py @@ -27,7 +27,7 @@ TEST_TEXT = """ def test_AudioDocument(): document = AudioDocument( - id=uuid.uuid4(), name="audio-dummy-test", raw_data_location="" + id=uuid.uuid4(), name="audio-dummy-test", raw_data_location="", metadata_id=uuid.uuid4() ) with patch.object(AudioDocument, "create_transcript", return_value=TEST_TEXT): for ground_truth, paragraph_data in zip( diff --git a/cognee/tests/integration/documents/ImageDocument_test.py b/cognee/tests/integration/documents/ImageDocument_test.py index e9caf3634..9f5952c40 100644 --- a/cognee/tests/integration/documents/ImageDocument_test.py +++ b/cognee/tests/integration/documents/ImageDocument_test.py @@ -16,7 +16,7 @@ The commotion has attracted an audience: a murder of crows has gathered in the l def test_ImageDocument(): document = ImageDocument( - id=uuid.uuid4(), name="image-dummy-test", raw_data_location="" + id=uuid.uuid4(), name="image-dummy-test", raw_data_location="", metadata_id=uuid.uuid4() ) with patch.object(ImageDocument, "transcribe_image", return_value=TEST_TEXT): diff --git a/cognee/tests/integration/documents/PdfDocument_test.py b/cognee/tests/integration/documents/PdfDocument_test.py index d8ddbe23c..fbfe236db 100644 --- a/cognee/tests/integration/documents/PdfDocument_test.py +++ b/cognee/tests/integration/documents/PdfDocument_test.py @@ -17,7 +17,7 @@ def test_PdfDocument(): "artificial-intelligence.pdf", ) document = PdfDocument( - id=uuid.uuid4(), name="Test document.pdf", raw_data_location=test_file_path + id=uuid.uuid4(), name="Test document.pdf", raw_data_location=test_file_path, metadata_id=uuid.uuid4() ) for ground_truth, paragraph_data in zip( diff --git a/cognee/tests/integration/documents/TextDocument_test.py b/cognee/tests/integration/documents/TextDocument_test.py index ef7d42272..46adee094 100644 --- a/cognee/tests/integration/documents/TextDocument_test.py +++ b/cognee/tests/integration/documents/TextDocument_test.py @@ -29,7 +29,7 @@ def test_TextDocument(input_file, chunk_size): input_file, ) document = TextDocument( - id=uuid.uuid4(), name=input_file, raw_data_location=test_file_path + id=uuid.uuid4(), name=input_file, raw_data_location=test_file_path, metadata_id=uuid.uuid4() ) for ground_truth, paragraph_data in zip( From 5b5c1ea5c6b4be3ec5853d9434564fdfc8254e6f Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Tue, 26 Nov 2024 17:53:43 +0100 Subject: [PATCH 07/17] Fix module import error --- cognee/tasks/ingestion/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cognee/tasks/ingestion/__init__.py b/cognee/tasks/ingestion/__init__.py index 56cab2756..f569267a1 100644 --- a/cognee/tasks/ingestion/__init__.py +++ b/cognee/tasks/ingestion/__init__.py @@ -2,3 +2,4 @@ from .ingest_data import ingest_data from .save_data_to_storage import save_data_to_storage from .save_data_item_to_storage import save_data_item_to_storage from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage +from .ingest_data_with_metadata import ingest_data_with_metadata From 9e93ea07946487d7b2f45177ea22ef964b87e1a1 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Wed, 27 Nov 2024 09:20:44 +0100 Subject: [PATCH 08/17] Make save_data_item_with_metadata_to_storage async --- .../ingestion/save_data_item_with_metadata_to_storage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py index 9acf0d413..6695153d0 100644 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py @@ -4,12 +4,12 @@ from cognee.modules.ingestion import save_data_to_file from cognee.modules.ingestion.operations.write_metadata import write_metadata -def save_data_item_with_metadata_to_storage( +async def save_data_item_with_metadata_to_storage( data_item: Union[BinaryIO, str, Any], dataset_name: str ) -> str: # Dynamic import is used because the llama_index module is optional. # For the same reason Any is accepted as a data item - metadata_id = write_metadata(data_item) + metadata_id = await write_metadata(data_item) # Check if data is of type Document or any of it's subclasses if str(type(data_item)).startswith("llama_index"): From 159985b5012d6d1a7381c6bde51fb090e6c0468e Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Wed, 27 Nov 2024 11:27:17 +0100 Subject: [PATCH 09/17] Remove line in README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ed4489fbf..1bb232b66 100644 --- a/README.md +++ b/README.md @@ -325,4 +325,4 @@ pip install cognee | Neo4j | Graph | Stable ✅ | | | NetworkX | Graph | Stable ✅ | | | FalkorDB | Vector/Graph | Unstable ❌ | | -| PGVector | Vector | Unstable ❌ | Postgres DB returns the Timeout error | +| PGVector | Vector | Unstable ❌ | Postgres DB returns the Timeout error | \ No newline at end of file From 80517f5117f7b66c12634d7326e814918c9600e3 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Wed, 27 Nov 2024 11:27:55 +0100 Subject: [PATCH 10/17] Revert README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1bb232b66..ed4489fbf 100644 --- a/README.md +++ b/README.md @@ -325,4 +325,4 @@ pip install cognee | Neo4j | Graph | Stable ✅ | | | NetworkX | Graph | Stable ✅ | | | FalkorDB | Vector/Graph | Unstable ❌ | | -| PGVector | Vector | Unstable ❌ | Postgres DB returns the Timeout error | \ No newline at end of file +| PGVector | Vector | Unstable ❌ | Postgres DB returns the Timeout error | From aacba555c92132ed4d781062614fe7d7b3d66657 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Wed, 27 Nov 2024 12:22:28 +0100 Subject: [PATCH 11/17] Remove passing of metadata_id to DocumentChunk --- cognee/modules/chunking/TextChunker.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cognee/modules/chunking/TextChunker.py b/cognee/modules/chunking/TextChunker.py index 24ed0b236..a98d36961 100644 --- a/cognee/modules/chunking/TextChunker.py +++ b/cognee/modules/chunking/TextChunker.py @@ -34,8 +34,7 @@ class TextChunker(): word_count = chunk_data["word_count"], is_part_of = self.document, chunk_index = self.chunk_index, - cut_type = chunk_data["cut_type"], - metadata_id = self.document.metadata_id + cut_type = chunk_data["cut_type"] ) paragraph_chunks = [] self.chunk_size = 0 @@ -48,8 +47,7 @@ class TextChunker(): word_count = self.chunk_size, is_part_of = self.document, chunk_index = self.chunk_index, - cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], - metadata_id = self.document.metadata_id + cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"] ) except Exception as e: print(e) @@ -66,8 +64,7 @@ class TextChunker(): word_count = self.chunk_size, is_part_of = self.document, chunk_index = self.chunk_index, - cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], - metadata_id = self.document.metadata_id + cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"] ) except Exception as e: print(e) From 3d5cb7644a21d8e032dddd7dfb628fb2bccba573 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Wed, 27 Nov 2024 12:50:39 +0100 Subject: [PATCH 12/17] Pass DocumentChunk metadata_id to _metadata field --- cognee/modules/chunking/TextChunker.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/cognee/modules/chunking/TextChunker.py b/cognee/modules/chunking/TextChunker.py index a98d36961..f38058b0e 100644 --- a/cognee/modules/chunking/TextChunker.py +++ b/cognee/modules/chunking/TextChunker.py @@ -34,7 +34,11 @@ class TextChunker(): word_count = chunk_data["word_count"], is_part_of = self.document, chunk_index = self.chunk_index, - cut_type = chunk_data["cut_type"] + cut_type = chunk_data["cut_type"], + _metadata = { + "index_fields": ["text"], + "metadata_id": self.document.metadata_id + } ) paragraph_chunks = [] self.chunk_size = 0 @@ -47,7 +51,11 @@ class TextChunker(): word_count = self.chunk_size, is_part_of = self.document, chunk_index = self.chunk_index, - cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"] + cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], + _metadata = { + "index_fields": ["text"], + "metadata_id": self.document.metadata_id + } ) except Exception as e: print(e) @@ -64,7 +72,11 @@ class TextChunker(): word_count = self.chunk_size, is_part_of = self.document, chunk_index = self.chunk_index, - cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"] + cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], + _metadata = { + "index_fields": ["text"], + "metadata_id": self.document.metadata_id + } ) except Exception as e: print(e) From 1679c746a3ea249562ce57dd6a0b47d7611e6268 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Wed, 27 Nov 2024 14:15:03 +0100 Subject: [PATCH 13/17] Move class and functions to data.models --- cognee/modules/data/models/Data.py | 13 +++++++++++-- .../modules/{ingestion => data}/models/Metadata.py | 5 ++++- .../operations/delete_metadata.py | 0 .../{ingestion => data}/operations/get_metadata.py | 0 .../operations/write_metadata.py | 0 cognee/tasks/ingestion/ingest_data_with_metadata.py | 2 +- .../save_data_item_with_metadata_to_storage.py | 2 +- 7 files changed, 17 insertions(+), 5 deletions(-) rename cognee/modules/{ingestion => data}/models/Metadata.py (66%) rename cognee/modules/{ingestion => data}/operations/delete_metadata.py (100%) rename cognee/modules/{ingestion => data}/operations/get_metadata.py (100%) rename cognee/modules/{ingestion => data}/operations/write_metadata.py (100%) diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index 55991541d..7899b017f 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -8,7 +8,7 @@ from sqlalchemy.orm import Mapped, relationship from cognee.infrastructure.databases.relational import Base from .DatasetData import DatasetData - +from .Metadata import Metadata class Data(Base): __tablename__ = "data" @@ -28,7 +28,7 @@ class Data(Base): DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc) ) - datasets: Mapped[List["Dataset"]] = relationship( + datasets = relationship( "Dataset", secondary=DatasetData.__tablename__, back_populates="data", @@ -36,6 +36,15 @@ class Data(Base): cascade="all, delete", ) + metadata = relationship( + "Metadata", + secondary=Metadata.__tablename__, + back_populates="data", + lazy="noload", + cascade="all, delete", + ) + + def to_json(self) -> dict: return { "id": str(self.id), diff --git a/cognee/modules/ingestion/models/Metadata.py b/cognee/modules/data/models/Metadata.py similarity index 66% rename from cognee/modules/ingestion/models/Metadata.py rename to cognee/modules/data/models/Metadata.py index 5d6333bc3..adf254941 100644 --- a/cognee/modules/ingestion/models/Metadata.py +++ b/cognee/modules/data/models/Metadata.py @@ -1,7 +1,7 @@ from datetime import datetime, timezone from uuid import uuid4 -from sqlalchemy import UUID, Column, DateTime, String +from sqlalchemy import UUID, Column, DateTime, String, ForeignKey, relationship from cognee.infrastructure.databases.relational import Base @@ -19,3 +19,6 @@ class Metadata(Base): updated_at = Column( DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc) ) + + dataset_id = Column(UUID, ForeignKey("datasets.id", ondelete="CASCADE"), primary_key = True) + data_id = Column(UUID, ForeignKey("data.id", ondelete="CASCADE"), primary_key = True) \ No newline at end of file diff --git a/cognee/modules/ingestion/operations/delete_metadata.py b/cognee/modules/data/operations/delete_metadata.py similarity index 100% rename from cognee/modules/ingestion/operations/delete_metadata.py rename to cognee/modules/data/operations/delete_metadata.py diff --git a/cognee/modules/ingestion/operations/get_metadata.py b/cognee/modules/data/operations/get_metadata.py similarity index 100% rename from cognee/modules/ingestion/operations/get_metadata.py rename to cognee/modules/data/operations/get_metadata.py diff --git a/cognee/modules/ingestion/operations/write_metadata.py b/cognee/modules/data/operations/write_metadata.py similarity index 100% rename from cognee/modules/ingestion/operations/write_metadata.py rename to cognee/modules/data/operations/write_metadata.py diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index 07f5c8115..e7068a008 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -5,7 +5,7 @@ import dlt import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.methods import create_dataset -from cognee.modules.ingestion.operations.delete_metadata import delete_metadata +from cognee.modules.data.operations.delete_metadata import delete_metadata from cognee.modules.users.models import User from cognee.modules.users.permissions.methods import give_permission_on_document from cognee.shared.utils import send_telemetry diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py index 6695153d0..11512a1c4 100644 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py @@ -1,7 +1,7 @@ from typing import Any, BinaryIO, Union from cognee.modules.ingestion import save_data_to_file -from cognee.modules.ingestion.operations.write_metadata import write_metadata +from cognee.modules.data.operations.write_metadata import write_metadata async def save_data_item_with_metadata_to_storage( From cd0e505ac00bf3432a0a23c6b0ee50415f6ed4ea Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Wed, 27 Nov 2024 15:35:23 +0100 Subject: [PATCH 14/17] WIP --- cognee/modules/data/models/Data.py | 5 +---- cognee/modules/data/models/Metadata.py | 8 +++++--- cognee/modules/data/operations/write_metadata.py | 4 ++-- cognee/tasks/documents/classify_documents.py | 9 +++++---- .../tasks/ingestion/ingest_data_with_metadata.py | 15 +++++++-------- .../save_data_item_with_metadata_to_storage.py | 5 +---- 6 files changed, 21 insertions(+), 25 deletions(-) diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index 7899b017f..f1b033dd0 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -19,8 +19,6 @@ class Data(Base): extension = Column(String) mime_type = Column(String) raw_data_location = Column(String) - metadata_id = Column(UUID) - created_at = Column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) ) @@ -36,9 +34,8 @@ class Data(Base): cascade="all, delete", ) - metadata = relationship( + metadata_relationship = relationship( "Metadata", - secondary=Metadata.__tablename__, back_populates="data", lazy="noload", cascade="all, delete", diff --git a/cognee/modules/data/models/Metadata.py b/cognee/modules/data/models/Metadata.py index adf254941..3ab30b38d 100644 --- a/cognee/modules/data/models/Metadata.py +++ b/cognee/modules/data/models/Metadata.py @@ -1,7 +1,8 @@ from datetime import datetime, timezone from uuid import uuid4 -from sqlalchemy import UUID, Column, DateTime, String, ForeignKey, relationship +from sqlalchemy import UUID, Column, DateTime, String, ForeignKey +from sqlalchemy.orm import relationship from cognee.infrastructure.databases.relational import Base @@ -20,5 +21,6 @@ class Metadata(Base): DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc) ) - dataset_id = Column(UUID, ForeignKey("datasets.id", ondelete="CASCADE"), primary_key = True) - data_id = Column(UUID, ForeignKey("data.id", ondelete="CASCADE"), primary_key = True) \ No newline at end of file + data_id = Column(UUID, ForeignKey("data.id", ondelete="CASCADE"), primary_key = False) + data = relationship("Data", back_populates="metadata_relationship") + diff --git a/cognee/modules/data/operations/write_metadata.py b/cognee/modules/data/operations/write_metadata.py index b1b40a3d4..749aed831 100644 --- a/cognee/modules/data/operations/write_metadata.py +++ b/cognee/modules/data/operations/write_metadata.py @@ -10,18 +10,18 @@ from cognee.infrastructure.databases.relational import get_relational_engine from ..models.Metadata import Metadata -async def write_metadata(data_item: Any) -> UUID: +async def write_metadata(data_item: Any, data_id: UUID) -> UUID: metadata_dict = get_metadata_dict(data_item) db_engine = get_relational_engine() async with db_engine.get_async_session() as session: metadata = Metadata( metadata_repr=json.dumps(metadata_dict), metadata_source=parse_type(type(data_item)), + data_id=data_id ) session.add(metadata) await session.commit() - return metadata.id def parse_type(type_: Any) -> str: diff --git a/cognee/tasks/documents/classify_documents.py b/cognee/tasks/documents/classify_documents.py index 599b74e17..79ad8245f 100644 --- a/cognee/tasks/documents/classify_documents.py +++ b/cognee/tasks/documents/classify_documents.py @@ -39,14 +39,15 @@ EXTENSION_TO_DOCUMENT_CLASS = { def classify_documents(data_documents: list[Data]) -> list[Document]: - documents = [ - EXTENSION_TO_DOCUMENT_CLASS[data_item.extension]( + documents = [] + for data_item in data_documents: + document = EXTENSION_TO_DOCUMENT_CLASS[data_item.extension]( id=data_item.id, title=f"{data_item.name}.{data_item.extension}", raw_data_location=data_item.raw_data_location, name=data_item.name, metadata_id=data_item.metadata_id ) - for data_item in data_documents - ] + documents.append(document) + return documents diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index e7068a008..d2c91e607 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -9,13 +9,14 @@ from cognee.modules.data.operations.delete_metadata import delete_metadata from cognee.modules.users.models import User from cognee.modules.users.permissions.methods import give_permission_on_document from cognee.shared.utils import send_telemetry - +from cognee.modules.data.operations.write_metadata import write_metadata from .get_dlt_destination import get_dlt_destination from .save_data_item_with_metadata_to_storage import ( save_data_item_with_metadata_to_storage, ) + async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): destination = get_dlt_destination() @@ -32,8 +33,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): # Process data for data_item in data: - - file_path, metadata_id = await save_data_item_with_metadata_to_storage( + file_path = await save_data_item_with_metadata_to_storage( data_item, dataset_name ) @@ -64,21 +64,20 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): data_point.raw_data_location = file_metadata["file_path"] data_point.extension = file_metadata["extension"] data_point.mime_type = file_metadata["mime_type"] - data_point.metadata_id = metadata_id await session.merge(data_point) - await session.commit() else: data_point = Data( id=data_id, name=file_metadata["name"], raw_data_location=file_metadata["file_path"], extension=file_metadata["extension"], - mime_type=file_metadata["mime_type"], - metadata_id=metadata_id, + mime_type=file_metadata["mime_type"] ) dataset.data.append(data_point) - await session.commit() + await session.commit() + await write_metadata(data_item, data_point.id) + yield { "id": data_id, diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py index 11512a1c4..c07327238 100644 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py @@ -1,7 +1,6 @@ from typing import Any, BinaryIO, Union from cognee.modules.ingestion import save_data_to_file -from cognee.modules.data.operations.write_metadata import write_metadata async def save_data_item_with_metadata_to_storage( @@ -9,8 +8,6 @@ async def save_data_item_with_metadata_to_storage( ) -> str: # Dynamic import is used because the llama_index module is optional. # For the same reason Any is accepted as a data item - metadata_id = await write_metadata(data_item) - # Check if data is of type Document or any of it's subclasses if str(type(data_item)).startswith("llama_index"): from .transform_data import get_data_from_llama_index @@ -33,4 +30,4 @@ async def save_data_item_with_metadata_to_storage( else: raise ValueError(f"Data type not supported: {type(data_item)}") - return file_path, metadata_id + return file_path From 15802237e973711db1f08b5fbd831c24360e0c2b Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Wed, 27 Nov 2024 16:27:55 +0100 Subject: [PATCH 15/17] Get metadata from metadata table --- cognee/modules/data/operations/get_metadata.py | 4 +++- cognee/modules/data/operations/write_metadata.py | 1 + cognee/tasks/documents/classify_documents.py | 6 ++++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cognee/modules/data/operations/get_metadata.py b/cognee/modules/data/operations/get_metadata.py index 9034b327d..26637e383 100644 --- a/cognee/modules/data/operations/get_metadata.py +++ b/cognee/modules/data/operations/get_metadata.py @@ -14,4 +14,6 @@ async def get_metadata(metadata_id: UUID) -> Metadata: async with db_engine.get_async_session() as session: metadata = await session.get(Metadata, metadata_id) - return json.parse(metadata) + return metadata + + diff --git a/cognee/modules/data/operations/write_metadata.py b/cognee/modules/data/operations/write_metadata.py index 749aed831..4b550a6bf 100644 --- a/cognee/modules/data/operations/write_metadata.py +++ b/cognee/modules/data/operations/write_metadata.py @@ -15,6 +15,7 @@ async def write_metadata(data_item: Any, data_id: UUID) -> UUID: db_engine = get_relational_engine() async with db_engine.get_async_session() as session: metadata = Metadata( + id=data_id, metadata_repr=json.dumps(metadata_dict), metadata_source=parse_type(type(data_item)), data_id=data_id diff --git a/cognee/tasks/documents/classify_documents.py b/cognee/tasks/documents/classify_documents.py index 79ad8245f..41ffc45bd 100644 --- a/cognee/tasks/documents/classify_documents.py +++ b/cognee/tasks/documents/classify_documents.py @@ -6,6 +6,7 @@ from cognee.modules.data.processing.document_types import ( ImageDocument, TextDocument, ) +from cognee.modules.data.operations.get_metadata import get_metadata EXTENSION_TO_DOCUMENT_CLASS = { "pdf": PdfDocument, # Text documents @@ -38,15 +39,16 @@ EXTENSION_TO_DOCUMENT_CLASS = { } -def classify_documents(data_documents: list[Data]) -> list[Document]: +async def classify_documents(data_documents: list[Data]) -> list[Document]: documents = [] for data_item in data_documents: + metadata = await get_metadata(data_item.id) document = EXTENSION_TO_DOCUMENT_CLASS[data_item.extension]( id=data_item.id, title=f"{data_item.name}.{data_item.extension}", raw_data_location=data_item.raw_data_location, name=data_item.name, - metadata_id=data_item.metadata_id + metadata_id=metadata.id ) documents.append(document) From d4e77636b5428591ec35560f316fba5264020f9a Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Wed, 27 Nov 2024 16:53:53 +0100 Subject: [PATCH 16/17] Revert spaces around args --- .../ingestion/ingest_data_with_metadata.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index d2c91e607..573e2c3c1 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -25,7 +25,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): destination=destination, ) - @dlt.resource(standalone=True, merge_key="id") + @dlt.resource(standalone = True, merge_key = "id") async def data_resources(data: Any, user: User): if not isinstance(data, list): # Convert data to a list as we work with lists further down. @@ -38,7 +38,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): ) # Ingest data and add metadata - with open(file_path.replace("file://", ""), mode="rb") as file: + with open(file_path.replace("file://", ""), mode = "rb") as file: classified_data = ingestion.classify(file) data_id = ingestion.identify(classified_data) @@ -67,11 +67,11 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): await session.merge(data_point) else: data_point = Data( - id=data_id, - name=file_metadata["name"], - raw_data_location=file_metadata["file_path"], - extension=file_metadata["extension"], - mime_type=file_metadata["mime_type"] + id = data_id, + name = file_metadata["name"], + raw_data_location = file_metadata["file_path"], + extension = file_metadata["extension"], + mime_type = file_metadata["mime_type"] ) dataset.data.append(data_point) @@ -93,9 +93,9 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id) run_info = pipeline.run( data_resources(data, user), - table_name="file_metadata", - dataset_name=dataset_name, - write_disposition="merge", + table_name = "file_metadata", + dataset_name = dataset_name, + write_disposition = "merge", ) send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id) From bc82430fb50726cec4464074c9d06fe00bea2535 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Fri, 29 Nov 2024 14:36:03 +0100 Subject: [PATCH 17/17] Merge latest COG-519 --- .../files/utils/get_file_metadata.py | 1 + .../modules/data/operations/write_metadata.py | 21 ++++++++++++------- .../ingestion/ingest_data_with_metadata.py | 3 +-- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/cognee/infrastructure/files/utils/get_file_metadata.py b/cognee/infrastructure/files/utils/get_file_metadata.py index 4aea9560e..a114ef48f 100644 --- a/cognee/infrastructure/files/utils/get_file_metadata.py +++ b/cognee/infrastructure/files/utils/get_file_metadata.py @@ -4,6 +4,7 @@ from .guess_file_type import guess_file_type class FileMetadata(TypedDict): name: str + file_path: str mime_type: str extension: str diff --git a/cognee/modules/data/operations/write_metadata.py b/cognee/modules/data/operations/write_metadata.py index 4b550a6bf..a2ea644ac 100644 --- a/cognee/modules/data/operations/write_metadata.py +++ b/cognee/modules/data/operations/write_metadata.py @@ -4,14 +4,15 @@ import re import warnings from typing import Any from uuid import UUID +from typing import Any, BinaryIO, Union from cognee.infrastructure.databases.relational import get_relational_engine - +from cognee.infrastructure.files.utils.get_file_metadata import FileMetadata from ..models.Metadata import Metadata -async def write_metadata(data_item: Any, data_id: UUID) -> UUID: - metadata_dict = get_metadata_dict(data_item) +async def write_metadata(data_item: Union[BinaryIO, str, Any], data_id: UUID, file_metadata: FileMetadata) -> UUID: + metadata_dict = get_metadata_dict(data_item, file_metadata) db_engine = get_relational_engine() async with db_engine.get_async_session() as session: metadata = Metadata( @@ -34,14 +35,18 @@ def parse_type(type_: Any) -> str: raise Exception(f"type: {type_} could not be parsed") -def get_metadata_dict(metadata: Any) -> dict[str, Any]: - if hasattr(metadata, "dict") and inspect.ismethod(getattr(metadata, "dict")): - return metadata.dict() +def get_metadata_dict(data_item: Union[BinaryIO, str, Any], file_metadata: FileMetadata) -> dict[str, Any]: + if isinstance(data_item, str): + return(file_metadata) + elif isinstance(data_item, BinaryIO): + return(file_metadata) + elif hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")): + return {**file_metadata, **data_item.dict()} else: warnings.warn( - f"metadata of type {type(metadata)}: {str(metadata)[:20]}... does not have dict method. Defaulting to string method" + f"metadata of type {type(data_item)}: {str(data_item)[:20]}... does not have dict method. Defaulting to string method" ) try: - return {"content": str(metadata)} + return {**dict(file_metadata), "content": str(data_item)} except Exception as e: raise Exception(f"Could not cast metadata to string: {e}") diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index 573e2c3c1..0c17b71f5 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -1,7 +1,6 @@ from typing import Any import dlt - import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.methods import create_dataset @@ -76,7 +75,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): dataset.data.append(data_point) await session.commit() - await write_metadata(data_item, data_point.id) + await write_metadata(data_item, data_point.id, file_metadata) yield {