feat: Redis lock integration and Kuzu agentic access fix (#1504)

<!-- .github/pull_request_template.md -->

## Description
This PR introduces a shared locked mechanism in KuzuAdapter to avoid use
case when multiple subprocesses from different environments are trying
to use the same Kuzu adatabase.

## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [x] Performance improvement
- [ ] Other (please specify):

## Screenshots/Videos (if applicable)
None

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [x] **I have tested my changes thoroughly before submitting this PR**
- [x] **This PR contains minimal changes necessary to address the
issue/feature**
- [x] My code follows the project's coding standards and style
guidelines
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have added necessary documentation (if applicable)
- [x] All new and existing tests pass
- [x] I have searched existing PRs to ensure this change hasn't been
submitted already
- [x] I have linked any relevant issues in the description
- [x] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
hajdul88 2025-10-16 15:48:20 +02:00 committed by GitHub
parent f0c332928d
commit 9821a01a47
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 1671 additions and 371 deletions

View file

@ -41,4 +41,4 @@ runs:
EXTRA_ARGS="$EXTRA_ARGS --extra $extra"
done
fi
uv sync --extra api --extra docs --extra evals --extra codegraph --extra ollama --extra dev --extra neo4j $EXTRA_ARGS
uv sync --extra api --extra docs --extra evals --extra codegraph --extra ollama --extra dev --extra neo4j --extra redis $EXTRA_ARGS

View file

@ -1,4 +1,6 @@
name: Reusable Integration Tests
permissions:
contents: read
on:
workflow_call:
@ -264,3 +266,68 @@ jobs:
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
run: uv run python ./cognee/tests/test_edge_ingestion.py
run_concurrent_subprocess_access_test:
name: Concurrent Subprocess access test
runs-on: ubuntu-latest
defaults:
run:
shell: bash
services:
postgres:
image: pgvector/pgvector:pg17
env:
POSTGRES_USER: cognee
POSTGRES_PASSWORD: cognee
POSTGRES_DB: cognee_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
redis:
image: redis:7
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 5s
--health-timeout 3s
--health-retries 5
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: '3.11.x'
extra-dependencies: "postgres redis"
- name: Run Concurrent subprocess access test (Kuzu/Lancedb/Postgres)
env:
ENV: dev
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
GRAPH_DATABASE_PROVIDER: 'kuzu'
CACHING: true
SHARED_KUZU_LOCK: true
DB_PROVIDER: 'postgres'
DB_NAME: 'cognee_db'
DB_HOST: '127.0.0.1'
DB_PORT: 5432
DB_USERNAME: cognee
DB_PASSWORD: cognee
run: uv run python ./cognee/tests/test_concurrent_subprocess_access.py

View file

@ -0,0 +1,2 @@
from .get_cache_engine import get_cache_engine
from .config import get_cache_config

View file

@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
from contextlib import contextmanager
class CacheDBInterface(ABC):
"""
Abstract base class for distributed cache coordination systems (e.g., Redis, Memcached).
Provides a common interface for lock acquisition, release, and context-managed locking.
"""
def __init__(self, host: str, port: int, lock_key: str):
self.host = host
self.port = port
self.lock_key = lock_key
self.lock = None
@abstractmethod
def acquire_lock(self):
"""
Acquire a lock on the given key.
Must be implemented by subclasses.
"""
pass
@abstractmethod
def release_lock(self):
"""
Release the lock if it is held.
Must be implemented by subclasses.
"""
pass
@contextmanager
def hold_lock(self):
"""
Context manager for safely acquiring and releasing the lock.
"""
self.acquire()
try:
yield
finally:
self.release()

View file

@ -0,0 +1,39 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
from functools import lru_cache
class CacheConfig(BaseSettings):
"""
Configuration for distributed cache systems (e.g., Redis), used for locking or coordination.
Attributes:
- shared_kuzu_lock: Shared kuzu lock logic on/off.
- cache_host: Hostname of the cache service.
- cache_port: Port number for the cache service.
- agentic_lock_expire: Automatic lock expiration time (in seconds).
- agentic_lock_timeout: Maximum time (in seconds) to wait for the lock release.
"""
caching: bool = False
shared_kuzu_lock: bool = False
cache_host: str = "localhost"
cache_port: int = 6379
agentic_lock_expire: int = 240
agentic_lock_timeout: int = 300
model_config = SettingsConfigDict(env_file=".env", extra="allow")
def to_dict(self) -> dict:
return {
"caching": self.caching,
"shared_kuzu_lock": self.shared_kuzu_lock,
"cache_host": self.cache_host,
"cache_port": self.cache_port,
"agentic_lock_expire": self.agentic_lock_expire,
"agentic_lock_timeout": self.agentic_lock_timeout,
}
@lru_cache
def get_cache_config():
return CacheConfig()

View file

@ -0,0 +1,59 @@
"""Factory to get the appropriate cache coordination engine (e.g., Redis)."""
from functools import lru_cache
from cognee.infrastructure.databases.cache.config import get_cache_config
from cognee.infrastructure.databases.cache.cache_db_interface import CacheDBInterface
config = get_cache_config()
@lru_cache
def create_cache_engine(
cache_host: str,
cache_port: int,
lock_key: str,
agentic_lock_expire: int = 240,
agentic_lock_timeout: int = 300,
):
"""
Factory function to instantiate a cache coordination backend (currently Redis).
Parameters:
-----------
- cache_host: Hostname or IP of the cache server.
- cache_port: Port number to connect to.
- lock_key: Identifier used for the locking resource.
- agentic_lock_expire: Duration to hold the lock after acquisition.
- agentic_lock_timeout: Max time to wait for the lock before failing.
Returns:
--------
- CacheDBInterface: An instance of the appropriate cache adapter. :TODO: Now we support only Redis. later if we add more here we can split the logic
"""
if config.caching:
from cognee.infrastructure.databases.cache.redis.RedisAdapter import RedisAdapter
return RedisAdapter(
host=cache_host,
port=cache_port,
lock_name=lock_key,
timeout=agentic_lock_expire,
blocking_timeout=agentic_lock_timeout,
)
else:
return None
def get_cache_engine(lock_key: str) -> CacheDBInterface:
"""
Returns a cache adapter instance using current context configuration.
"""
return create_cache_engine(
cache_host=config.cache_host,
cache_port=config.cache_port,
lock_key=lock_key,
agentic_lock_expire=config.agentic_lock_expire,
agentic_lock_timeout=config.agentic_lock_timeout,
)

View file

@ -0,0 +1,49 @@
import redis
from contextlib import contextmanager
from cognee.infrastructure.databases.cache.cache_db_interface import CacheDBInterface
class RedisAdapter(CacheDBInterface):
def __init__(self, host, port, lock_name, timeout=240, blocking_timeout=300):
super().__init__(host, port, lock_name)
self.redis = redis.Redis(host=host, port=port)
self.timeout = timeout
self.blocking_timeout = blocking_timeout
def acquire_lock(self):
"""
Acquire the Redis lock manually. Raises if acquisition fails.
"""
self.lock = self.redis.lock(
name=self.lock_key,
timeout=self.timeout,
blocking_timeout=self.blocking_timeout,
)
acquired = self.lock.acquire()
if not acquired:
raise RuntimeError(f"Could not acquire Redis lock: {self.lock_key}")
return self.lock
def release_lock(self):
"""
Release the Redis lock manually, if held.
"""
if self.lock:
try:
self.lock.release()
self.lock = None
except redis.exceptions.LockError:
pass
@contextmanager
def hold_lock(self):
"""
Context manager for acquiring and releasing the Redis lock automatically.
"""
self.acquire()
try:
yield
finally:
self.release()

View file

@ -4,7 +4,7 @@ import os
import json
import asyncio
import tempfile
from uuid import UUID
from uuid import UUID, uuid5, NAMESPACE_OID
from kuzu import Connection
from kuzu.database import Database
from datetime import datetime, timezone
@ -23,9 +23,14 @@ from cognee.infrastructure.engine import DataPoint
from cognee.modules.storage.utils import JSONEncoder
from cognee.modules.engine.utils.generate_timestamp_datapoint import date_to_int
from cognee.tasks.temporal_graph.models import Timestamp
from cognee.infrastructure.databases.cache.config import get_cache_config
logger = get_logger()
cache_config = get_cache_config()
if cache_config.shared_kuzu_lock:
from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine
class KuzuAdapter(GraphDBInterface):
"""
@ -39,12 +44,20 @@ class KuzuAdapter(GraphDBInterface):
def __init__(self, db_path: str):
"""Initialize Kuzu database connection and schema."""
self.open_connections = 0
self._is_closed = False
self.db_path = db_path # Path for the database directory
self.db: Optional[Database] = None
self.connection: Optional[Connection] = None
self.executor = ThreadPoolExecutor()
self._initialize_connection()
if cache_config.shared_kuzu_lock:
self.redis_lock = get_cache_engine(
lock_key="kuzu-lock-" + str(uuid5(NAMESPACE_OID, db_path))
)
else:
self.executor = ThreadPoolExecutor()
self._initialize_connection()
self.KUZU_ASYNC_LOCK = asyncio.Lock()
self._connection_change_lock = asyncio.Lock()
def _initialize_connection(self) -> None:
"""Initialize the Kuzu database connection and schema."""
@ -209,9 +222,13 @@ class KuzuAdapter(GraphDBInterface):
params = params or {}
def blocking_query():
lock_acquired = False
try:
if cache_config.shared_kuzu_lock:
self.redis_lock.acquire_lock()
lock_acquired = True
if not self.connection:
logger.debug("Reconnecting to Kuzu database...")
logger.info("Reconnecting to Kuzu database...")
self._initialize_connection()
result = self.connection.execute(query, params)
@ -225,12 +242,47 @@ class KuzuAdapter(GraphDBInterface):
val = val.as_py()
processed_rows.append(val)
rows.append(tuple(processed_rows))
return rows
except Exception as e:
logger.error(f"Query execution failed: {str(e)}")
raise
finally:
if cache_config.shared_kuzu_lock and lock_acquired:
try:
self.close()
finally:
self.redis_lock.release_lock()
return await loop.run_in_executor(self.executor, blocking_query)
if cache_config.shared_kuzu_lock:
async with self._connection_change_lock:
self.open_connections += 1
logger.info(f"Open connections after open: {self.open_connections}")
try:
result = blocking_query()
finally:
self.open_connections -= 1
logger.info(f"Open connections after close: {self.open_connections}")
return result
else:
result = await loop.run_in_executor(self.executor, blocking_query)
return result
def close(self):
if self.connection:
del self.connection
self.connection = None
if self.db:
del self.db
self.db = None
self._is_closed = True
logger.info("Kuzu database closed successfully")
def reopen(self):
if self._is_closed:
self._is_closed = False
self._initialize_connection()
logger.info("Kuzu database re-opened successfully")
@asynccontextmanager
async def get_session(self):
@ -1557,44 +1609,6 @@ class KuzuAdapter(GraphDBInterface):
logger.error(f"Failed to delete graph data: {e}")
raise
async def clear_database(self) -> None:
"""
Clear all data from the database by deleting the database files and reinitializing.
This method removes all files associated with the database and reinitializes the Kuzu
database structure, ensuring a completely empty state. It handles exceptions that might
occur during file deletions or initializations carefully.
"""
try:
if self.connection:
self.connection = None
if self.db:
self.db.close()
self.db = None
db_dir = os.path.dirname(self.db_path)
db_name = os.path.basename(self.db_path)
file_storage = get_file_storage(db_dir)
if await file_storage.file_exists(db_name):
await file_storage.remove_all()
logger.info(f"Deleted Kuzu database files at {self.db_path}")
# Reinitialize the database
self._initialize_connection()
# Verify the database is empty
result = self.connection.execute("MATCH (n:Node) RETURN COUNT(n)")
count = result.get_next()[0] if result.has_next() else 0
if count > 0:
logger.warning(
f"Database still contains {count} nodes after clearing, forcing deletion"
)
self.connection.execute("MATCH (n:Node) DETACH DELETE n")
logger.info("Database cleared successfully")
except Exception as e:
logger.error(f"Error during database clearing: {e}")
raise
async def get_document_subgraph(self, data_id: str):
"""
Get all nodes that should be deleted when removing a document.

View file

@ -0,0 +1,25 @@
import asyncio
import time
from cognee.infrastructure.databases.graph.kuzu.adapter import KuzuAdapter
# This will create the test.db if it doesn't exist
async def main():
adapter = KuzuAdapter("test.db")
result = await adapter.query("MATCH (n:Node) RETURN COUNT(n)")
print(f"Reader: Found {result[0][0]} nodes")
result = await adapter.query("MATCH (n:Node) RETURN COUNT(n)")
print(f"Reader: Found {result[0][0]} nodes")
result = await adapter.query("MATCH (n:Node) RETURN COUNT(n)")
print(f"Reader: Found {result[0][0]} nodes")
result = await adapter.query("MATCH (n:Node) RETURN COUNT(n)")
print(f"Reader: Found {result[0][0]} nodes")
result = await adapter.query("MATCH (n:Node) RETURN COUNT(n)")
print(f"Reader: Found {result} nodes")
result = await adapter.query("MATCH (n:Node) RETURN COUNT(n)")
print(f"Reader: Found {result[0][0]} nodes")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,31 @@
import asyncio
import cognee
from cognee.shared.logging_utils import setup_logging, INFO
from cognee.api.v1.search import SearchType
async def main():
await cognee.cognify(datasets=["first_cognify_dataset"])
query_text = (
"Tell me what is in the context. Additionally write out 'FIRST_COGNIFY' before your answer"
)
search_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION,
query_text=query_text,
datasets=["first_cognify_dataset"],
)
print("Search results:")
for result_text in search_results:
print(result_text)
if __name__ == "__main__":
logger = setup_logging(log_level=INFO)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())

