Add autoformatting

This commit is contained in:
Leon Luithlen 2024-11-26 16:13:08 +01:00
parent c5f3314c85
commit fd987ed61e
7 changed files with 103 additions and 61 deletions

View file

@ -1,15 +1,19 @@
from uuid import uuid4
from typing import List
from datetime import datetime, timezone
from sqlalchemy.orm import relationship, Mapped
from sqlalchemy import Column, String, DateTime, UUID
from typing import List
from uuid import uuid4
from sqlalchemy import UUID, Column, DateTime, String
from sqlalchemy.orm import Mapped, relationship
from cognee.infrastructure.databases.relational import Base
from .DatasetData import DatasetData
class Data(Base):
__tablename__ = "data"
id = Column(UUID, primary_key = True, default = uuid4)
id = Column(UUID, primary_key=True, default=uuid4)
name = Column(String)
extension = Column(String)
@ -17,15 +21,19 @@ class Data(Base):
raw_data_location = Column(String)
metadata_id = Column(UUID)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))
created_at = Column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
updated_at = Column(
DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)
)
datasets: Mapped[List["Dataset"]] = relationship(
"Dataset",
secondary = DatasetData.__tablename__,
back_populates = "data",
lazy = "noload",
cascade="all, delete"
secondary=DatasetData.__tablename__,
back_populates="data",
lazy="noload",
cascade="all, delete",
)
def to_json(self) -> dict:

View file

@ -1,13 +1,20 @@
from uuid import uuid4
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, String, UUID
from uuid import uuid4
from sqlalchemy import UUID, Column, DateTime, String
from cognee.infrastructure.databases.relational import Base
class Metadata(Base):
__tablename__ = "queries"
id = Column(UUID, primary_key = True, default = uuid4)
id = Column(UUID, primary_key=True, default=uuid4)
metadata = Column(String)
created_at = Column(DateTime(timezone = True), default = lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone = True), onupdate = lambda: datetime.now(timezone.utc))
created_at = Column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
updated_at = Column(
DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc)
)

View file

@ -1,7 +1,10 @@
import warnings
from uuid import UUID
from sqlalchemy import select
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models.Metadata import Metadata
@ -11,6 +14,6 @@ async def delete_metadata(metadata_id: UUID):
metadata = await session.get(Metadata, metadata_id)
if metadata is None:
warnings.warn(f"metadata for metadata_id: {metadata_id} not found")
session.delete(metadata)
session.commit()
session.commit()

View file

@ -1,9 +1,13 @@
import json
from uuid import UUID
from sqlalchemy import select
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models.Metadata import Metadata
async def get_metadata(metadata_id: UUID) -> Metadata:
db_engine = get_relational_engine()

View file

@ -1,41 +1,46 @@
import json
import inspect
import warnings
import json
import re
import warnings
from typing import Any
from uuid import UUID
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models.Metadata import Metadata
async def write_metadata(data_item: Any) -> UUID:
metadata_dict = get_metadata_dict(data_item)
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
metadata = Metadata(
metadata = json.dumps(metadata_dict),
metadata_source = parse_type(type(data_item))
metadata=json.dumps(metadata_dict),
metadata_source=parse_type(type(data_item)),
)
session.add(metadata)
await session.commit()
return metadata.id
def parse_type(type_: Any) -> str:
pattern = r".+'([\w_\.]+)'"
match = re.search(pattern, str(type_))
if match:
return(match.group(1))
return match.group(1)
else:
raise Exception(f"type: {type_} could not be parsed")
def get_metadata_dict(metadata: Any) -> dict[str, Any]:
if hasattr(metadata, "dict") and inspect.ismethod(getattr(metadata, "dict")):
return(metadata.dict())
return metadata.dict()
else:
warnings.warn(f"metadata of type {type(metadata)}: {str(metadata)[:20]}... does not have dict method. Defaulting to string method")
warnings.warn(
f"metadata of type {type(metadata)}: {str(metadata)[:20]}... does not have dict method. Defaulting to string method"
)
try:
return({"content": str(metadata)})
return {"content": str(metadata)}
except Exception as e:
raise Exception(f"Could not cast metadata to string: {e}")
raise Exception(f"Could not cast metadata to string: {e}")

View file

