From f5b5e56cc1d68e340c24ab12ee580cca2c5fcca0 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 5 Dec 2024 16:38:44 +0100 Subject: [PATCH] feat: Add deduplication of data Data is deduplicated per user so if a user tries to add data which already exists it will just be redirected to existing data in database Feature COG-505 --- .../files/utils/get_file_metadata.py | 6 ++++++ cognee/modules/data/models/Data.py | 3 ++- .../modules/data/operations/write_metadata.py | 1 - .../ingestion/data_types/BinaryData.py | 2 +- cognee/modules/ingestion/identify.py | 8 ++++--- .../ingestion/ingest_data_with_metadata.py | 21 ++++++++++++------- 6 files changed, 28 insertions(+), 13 deletions(-) diff --git a/cognee/infrastructure/files/utils/get_file_metadata.py b/cognee/infrastructure/files/utils/get_file_metadata.py index a114ef48f..a9528e5c0 100644 --- a/cognee/infrastructure/files/utils/get_file_metadata.py +++ b/cognee/infrastructure/files/utils/get_file_metadata.py @@ -1,4 +1,5 @@ from typing import BinaryIO, TypedDict +import hashlib from .guess_file_type import guess_file_type @@ -7,10 +8,14 @@ class FileMetadata(TypedDict): file_path: str mime_type: str extension: str + content_hash: str def get_file_metadata(file: BinaryIO) -> FileMetadata: """Get metadata from a file""" file.seek(0) + content_hash = hashlib.md5(file.read()).hexdigest() + file.seek(0) + file_type = guess_file_type(file) file_path = file.name @@ -21,4 +26,5 @@ def get_file_metadata(file: BinaryIO) -> FileMetadata: file_path = file_path, mime_type = file_type.mime, extension = file_type.extension, + content_hash = content_hash, ) diff --git a/cognee/modules/data/models/Data.py b/cognee/modules/data/models/Data.py index f1b033dd0..e24bc7c5c 100644 --- a/cognee/modules/data/models/Data.py +++ b/cognee/modules/data/models/Data.py @@ -1,7 +1,6 @@ from datetime import datetime, timezone from typing import List from uuid import uuid4 - from sqlalchemy import UUID, Column, DateTime, String from sqlalchemy.orm import Mapped, relationship @@ -19,6 +18,8 @@ class Data(Base): extension = Column(String) mime_type = Column(String) raw_data_location = Column(String) + owner_id = Column(UUID, index=True) + content_hash = Column(String) created_at = Column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) ) diff --git a/cognee/modules/data/operations/write_metadata.py b/cognee/modules/data/operations/write_metadata.py index 43031cdc9..67c8c0e45 100644 --- a/cognee/modules/data/operations/write_metadata.py +++ b/cognee/modules/data/operations/write_metadata.py @@ -2,7 +2,6 @@ import inspect import json import re import warnings -from typing import Any from uuid import UUID from sqlalchemy import select from typing import Any, BinaryIO, Union diff --git a/cognee/modules/ingestion/data_types/BinaryData.py b/cognee/modules/ingestion/data_types/BinaryData.py index 6959eb15f..0606250ea 100644 --- a/cognee/modules/ingestion/data_types/BinaryData.py +++ b/cognee/modules/ingestion/data_types/BinaryData.py @@ -17,7 +17,7 @@ class BinaryData(IngestionData): def get_identifier(self): metadata = self.get_metadata() - return self.name + "." + metadata["extension"] + return metadata["content_hash"] def get_metadata(self): self.ensure_metadata() diff --git a/cognee/modules/ingestion/identify.py b/cognee/modules/ingestion/identify.py index 745aab913..776932683 100644 --- a/cognee/modules/ingestion/identify.py +++ b/cognee/modules/ingestion/identify.py @@ -1,7 +1,9 @@ from uuid import uuid5, NAMESPACE_OID from .data_types import IngestionData -def identify(data: IngestionData) -> str: - data_id: str = data.get_identifier() +from cognee.modules.users.models import User - return uuid5(NAMESPACE_OID, data_id) +def identify(data: IngestionData, user: User) -> str: + data_content_hash: str = data.get_identifier() + # return UUID hash of file contents + owner id + return uuid5(NAMESPACE_OID,f"{data_content_hash}{user.id}") diff --git a/cognee/tasks/ingestion/ingest_data_with_metadata.py b/cognee/tasks/ingestion/ingest_data_with_metadata.py index 59e4a16f3..73e9da32d 100644 --- a/cognee/tasks/ingestion/ingest_data_with_metadata.py +++ b/cognee/tasks/ingestion/ingest_data_with_metadata.py @@ -5,7 +5,6 @@ import cognee.modules.ingestion as ingestion from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.data.methods import create_dataset from cognee.modules.data.models.DatasetData import DatasetData -from cognee.modules.data.operations.delete_metadata import delete_metadata from cognee.modules.users.models import User from cognee.modules.users.permissions.methods import give_permission_on_document from cognee.shared.utils import send_telemetry @@ -25,11 +24,11 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): ) @dlt.resource(standalone=True, merge_key="id") - async def data_resources(file_paths: str): + async def data_resources(file_paths: str, user: User): for file_path in file_paths: with open(file_path.replace("file://", ""), mode="rb") as file: classified_data = ingestion.classify(file) - data_id = ingestion.identify(classified_data) + data_id = ingestion.identify(classified_data, user) file_metadata = classified_data.get_metadata() yield { "id": data_id, @@ -37,6 +36,8 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): "file_path": file_metadata["file_path"], "extension": file_metadata["extension"], "mime_type": file_metadata["mime_type"], + "content_hash": file_metadata["content_hash"], + "owner_id": str(user.id), } async def data_storing(data: Any, dataset_name: str, user: User): @@ -58,7 +59,8 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): with open(file_path.replace("file://", ""), mode = "rb") as file: classified_data = ingestion.classify(file) - data_id = ingestion.identify(classified_data) + # data_id is the hash of file contents + owner id to avoid duplicate data + data_id = ingestion.identify(classified_data, user) file_metadata = classified_data.get_metadata() @@ -71,6 +73,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): async with db_engine.get_async_session() as session: dataset = await create_dataset(dataset_name, user.id, session) + # Check to see if data should be updated data_point = ( await session.execute(select(Data).filter(Data.id == data_id)) ).scalar_one_or_none() @@ -80,6 +83,8 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): data_point.raw_data_location = file_metadata["file_path"] data_point.extension = file_metadata["extension"] data_point.mime_type = file_metadata["mime_type"] + data_point.owner_id = user.id + data_point.content_hash = file_metadata["content_hash"] await session.merge(data_point) else: data_point = Data( @@ -87,7 +92,9 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): name = file_metadata["name"], raw_data_location = file_metadata["file_path"], extension = file_metadata["extension"], - mime_type = file_metadata["mime_type"] + mime_type = file_metadata["mime_type"], + owner_id = user.id, + content_hash = file_metadata["content_hash"], ) # Check if data is already in dataset @@ -118,14 +125,14 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User): # To use sqlite with dlt dataset_name must be set to "main". # Sqlite doesn't support schemas run_info = pipeline.run( - data_resources(file_paths), + data_resources(file_paths, user), table_name="file_metadata", dataset_name="main", write_disposition="merge", ) else: run_info = pipeline.run( - data_resources(file_paths), + data_resources(file_paths, user), table_name="file_metadata", dataset_name=dataset_name, write_disposition="merge",