View file

@ -0,0 +1,31 @@
import asyncio
import cognee
from cognee.shared.logging_utils import setup_logging, INFO
from cognee.api.v1.search import SearchType
async def main():
await cognee.cognify(datasets=["second_cognify_dataset"])
query_text = (
"Tell me what is in the context. Additionally write out 'SECOND_COGNIFY' before your answer"
)
search_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION,
query_text=query_text,
datasets=["second_cognify_dataset"],
)
print("Search results:")
for result_text in search_results:
print(result_text)
if __name__ == "__main__":
logger = setup_logging(log_level=INFO)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())

View file

@ -0,0 +1,32 @@
import asyncio
import time
import uuid
from cognee.modules.data.processing.document_types import PdfDocument
from cognee.infrastructure.databases.graph.kuzu.adapter import KuzuAdapter
def create_node(name):
document = PdfDocument(
id=uuid.uuid4(),
name=name,
raw_data_location=name,
external_metadata="test_external_metadata",
mime_type="test_mime",
)
return document
async def main():
adapter = KuzuAdapter("test.db")
nodes = [create_node(f"Node{i}") for i in range(5)]
print("Writer: Starting...")
await adapter.add_nodes(nodes)
print("writer finished...")
time.sleep(10)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,84 @@
import os
import asyncio
import cognee
import pathlib
import subprocess
from cognee.shared.logging_utils import get_logger
logger = get_logger()
"""
Test: Redis-based Kùzu Locking Across Subprocesses
This test ensures the Redis shared lock correctly serializes access to the Kùzu
database when multiple subprocesses (writer/reader and cognify tasks) run in parallel.
If this test fails, it indicates the locking mechanism is not properly handling
concurrent subprocess access.
"""
async def concurrent_subprocess_access():
data_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".data_storage/concurrent_tasks")
).resolve()
)
cognee_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".cognee_system/concurrent_tasks")
).resolve()
)
subprocess_directory_path = str(
pathlib.Path(os.path.join(pathlib.Path(__file__).parent, "subprocesses/")).resolve()
)
writer_path = subprocess_directory_path + "/writer.py"
reader_path = subprocess_directory_path + "/reader.py"
cognee.config.data_root_directory(data_directory_path)
cognee.config.system_root_directory(cognee_directory_path)
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
writer_process = subprocess.Popen([os.sys.executable, str(writer_path)])
reader_process = subprocess.Popen([os.sys.executable, str(reader_path)])
# Wait for both processes to complete
writer_process.wait()
reader_process.wait()
logger.info("Basic write read subprocess example finished")
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
text = """
This is the text of the first cognify subprocess
"""
await cognee.add(text, dataset_name="first_cognify_dataset")
text = """
This is the text of the second cognify subprocess
"""
await cognee.add(text, dataset_name="second_cognify_dataset")
first_cognify_path = subprocess_directory_path + "/simple_cognify_1.py"
second_cognify_path = subprocess_directory_path + "/simple_cognify_2.py"
first_cognify_process = subprocess.Popen([os.sys.executable, str(first_cognify_path)])
second_cognify_process = subprocess.Popen([os.sys.executable, str(second_cognify_path)])
# Wait for both processes to complete
first_cognify_process.wait()
second_cognify_process.wait()
logger.info("Database concurrent subprocess example finished")
if __name__ == "__main__":
asyncio.run(concurrent_subprocess_access())

