diff --git a/working_dir_error_replication/reader.py b/cognee/tests/subprocesses/reader.py similarity index 99% rename from working_dir_error_replication/reader.py rename to cognee/tests/subprocesses/reader.py index 4ef78ea06..df54a63e4 100644 --- a/working_dir_error_replication/reader.py +++ b/cognee/tests/subprocesses/reader.py @@ -8,7 +8,6 @@ from cognee.infrastructure.databases.graph.kuzu.adapter import KuzuAdapter async def main(): adapter = KuzuAdapter("test.db") result = await adapter.query("MATCH (n:Node) RETURN COUNT(n)") - print(f"Reader: Found {result[0][0]} nodes") result = await adapter.query("MATCH (n:Node) RETURN COUNT(n)") print(f"Reader: Found {result[0][0]} nodes") diff --git a/working_dir_error_replication/writer.py b/cognee/tests/subprocesses/writer.py similarity index 71% rename from working_dir_error_replication/writer.py rename to cognee/tests/subprocesses/writer.py index 07f2cd276..27d00caba 100644 --- a/working_dir_error_replication/writer.py +++ b/cognee/tests/subprocesses/writer.py @@ -1,10 +1,8 @@ import asyncio -from uuid import uuid4 -from cognee.infrastructure.databases.graph.kuzu.adapter import KuzuAdapter import time import uuid -from cognee.modules.chunking.models.DocumentChunk import DocumentChunk from cognee.modules.data.processing.document_types import PdfDocument +from cognee.infrastructure.databases.graph.kuzu.adapter import KuzuAdapter def create_node(name): @@ -12,22 +10,22 @@ def create_node(name): id=uuid.uuid4(), name=name, raw_data_location=name, - external_metadata="", - mime_type="", + external_metadata="test_external_metadata", + mime_type="test_mime", ) return document async def main(): adapter = KuzuAdapter("test.db") - nodes = [create_node(f"Node{i}") for i in range(2)] + nodes = [create_node(f"Node{i}") for i in range(5)] print("Writer: Starting...") await adapter.add_nodes(nodes) print("writer finished...") - time.sleep(5) + time.sleep(10) if __name__ == "__main__": diff --git a/cognee/tests/test_concurrent_subprocess_access.py b/cognee/tests/test_concurrent_subprocess_access.py index a80d44799..5aafbe177 100644 --- a/cognee/tests/test_concurrent_subprocess_access.py +++ b/cognee/tests/test_concurrent_subprocess_access.py @@ -2,14 +2,17 @@ import os import asyncio import cognee import pathlib +import subprocess from cognee.infrastructure.databases.graph import get_graph_engine from collections import Counter from cognee.modules.users.methods import get_default_user from cognee.shared.logging_utils import get_logger +from multiprocessing import Process + logger = get_logger() - +run async def test_concurrent_subprocess_access(): data_directory_path = str( @@ -29,15 +32,13 @@ async def test_concurrent_subprocess_access(): await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) - text1 = "Dave watches Dexter Resurrection" - text2 = "Ana likes apples" - text3 = "Bob prefers Cognee over other solutions" + writer_process = subprocess.Popen([os.sys.executable, "writer.py"]) - await cognee.add([text1, text2, text3], dataset_name="edge_ingestion_test") + reader_process = subprocess.Popen([os.sys.executable, "reader.py"]) - user = await get_default_user() - - await cognee.cognify(["edge_ingestion_test"], user=user) + # Wait for both processes to complete + writer_process.wait() + reader_process.wait() if __name__ == "__main__":