fix: remove obsolete files and fix unit tests

This commit is contained in:
Boris Arzentar 2025-07-08 22:47:09 +02:00
parent 340a61b20a
commit 66427e725c
No known key found for this signature in database
GPG key ID: D5CC274C784807B7
17 changed files with 40 additions and 113 deletions

View file

@ -1,7 +1,11 @@
import io import io
from typing import BinaryIO, TypedDict from typing import BinaryIO, TypedDict
from .guess_file_type import guess_file_type
from cognee.shared.logging_utils import get_logger
from cognee.shared.utils import get_file_content_hash from cognee.shared.utils import get_file_content_hash
from .guess_file_type import guess_file_type
logger = get_logger("FileMetadata")
class FileMetadata(TypedDict): class FileMetadata(TypedDict):
@ -44,9 +48,7 @@ def get_file_metadata(file: BinaryIO) -> FileMetadata:
content_hash = get_file_content_hash(file) content_hash = get_file_content_hash(file)
file.seek(0) file.seek(0)
except io.UnsupportedOperation as error: except io.UnsupportedOperation as error:
raise Exception( logger.error(f"Error retrieving content hash for file: {file.name} \n{str(error)}\n\n")
f"Error retrieving metadata from file: {file.name} \n{str(error)}\n\n"
) from error
file_type = guess_file_type(file) file_type = guess_file_type(file)

View file

@ -1,25 +0,0 @@
import asyncio
import threading
def run_sync(coro, timeout=None):
result = None
exception = None
def runner():
nonlocal result, exception
try:
result = asyncio.run(coro)
except Exception as e:
exception = e
thread = threading.Thread(target=runner)
thread.start()
thread.join(timeout)
if thread.is_alive():
raise asyncio.TimeoutError("Coroutine execution timed out.")
if exception:
raise exception
return result

View file