@ -1,24 +1,30 @@
import dlt
import cognee.modules.ingestion as ingestion
from typing import Any
from cognee.shared.utils import send_telemetry
from cognee.modules.users.models import User
import dlt
import cognee.modules.ingestion as ingestion
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.methods import create_dataset
from cognee.modules.users.permissions.methods import give_permission_on_document
from .get_dlt_destination import get_dlt_destination
from .save_data_item_with_metadata_to_storage import save_data_item_with_metadata_to_storage
from cognee.modules.ingestion.operations.delete_metadata import delete_metadata
from cognee.modules.users.models import User
from cognee.modules.users.permissions.methods import give_permission_on_document
from cognee.shared.utils import send_telemetry
from .get_dlt_destination import get_dlt_destination
from .save_data_item_with_metadata_to_storage import (
save_data_item_with_metadata_to_storage,
)
async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
destination = get_dlt_destination()
pipeline = dlt.pipeline(
pipeline_name = "file_load_from_filesystem",
destination = destination,
pipeline_name="file_load_from_filesystem",
destination=destination,
)
@dlt.resource(standalone = True, merge_key = "id")
@dlt.resource(standalone=True, merge_key="id")
async def data_resources(data: Any, user: User):
if not isinstance(data, list):
# Convert data to a list as we work with lists further down.
@ -27,10 +33,12 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
# Process data
for data_item in data:
file_path, metadata_id = await save_data_item_with_metadata_to_storage(data_item, dataset_name)
file_path, metadata_id = await save_data_item_with_metadata_to_storage(
data_item, dataset_name
)
# Ingest data and add metadata
with open(file_path.replace("file://", ""), mode = "rb") as file:
with open(file_path.replace("file://", ""), mode="rb") as file:
classified_data = ingestion.classify(file)
data_id = ingestion.identify(classified_data)
@ -38,6 +46,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
file_metadata = classified_data.get_metadata()
from sqlalchemy import select
from cognee.modules.data.models import Data
db_engine = get_relational_engine()
@ -45,9 +54,9 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
async with db_engine.get_async_session() as session:
dataset = await create_dataset(dataset_name, user.id, session)
data_point = (await session.execute(
select(Data).filter(Data.id == data_id)
)).scalar_one_or_none()
data_point = (
await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none()
if data_point is not None:
await delete_metadata(data_point.metadata_id)
@ -60,12 +69,12 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
await session.commit()
else:
data_point = Data(
id = data_id,
name = file_metadata["name"],
raw_data_location = file_metadata["file_path"],
extension = file_metadata["extension"],
mime_type = file_metadata["mime_type"],
metadata_id = metadata_id
id=data_id,
name=file_metadata["name"],
raw_data_location=file_metadata["file_path"],
extension=file_metadata["extension"],
mime_type=file_metadata["mime_type"],
metadata_id=metadata_id,
)
dataset.data.append(data_point)
@ -82,14 +91,13 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
await give_permission_on_document(user, data_id, "read")
await give_permission_on_document(user, data_id, "write")
send_telemetry("cognee.add EXECUTION STARTED", user_id = user.id)
send_telemetry("cognee.add EXECUTION STARTED", user_id=user.id)
run_info = pipeline.run(
data_resources(data, user),
table_name = "file_metadata",
dataset_name = dataset_name,
write_disposition = "merge",
table_name="file_metadata",
dataset_name=dataset_name,
write_disposition="merge",
)
send_telemetry("cognee.add EXECUTION COMPLETED", user_id = user.id)
send_telemetry("cognee.add EXECUTION COMPLETED", user_id=user.id)
return run_info

View file

@ -1,20 +1,27 @@
from typing import Union, BinaryIO, Any
from typing import Any, BinaryIO, Union
from cognee.modules.ingestion import save_data_to_file
from cognee.modules.ingestion.operations.write_metadata import write_metadata
def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any], dataset_name: str) -> str:
# Dynamic import is used because the llama_index module is optional.
def save_data_item_with_metadata_to_storage(
data_item: Union[BinaryIO, str, Any], dataset_name: str
) -> str:
# Dynamic import is used because the llama_index module is optional.
# For the same reason Any is accepted as a data item
metadata_id = write_metadata(data_item)
# Check if data is of type Document or any of it's subclasses
if str(type(data_item)).startswith("llama_index"):
from .transform_data import get_data_from_llama_index
file_path = get_data_from_llama_index(data_item, dataset_name)
# data is a file object coming from upload.
elif hasattr(data_item, "file"):
file_path = save_data_to_file(data_item.file, dataset_name, filename=data_item.filename)
file_path = save_data_to_file(
data_item.file, dataset_name, filename=data_item.filename
)
elif isinstance(data_item, str):
# data is a file path
@ -26,4 +33,4 @@ def save_data_item_with_metadata_to_storage(data_item: Union[BinaryIO, str, Any]
else:
raise ValueError(f"Data type not supported: {type(data_item)}")
return file_path, metadata_id
return file_path, metadata_id