From fd987ed61ec0d1f148e5798dce38001d5f75ebf9 Mon Sep 17 00:00:00 2001 From: Leon Luithlen Date: Tue, 26 Nov 2024 16:13:08 +0100 Subject: [PATCH] Add autoformatting --- cognee/modules/data/models/Data.py | 30 +++++---- cognee/modules/ingestion/models/metadata.py | 19 ++++-- .../ingestion/operations/delete_metadata.py | 7 ++- .../ingestion/operations/get_metadata.py | 4 ++ .../ingestion/operations/write_metadata.py | 25 +++++--- .../ingestion/ingest_data_with_metadata.py | 62 +++++++++++-------- ...save_data_item_with_metadata_to_storage.py | 17 +++-- 7 files changed, 103 insertions(+), 61 deletions(-) diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index 6601580eb..55991541d 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -1,15 +1,19 @@ -from uuid import uuid4 -from typing import List from datetime import datetime, timezone -from sqlalchemy.orm import relationship, Mapped -from sqlalchemy import Column, String, DateTime, UUID +from typing import List +from uuid import uuid4 + +from sqlalchemy import UUID, Column, DateTime, String +from sqlalchemy.orm import Mapped, relationship + from cognee.infrastructure.databases.relational import Base + from .DatasetData import DatasetData + class Data(Base): __tablename__ = "data" - id = Column(UUID, primary_key = True, default = uuid4) + id = Column(UUID, primary_key=True, default=uuid4) name = Column(String) extension = Column(String) @@ -17,15 +21,19 @@ class Data(Base): raw_data_location = Column(String) metadata_id = Column(UUID) - created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) - updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc)) + created_at = Column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) + updated_at = Column( + DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc) + ) datasets: Mapped[List["Dataset"]] = relationship( "Dataset", - secondary = DatasetData.__tablename__, - back_populates = "data", - lazy = "noload", - cascade="all, delete" + secondary=DatasetData.__tablename__, + back_populates="data", + lazy="noload", + cascade="all, delete", ) def to_json(self) -> dict: diff --git a/cognee/modules/ingestion/models/metadata.py b/cognee/modules/ingestion/models/metadata.py index 29d6cfaf2..ab9fe1e01 100644 --- a/cognee/modules/ingestion/models/metadata.py +++ b/cognee/modules/ingestion/models/metadata.py @@ -1,13 +1,20 @@ -from uuid import uuid4 from datetime import datetime, timezone -from sqlalchemy import Column, DateTime, String, UUID +from uuid import uuid4 + +from sqlalchemy import UUID, Column, DateTime, String + from cognee.infrastructure.databases.relational import Base + class Metadata(Base): __tablename__ = "queries" - id = Column(UUID, primary_key = True, default = uuid4) + id = Column(UUID, primary_key=True, default=uuid4) metadata = Column(String) - - created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc)) - updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc)) + + created_at = Column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) + updated_at = Column( + DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc) + ) diff --git a/cognee/modules/ingestion/operations/delete_metadata.py b/cognee/modules/ingestion/operations/delete_metadata.py index dc6bcd85f..df94f52ed 100644 --- a/cognee/modules/ingestion/operations/delete_metadata.py +++ b/cognee/modules/ingestion/operations/delete_metadata.py @@ -1,7 +1,10 @@ import warnings from uuid import UUID + from sqlalchemy import select + from cognee.infrastructure.databases.relational import get_relational_engine + from ..models.Metadata import Metadata @@ -11,6 +14,6 @@ async def delete_metadata(metadata_id: UUID): metadata = await session.get(Metadata, metadata_id) if metadata is None: warnings.warn(f"metadata for metadata_id: {metadata_id} not found") - + session.delete(metadata) - session.commit() \ No newline at end of file + session.commit() diff --git a/cognee/modules/ingestion/operations/get_metadata.py b/cognee/modules/ingestion/operations/get_metadata.py index 047bc4906..9034b327d 100644 --- a/cognee/modules/ingestion/operations/get_metadata.py +++ b/cognee/modules/ingestion/operations/get_metadata.py @@ -1,9 +1,13 @@ import json from uuid import UUID + from sqlalchemy import select + from cognee.infrastructure.databases.relational import get_relational_engine + from ..models.Metadata import Metadata + async def get_metadata(metadata_id: UUID) -> Metadata: db_engine = get_relational_engine() diff --git a/cognee/modules/ingestion/operations/write_metadata.py b/cognee/modules/ingestion/operations/write_metadata.py index b97c3cf85..cefed21de 100644 --- a/cognee/modules/ingestion/operations/write_metadata.py +++ b/cognee/modules/ingestion/operations/write_metadata.py @@ -1,41 +1,46 @@ -import json import inspect -import warnings +import json import re +import warnings from typing import Any - from uuid import UUID + from cognee.infrastructure.databases.relational import get_relational_engine + from ..models.Metadata import Metadata + async def write_metadata(data_item: Any) -> UUID: metadata_dict = get_metadata_dict(data_item) db_engine = get_relational_engine() async with db_engine.get_async_session() as session: metadata = Metadata( - metadata = json.dumps(metadata_dict), - metadata_source = parse_type(type(data_item)) + metadata=json.dumps(metadata_dict), + metadata_source=parse_type(type(data_item)), ) session.add(metadata) await session.commit() return metadata.id + def parse_type(type_: Any) -> str: pattern = r".+'([\w_\.]+)'" match = re.search(pattern, str(type_)) if match: - return(match.group(1)) + return match.group(1) else: raise Exception(f"type: {type_} could not be parsed") def get_metadata_dict(metadata: Any) -> dict[str, Any]: if hasattr(metadata, "dict") and inspect.ismethod(getattr(metadata, "dict")): - return(metadata.dict()) + return metadata.dict() else: - warnings.warn(f"metadata of type {type(metadata)}: {str(metadata)[:20]}... does not have dict method. Defaulting to string method") + warnings.warn( + f"metadata of type {type(metadata)}: {str(metadata)[:20]}... does not have dict method. Defaulting to string method" + ) try: - return({"content": str(metadata)}) + return {"content": str(metadata)} except Exception as e: - raise Exception(f"Could not cast metadata to string: {e}") \ No newline at end of file + raise Exception(f"Could not cast metadata to string: {e}") diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index 577dca57e..07f5c8115 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -1,24 +1,30 @@ -import dlt -import cognee.modules.ingestion as ingestion from typing import Any -from cognee.shared.utils import send_telemetry -from cognee.modules.users.models import User + +import dlt + +import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.methods import create_dataset -from cognee.modules.users.permissions.methods import give_permission_on_document -from .get_dlt_destination import get_dlt_destination -from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage from cognee.modules.ingestion.operations.delete_metadata import delete_metadata +from cognee.modules.users.models import User +from cognee.modules.users.permissions.methods import give_permission_on_document +from cognee.shared.utils import send_telemetry + +from .get_dlt_destination import get_dlt_destination +from .save_data_item_with_metadata_to_storage import ( + save_data_item_with_metadata_to_storage, +) + async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): destination = get_dlt_destination() pipeline = dlt.pipeline( - pipeline_name = "file_load_from_filesystem", - destination = destination, + pipeline_name="file_load_from_filesystem", + destination=destination, ) - @dlt.resource(standalone = True, merge_key = "id") + @dlt.resource(standalone=True, merge_key="id") async def data_resources(data: Any, user: User): if not isinstance(data, list): # Convert data to a list as we work with lists further down. @@ -27,10 +33,12 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): # Process data for data_item in data: - file_path, metadata_id = await save_data_item_with_metadata_to_storage(data_item, dataset_name) + file_path, metadata_id = await save_data_item_with_metadata_to_storage( + data_item, dataset_name + ) # Ingest data and add metadata - with open(file_path.replace("file://", ""), mode = "rb") as file: + with open(file_path.replace("file://", ""), mode="rb") as file: classified_data = ingestion.classify(file) data_id = ingestion.identify(classified_data) @@ -38,6 +46,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): file_metadata = classified_data.get_metadata() from sqlalchemy import select + from cognee.modules.data.models import Data db_engine = get_relational_engine() @@ -45,9 +54,9 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): async with db_engine.get_async_session() as session: dataset = await create_dataset(dataset_name, user.id, session) - data_point = (await session.execute( - select(Data).filter(Data.id == data_id) - )).scalar_one_or_none() + data_point = ( + await session.execute(select(Data).filter(Data.id == data_id)) + ).scalar_one_or_none() if data_point is not None: await delete_metadata(data_point.metadata_id) @@ -60,12 +69,12 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): await session.commit() else: data_point = Data( - id = data_id, - name = file_metadata["name"], - raw_data_location = file_metadata["file_path"], - extension = file_metadata["extension"], - mime_type = file_metadata["mime_type"], - metadata_id = metadata_id + id=data_id, + name=file_metadata["name"], + raw_data_location=file_metadata["file_path"], + extension=file_metadata["extension"], + mime_type=file_metadata["mime_type"], + metadata_id=metadata_id, ) dataset.data.append(data_point) @@ -82,14 +91,13 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): await give_permission_on_document(user, data_id, "read") await give_permission_on_document(user, data_id, "write") - - send_telemetry("cognee.add EXECUTION STARTED", user_id = user.id) + send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id) run_info = pipeline.run( data_resources(data, user), - table_name = "file_metadata", - dataset_name = dataset_name, - write_disposition = "merge", + table_name="file_metadata", + dataset_name=dataset_name, + write_disposition="merge", ) - send_telemetry("cognee.add EXECUTION COMPLETED", user_id = user.id) + send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id) return run_info diff --git a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py index 108bcd21b..9acf0d413 100644 --- a/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py +++ b/cognee/tasks/ingestion/save_data_item_with_metadata_to_storage.py @@ -1,20 +1,27 @@ -from typing import Union, BinaryIO, Any +from typing import Any, BinaryIO, Union + from cognee.modules.ingestion import save_data_to_file from cognee.modules.ingestion.operations.write_metadata import write_metadata -def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str: - # Dynamic import is used because the llama_index module is optional. + +def save_data_item_with_metadata_to_storage( + data_item: Union[BinaryIO, str, Any], dataset_name: str +) -> str: + # Dynamic import is used because the llama_index module is optional. # For the same reason Any is accepted as a data item metadata_id = write_metadata(data_item) # Check if data is of type Document or any of it's subclasses if str(type(data_item)).startswith("llama_index"): from .transform_data import get_data_from_llama_index + file_path = get_data_from_llama_index(data_item, dataset_name) # data is a file object coming from upload. elif hasattr(data_item, "file"): - file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename) + file_path = save_data_to_file( + data_item.file, dataset_name, filename=data_item.filename + ) elif isinstance(data_item, str): # data is a file path @@ -26,4 +33,4 @@ def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any] else: raise ValueError(f"Data type not supported: {type(data_item)}") - return file_path, metadata_id \ No newline at end of file + return file_path, metadata_id