View file

@ -0,0 +1,87 @@
"""Tests for cache configuration."""
import pytest
from cognee.infrastructure.databases.cache.config import CacheConfig, get_cache_config
def test_cache_config_defaults():
"""Test that CacheConfig has the correct default values."""
config = CacheConfig()
assert config.caching is False
assert config.shared_kuzu_lock is False
assert config.cache_host == "localhost"
assert config.cache_port == 6379
assert config.agentic_lock_expire == 240
assert config.agentic_lock_timeout == 300
def test_cache_config_custom_values():
"""Test that CacheConfig accepts custom values."""
config = CacheConfig(
caching=True,
shared_kuzu_lock=True,
cache_host="redis.example.com",
cache_port=6380,
agentic_lock_expire=120,
agentic_lock_timeout=180,
)
assert config.caching is True
assert config.shared_kuzu_lock is True
assert config.cache_host == "redis.example.com"
assert config.cache_port == 6380
assert config.agentic_lock_expire == 120
assert config.agentic_lock_timeout == 180
def test_cache_config_to_dict():
"""Test the to_dict method returns all configuration values."""
config = CacheConfig(
caching=True,
shared_kuzu_lock=True,
cache_host="test-host",
cache_port=7000,
agentic_lock_expire=100,
agentic_lock_timeout=200,
)
config_dict = config.to_dict()
assert config_dict == {
"caching": True,
"shared_kuzu_lock": True,
"cache_host": "test-host",
"cache_port": 7000,
"agentic_lock_expire": 100,
"agentic_lock_timeout": 200,
}
def test_get_cache_config_singleton():
"""Test that get_cache_config returns the same instance."""
config1 = get_cache_config()
config2 = get_cache_config()
assert config1 is config2
def test_cache_config_extra_fields_allowed():
"""Test that CacheConfig allows extra fields due to extra='allow'."""
config = CacheConfig(extra_field="extra_value", another_field=123)
assert hasattr(config, "extra_field")
assert config.extra_field == "extra_value"
assert hasattr(config, "another_field")
assert config.another_field == 123
def test_cache_config_boolean_type_validation():
"""Test that boolean fields accept various truthy/falsy values."""
config1 = CacheConfig(caching="true", shared_kuzu_lock="yes")
assert config1.caching is True
assert config1.shared_kuzu_lock is True
config2 = CacheConfig(caching="false", shared_kuzu_lock="no")
assert config2.caching is False
assert config2.shared_kuzu_lock is False