@ -47,18 +47,11 @@ def expand_with_nodes_and_edges(
type_node_key = f"{type_node_id}_type" type_node_key = f"{type_node_id}_type"
if type_node_key not in added_nodes_map and type_node_key not in key_mapping: if type_node_key not in added_nodes_map and type_node_key not in key_mapping:
if ontology_resolver: (
( ontology_entity_type_nodes,
ontology_entity_type_nodes, ontology_entity_type_edges,
ontology_entity_type_edges, ontology_closest_class_node,
ontology_closest_class_node, ) = ontology_resolver.get_subgraph(node_name=type_node_name, node_type="classes")
) = ontology_resolver.get_subgraph(
node_name=type_node_name, node_type="classes"
)
else:
ontology_entity_type_nodes = []
ontology_entity_type_edges = []
ontology_closest_class_node = None
if ontology_closest_class_node: if ontology_closest_class_node:
name_mapping[type_node_name] = ontology_closest_class_node.name name_mapping[type_node_name] = ontology_closest_class_node.name
@ -135,14 +128,9 @@ def expand_with_nodes_and_edges(
entity_node_key = f"{node_id}_entity" entity_node_key = f"{node_id}_entity"
if entity_node_key not in added_nodes_map and entity_node_key not in key_mapping: if entity_node_key not in added_nodes_map and entity_node_key not in key_mapping:
if ontology_resolver: ontology_entity_nodes, ontology_entity_edges, start_ent_ont = (
ontology_entity_nodes, ontology_entity_edges, start_ent_ont = ( ontology_resolver.get_subgraph(node_name=node_name, node_type="individuals")
ontology_resolver.get_subgraph(node_name=node_name, node_type="individuals") )
)
else:
ontology_entity_nodes = []
ontology_entity_edges = []
start_ent_ont = None
if start_ent_ont: if start_ent_ont:
name_mapping[node_name] = start_ent_ont.name name_mapping[node_name] = start_ent_ont.name

View file

@ -1,6 +0,0 @@
import hashlib
def get_text_content_hash(text: str) -> str:
encoded_text = text.encode("utf-8")
return hashlib.md5(encoded_text).hexdigest()

View file

@ -85,29 +85,6 @@ async def run_tasks_distributed(tasks, dataset_id, data, user, pipeline_name, co
payload=result, payload=result,
) )
# producer_futures = []
# for data_item in data[:5]:
# producer_future = run_tasks_distributed(
# run_tasks_with_telemetry, tasks, [data_item], user, pipeline_name, context
# )
# producer_futures.append(producer_future)
# batch_results = []
# for producer_future in producer_futures:
# try:
# result = producer_future.get()
# except Exception as e:
# result = e
# batch_results.append(result)
# yield PipelineRunYield(
# pipeline_run_id=pipeline_run_id,
# dataset_id=dataset.id,
# dataset_name=dataset.name,
# payload=result,
# )
await log_pipeline_run_complete(pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data) await log_pipeline_run_complete(pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data)
yield PipelineRunCompleted( yield PipelineRunCompleted(

View file

@ -1,9 +1,11 @@
import asyncio
import random
import time import time
import random
import pytest
import asyncio
from typing import List from typing import List
from uuid import NAMESPACE_OID, uuid5 from uuid import NAMESPACE_OID, uuid5
from cognee.infrastructure.engine import DataPoint from cognee.infrastructure.engine import DataPoint
from cognee.modules.graph.utils import get_graph_from_model from cognee.modules.graph.utils import get_graph_from_model
@ -55,6 +57,7 @@ def nanoseconds_to_largest_unit(nanoseconds):
return nanoseconds, "nanoseconds" return nanoseconds, "nanoseconds"
@pytest.mark.asyncio
async def test_circular_reference_extraction(): async def test_circular_reference_extraction():
repo = Repository(path="repo1") repo = Repository(path="repo1")

View file

@ -1,7 +1,9 @@
import pytest
import asyncio import asyncio
from typing import List from typing import List
from uuid import NAMESPACE_OID, uuid5 from uuid import NAMESPACE_OID, uuid5
from cognee.infrastructure.engine import DataPoint from cognee.infrastructure.engine import DataPoint
from cognee.modules.graph.utils import get_graph_from_model from cognee.modules.graph.utils import get_graph_from_model
@ -32,6 +34,7 @@ class Entity(DataPoint):
DocumentChunk.model_rebuild() DocumentChunk.model_rebuild()
@pytest.mark.asyncio
async def get_graph_from_model_test(): async def get_graph_from_model_test():
document = Document(path="file_path") document = Document(path="file_path")

View file

@ -1,8 +1,9 @@
import pytest
import asyncio import asyncio
import random
from typing import List from typing import List
from uuid import NAMESPACE_OID, uuid5 from uuid import NAMESPACE_OID, uuid5
from cognee.infrastructure.engine import DataPoint from cognee.infrastructure.engine import DataPoint
from cognee.modules.graph.utils import get_graph_from_model from cognee.modules.graph.utils import get_graph_from_model
@ -33,6 +34,7 @@ class Entity(DataPoint):
DocumentChunk.model_rebuild() DocumentChunk.model_rebuild()
@pytest.mark.asyncio
async def get_graph_from_model_test(): async def get_graph_from_model_test():
document = Document(path="file_path") document = Document(path="file_path")

View file

@ -1,5 +1,5 @@
import pytest import pytest
from typing import List, Optional from typing import List
from cognee.infrastructure.engine import DataPoint from cognee.infrastructure.engine import DataPoint
from cognee.modules.graph.utils import get_graph_from_model from cognee.modules.graph.utils import get_graph_from_model

View file

@ -2,7 +2,7 @@ import asyncio
from queue import Queue from queue import Queue
import cognee import cognee
from cognee.modules.pipelines.operations.run_tasks import run_tasks_base from cognee.modules.pipelines.operations.run_tasks_base import run_tasks_base
from cognee.modules.pipelines.tasks.task import Task from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.infrastructure.databases.relational import create_db_and_tables from cognee.infrastructure.databases.relational import create_db_and_tables

View file

@ -1,7 +1,7 @@
import asyncio import asyncio
import cognee import cognee
from cognee.modules.pipelines.operations.run_tasks import run_tasks_base from cognee.modules.pipelines.operations.run_tasks_base import run_tasks_base
from cognee.modules.pipelines.tasks.task import Task from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.infrastructure.databases.relational import create_db_and_tables from cognee.infrastructure.databases.relational import create_db_and_tables

View file

@ -3,7 +3,7 @@ import asyncio
import cognee import cognee
from cognee.modules.pipelines.tasks.task import Task from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.methods import get_default_user from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines.operations.run_tasks import run_tasks_base from cognee.modules.pipelines.operations.run_tasks_base import run_tasks_base
from cognee.infrastructure.databases.relational import create_db_and_tables from cognee.infrastructure.databases.relational import create_db_and_tables

View file

@ -1,5 +1,5 @@
import asyncio
import os import os
import asyncio
import cognee import cognee
from cognee.api.v1.prune import prune from cognee.api.v1.prune import prune
@ -45,10 +45,10 @@ async def main():
worker_future = data_point_saving_worker.spawn() worker_future = data_point_saving_worker.spawn()
consumer_futures.append(worker_future) consumer_futures.append(worker_future)
# s3_bucket_name = "s3://s3-test-laszlo/Database for KG v1" s3_bucket_path = os.getenv("S3_BUCKET_PATH")
s3_bucket_name = "s3://s3-test-laszlo/Pdf" s3_data_path = "s3://" + s3_bucket_path
await cognee.add(s3_bucket_name, dataset_name="s3-files") await cognee.add(s3_data_path, dataset_name="s3-files")
await cognee.cognify(datasets=["s3-files"]) await cognee.cognify(datasets=["s3-files"])

View file

@ -1,22 +1,11 @@
import json
import pathlib import pathlib
from os import path from os import path
from modal import Image from modal import Image
from logging import getLogger from logging import getLogger
from dotenv import dotenv_values
logger = getLogger("modal_image_creation") logger = getLogger("modal_image_creation")
local_env_vars = dict(dotenv_values(".env")) image = Image.from_dockerfile(
path=pathlib.Path(path.join(path.dirname(__file__), "Dockerfile")).resolve(),
logger.debug("Modal deployment started with the following environmental variables:") force_build=False,
logger.debug(json.dumps(local_env_vars, indent=4)) ).add_local_python_source("cognee", "entrypoint")
image = (
Image.from_dockerfile(
path=pathlib.Path(path.join(path.dirname(__file__), "Dockerfile")).resolve(),
force_build=False,
)
.env(local_env_vars)
.add_local_python_source("cognee", "entrypoint")
)

View file

@ -1,7 +1,5 @@
from grpclib import GRPCError
async def queued_add_data_points(collection_name, data_points_batch): async def queued_add_data_points(collection_name, data_points_batch):
from grpclib import GRPCError
from ..queues import add_data_points_queue from ..queues import add_data_points_queue
try: try:

View file

@ -1,7 +1,5 @@
from grpclib import GRPCError
async def queued_add_edges(edge_batch): async def queued_add_edges(edge_batch):
from grpclib import GRPCError
from ..queues import add_nodes_and_edges_queue from ..queues import add_nodes_and_edges_queue
try: try:

View file

@ -1,7 +1,5 @@
from grpclib import GRPCError
async def queued_add_nodes(node_batch): async def queued_add_nodes(node_batch):
from grpclib import GRPCError
from ..queues import add_nodes_and_edges_queue from ..queues import add_nodes_and_edges_queue
try: try: