diff --git a/cognee/api/v1/add/add_v2.py b/cognee/api/v1/add/add_v2.py index 9d6e33012..631d963e5 100644 --- a/cognee/api/v1/add/add_v2.py +++ b/cognee/api/v1/add/add_v2.py @@ -2,7 +2,7 @@ from typing import Union, BinaryIO from cognee.modules.users.models import User from cognee.modules.users.methods import get_default_user from cognee.modules.pipelines import run_tasks, Task -from cognee.tasks.ingestion import save_data_to_storage, ingest_data +from cognee.tasks.ingestion import ingest_data_with_metadata from cognee.infrastructure.databases.relational import create_db_and_tables as create_relational_db_and_tables from cognee.infrastructure.databases.vector.pgvector import create_db_and_tables as create_pgvector_db_and_tables @@ -14,8 +14,7 @@ async def add(data: Union[BinaryIO, list[BinaryIO], str, list[str]], dataset_nam user = await get_default_user() tasks = [ - Task(save_data_to_storage, dataset_name), - Task(ingest_data, dataset_name, user) + Task(ingest_data_with_metadata, dataset_name, user) ] pipeline = run_tasks(tasks, data, "add_pipeline") diff --git a/cognee/infrastructure/files/utils/get_file_metadata.py b/cognee/infrastructure/files/utils/get_file_metadata.py index 4aea9560e..a114ef48f 100644 --- a/cognee/infrastructure/files/utils/get_file_metadata.py +++ b/cognee/infrastructure/files/utils/get_file_metadata.py @@ -4,6 +4,7 @@ from .guess_file_type import guess_file_type class FileMetadata(TypedDict): name: str + file_path: str mime_type: str extension: str diff --git a/cognee/modules/chunking/TextChunker.py b/cognee/modules/chunking/TextChunker.py index f0a72b58a..f38058b0e 100644 --- a/cognee/modules/chunking/TextChunker.py +++ b/cognee/modules/chunking/TextChunker.py @@ -35,6 +35,10 @@ class TextChunker(): is_part_of = self.document, chunk_index = self.chunk_index, cut_type = chunk_data["cut_type"], + _metadata = { + "index_fields": ["text"], + "metadata_id": self.document.metadata_id + } ) paragraph_chunks = [] self.chunk_size = 0 @@ -48,6 +52,10 @@ class TextChunker(): is_part_of = self.document, chunk_index = self.chunk_index, cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], + _metadata = { + "index_fields": ["text"], + "metadata_id": self.document.metadata_id + } ) except Exception as e: print(e) @@ -65,6 +73,10 @@ class TextChunker(): is_part_of = self.document, chunk_index = self.chunk_index, cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], + _metadata = { + "index_fields": ["text"], + "metadata_id": self.document.metadata_id + } ) except Exception as e: print(e) diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index 2e9745600..f1b033dd0 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -1,31 +1,46 @@ -from uuid import uuid4 -from typing import List from datetime import datetime, timezone -from sqlalchemy.orm import relationship, Mapped -from sqlalchemy import Column, String, DateTime, UUID +from typing import List +from uuid import uuid4 + +from sqlalchemy import UUID, Column, DateTime, String +from sqlalchemy.orm import Mapped, relationship + from cognee.infrastructure.databases.relational import Base + from .DatasetData import DatasetData +from .Metadata import Metadata class Data(Base): __tablename__ = "data" - id = Column(UUID, primary_key = True, default = uuid4) + id = Column(UUID, primary_key=True, default=uuid4) name = Column(String) extension = Column(String) mime_type = Column(String) raw_data_location = Column(String) - - created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) - updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc)) - - datasets: Mapped[List["Dataset"]] = relationship( - "Dataset", - secondary = DatasetData.__tablename__, - back_populates = "data", - lazy = "noload", - cascade="all, delete" + created_at = Column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) ) + updated_at = Column( + DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc) + ) + + datasets = relationship( + "Dataset", + secondary=DatasetData.__tablename__, + back_populates="data", + lazy="noload", + cascade="all, delete", + ) + + metadata_relationship = relationship( + "Metadata", + back_populates="data", + lazy="noload", + cascade="all, delete", + ) + def to_json(self) -> dict: return { diff --git a/cognee/modules/data/models/Metadata.py b/cognee/modules/data/models/Metadata.py new file mode 100644 index 000000000..3ab30b38d --- /dev/null +++ b/cognee/modules/data/models/Metadata.py @@ -0,0 +1,26 @@ +from datetime import datetime, timezone +from uuid import uuid4 + +from sqlalchemy import UUID, Column, DateTime, String, ForeignKey +from sqlalchemy.orm import relationship + +from cognee.infrastructure.databases.relational import Base + + +class Metadata(Base): + __tablename__ = "metadata_table" + + id = Column(UUID, primary_key=True, default=uuid4) + metadata_repr = Column(String) + metadata_source = Column(String) + + created_at = Column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) + updated_at = Column( + DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc) + ) + + data_id = Column(UUID, ForeignKey("data.id", ondelete="CASCADE"), primary_key = False) + data = relationship("Data", back_populates="metadata_relationship") + diff --git a/cognee/modules/data/operations/delete_metadata.py b/cognee/modules/data/operations/delete_metadata.py new file mode 100644 index 000000000..df94f52ed --- /dev/null +++ b/cognee/modules/data/operations/delete_metadata.py @@ -0,0 +1,19 @@ +import warnings +from uuid import UUID + +from sqlalchemy import select + +from cognee.infrastructure.databases.relational import get_relational_engine + +from ..models.Metadata import Metadata + + +async def delete_metadata(metadata_id: UUID): + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + metadata = await session.get(Metadata, metadata_id) + if metadata is None: + warnings.warn(f"metadata for metadata_id: {metadata_id} not found") + + session.delete(metadata) + session.commit() diff --git a/cognee/modules/data/operations/get_metadata.py b/cognee/modules/data/operations/get_metadata.py new file mode 100644 index 000000000..26637e383 --- /dev/null +++ b/cognee/modules/data/operations/get_metadata.py @@ -0,0 +1,19 @@ +import json +from uuid import UUID + +from sqlalchemy import select + +from cognee.infrastructure.databases.relational import get_relational_engine + +from ..models.Metadata import Metadata + + +async def get_metadata(metadata_id: UUID) -> Metadata: + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + metadata = await session.get(Metadata, metadata_id) + + return metadata + + diff --git a/cognee/modules/data/operations/write_metadata.py b/cognee/modules/data/operations/write_metadata.py new file mode 100644 index 000000000..a2ea644ac --- /dev/null +++ b/cognee/modules/data/operations/write_metadata.py @@ -0,0 +1,52 @@ +import inspect +import json +import re +import warnings +from typing import Any +from uuid import UUID +from typing import Any, BinaryIO, Union + +from cognee.infrastructure.databases.relational import get_relational_engine +from cognee.infrastructure.files.utils.get_file_metadata import FileMetadata +from ..models.Metadata import Metadata + + +async def write_metadata(data_item: Union[BinaryIO, str, Any], data_id: UUID, file_metadata: FileMetadata) -> UUID: + metadata_dict = get_metadata_dict(data_item, file_metadata) + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + metadata = Metadata( + id=data_id, + metadata_repr=json.dumps(metadata_dict), + metadata_source=parse_type(type(data_item)), + data_id=data_id + ) + session.add(metadata) + await session.commit() + + + +def parse_type(type_: Any) -> str: + pattern = r".+'([\w_\.]+)'" + match = re.search(pattern, str(type_)) + if match: + return match.group(1) + else: + raise Exception(f"type: {type_} could not be parsed") + + +def get_metadata_dict(data_item: Union[BinaryIO, str, Any], file_metadata: FileMetadata) -> dict[str, Any]: + if isinstance(data_item, str): + return(file_metadata) + elif isinstance(data_item, BinaryIO): + return(file_metadata) + elif hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")): + return {**file_metadata, **data_item.dict()} + else: + warnings.warn( + f"metadata of type {type(data_item)}: {str(data_item)[:20]}... does not have dict method. Defaulting to string method" + ) + try: + return {**dict(file_metadata), "content": str(data_item)} + except Exception as e: + raise Exception(f"Could not cast metadata to string: {e}") diff --git a/cognee/modules/data/processing/document_types/Document.py b/cognee/modules/data/processing/document_types/Document.py index 7d5545cfc..773fc30c8 100644 --- a/cognee/modules/data/processing/document_types/Document.py +++ b/cognee/modules/data/processing/document_types/Document.py @@ -1,9 +1,11 @@ from cognee.infrastructure.engine import DataPoint +from uuid import UUID class Document(DataPoint): type: str name: str raw_data_location: str + metadata_id: UUID def read(self, chunk_size: int) -> str: pass diff --git a/cognee/tasks/documents/classify_documents.py b/cognee/tasks/documents/classify_documents.py index 8ee87bcad..41ffc45bd 100644 --- a/cognee/tasks/documents/classify_documents.py +++ b/cognee/tasks/documents/classify_documents.py @@ -6,6 +6,7 @@ from cognee.modules.data.processing.document_types import ( ImageDocument, TextDocument, ) +from cognee.modules.data.operations.get_metadata import get_metadata EXTENSION_TO_DOCUMENT_CLASS = { "pdf": PdfDocument, # Text documents @@ -38,14 +39,17 @@ EXTENSION_TO_DOCUMENT_CLASS = { } -def classify_documents(data_documents: list[Data]) -> list[Document]: - documents = [ - EXTENSION_TO_DOCUMENT_CLASS[data_item.extension]( +async def classify_documents(data_documents: list[Data]) -> list[Document]: + documents = [] + for data_item in data_documents: + metadata = await get_metadata(data_item.id) + document = EXTENSION_TO_DOCUMENT_CLASS[data_item.extension]( id=data_item.id, title=f"{data_item.name}.{data_item.extension}", raw_data_location=data_item.raw_data_location, name=data_item.name, + metadata_id=metadata.id ) - for data_item in data_documents - ] + documents.append(document) + return documents diff --git a/cognee/tasks/ingestion/__init__.py b/cognee/tasks/ingestion/__init__.py index 56cab2756..f569267a1 100644 --- a/cognee/tasks/ingestion/__init__.py +++ b/cognee/tasks/ingestion/__init__.py @@ -2,3 +2,4 @@ from .ingest_data import ingest_data from .save_data_to_storage import save_data_to_storage from .save_data_item_to_storage import save_data_item_to_storage from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage +from .ingest_data_with_metadata import ingest_data_with_metadata diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index e5a50c13b..0c17b71f5 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -1,20 +1,27 @@ +from typing import Any + import dlt import cognee.modules.ingestion as ingestion -from typing import Any -from cognee.shared.utils import send_telemetry -from cognee.modules.users.models import User from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.methods import create_dataset +from cognee.modules.data.operations.delete_metadata import delete_metadata +from cognee.modules.users.models import User from cognee.modules.users.permissions.methods import give_permission_on_document +from cognee.shared.utils import send_telemetry +from cognee.modules.data.operations.write_metadata import write_metadata from .get_dlt_destination import get_dlt_destination -from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage +from .save_data_item_with_metadata_to_storage import ( + save_data_item_with_metadata_to_storage, +) + + async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): destination = get_dlt_destination() pipeline = dlt.pipeline( - pipeline_name = "file_load_from_filesystem", - destination = destination, + pipeline_name="file_load_from_filesystem", + destination=destination, ) @dlt.resource(standalone = True, merge_key = "id") @@ -25,8 +32,9 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): # Process data for data_item in data: - - file_path = save_data_item_with_metadata_to_storage(data_item, dataset_name) + file_path = await save_data_item_with_metadata_to_storage( + data_item, dataset_name + ) # Ingest data and add metadata with open(file_path.replace("file://", ""), mode = "rb") as file: @@ -37,6 +45,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): file_metadata = classified_data.get_metadata() from sqlalchemy import select + from cognee.modules.data.models import Data db_engine = get_relational_engine() @@ -44,29 +53,30 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): async with db_engine.get_async_session() as session: dataset = await create_dataset(dataset_name, user.id, session) - data_point = (await session.execute( - select(Data).filter(Data.id == data_id) - )).scalar_one_or_none() + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() if data_point is not None: + await delete_metadata(data_point.metadata_id) data_point.name = file_metadata["name"] data_point.raw_data_location = file_metadata["file_path"] data_point.extension = file_metadata["extension"] data_point.mime_type = file_metadata["mime_type"] - await session.merge(data_point) - await session.commit() else: data_point = Data( id = data_id, name = file_metadata["name"], raw_data_location = file_metadata["file_path"], extension = file_metadata["extension"], - mime_type = file_metadata["mime_type"], + mime_type = file_metadata["mime_type"] ) dataset.data.append(data_point) - await session.commit() + await session.commit() + await write_metadata(data_item, data_point.id, file_metadata) + yield { "id": data_id, @@ -79,14 +89,13 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): await give_permission_on_document(user, data_id, "read") await give_permission_on_document(user, data_id, "write") - - send_telemetry("cognee.add EXECUTION STARTED", user_id = user.id) + send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id) run_info = pipeline.run( data_resources(data, user), table_name = "file_metadata", dataset_name = dataset_name, write_disposition = "merge", ) - send_telemetry("cognee.add EXECUTION COMPLETED", user_id = user.id) + send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id) return run_info diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py index ec29edb89..c07327238 100644 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py @@ -1,19 +1,24 @@ -from typing import Union, BinaryIO, Any +from typing import Any, BinaryIO, Union + from cognee.modules.ingestion import save_data_to_file -def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str: - # Dynamic import is used because the llama_index module is optional. - # For the same reason Any is accepted as a data item - from llama_index.core import Document - from .transform_data import get_data_from_llama_index +async def save_data_item_with_metadata_to_storage( + data_item: Union[BinaryIO, str, Any], dataset_name: str +) -> str: + # Dynamic import is used because the llama_index module is optional. + # For the same reason Any is accepted as a data item # Check if data is of type Document or any of it's subclasses - if isinstance(data_item, Document): + if str(type(data_item)).startswith("llama_index"): + from .transform_data import get_data_from_llama_index + file_path = get_data_from_llama_index(data_item, dataset_name) # data is a file object coming from upload. elif hasattr(data_item, "file"): - file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename) + file_path = save_data_to_file( + data_item.file, dataset_name, filename=data_item.filename + ) elif isinstance(data_item, str): # data is a file path @@ -25,4 +30,4 @@ def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any] else: raise ValueError(f"Data type not supported: {type(data_item)}") - return file_path \ No newline at end of file + return file_path diff --git a/cognee/tests/integration/documents/AudioDocument_test.py b/cognee/tests/integration/documents/AudioDocument_test.py index f133ef811..a35e3892b 100644 --- a/cognee/tests/integration/documents/AudioDocument_test.py +++ b/cognee/tests/integration/documents/AudioDocument_test.py @@ -27,7 +27,7 @@ TEST_TEXT = """ def test_AudioDocument(): document = AudioDocument( - id=uuid.uuid4(), name="audio-dummy-test", raw_data_location="" + id=uuid.uuid4(), name="audio-dummy-test", raw_data_location="", metadata_id=uuid.uuid4() ) with patch.object(AudioDocument, "create_transcript", return_value=TEST_TEXT): for ground_truth, paragraph_data in zip( diff --git a/cognee/tests/integration/documents/ImageDocument_test.py b/cognee/tests/integration/documents/ImageDocument_test.py index e9caf3634..9f5952c40 100644 --- a/cognee/tests/integration/documents/ImageDocument_test.py +++ b/cognee/tests/integration/documents/ImageDocument_test.py @@ -16,7 +16,7 @@ The commotion has attracted an audience: a murder of crows has gathered in the l def test_ImageDocument(): document = ImageDocument( - id=uuid.uuid4(), name="image-dummy-test", raw_data_location="" + id=uuid.uuid4(), name="image-dummy-test", raw_data_location="", metadata_id=uuid.uuid4() ) with patch.object(ImageDocument, "transcribe_image", return_value=TEST_TEXT): diff --git a/cognee/tests/integration/documents/PdfDocument_test.py b/cognee/tests/integration/documents/PdfDocument_test.py index d8ddbe23c..fbfe236db 100644 --- a/cognee/tests/integration/documents/PdfDocument_test.py +++ b/cognee/tests/integration/documents/PdfDocument_test.py @@ -17,7 +17,7 @@ def test_PdfDocument(): "artificial-intelligence.pdf", ) document = PdfDocument( - id=uuid.uuid4(), name="Test document.pdf", raw_data_location=test_file_path + id=uuid.uuid4(), name="Test document.pdf", raw_data_location=test_file_path, metadata_id=uuid.uuid4() ) for ground_truth, paragraph_data in zip( diff --git a/cognee/tests/integration/documents/TextDocument_test.py b/cognee/tests/integration/documents/TextDocument_test.py index ef7d42272..46adee094 100644 --- a/cognee/tests/integration/documents/TextDocument_test.py +++ b/cognee/tests/integration/documents/TextDocument_test.py @@ -29,7 +29,7 @@ def test_TextDocument(input_file, chunk_size): input_file, ) document = TextDocument( - id=uuid.uuid4(), name=input_file, raw_data_location=test_file_path + id=uuid.uuid4(), name=input_file, raw_data_location=test_file_path, metadata_id=uuid.uuid4() ) for ground_truth, paragraph_data in zip(