feat: adds pydantic settings to set caching + some locks to Kuzu adapter
This commit is contained in:
parent
c51787c987
commit
21c203d1eb
2 changed files with 47 additions and 4 deletions
40
cognee/infrastructure/databases/cache/config.py
vendored
Normal file
40
cognee/infrastructure/databases/cache/config.py
vendored
Normal file
|
|
@ -0,0 +1,40 @@
|
||||||
|
import os
|
||||||
|
import pydantic
|
||||||
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
|
from functools import lru_cache
|
||||||
|
|
||||||
|
|
||||||
|
class CacheConfig(BaseSettings):
|
||||||
|
"""
|
||||||
|
Configuration for distributed cache systems (e.g., Redis), used for locking or coordination.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
- caching: Caching logic on/off.
|
||||||
|
- cache_host: Hostname of the cache service.
|
||||||
|
- cache_port: Port number for the cache service.
|
||||||
|
- agentic_lock_expire: Automatic lock expiration time (in seconds).
|
||||||
|
- agentic_lock_timeout: Maximum time (in seconds) to wait for the lock release.
|
||||||
|
"""
|
||||||
|
|
||||||
|
caching: bool = False
|
||||||
|
cache_host: str = "localhost"
|
||||||
|
cache_port: int = 6379
|
||||||
|
agentic_lock_expire: int = 240
|
||||||
|
agentic_lock_timeout: int = 300
|
||||||
|
|
||||||
|
model_config = SettingsConfigDict(env_file=".env", extra="allow")
|
||||||
|
|
||||||
|
def to_dict(self) -> dict:
|
||||||
|
return {
|
||||||
|
"caching": self.caching,
|
||||||
|
"cache_host": self.cache_host,
|
||||||
|
"cache_port": self.cache_port,
|
||||||
|
"agentic_lock_expire": self.agentic_lock_expire,
|
||||||
|
"agentic_lock_timeout": self.agentic_lock_timeout,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache
|
||||||
|
def get_cache_config():
|
||||||
|
return CacheConfig()
|
||||||
|
|
||||||
|
|
@ -175,7 +175,9 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
|
|
||||||
if self.connection:
|
if self.connection:
|
||||||
async with self.KUZU_ASYNC_LOCK:
|
async with self.KUZU_ASYNC_LOCK:
|
||||||
self.connection.execute("CHECKPOINT;")
|
self.redis_lock.acquire()
|
||||||
|
await self.query("CHECKPOINT;")
|
||||||
|
self.redis_lock.release()
|
||||||
|
|
||||||
s3_file_storage.s3.put(self.temp_graph_file, self.db_path, recursive=True)
|
s3_file_storage.s3.put(self.temp_graph_file, self.db_path, recursive=True)
|
||||||
|
|
||||||
|
|
@ -213,9 +215,8 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
|
|
||||||
def blocking_query():
|
def blocking_query():
|
||||||
try:
|
try:
|
||||||
if not self.connection:
|
if self._is_closed:
|
||||||
logger.debug("Reconnecting to Kuzu database...")
|
self.reopen()
|
||||||
self._initialize_connection()
|
|
||||||
|
|
||||||
result = self.connection.execute(query, params)
|
result = self.connection.execute(query, params)
|
||||||
rows = []
|
rows = []
|
||||||
|
|
@ -1590,6 +1591,7 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
logger.info(f"Deleted Kuzu database files at {self.db_path}")
|
logger.info(f"Deleted Kuzu database files at {self.db_path}")
|
||||||
|
|
||||||
# Reinitialize the database
|
# Reinitialize the database
|
||||||
|
self.redis_lock.acquire()
|
||||||
self._initialize_connection()
|
self._initialize_connection()
|
||||||
# Verify the database is empty
|
# Verify the database is empty
|
||||||
result = self.connection.execute("MATCH (n:Node) RETURN COUNT(n)")
|
result = self.connection.execute("MATCH (n:Node) RETURN COUNT(n)")
|
||||||
|
|
@ -1600,6 +1602,7 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
)
|
)
|
||||||
self.connection.execute("MATCH (n:Node) DETACH DELETE n")
|
self.connection.execute("MATCH (n:Node) DETACH DELETE n")
|
||||||
logger.info("Database cleared successfully")
|
logger.info("Database cleared successfully")
|
||||||
|
self.redis_lock.release()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error during database clearing: {e}")
|
logger.error(f"Error during database clearing: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue