From c5f3314c856a0ac41ec4200d83f23797c41ccfd9 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Tue, 26 Nov 2024 16:10:03 +0100 Subject: [PATCH] Add Metadata table and read write delete functions --- cognee/modules/data/models/Data.py | 1 + cognee/modules/ingestion/models/metadata.py | 13 ++++++ .../ingestion/operations/delete_metadata.py | 16 ++++++++ .../ingestion/operations/get_metadata.py | 13 ++++++ .../ingestion/operations/write_metadata.py | 41 +++++++++++++++++++ .../ingestion/ingest_data_with_metadata.py | 7 +++- ...save_data_item_with_metadata_to_storage.py | 9 ++-- 7 files changed, 94 insertions(+), 6 deletions(-) create mode 100644 cognee/modules/ingestion/models/metadata.py create mode 100644 cognee/modules/ingestion/operations/delete_metadata.py create mode 100644 cognee/modules/ingestion/operations/get_metadata.py create mode 100644 cognee/modules/ingestion/operations/write_metadata.py diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index 2e9745600..6601580eb 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -15,6 +15,7 @@ class Data(Base): extension = Column(String) mime_type = Column(String) raw_data_location = Column(String) + metadata_id = Column(UUID) created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc)) diff --git a/cognee/modules/ingestion/models/metadata.py b/cognee/modules/ingestion/models/metadata.py new file mode 100644 index 000000000..29d6cfaf2 --- /dev/null +++ b/cognee/modules/ingestion/models/metadata.py @@ -0,0 +1,13 @@ +from uuid import uuid4 +from datetime import datetime, timezone +from sqlalchemy import Column, DateTime, String, UUID +from cognee.infrastructure.databases.relational import Base + +class Metadata(Base): + __tablename__ = "queries" + + id = Column(UUID, primary_key = True, default = uuid4) + metadata = Column(String) + + created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) + updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc)) diff --git a/cognee/modules/ingestion/operations/delete_metadata.py b/cognee/modules/ingestion/operations/delete_metadata.py new file mode 100644 index 000000000..dc6bcd85f --- /dev/null +++ b/cognee/modules/ingestion/operations/delete_metadata.py @@ -0,0 +1,16 @@ +import warnings +from uuid import UUID +from sqlalchemy import select +from cognee.infrastructure.databases.relational import get_relational_engine +from ..models.Metadata import Metadata + + +async def delete_metadata(metadata_id: UUID): + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + metadata = await session.get(Metadata, metadata_id) + if metadata is None: + warnings.warn(f"metadata for metadata_id: {metadata_id} not found") + + session.delete(metadata) + session.commit() \ No newline at end of file diff --git a/cognee/modules/ingestion/operations/get_metadata.py b/cognee/modules/ingestion/operations/get_metadata.py new file mode 100644 index 000000000..047bc4906 --- /dev/null +++ b/cognee/modules/ingestion/operations/get_metadata.py @@ -0,0 +1,13 @@ +import json +from uuid import UUID +from sqlalchemy import select +from cognee.infrastructure.databases.relational import get_relational_engine +from ..models.Metadata import Metadata + +async def get_metadata(metadata_id: UUID) -> Metadata: + db_engine = get_relational_engine() + + async with db_engine.get_async_session() as session: + metadata = await session.get(Metadata, metadata_id) + + return json.parse(metadata) diff --git a/cognee/modules/ingestion/operations/write_metadata.py b/cognee/modules/ingestion/operations/write_metadata.py new file mode 100644 index 000000000..b97c3cf85 --- /dev/null +++ b/cognee/modules/ingestion/operations/write_metadata.py @@ -0,0 +1,41 @@ +import json +import inspect +import warnings +import re +from typing import Any + +from uuid import UUID +from cognee.infrastructure.databases.relational import get_relational_engine +from ..models.Metadata import Metadata + +async def write_metadata(data_item: Any) -> UUID: + metadata_dict = get_metadata_dict(data_item) + db_engine = get_relational_engine() + async with db_engine.get_async_session() as session: + metadata = Metadata( + metadata = json.dumps(metadata_dict), + metadata_source = parse_type(type(data_item)) + ) + session.add(metadata) + await session.commit() + + return metadata.id + +def parse_type(type_: Any) -> str: + pattern = r".+'([\w_\.]+)'" + match = re.search(pattern, str(type_)) + if match: + return(match.group(1)) + else: + raise Exception(f"type: {type_} could not be parsed") + + +def get_metadata_dict(metadata: Any) -> dict[str, Any]: + if hasattr(metadata, "dict") and inspect.ismethod(getattr(metadata, "dict")): + return(metadata.dict()) + else: + warnings.warn(f"metadata of type {type(metadata)}: {str(metadata)[:20]}... does not have dict method. Defaulting to string method") + try: + return({"content": str(metadata)}) + except Exception as e: + raise Exception(f"Could not cast metadata to string: {e}") \ No newline at end of file diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index e5a50c13b..577dca57e 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -8,6 +8,7 @@ from cognee.modules.data.methods import create_dataset from cognee.modules.users.permissions.methods import give_permission_on_document from .get_dlt_destination import get_dlt_destination from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage +from cognee.modules.ingestion.operations.delete_metadata import delete_metadata async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): destination = get_dlt_destination() @@ -26,7 +27,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): # Process data for data_item in data: - file_path = save_data_item_with_metadata_to_storage(data_item, dataset_name) + file_path, metadata_id = await save_data_item_with_metadata_to_storage(data_item, dataset_name) # Ingest data and add metadata with open(file_path.replace("file://", ""), mode = "rb") as file: @@ -49,11 +50,12 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): )).scalar_one_or_none() if data_point is not None: + await delete_metadata(data_point.metadata_id) data_point.name = file_metadata["name"] data_point.raw_data_location = file_metadata["file_path"] data_point.extension = file_metadata["extension"] data_point.mime_type = file_metadata["mime_type"] - + data_point.metadata_id = metadata_id await session.merge(data_point) await session.commit() else: @@ -63,6 +65,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): raw_data_location = file_metadata["file_path"], extension = file_metadata["extension"], mime_type = file_metadata["mime_type"], + metadata_id = metadata_id ) dataset.data.append(data_point) diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py index ec29edb89..108bcd21b 100644 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py @@ -1,14 +1,15 @@ from typing import Union, BinaryIO, Any from cognee.modules.ingestion import save_data_to_file +from cognee.modules.ingestion.operations.write_metadata import write_metadata def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str: # Dynamic import is used because the llama_index module is optional. # For the same reason Any is accepted as a data item - from llama_index.core import Document - from .transform_data import get_data_from_llama_index + metadata_id = write_metadata(data_item) # Check if data is of type Document or any of it's subclasses - if isinstance(data_item, Document): + if str(type(data_item)).startswith("llama_index"): + from .transform_data import get_data_from_llama_index file_path = get_data_from_llama_index(data_item, dataset_name) # data is a file object coming from upload. @@ -25,4 +26,4 @@ def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any] else: raise ValueError(f"Data type not supported: {type(data_item)}") - return file_path \ No newline at end of file + return file_path, metadata_id \ No newline at end of file