Add Metadata table and read write delete functions

This commit is contained in:
Leon Luithlen 2024-11-26 16:10:03 +01:00
parent 2408fd7a01
commit c5f3314c85
7 changed files with 94 additions and 6 deletions

View file

@ -15,6 +15,7 @@ class Data(Base):
extension = Column(String) extension = Column(String)
mime_type = Column(String) mime_type = Column(String)
raw_data_location = Column(String) raw_data_location = Column(String)
metadata_id = Column(UUID)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))

View file

@ -0,0 +1,13 @@
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, String, UUID
from cognee.infrastructure.databases.relational import Base
class Metadata(Base):
__tablename__ = "queries"
id = Column(UUID, primary_key = True, default = uuid4)
metadata = Column(String)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))

View file

@ -0,0 +1,16 @@
import warnings
from uuid import UUID
from sqlalchemy import select
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models.Metadata import Metadata
async def delete_metadata(metadata_id: UUID):
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
metadata = await session.get(Metadata, metadata_id)
if metadata is None:
warnings.warn(f"metadata for metadata_id: {metadata_id} not found")
session.delete(metadata)
session.commit()

View file

@ -0,0 +1,13 @@
import json
from uuid import UUID
from sqlalchemy import select
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models.Metadata import Metadata
async def get_metadata(metadata_id: UUID) -> Metadata:
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
metadata = await session.get(Metadata, metadata_id)
return json.parse(metadata)

View file

@ -0,0 +1,41 @@
import json
import inspect
import warnings
import re
from typing import Any
from uuid import UUID
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models.Metadata import Metadata
async def write_metadata(data_item: Any) -> UUID:
metadata_dict = get_metadata_dict(data_item)
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
metadata = Metadata(
metadata = json.dumps(metadata_dict),
metadata_source = parse_type(type(data_item))
)
session.add(metadata)
await session.commit()
return metadata.id
def parse_type(type_: Any) -> str:
pattern = r".+'([\w_\.]+)'"
match = re.search(pattern, str(type_))
if match:
return(match.group(1))
else:
raise Exception(f"type: {type_} could not be parsed")
def get_metadata_dict(metadata: Any) -> dict[str, Any]:
if hasattr(metadata, "dict") and inspect.ismethod(getattr(metadata, "dict")):
return(metadata.dict())
else:
warnings.warn(f"metadata of type {type(metadata)}: {str(metadata)[:20]}... does not have dict method. Defaulting to string method")
try:
return({"content": str(metadata)})
except Exception as e:
raise Exception(f"Could not cast metadata to string: {e}")

View file

@ -8,6 +8,7 @@ from cognee.modules.data.methods import create_dataset
from cognee.modules.users.permissions.methods import give_permission_on_document from cognee.modules.users.permissions.methods import give_permission_on_document
from .get_dlt_destination import get_dlt_destination from .get_dlt_destination import get_dlt_destination
from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage
from cognee.modules.ingestion.operations.delete_metadata import delete_metadata
async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
destination = get_dlt_destination() destination = get_dlt_destination()
@ -26,7 +27,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
# Process data # Process data
for data_item in data: for data_item in data:
file_path = save_data_item_with_metadata_to_storage(data_item, dataset_name) file_path, metadata_id = await save_data_item_with_metadata_to_storage(data_item, dataset_name)
# Ingest data and add metadata # Ingest data and add metadata
with open(file_path.replace("file://", ""), mode = "rb") as file: with open(file_path.replace("file://", ""), mode = "rb") as file:
@ -49,11 +50,12 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
)).scalar_one_or_none() )).scalar_one_or_none()
if data_point is not None: if data_point is not None:
await delete_metadata(data_point.metadata_id)
data_point.name = file_metadata["name"] data_point.name = file_metadata["name"]
data_point.raw_data_location = file_metadata["file_path"] data_point.raw_data_location = file_metadata["file_path"]
data_point.extension = file_metadata["extension"] data_point.extension = file_metadata["extension"]
data_point.mime_type = file_metadata["mime_type"] data_point.mime_type = file_metadata["mime_type"]
data_point.metadata_id = metadata_id
await session.merge(data_point) await session.merge(data_point)
await session.commit() await session.commit()
else: else:
@ -63,6 +65,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
raw_data_location = file_metadata["file_path"], raw_data_location = file_metadata["file_path"],
extension = file_metadata["extension"], extension = file_metadata["extension"],
mime_type = file_metadata["mime_type"], mime_type = file_metadata["mime_type"],
metadata_id = metadata_id
) )
dataset.data.append(data_point) dataset.data.append(data_point)

View file

@ -1,14 +1,15 @@
from typing import Union, BinaryIO, Any from typing import Union, BinaryIO, Any
from cognee.modules.ingestion import save_data_to_file from cognee.modules.ingestion import save_data_to_file
from cognee.modules.ingestion.operations.write_metadata import write_metadata
def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str: def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str:
# Dynamic import is used because the llama_index module is optional. # Dynamic import is used because the llama_index module is optional.
# For the same reason Any is accepted as a data item # For the same reason Any is accepted as a data item
from llama_index.core import Document metadata_id = write_metadata(data_item)
from .transform_data import get_data_from_llama_index
# Check if data is of type Document or any of it's subclasses # Check if data is of type Document or any of it's subclasses
if isinstance(data_item, Document): if str(type(data_item)).startswith("llama_index"):
from .transform_data import get_data_from_llama_index
file_path = get_data_from_llama_index(data_item, dataset_name) file_path = get_data_from_llama_index(data_item, dataset_name)
# data is a file object coming from upload. # data is a file object coming from upload.
@ -25,4 +26,4 @@ def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any]
else: else:
raise ValueError(f"Data type not supported: {type(data_item)}") raise ValueError(f"Data type not supported: {type(data_item)}")
return file_path return file_path, metadata_id