refactor: Move hash calculation of file to util

Moved hash calculation of file to shared utils, added better typing

Refactor COG-505
This commit is contained in:
Igor Ilic 2024-12-05 20:33:30 +01:00
parent 9ba5d49e69
commit e80377b729
3 changed files with 27 additions and 3 deletions

View file

@ -1,6 +1,7 @@
from typing import BinaryIO, TypedDict
import hashlib
from .guess_file_type import guess_file_type
from cognee.shared.utils import get_file_content_hash
class FileMetadata(TypedDict):
@ -13,7 +14,7 @@ class FileMetadata(TypedDict):
def get_file_metadata(file: BinaryIO) -> FileMetadata:
"""Get metadata from a file"""
file.seek(0)
content_hash = hashlib.md5(file.read()).hexdigest()
content_hash = get_file_content_hash(file)
file.seek(0)
file_type = guess_file_type(file)

View file

@ -1,6 +1,9 @@
""" This module contains utility functions for the cognee. """
import os
from typing import BinaryIO, Union
import requests
import hashlib
from datetime import datetime, timezone
import graphistry
import networkx as nx
@ -70,6 +73,26 @@ def num_tokens_from_string(string: str, encoding_name: str) -> int:
num_tokens = len(encoding.encode(string))
return num_tokens
def get_file_content_hash(file_obj: Union[str, BinaryIO]) -> str:
h = hashlib.md5()
if isinstance(file_obj, str):
with open(file_obj, 'rb') as file:
while True:
# Reading is buffered, so we can read smaller chunks.
chunk = file.read(h.block_size)
if not chunk:
break
h.update(chunk)
else:
while True:
# Reading is buffered, so we can read smaller chunks.
chunk = file_obj.read(h.block_size)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
def trim_text_to_max_tokens(text: str, max_tokens: int, encoding_name: str) -> str:
"""

View file

@ -1,4 +1,4 @@
from typing import Any
from typing import Any, List
import dlt
import cognee.modules.ingestion as ingestion
@ -24,7 +24,7 @@ async def ingest_data_with_metadata(data: Any, dataset_name: str, user: User):
)
@dlt.resource(standalone=True, primary_key="id", merge_key="id")
async def data_resources(file_paths: str, user: User):
async def data_resources(file_paths: List[str], user: User):
for file_path in file_paths:
with open(file_path.replace("file://", ""), mode="rb") as file:
classified_data = ingestion.classify(file)