This commit is contained in:
Leon Luithlen 2024-11-27 15:35:23 +01:00
parent 1679c746a3
commit cd0e505ac0
6 changed files with 21 additions and 25 deletions

View file

@ -19,8 +19,6 @@ class Data(Base):
extension = Column(String) extension = Column(String)
mime_type = Column(String) mime_type = Column(String)
raw_data_location = Column(String) raw_data_location = Column(String)
metadata_id = Column(UUID)
created_at = Column( created_at = Column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
) )
@ -36,9 +34,8 @@ class Data(Base):
cascade="all, delete", cascade="all, delete",
) )
metadata = relationship( metadata_relationship = relationship(
"Metadata", "Metadata",
secondary=Metadata.__tablename__,
back_populates="data", back_populates="data",
lazy="noload", lazy="noload",
cascade="all, delete", cascade="all, delete",

View file

@ -1,7 +1,8 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from uuid import uuid4 from uuid import uuid4
from sqlalchemy import UUID, Column, DateTime, String, ForeignKey, relationship from sqlalchemy import UUID, Column, DateTime, String, ForeignKey
from sqlalchemy.orm import relationship
from cognee.infrastructure.databases.relational import Base from cognee.infrastructure.databases.relational import Base
@ -20,5 +21,6 @@ class Metadata(Base):
DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc) DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)
) )
dataset_id = Column(UUID, ForeignKey("datasets.id", ondelete="CASCADE"), primary_key = True) data_id = Column(UUID, ForeignKey("data.id", ondelete="CASCADE"), primary_key = False)
data_id = Column(UUID, ForeignKey("data.id", ondelete="CASCADE"), primary_key = True) data = relationship("Data", back_populates="metadata_relationship")

View file

@ -10,18 +10,18 @@ from cognee.infrastructure.databases.relational import get_relational_engine
from ..models.Metadata import Metadata from ..models.Metadata import Metadata
async def write_metadata(data_item: Any) -> UUID: async def write_metadata(data_item: Any, data_id: UUID) -> UUID:
metadata_dict = get_metadata_dict(data_item) metadata_dict = get_metadata_dict(data_item)
db_engine = get_relational_engine() db_engine = get_relational_engine()
async with db_engine.get_async_session() as session: async with db_engine.get_async_session() as session:
metadata = Metadata( metadata = Metadata(
metadata_repr=json.dumps(metadata_dict), metadata_repr=json.dumps(metadata_dict),
metadata_source=parse_type(type(data_item)), metadata_source=parse_type(type(data_item)),
data_id=data_id
) )
session.add(metadata) session.add(metadata)
await session.commit() await session.commit()
return metadata.id
def parse_type(type_: Any) -> str: def parse_type(type_: Any) -> str:

View file

@ -39,14 +39,15 @@ EXTENSION_TO_DOCUMENT_CLASS = {
def classify_documents(data_documents: list[Data]) -> list[Document]: def classify_documents(data_documents: list[Data]) -> list[Document]:
documents = [ documents = []
EXTENSION_TO_DOCUMENT_CLASS[data_item.extension]( for data_item in data_documents:
document = EXTENSION_TO_DOCUMENT_CLASS[data_item.extension](
id=data_item.id, id=data_item.id,
title=f"{data_item.name}.{data_item.extension}", title=f"{data_item.name}.{data_item.extension}",
raw_data_location=data_item.raw_data_location, raw_data_location=data_item.raw_data_location,
name=data_item.name, name=data_item.name,
metadata_id=data_item.metadata_id metadata_id=data_item.metadata_id
) )
for data_item in data_documents documents.append(document)
]
return documents return documents

View file

@ -9,13 +9,14 @@ from cognee.modules.data.operations.delete_metadata import delete_metadata
from cognee.modules.users.models import User from cognee.modules.users.models import User
from cognee.modules.users.permissions.methods import give_permission_on_document from cognee.modules.users.permissions.methods import give_permission_on_document
from cognee.shared.utils import send_telemetry from cognee.shared.utils import send_telemetry
from cognee.modules.data.operations.write_metadata import write_metadata
from .get_dlt_destination import get_dlt_destination from .get_dlt_destination import get_dlt_destination
from .save_data_item_with_metadata_to_storage import ( from .save_data_item_with_metadata_to_storage import (
save_data_item_with_metadata_to_storage, save_data_item_with_metadata_to_storage,
) )
async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
destination = get_dlt_destination() destination = get_dlt_destination()
@ -32,8 +33,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
# Process data # Process data
for data_item in data: for data_item in data:
file_path = await save_data_item_with_metadata_to_storage(
file_path, metadata_id = await save_data_item_with_metadata_to_storage(
data_item, dataset_name data_item, dataset_name
) )
@ -64,21 +64,20 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
data_point.raw_data_location = file_metadata["file_path"] data_point.raw_data_location = file_metadata["file_path"]
data_point.extension = file_metadata["extension"] data_point.extension = file_metadata["extension"]
data_point.mime_type = file_metadata["mime_type"] data_point.mime_type = file_metadata["mime_type"]
data_point.metadata_id = metadata_id
await session.merge(data_point) await session.merge(data_point)
await session.commit()
else: else:
data_point = Data( data_point = Data(
id=data_id, id=data_id,
name=file_metadata["name"], name=file_metadata["name"],
raw_data_location=file_metadata["file_path"], raw_data_location=file_metadata["file_path"],
extension=file_metadata["extension"], extension=file_metadata["extension"],
mime_type=file_metadata["mime_type"], mime_type=file_metadata["mime_type"]
metadata_id=metadata_id,
) )
dataset.data.append(data_point) dataset.data.append(data_point)
await session.commit() await session.commit()
await write_metadata(data_item, data_point.id)
yield { yield {
"id": data_id, "id": data_id,

View file

@ -1,7 +1,6 @@
from typing import Any, BinaryIO, Union from typing import Any, BinaryIO, Union
from cognee.modules.ingestion import save_data_to_file from cognee.modules.ingestion import save_data_to_file
from cognee.modules.data.operations.write_metadata import write_metadata
async def save_data_item_with_metadata_to_storage( async def save_data_item_with_metadata_to_storage(
@ -9,8 +8,6 @@ async def save_data_item_with_metadata_to_storage(
) -> str: ) -> str:
# Dynamic import is used because the llama_index module is optional. # Dynamic import is used because the llama_index module is optional.
# For the same reason Any is accepted as a data item # For the same reason Any is accepted as a data item
metadata_id = await write_metadata(data_item)
# Check if data is of type Document or any of it's subclasses # Check if data is of type Document or any of it's subclasses
if str(type(data_item)).startswith("llama_index"): if str(type(data_item)).startswith("llama_index"):
from .transform_data import get_data_from_llama_index from .transform_data import get_data_from_llama_index
@ -33,4 +30,4 @@ async def save_data_item_with_metadata_to_storage(
else: else:
raise ValueError(f"Data type not supported: {type(data_item)}") raise ValueError(f"Data type not supported: {type(data_item)}")
return file_path, metadata_id return file_path