refactor: Reduce complexity of metadata handling
Have foreign metadata be a table column in data instead of it's own table to reduce complexity Refactor COG-793
This commit is contained in:
parent
0c7c1d7503
commit
49ad292592
15 changed files with 20 additions and 151 deletions
|
|
@ -95,7 +95,7 @@ class LiteLLMEmbeddingEngine(EmbeddingEngine):
|
||||||
|
|
||||||
return await self.embed_text(text)
|
return await self.embed_text(text)
|
||||||
|
|
||||||
except (litellm.exceptions.BadRequestError, litellm.llms.OpenAI.openai.OpenAIError):
|
except litellm.exceptions.BadRequestError:
|
||||||
raise EmbeddingException("Failed to index data points.")
|
raise EmbeddingException("Failed to index data points.")
|
||||||
|
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,6 @@ class TextChunker:
|
||||||
contains=[],
|
contains=[],
|
||||||
_metadata={
|
_metadata={
|
||||||
"index_fields": ["text"],
|
"index_fields": ["text"],
|
||||||
"metadata_id": self.document.metadata_id,
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
paragraph_chunks = []
|
paragraph_chunks = []
|
||||||
|
|
@ -74,7 +73,6 @@ class TextChunker:
|
||||||
contains=[],
|
contains=[],
|
||||||
_metadata={
|
_metadata={
|
||||||
"index_fields": ["text"],
|
"index_fields": ["text"],
|
||||||
"metadata_id": self.document.metadata_id,
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
@ -95,7 +93,7 @@ class TextChunker:
|
||||||
chunk_index=self.chunk_index,
|
chunk_index=self.chunk_index,
|
||||||
cut_type=paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
|
cut_type=paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"],
|
||||||
contains=[],
|
contains=[],
|
||||||
_metadata={"index_fields": ["text"], "metadata_id": self.document.metadata_id},
|
_metadata={"index_fields": ["text"]},
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,11 @@
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import List
|
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
from sqlalchemy import UUID, Column, DateTime, String
|
from sqlalchemy import UUID, Column, DateTime, String, JSON
|
||||||
from sqlalchemy.orm import Mapped, relationship
|
from sqlalchemy.orm import relationship
|
||||||
|
|
||||||
from cognee.infrastructure.databases.relational import Base
|
from cognee.infrastructure.databases.relational import Base
|
||||||
|
|
||||||
from .DatasetData import DatasetData
|
from .DatasetData import DatasetData
|
||||||
from .Metadata import Metadata
|
|
||||||
|
|
||||||
|
|
||||||
class Data(Base):
|
class Data(Base):
|
||||||
|
|
@ -21,6 +19,7 @@ class Data(Base):
|
||||||
raw_data_location = Column(String)
|
raw_data_location = Column(String)
|
||||||
owner_id = Column(UUID, index=True)
|
owner_id = Column(UUID, index=True)
|
||||||
content_hash = Column(String)
|
content_hash = Column(String)
|
||||||
|
foreign_metadata = Column(JSON)
|
||||||
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
||||||
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))
|
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))
|
||||||
|
|
||||||
|
|
@ -32,13 +31,6 @@ class Data(Base):
|
||||||
cascade="all, delete",
|
cascade="all, delete",
|
||||||
)
|
)
|
||||||
|
|
||||||
metadata_relationship = relationship(
|
|
||||||
"Metadata",
|
|
||||||
back_populates="data",
|
|
||||||
lazy="noload",
|
|
||||||
cascade="all, delete",
|
|
||||||
)
|
|
||||||
|
|
||||||
def to_json(self) -> dict:
|
def to_json(self) -> dict:
|
||||||
return {
|
return {
|
||||||
"id": str(self.id),
|
"id": str(self.id),
|
||||||
|
|
|
||||||
|
|
@ -1,21 +0,0 @@
|
||||||
from datetime import datetime, timezone
|
|
||||||
from uuid import uuid4
|
|
||||||
|
|
||||||
from sqlalchemy import UUID, Column, DateTime, String, ForeignKey
|
|
||||||
from sqlalchemy.orm import relationship
|
|
||||||
|
|
||||||
from cognee.infrastructure.databases.relational import Base
|
|
||||||
|
|
||||||
|
|
||||||
class Metadata(Base):
|
|
||||||
__tablename__ = "metadata_table"
|
|
||||||
|
|
||||||
id = Column(UUID, primary_key=True, default=uuid4)
|
|
||||||
metadata_repr = Column(String)
|
|
||||||
metadata_source = Column(String)
|
|
||||||
|
|
||||||
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
|
|
||||||
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))
|
|
||||||
|
|
||||||
data_id = Column(UUID, ForeignKey("data.id", ondelete="CASCADE"), primary_key=False)
|
|
||||||
data = relationship("Data", back_populates="metadata_relationship")
|
|
||||||
|
|
@ -1,19 +0,0 @@
|
||||||
import warnings
|
|
||||||
from uuid import UUID
|
|
||||||
|
|
||||||
from sqlalchemy import select
|
|
||||||
|
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
|
||||||
|
|
||||||
from ..models.Metadata import Metadata
|
|
||||||
|
|
||||||
|
|
||||||
async def delete_metadata(metadata_id: UUID):
|
|
||||||
db_engine = get_relational_engine()
|
|
||||||
async with db_engine.get_async_session() as session:
|
|
||||||
metadata = await session.get(Metadata, metadata_id)
|
|
||||||
if metadata is None:
|
|
||||||
warnings.warn(f"metadata for metadata_id: {metadata_id} not found")
|
|
||||||
|
|
||||||
session.delete(metadata)
|
|
||||||
session.commit()
|
|
||||||
|
|
@ -1,17 +0,0 @@
|
||||||
import json
|
|
||||||
from uuid import UUID
|
|
||||||
|
|
||||||
from sqlalchemy import select
|
|
||||||
|
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
|
||||||
|
|
||||||
from ..models.Metadata import Metadata
|
|
||||||
|
|
||||||
|
|
||||||
async def get_metadata(metadata_id: UUID) -> Metadata:
|
|
||||||
db_engine = get_relational_engine()
|
|
||||||
|
|
||||||
async with db_engine.get_async_session() as session:
|
|
||||||
metadata = await session.get(Metadata, metadata_id)
|
|
||||||
|
|
||||||
return metadata
|
|
||||||
|
|
@ -1,65 +0,0 @@
|
||||||
import inspect
|
|
||||||
import json
|
|
||||||
import re
|
|
||||||
import warnings
|
|
||||||
from uuid import UUID
|
|
||||||
from sqlalchemy import select
|
|
||||||
from typing import Any, BinaryIO, Union
|
|
||||||
|
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
|
||||||
from cognee.infrastructure.files.utils.get_file_metadata import FileMetadata
|
|
||||||
from ..models.Metadata import Metadata
|
|
||||||
|
|
||||||
|
|
||||||
async def write_metadata(
|
|
||||||
data_item: Union[BinaryIO, str, Any], data_id: UUID, file_metadata: FileMetadata
|
|
||||||
) -> UUID:
|
|
||||||
metadata_dict = get_metadata_dict(data_item, file_metadata)
|
|
||||||
db_engine = get_relational_engine()
|
|
||||||
async with db_engine.get_async_session() as session:
|
|
||||||
metadata = (
|
|
||||||
await session.execute(select(Metadata).filter(Metadata.data_id == data_id))
|
|
||||||
).scalar_one_or_none()
|
|
||||||
|
|
||||||
if metadata is not None:
|
|
||||||
metadata.metadata_repr = json.dumps(metadata_dict)
|
|
||||||
metadata.metadata_source = parse_type(type(data_item))
|
|
||||||
await session.merge(metadata)
|
|
||||||
else:
|
|
||||||
metadata = Metadata(
|
|
||||||
id=data_id,
|
|
||||||
metadata_repr=json.dumps(metadata_dict),
|
|
||||||
metadata_source=parse_type(type(data_item)),
|
|
||||||
data_id=data_id,
|
|
||||||
)
|
|
||||||
session.add(metadata)
|
|
||||||
|
|
||||||
await session.commit()
|
|
||||||
|
|
||||||
|
|
||||||
def parse_type(type_: Any) -> str:
|
|
||||||
pattern = r".+'([\w_\.]+)'"
|
|
||||||
match = re.search(pattern, str(type_))
|
|
||||||
if match:
|
|
||||||
return match.group(1)
|
|
||||||
else:
|
|
||||||
raise Exception(f"type: {type_} could not be parsed")
|
|
||||||
|
|
||||||
|
|
||||||
def get_metadata_dict(
|
|
||||||
data_item: Union[BinaryIO, str, Any], file_metadata: FileMetadata
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
if isinstance(data_item, str):
|
|
||||||
return file_metadata
|
|
||||||
elif isinstance(data_item, BinaryIO):
|
|
||||||
return file_metadata
|
|
||||||
elif hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")):
|
|
||||||
return {**file_metadata, **data_item.dict()}
|
|
||||||
else:
|
|
||||||
warnings.warn(
|
|
||||||
f"metadata of type {type(data_item)}: {str(data_item)[:20]}... does not have dict method. Defaulting to string method"
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
return {**dict(file_metadata), "content": str(data_item)}
|
|
||||||
except Exception as e:
|
|
||||||
raise Exception(f"Could not cast metadata to string: {e}")
|
|
||||||
|
|
@ -7,7 +7,6 @@ from cognee.infrastructure.engine import DataPoint
|
||||||
class Document(DataPoint):
|
class Document(DataPoint):
|
||||||
name: str
|
name: str
|
||||||
raw_data_location: str
|
raw_data_location: str
|
||||||
metadata_id: UUID
|
|
||||||
mime_type: str
|
mime_type: str
|
||||||
_metadata: dict = {"index_fields": ["name"], "type": "Document"}
|
_metadata: dict = {"index_fields": ["name"], "type": "Document"}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,6 @@ from cognee.modules.data.processing.document_types import (
|
||||||
TextDocument,
|
TextDocument,
|
||||||
UnstructuredDocument,
|
UnstructuredDocument,
|
||||||
)
|
)
|
||||||
from cognee.modules.data.operations.get_metadata import get_metadata
|
|
||||||
|
|
||||||
EXTENSION_TO_DOCUMENT_CLASS = {
|
EXTENSION_TO_DOCUMENT_CLASS = {
|
||||||
"pdf": PdfDocument, # Text documents
|
"pdf": PdfDocument, # Text documents
|
||||||
|
|
@ -52,14 +51,12 @@ EXTENSION_TO_DOCUMENT_CLASS = {
|
||||||
async def classify_documents(data_documents: list[Data]) -> list[Document]:
|
async def classify_documents(data_documents: list[Data]) -> list[Document]:
|
||||||
documents = []
|
documents = []
|
||||||
for data_item in data_documents:
|
for data_item in data_documents:
|
||||||
metadata = await get_metadata(data_item.id)
|
|
||||||
document = EXTENSION_TO_DOCUMENT_CLASS[data_item.extension](
|
document = EXTENSION_TO_DOCUMENT_CLASS[data_item.extension](
|
||||||
id=data_item.id,
|
id=data_item.id,
|
||||||
title=f"{data_item.name}.{data_item.extension}",
|
title=f"{data_item.name}.{data_item.extension}",
|
||||||
raw_data_location=data_item.raw_data_location,
|
raw_data_location=data_item.raw_data_location,
|
||||||
name=data_item.name,
|
name=data_item.name,
|
||||||
mime_type=data_item.mime_type,
|
mime_type=data_item.mime_type,
|
||||||
metadata_id=metadata.id,
|
|
||||||
)
|
)
|
||||||
documents.append(document)
|
documents.append(document)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,12 +8,15 @@ from cognee.modules.data.models.DatasetData import DatasetData
|
||||||
from cognee.modules.users.models import User
|
from cognee.modules.users.models import User
|
||||||
from cognee.modules.users.permissions.methods import give_permission_on_document
|
from cognee.modules.users.permissions.methods import give_permission_on_document
|
||||||
from cognee.shared.utils import send_telemetry
|
from cognee.shared.utils import send_telemetry
|
||||||
from cognee.modules.data.operations.write_metadata import write_metadata
|
|
||||||
from .get_dlt_destination import get_dlt_destination
|
from .get_dlt_destination import get_dlt_destination
|
||||||
from .save_data_item_to_storage import (
|
from .save_data_item_to_storage import (
|
||||||
save_data_item_to_storage,
|
save_data_item_to_storage,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from typing import Union, BinaryIO
|
||||||
|
import inspect
|
||||||
|
import warnings
|
||||||
|
|
||||||
|
|
||||||
async def ingest_data(data: Any, dataset_name: str, user: User):
|
async def ingest_data(data: Any, dataset_name: str, user: User):
|
||||||
destination = get_dlt_destination()
|
destination = get_dlt_destination()
|
||||||
|
|
@ -23,6 +26,15 @@ async def ingest_data(data: Any, dataset_name: str, user: User):
|
||||||
destination=destination,
|
destination=destination,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def get_foreign_metadata_dict(data_item: Union[BinaryIO, str, Any]) -> dict[str, Any]:
|
||||||
|
if hasattr(data_item, "dict") and inspect.ismethod(getattr(data_item, "dict")):
|
||||||
|
return {"metadata": data_item.dict(), "origin": str(type(data_item))}
|
||||||
|
else:
|
||||||
|
warnings.warn(
|
||||||
|
f"Data of type {type(data_item)}... does not have dict method. Returning empty metadata."
|
||||||
|
)
|
||||||
|
return {}
|
||||||
|
|
||||||
@dlt.resource(standalone=True, primary_key="id", merge_key="id")
|
@dlt.resource(standalone=True, primary_key="id", merge_key="id")
|
||||||
async def data_resources(file_paths: List[str], user: User):
|
async def data_resources(file_paths: List[str], user: User):
|
||||||
for file_path in file_paths:
|
for file_path in file_paths:
|
||||||
|
|
@ -83,6 +95,7 @@ async def ingest_data(data: Any, dataset_name: str, user: User):
|
||||||
data_point.mime_type = file_metadata["mime_type"]
|
data_point.mime_type = file_metadata["mime_type"]
|
||||||
data_point.owner_id = user.id
|
data_point.owner_id = user.id
|
||||||
data_point.content_hash = file_metadata["content_hash"]
|
data_point.content_hash = file_metadata["content_hash"]
|
||||||
|
data_point.foreign_metadata = (get_foreign_metadata_dict(data_item),)
|
||||||
await session.merge(data_point)
|
await session.merge(data_point)
|
||||||
else:
|
else:
|
||||||
data_point = Data(
|
data_point = Data(
|
||||||
|
|
@ -93,6 +106,7 @@ async def ingest_data(data: Any, dataset_name: str, user: User):
|
||||||
mime_type=file_metadata["mime_type"],
|
mime_type=file_metadata["mime_type"],
|
||||||
owner_id=user.id,
|
owner_id=user.id,
|
||||||
content_hash=file_metadata["content_hash"],
|
content_hash=file_metadata["content_hash"],
|
||||||
|
foreign_metadata=get_foreign_metadata_dict(data_item),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check if data is already in dataset
|
# Check if data is already in dataset
|
||||||
|
|
@ -108,7 +122,6 @@ async def ingest_data(data: Any, dataset_name: str, user: User):
|
||||||
dataset.data.append(data_point)
|
dataset.data.append(data_point)
|
||||||
|
|
||||||
await session.commit()
|
await session.commit()
|
||||||
await write_metadata(data_item, data_point.id, file_metadata)
|
|
||||||
|
|
||||||
await give_permission_on_document(user, data_id, "read")
|
await give_permission_on_document(user, data_id, "read")
|
||||||
await give_permission_on_document(user, data_id, "write")
|
await give_permission_on_document(user, data_id, "write")
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,6 @@ def test_AudioDocument():
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name="audio-dummy-test",
|
name="audio-dummy-test",
|
||||||
raw_data_location="",
|
raw_data_location="",
|
||||||
metadata_id=uuid.uuid4(),
|
|
||||||
mime_type="",
|
mime_type="",
|
||||||
)
|
)
|
||||||
with patch.object(AudioDocument, "create_transcript", return_value=TEST_TEXT):
|
with patch.object(AudioDocument, "create_transcript", return_value=TEST_TEXT):
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,6 @@ def test_ImageDocument():
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name="image-dummy-test",
|
name="image-dummy-test",
|
||||||
raw_data_location="",
|
raw_data_location="",
|
||||||
metadata_id=uuid.uuid4(),
|
|
||||||
mime_type="",
|
mime_type="",
|
||||||
)
|
)
|
||||||
with patch.object(ImageDocument, "transcribe_image", return_value=TEST_TEXT):
|
with patch.object(ImageDocument, "transcribe_image", return_value=TEST_TEXT):
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,6 @@ def test_PdfDocument():
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name="Test document.pdf",
|
name="Test document.pdf",
|
||||||
raw_data_location=test_file_path,
|
raw_data_location=test_file_path,
|
||||||
metadata_id=uuid.uuid4(),
|
|
||||||
mime_type="",
|
mime_type="",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,6 @@ def test_TextDocument(input_file, chunk_size):
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name=input_file,
|
name=input_file,
|
||||||
raw_data_location=test_file_path,
|
raw_data_location=test_file_path,
|
||||||
metadata_id=uuid.uuid4(),
|
|
||||||
mime_type="",
|
mime_type="",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,6 @@ def test_UnstructuredDocument():
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name="example.pptx",
|
name="example.pptx",
|
||||||
raw_data_location=pptx_file_path,
|
raw_data_location=pptx_file_path,
|
||||||
metadata_id=uuid.uuid4(),
|
|
||||||
mime_type="application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
mime_type="application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -47,7 +46,6 @@ def test_UnstructuredDocument():
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name="example.docx",
|
name="example.docx",
|
||||||
raw_data_location=docx_file_path,
|
raw_data_location=docx_file_path,
|
||||||
metadata_id=uuid.uuid4(),
|
|
||||||
mime_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
mime_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -55,7 +53,6 @@ def test_UnstructuredDocument():
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name="example.csv",
|
name="example.csv",
|
||||||
raw_data_location=csv_file_path,
|
raw_data_location=csv_file_path,
|
||||||
metadata_id=uuid.uuid4(),
|
|
||||||
mime_type="text/csv",
|
mime_type="text/csv",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -63,7 +60,6 @@ def test_UnstructuredDocument():
|
||||||
id=uuid.uuid4(),
|
id=uuid.uuid4(),
|
||||||
name="example.xlsx",
|
name="example.xlsx",
|
||||||
raw_data_location=xlsx_file_path,
|
raw_data_location=xlsx_file_path,
|
||||||
metadata_id=uuid.uuid4(),
|
|
||||||
mime_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
mime_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue