feat: Add deduplication of data

Data is deduplicated per user so if a user tries to add data which already exists it will just be redirected to existing data in database

Feature COG-505
This commit is contained in:
Igor Ilic 2024-12-05 16:38:44 +01:00
parent 0ce254b262
commit f5b5e56cc1
6 changed files with 28 additions and 13 deletions

View file

@ -1,4 +1,5 @@
from typing import BinaryIO, TypedDict from typing import BinaryIO, TypedDict
import hashlib
from .guess_file_type import guess_file_type from .guess_file_type import guess_file_type
@ -7,10 +8,14 @@ class FileMetadata(TypedDict):
file_path: str file_path: str
mime_type: str mime_type: str
extension: str extension: str
content_hash: str
def get_file_metadata(file: BinaryIO) -> FileMetadata: def get_file_metadata(file: BinaryIO) -> FileMetadata:
"""Get metadata from a file""" """Get metadata from a file"""
file.seek(0) file.seek(0)
content_hash = hashlib.md5(file.read()).hexdigest()
file.seek(0)
file_type = guess_file_type(file) file_type = guess_file_type(file)
file_path = file.name file_path = file.name
@ -21,4 +26,5 @@ def get_file_metadata(file: BinaryIO) -> FileMetadata:
file_path = file_path, file_path = file_path,
mime_type = file_type.mime, mime_type = file_type.mime,
extension = file_type.extension, extension = file_type.extension,
content_hash = content_hash,
) )

View file

@ -1,7 +1,6 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import List from typing import List
from uuid import uuid4 from uuid import uuid4
from sqlalchemy import UUID, Column, DateTime, String from sqlalchemy import UUID, Column, DateTime, String
from sqlalchemy.orm import Mapped, relationship from sqlalchemy.orm import Mapped, relationship
@ -19,6 +18,8 @@ class Data(Base):
extension = Column(String) extension = Column(String)
mime_type = Column(String) mime_type = Column(String)
raw_data_location = Column(String) raw_data_location = Column(String)
owner_id = Column(UUID, index=True)
content_hash = Column(String)
created_at = Column( created_at = Column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
) )

View file

@ -2,7 +2,6 @@ import inspect
import json import json
import re import re
import warnings import warnings
from typing import Any
from uuid import UUID from uuid import UUID
from sqlalchemy import select from sqlalchemy import select
from typing import Any, BinaryIO, Union from typing import Any, BinaryIO, Union

View file

@ -17,7 +17,7 @@ class BinaryData(IngestionData):
def get_identifier(self): def get_identifier(self):
metadata = self.get_metadata() metadata = self.get_metadata()
return self.name + "." + metadata["extension"] return metadata["content_hash"]
def get_metadata(self): def get_metadata(self):
self.ensure_metadata() self.ensure_metadata()

View file

@ -1,7 +1,9 @@
from uuid import uuid5, NAMESPACE_OID from uuid import uuid5, NAMESPACE_OID
from .data_types import IngestionData from .data_types import IngestionData
def identify(data: IngestionData) -> str: from cognee.modules.users.models import User
data_id: str = data.get_identifier()
return uuid5(NAMESPACE_OID, data_id) def identify(data: IngestionData, user: User) -> str:
data_content_hash: str = data.get_identifier()
# return UUID hash of file contents + owner id
return uuid5(NAMESPACE_OID,f"{data_content_hash}{user.id}")

View file

