From 66427e725c3e8a7cb9a94b60261a6193e8773370 Mon Sep 17 00:00:00 2001 From: Boris Arzentar Date: Tue, 8 Jul 2025 22:47:09 +0200 Subject: [PATCH] fix: remove obsolete files and fix unit tests --- .../files/utils/get_file_metadata.py | 10 ++++--- cognee/infrastructure/utils/run_sync.py | 25 ----------------- .../utils/expand_with_nodes_and_edges.py | 28 ++++++------------- .../ingestion/get_text_content_hash.py | 6 ---- .../operations/run_tasks_distributed.py | 23 --------------- .../graph/get_graph_from_huge_model_test.py | 7 +++-- .../graph/get_graph_from_model_flat_test.py | 3 ++ .../graph/get_graph_from_model_test.py | 4 ++- .../graph/get_graph_from_model_unit_test.py | 2 +- .../pipelines/run_task_from_queue_test.py | 2 +- .../unit/modules/pipelines/run_tasks_test.py | 2 +- .../pipelines/run_tasks_with_context_test.py | 2 +- distributed/entrypoint.py | 8 +++--- distributed/modal_image.py | 19 +++---------- distributed/tasks/queued_add_data_points.py | 4 +-- distributed/tasks/queued_add_edges.py | 4 +-- distributed/tasks/queued_add_nodes.py | 4 +-- 17 files changed, 40 insertions(+), 113 deletions(-) delete mode 100644 cognee/infrastructure/utils/run_sync.py delete mode 100644 cognee/modules/ingestion/get_text_content_hash.py diff --git a/cognee/infrastructure/files/utils/get_file_metadata.py b/cognee/infrastructure/files/utils/get_file_metadata.py index 2742e3454..ead226650 100644 --- a/cognee/infrastructure/files/utils/get_file_metadata.py +++ b/cognee/infrastructure/files/utils/get_file_metadata.py @@ -1,7 +1,11 @@ import io from typing import BinaryIO, TypedDict -from .guess_file_type import guess_file_type + +from cognee.shared.logging_utils import get_logger from cognee.shared.utils import get_file_content_hash +from .guess_file_type import guess_file_type + +logger = get_logger("FileMetadata") class FileMetadata(TypedDict): @@ -44,9 +48,7 @@ def get_file_metadata(file: BinaryIO) -> FileMetadata: content_hash = get_file_content_hash(file) file.seek(0) except io.UnsupportedOperation as error: - raise Exception( - f"Error retrieving metadata from file: {file.name} \n{str(error)}\n\n" - ) from error + logger.error(f"Error retrieving content hash for file: {file.name} \n{str(error)}\n\n") file_type = guess_file_type(file) diff --git a/cognee/infrastructure/utils/run_sync.py b/cognee/infrastructure/utils/run_sync.py deleted file mode 100644 index 4945b0b98..000000000 --- a/cognee/infrastructure/utils/run_sync.py +++ /dev/null @@ -1,25 +0,0 @@ -import asyncio -import threading - - -def run_sync(coro, timeout=None): - result = None - exception = None - - def runner(): - nonlocal result, exception - try: - result = asyncio.run(coro) - except Exception as e: - exception = e - - thread = threading.Thread(target=runner) - thread.start() - thread.join(timeout) - - if thread.is_alive(): - raise asyncio.TimeoutError("Coroutine execution timed out.") - if exception: - raise exception - - return result diff --git a/cognee/modules/graph/utils/expand_with_nodes_and_edges.py b/cognee/modules/graph/utils/expand_with_nodes_and_edges.py index b72a2d340..f56e6d0ca 100644 --- a/cognee/modules/graph/utils/expand_with_nodes_and_edges.py +++ b/cognee/modules/graph/utils/expand_with_nodes_and_edges.py @@ -47,18 +47,11 @@ def expand_with_nodes_and_edges( type_node_key = f"{type_node_id}_type" if type_node_key not in added_nodes_map and type_node_key not in key_mapping: - if ontology_resolver: - ( - ontology_entity_type_nodes, - ontology_entity_type_edges, - ontology_closest_class_node, - ) = ontology_resolver.get_subgraph( - node_name=type_node_name, node_type="classes" - ) - else: - ontology_entity_type_nodes = [] - ontology_entity_type_edges = [] - ontology_closest_class_node = None + ( + ontology_entity_type_nodes, + ontology_entity_type_edges, + ontology_closest_class_node, + ) = ontology_resolver.get_subgraph(node_name=type_node_name, node_type="classes") if ontology_closest_class_node: name_mapping[type_node_name] = ontology_closest_class_node.name @@ -135,14 +128,9 @@ def expand_with_nodes_and_edges( entity_node_key = f"{node_id}_entity" if entity_node_key not in added_nodes_map and entity_node_key not in key_mapping: - if ontology_resolver: - ontology_entity_nodes, ontology_entity_edges, start_ent_ont = ( - ontology_resolver.get_subgraph(node_name=node_name, node_type="individuals") - ) - else: - ontology_entity_nodes = [] - ontology_entity_edges = [] - start_ent_ont = None + ontology_entity_nodes, ontology_entity_edges, start_ent_ont = ( + ontology_resolver.get_subgraph(node_name=node_name, node_type="individuals") + ) if start_ent_ont: name_mapping[node_name] = start_ent_ont.name diff --git a/cognee/modules/ingestion/get_text_content_hash.py b/cognee/modules/ingestion/get_text_content_hash.py deleted file mode 100644 index ee59a0c0f..000000000 --- a/cognee/modules/ingestion/get_text_content_hash.py +++ /dev/null @@ -1,6 +0,0 @@ -import hashlib - - -def get_text_content_hash(text: str) -> str: - encoded_text = text.encode("utf-8") - return hashlib.md5(encoded_text).hexdigest() diff --git a/cognee/modules/pipelines/operations/run_tasks_distributed.py b/cognee/modules/pipelines/operations/run_tasks_distributed.py index 69d9b40e9..30c4fb073 100644 --- a/cognee/modules/pipelines/operations/run_tasks_distributed.py +++ b/cognee/modules/pipelines/operations/run_tasks_distributed.py @@ -85,29 +85,6 @@ async def run_tasks_distributed(tasks, dataset_id, data, user, pipeline_name, co payload=result, ) - # producer_futures = [] - - # for data_item in data[:5]: - # producer_future = run_tasks_distributed( - # run_tasks_with_telemetry, tasks, [data_item], user, pipeline_name, context - # ) - # producer_futures.append(producer_future) - - # batch_results = [] - # for producer_future in producer_futures: - # try: - # result = producer_future.get() - # except Exception as e: - # result = e - # batch_results.append(result) - - # yield PipelineRunYield( - # pipeline_run_id=pipeline_run_id, - # dataset_id=dataset.id, - # dataset_name=dataset.name, - # payload=result, - # ) - await log_pipeline_run_complete(pipeline_run_id, pipeline_id, pipeline_name, dataset_id, data) yield PipelineRunCompleted( diff --git a/cognee/tests/unit/interfaces/graph/get_graph_from_huge_model_test.py b/cognee/tests/unit/interfaces/graph/get_graph_from_huge_model_test.py index 29ab5ded9..c9bcf99dc 100644 --- a/cognee/tests/unit/interfaces/graph/get_graph_from_huge_model_test.py +++ b/cognee/tests/unit/interfaces/graph/get_graph_from_huge_model_test.py @@ -1,9 +1,11 @@ -import asyncio -import random import time +import random +import pytest +import asyncio from typing import List from uuid import NAMESPACE_OID, uuid5 + from cognee.infrastructure.engine import DataPoint from cognee.modules.graph.utils import get_graph_from_model @@ -55,6 +57,7 @@ def nanoseconds_to_largest_unit(nanoseconds): return nanoseconds, "nanoseconds" +@pytest.mark.asyncio async def test_circular_reference_extraction(): repo = Repository(path="repo1") diff --git a/cognee/tests/unit/interfaces/graph/get_graph_from_model_flat_test.py b/cognee/tests/unit/interfaces/graph/get_graph_from_model_flat_test.py index 8a1f7b5b8..e115b8c9b 100644 --- a/cognee/tests/unit/interfaces/graph/get_graph_from_model_flat_test.py +++ b/cognee/tests/unit/interfaces/graph/get_graph_from_model_flat_test.py @@ -1,7 +1,9 @@ +import pytest import asyncio from typing import List from uuid import NAMESPACE_OID, uuid5 + from cognee.infrastructure.engine import DataPoint from cognee.modules.graph.utils import get_graph_from_model @@ -32,6 +34,7 @@ class Entity(DataPoint): DocumentChunk.model_rebuild() +@pytest.mark.asyncio async def get_graph_from_model_test(): document = Document(path="file_path") diff --git a/cognee/tests/unit/interfaces/graph/get_graph_from_model_test.py b/cognee/tests/unit/interfaces/graph/get_graph_from_model_test.py index 135849c9b..6b6837eaf 100644 --- a/cognee/tests/unit/interfaces/graph/get_graph_from_model_test.py +++ b/cognee/tests/unit/interfaces/graph/get_graph_from_model_test.py @@ -1,8 +1,9 @@ +import pytest import asyncio -import random from typing import List from uuid import NAMESPACE_OID, uuid5 + from cognee.infrastructure.engine import DataPoint from cognee.modules.graph.utils import get_graph_from_model @@ -33,6 +34,7 @@ class Entity(DataPoint): DocumentChunk.model_rebuild() +@pytest.mark.asyncio async def get_graph_from_model_test(): document = Document(path="file_path") diff --git a/cognee/tests/unit/interfaces/graph/get_graph_from_model_unit_test.py b/cognee/tests/unit/interfaces/graph/get_graph_from_model_unit_test.py index d45819d39..99bb66ccf 100644 --- a/cognee/tests/unit/interfaces/graph/get_graph_from_model_unit_test.py +++ b/cognee/tests/unit/interfaces/graph/get_graph_from_model_unit_test.py @@ -1,5 +1,5 @@ import pytest -from typing import List, Optional +from typing import List from cognee.infrastructure.engine import DataPoint from cognee.modules.graph.utils import get_graph_from_model diff --git a/cognee/tests/unit/modules/pipelines/run_task_from_queue_test.py b/cognee/tests/unit/modules/pipelines/run_task_from_queue_test.py index 4aabb4241..a40f6ad00 100644 --- a/cognee/tests/unit/modules/pipelines/run_task_from_queue_test.py +++ b/cognee/tests/unit/modules/pipelines/run_task_from_queue_test.py @@ -2,7 +2,7 @@ import asyncio from queue import Queue import cognee -from cognee.modules.pipelines.operations.run_tasks import run_tasks_base +from cognee.modules.pipelines.operations.run_tasks_base import run_tasks_base from cognee.modules.pipelines.tasks.task import Task from cognee.modules.users.methods import get_default_user from cognee.infrastructure.databases.relational import create_db_and_tables diff --git a/cognee/tests/unit/modules/pipelines/run_tasks_test.py b/cognee/tests/unit/modules/pipelines/run_tasks_test.py index 72dd5440e..511c8effe 100644 --- a/cognee/tests/unit/modules/pipelines/run_tasks_test.py +++ b/cognee/tests/unit/modules/pipelines/run_tasks_test.py @@ -1,7 +1,7 @@ import asyncio import cognee -from cognee.modules.pipelines.operations.run_tasks import run_tasks_base +from cognee.modules.pipelines.operations.run_tasks_base import run_tasks_base from cognee.modules.pipelines.tasks.task import Task from cognee.modules.users.methods import get_default_user from cognee.infrastructure.databases.relational import create_db_and_tables diff --git a/cognee/tests/unit/modules/pipelines/run_tasks_with_context_test.py b/cognee/tests/unit/modules/pipelines/run_tasks_with_context_test.py index 32c9f32a7..3da14c67b 100644 --- a/cognee/tests/unit/modules/pipelines/run_tasks_with_context_test.py +++ b/cognee/tests/unit/modules/pipelines/run_tasks_with_context_test.py @@ -3,7 +3,7 @@ import asyncio import cognee from cognee.modules.pipelines.tasks.task import Task from cognee.modules.users.methods import get_default_user -from cognee.modules.pipelines.operations.run_tasks import run_tasks_base +from cognee.modules.pipelines.operations.run_tasks_base import run_tasks_base from cognee.infrastructure.databases.relational import create_db_and_tables diff --git a/distributed/entrypoint.py b/distributed/entrypoint.py index 18d068a65..977c92c95 100644 --- a/distributed/entrypoint.py +++ b/distributed/entrypoint.py @@ -1,5 +1,5 @@ -import asyncio import os +import asyncio import cognee from cognee.api.v1.prune import prune @@ -45,10 +45,10 @@ async def main(): worker_future = data_point_saving_worker.spawn() consumer_futures.append(worker_future) - # s3_bucket_name = "s3://s3-test-laszlo/Database for KG v1" - s3_bucket_name = "s3://s3-test-laszlo/Pdf" + s3_bucket_path = os.getenv("S3_BUCKET_PATH") + s3_data_path = "s3://" + s3_bucket_path - await cognee.add(s3_bucket_name, dataset_name="s3-files") + await cognee.add(s3_data_path, dataset_name="s3-files") await cognee.cognify(datasets=["s3-files"]) diff --git a/distributed/modal_image.py b/distributed/modal_image.py index 1854e290c..ccca67810 100644 --- a/distributed/modal_image.py +++ b/distributed/modal_image.py @@ -1,22 +1,11 @@ -import json import pathlib from os import path from modal import Image from logging import getLogger -from dotenv import dotenv_values logger = getLogger("modal_image_creation") -local_env_vars = dict(dotenv_values(".env")) - -logger.debug("Modal deployment started with the following environmental variables:") -logger.debug(json.dumps(local_env_vars, indent=4)) - -image = ( - Image.from_dockerfile( - path=pathlib.Path(path.join(path.dirname(__file__), "Dockerfile")).resolve(), - force_build=False, - ) - .env(local_env_vars) - .add_local_python_source("cognee", "entrypoint") -) +image = Image.from_dockerfile( + path=pathlib.Path(path.join(path.dirname(__file__), "Dockerfile")).resolve(), + force_build=False, +).add_local_python_source("cognee", "entrypoint") diff --git a/distributed/tasks/queued_add_data_points.py b/distributed/tasks/queued_add_data_points.py index 8b4e21c56..0cc34e59e 100644 --- a/distributed/tasks/queued_add_data_points.py +++ b/distributed/tasks/queued_add_data_points.py @@ -1,7 +1,5 @@ -from grpclib import GRPCError - - async def queued_add_data_points(collection_name, data_points_batch): + from grpclib import GRPCError from ..queues import add_data_points_queue try: diff --git a/distributed/tasks/queued_add_edges.py b/distributed/tasks/queued_add_edges.py index 71723d7c6..e528dd44c 100644 --- a/distributed/tasks/queued_add_edges.py +++ b/distributed/tasks/queued_add_edges.py @@ -1,7 +1,5 @@ -from grpclib import GRPCError - - async def queued_add_edges(edge_batch): + from grpclib import GRPCError from ..queues import add_nodes_and_edges_queue try: diff --git a/distributed/tasks/queued_add_nodes.py b/distributed/tasks/queued_add_nodes.py index 48fbee7a1..adf6e3c8a 100644 --- a/distributed/tasks/queued_add_nodes.py +++ b/distributed/tasks/queued_add_nodes.py @@ -1,7 +1,5 @@ -from grpclib import GRPCError - - async def queued_add_nodes(node_batch): + from grpclib import GRPCError from ..queues import add_nodes_and_edges_queue try: