diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index 7899b017f..f1b033dd0 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -19,8 +19,6 @@ class Data(Base): extension = Column(String) mime_type = Column(String) raw_data_location = Column(String) - metadata_id = Column(UUID) - created_at = Column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) ) @@ -36,9 +34,8 @@ class Data(Base): cascade="all, delete", ) - metadata = relationship( + metadata_relationship = relationship( "Metadata", - secondary=Metadata.__tablename__, back_populates="data", lazy="noload", cascade="all, delete", diff --git a/cognee/modules/data/models/Metadata.py b/cognee/modules/data/models/Metadata.py index adf254941..3ab30b38d 100644 --- a/cognee/modules/data/models/Metadata.py +++ b/cognee/modules/data/models/Metadata.py @@ -1,7 +1,8 @@ from datetime import datetime, timezone from uuid import uuid4 -from sqlalchemy import UUID, Column, DateTime, String, ForeignKey, relationship +from sqlalchemy import UUID, Column, DateTime, String, ForeignKey +from sqlalchemy.orm import relationship from cognee.infrastructure.databases.relational import Base @@ -20,5 +21,6 @@ class Metadata(Base): DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc) ) - dataset_id = Column(UUID, ForeignKey("datasets.id", ondelete="CASCADE"), primary_key = True) - data_id = Column(UUID, ForeignKey("data.id", ondelete="CASCADE"), primary_key = True) \ No newline at end of file + data_id = Column(UUID, ForeignKey("data.id", ondelete="CASCADE"), primary_key = False) + data = relationship("Data", back_populates="metadata_relationship") + diff --git a/cognee/modules/data/operations/write_metadata.py b/cognee/modules/data/operations/write_metadata.py index b1b40a3d4..749aed831 100644 --- a/cognee/modules/data/operations/write_metadata.py +++ b/cognee/modules/data/operations/write_metadata.py @@ -10,18 +10,18 @@ from cognee.infrastructure.databases.relational import get_relational_engine from ..models.Metadata import Metadata -async def write_metadata(data_item: Any) -> UUID: +async def write_metadata(data_item: Any, data_id: UUID) -> UUID: metadata_dict = get_metadata_dict(data_item) db_engine = get_relational_engine() async with db_engine.get_async_session() as session: metadata = Metadata( metadata_repr=json.dumps(metadata_dict), metadata_source=parse_type(type(data_item)), + data_id=data_id ) session.add(metadata) await session.commit() - return metadata.id def parse_type(type_: Any) -> str: diff --git a/cognee/tasks/documents/classify_documents.py b/cognee/tasks/documents/classify_documents.py index 599b74e17..79ad8245f 100644 --- a/cognee/tasks/documents/classify_documents.py +++ b/cognee/tasks/documents/classify_documents.py @@ -39,14 +39,15 @@ EXTENSION_TO_DOCUMENT_CLASS = { def classify_documents(data_documents: list[Data]) -> list[Document]: - documents = [ - EXTENSION_TO_DOCUMENT_CLASS[data_item.extension]( + documents = [] + for data_item in data_documents: + document = EXTENSION_TO_DOCUMENT_CLASS[data_item.extension]( id=data_item.id, title=f"{data_item.name}.{data_item.extension}", raw_data_location=data_item.raw_data_location, name=data_item.name, metadata_id=data_item.metadata_id ) - for data_item in data_documents - ] + documents.append(document) + return documents diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index e7068a008..d2c91e607 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -9,13 +9,14 @@ from cognee.modules.data.operations.delete_metadata import delete_metadata from cognee.modules.users.models import User from cognee.modules.users.permissions.methods import give_permission_on_document from cognee.shared.utils import send_telemetry - +from cognee.modules.data.operations.write_metadata import write_metadata from .get_dlt_destination import get_dlt_destination from .save_data_item_with_metadata_to_storage import ( save_data_item_with_metadata_to_storage, ) + async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): destination = get_dlt_destination() @@ -32,8 +33,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): # Process data for data_item in data: - - file_path, metadata_id = await save_data_item_with_metadata_to_storage( + file_path = await save_data_item_with_metadata_to_storage( data_item, dataset_name ) @@ -64,21 +64,20 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): data_point.raw_data_location = file_metadata["file_path"] data_point.extension = file_metadata["extension"] data_point.mime_type = file_metadata["mime_type"] - data_point.metadata_id = metadata_id await session.merge(data_point) - await session.commit() else: data_point = Data( id=data_id, name=file_metadata["name"], raw_data_location=file_metadata["file_path"], extension=file_metadata["extension"], - mime_type=file_metadata["mime_type"], - metadata_id=metadata_id, + mime_type=file_metadata["mime_type"] ) dataset.data.append(data_point) - await session.commit() + await session.commit() + await write_metadata(data_item, data_point.id) + yield { "id": data_id, diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py index 11512a1c4..c07327238 100644 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py @@ -1,7 +1,6 @@ from typing import Any, BinaryIO, Union from cognee.modules.ingestion import save_data_to_file -from cognee.modules.data.operations.write_metadata import write_metadata async def save_data_item_with_metadata_to_storage( @@ -9,8 +8,6 @@ async def save_data_item_with_metadata_to_storage( ) -> str: # Dynamic import is used because the llama_index module is optional. # For the same reason Any is accepted as a data item - metadata_id = await write_metadata(data_item) - # Check if data is of type Document or any of it's subclasses if str(type(data_item)).startswith("llama_index"): from .transform_data import get_data_from_llama_index @@ -33,4 +30,4 @@ async def save_data_item_with_metadata_to_storage( else: raise ValueError(f"Data type not supported: {type(data_item)}") - return file_path, metadata_id + return file_path