@ -5,7 +5,6 @@ import cognee.modules.ingestion as ingestion
from cognee.infrastructure.databases.relational import get_relational_engine from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.methods import create_dataset from cognee.modules.data.methods import create_dataset
from cognee.modules.data.models.DatasetData import DatasetData from cognee.modules.data.models.DatasetData import DatasetData
from cognee.modules.data.operations.delete_metadata import delete_metadata
from cognee.modules.users.models import User from cognee.modules.users.models import User
from cognee.modules.users.permissions.methods import give_permission_on_document from cognee.modules.users.permissions.methods import give_permission_on_document
from cognee.shared.utils import send_telemetry from cognee.shared.utils import send_telemetry
@ -25,11 +24,11 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
) )
@dlt.resource(standalone=True, merge_key="id") @dlt.resource(standalone=True, merge_key="id")
async def data_resources(file_paths: str): async def data_resources(file_paths: str, user: User):
for file_path in file_paths: for file_path in file_paths:
with open(file_path.replace("file://", ""), mode="rb") as file: with open(file_path.replace("file://", ""), mode="rb") as file:
classified_data = ingestion.classify(file) classified_data = ingestion.classify(file)
data_id = ingestion.identify(classified_data) data_id = ingestion.identify(classified_data, user)
file_metadata = classified_data.get_metadata() file_metadata = classified_data.get_metadata()
yield { yield {
"id": data_id, "id": data_id,
@ -37,6 +36,8 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
"file_path": file_metadata["file_path"], "file_path": file_metadata["file_path"],
"extension": file_metadata["extension"], "extension": file_metadata["extension"],
"mime_type": file_metadata["mime_type"], "mime_type": file_metadata["mime_type"],
"content_hash": file_metadata["content_hash"],
"owner_id": str(user.id),
} }
async def data_storing(data: Any, dataset_name: str, user: User): async def data_storing(data: Any, dataset_name: str, user: User):
@ -58,7 +59,8 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
with open(file_path.replace("file://", ""), mode = "rb") as file: with open(file_path.replace("file://", ""), mode = "rb") as file:
classified_data = ingestion.classify(file) classified_data = ingestion.classify(file)
data_id = ingestion.identify(classified_data) # data_id is the hash of file contents + owner id to avoid duplicate data
data_id = ingestion.identify(classified_data, user)
file_metadata = classified_data.get_metadata() file_metadata = classified_data.get_metadata()
@ -71,6 +73,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
async with db_engine.get_async_session() as session: async with db_engine.get_async_session() as session:
dataset = await create_dataset(dataset_name, user.id, session) dataset = await create_dataset(dataset_name, user.id, session)
# Check to see if data should be updated
data_point = ( data_point = (
await session.execute(select(Data).filter(Data.id == data_id)) await session.execute(select(Data).filter(Data.id == data_id))
).scalar_one_or_none() ).scalar_one_or_none()
@ -80,6 +83,8 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
data_point.raw_data_location = file_metadata["file_path"] data_point.raw_data_location = file_metadata["file_path"]
data_point.extension = file_metadata["extension"] data_point.extension = file_metadata["extension"]
data_point.mime_type = file_metadata["mime_type"] data_point.mime_type = file_metadata["mime_type"]
data_point.owner_id = user.id
data_point.content_hash = file_metadata["content_hash"]
await session.merge(data_point) await session.merge(data_point)
else: else:
data_point = Data( data_point = Data(
@ -87,7 +92,9 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
name = file_metadata["name"], name = file_metadata["name"],
raw_data_location = file_metadata["file_path"], raw_data_location = file_metadata["file_path"],
extension = file_metadata["extension"], extension = file_metadata["extension"],
mime_type = file_metadata["mime_type"] mime_type = file_metadata["mime_type"],
owner_id = user.id,
content_hash = file_metadata["content_hash"],
) )
# Check if data is already in dataset # Check if data is already in dataset
@ -118,14 +125,14 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
# To use sqlite with dlt dataset_name must be set to "main". # To use sqlite with dlt dataset_name must be set to "main".
# Sqlite doesn't support schemas # Sqlite doesn't support schemas
run_info = pipeline.run( run_info = pipeline.run(
data_resources(file_paths), data_resources(file_paths, user),
table_name="file_metadata", table_name="file_metadata",
dataset_name="main", dataset_name="main",
write_disposition="merge", write_disposition="merge",
) )
else: else:
run_info = pipeline.run( run_info = pipeline.run(
data_resources(file_paths), data_resources(file_paths, user),
table_name="file_metadata", table_name="file_metadata",
dataset_name=dataset_name, dataset_name=dataset_name,
write_disposition="merge", write_disposition="merge",