View file

@ -129,6 +129,30 @@ services:
networks:
- cognee-network
redis:
image: redis:7-alpine
container_name: redis
profiles:
- redis
ports:
- "6379:6379"
networks:
- cognee-network
volumes:
- redis_data:/data
command: [ "redis-server", "--appendonly", "yes" ]
redisinsight:
image: redislabs/redisinsight:latest
container_name: redisinsight
restart: always
ports:
- "5540:5540"
networks:
- cognee-network
networks:
cognee-network:
name: cognee-network
@ -136,3 +160,4 @@ networks:
volumes:
chromadb_data:
postgres_data:
redis_data:

1316
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -140,6 +140,7 @@ dev = [
"mkdocstrings[python]>=0.26.2,<0.27",
]
debug = ["debugpy>=1.8.9,<2.0.0"]
redis = ["redis>=5.0.3,<6.0.0"]
monitoring = ["sentry-sdk[fastapi]>=2.9.0,<3", "langfuse>=2.32.0,<3"]

19
uv.lock generated
View file

@ -1024,6 +1024,9 @@ postgres-binary = [
posthog = [
{ name = "posthog" },
]
redis = [
{ name = "redis" },
]
scraping = [
{ name = "apscheduler" },
{ name = "beautifulsoup4" },
@ -1114,6 +1117,7 @@ requires-dist = [
{ name = "python-magic-bin", marker = "sys_platform == 'win32'", specifier = "<0.5" },
{ name = "python-multipart", specifier = ">=0.0.20,<1.0.0" },
{ name = "rdflib", specifier = ">=7.1.4,<7.2.0" },
{ name = "redis", marker = "extra == 'redis'", specifier = ">=5.0.3,<6.0.0" },
{ name = "ruff", marker = "extra == 'dev'", specifier = ">=0.9.2,<=0.13.1" },
{ name = "s3fs", extras = ["boto3"], marker = "extra == 'aws'", specifier = "==2025.3.2" },
{ name = "scikit-learn", marker = "extra == 'evals'", specifier = ">=1.6.1,<2" },
@ -1134,7 +1138,7 @@ requires-dist = [
{ name = "uvicorn", specifier = ">=0.34.0,<1.0.0" },
{ name = "websockets", specifier = ">=15.0.1,<16.0.0" },
]
provides-extras = ["api", "distributed", "scraping", "neo4j", "neptune", "postgres", "postgres-binary", "notebook", "langchain", "llama-index", "huggingface", "ollama", "mistral", "anthropic", "deepeval", "posthog", "groq", "chromadb", "docs", "codegraph", "evals", "graphiti", "aws", "dlt", "baml", "dev", "debug", "monitoring", "docling"]
provides-extras = ["api", "distributed", "scraping", "neo4j", "neptune", "postgres", "postgres-binary", "notebook", "langchain", "llama-index", "huggingface", "ollama", "mistral", "anthropic", "deepeval", "posthog", "groq", "chromadb", "docs", "codegraph", "evals", "graphiti", "aws", "dlt", "baml", "dev", "debug", "redis", "monitoring", "docling"]
[[package]]
name = "colorama"
@ -7389,6 +7393,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f4/31/e9b6f04288dcd3fa60cb3179260d6dad81b92aef3063d679ac7d80a827ea/rdflib-7.1.4-py3-none-any.whl", hash = "sha256:72f4adb1990fa5241abd22ddaf36d7cafa5d91d9ff2ba13f3086d339b213d997", size = 565051, upload-time = "2025-03-29T02:22:44.987Z" },
]
[[package]]
name = "redis"
version = "5.3.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "async-timeout", marker = "python_full_version < '3.11.3'" },
{ name = "pyjwt" },
]
sdist = { url = "https://files.pythonhosted.org/packages/6a/cf/128b1b6d7086200c9f387bd4be9b2572a30b90745ef078bd8b235042dc9f/redis-5.3.1.tar.gz", hash = "sha256:ca49577a531ea64039b5a36db3d6cd1a0c7a60c34124d46924a45b956e8cf14c", size = 4626200, upload-time = "2025-07-25T08:06:27.778Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7f/26/5c5fa0e83c3621db835cfc1f1d789b37e7fa99ed54423b5f519beb931aa7/redis-5.3.1-py3-none-any.whl", hash = "sha256:dc1909bd24669cc31b5f67a039700b16ec30571096c5f1f0d9d2324bff31af97", size = 272833, upload-time = "2025-07-25T08:06:26.317Z" },
]
[[package]]
name = "referencing"
version = "0.36.2"

View file

@ -0,0 +1,31 @@
"""
Run writer and reader in separate subprocesses to test Kuzu locks.
"""
import subprocess
import time
import os
def main():
print("=== Kuzu Subprocess Lock Test ===")
print("Starting writer and reader in separate subprocesses...")
print("Writer will hold the database lock, reader should block or fail\n")
start_time = time.time()
# Start writer subprocess
writer_process = subprocess.Popen([os.sys.executable, "writer.py"])
reader_process = subprocess.Popen([os.sys.executable, "reader.py"])
# Wait for both processes to complete
writer_process.wait()
reader_process.wait()
total_time = time.time() - start_time
print(f"\nTotal execution time: {total_time:.2f}s")
if __name__ == "__main__":
main()