feat: first version of async multiple db support

This commit is contained in:
Igor Ilic 2025-05-15 17:05:11 +02:00
parent ddfa506cf8
commit 70e307a905
5 changed files with 75 additions and 11 deletions

View file

@ -18,6 +18,12 @@ from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_text
from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.pipelines import cognee_pipeline
from cognee.infrastructure.databases.graph.get_graph_engine import (
graph_db_config as context_graph_db_config,
)
from cognee.infrastructure.databases.vector.get_vector_engine import (
vector_db_config as context_vector_db_config,
)
logger = get_logger("cognify")
@ -31,11 +37,18 @@ async def cognify(
chunker=TextChunker,
chunk_size: int = None,
ontology_file_path: Optional[str] = None,
vector_db_config: dict = None,
graph_db_config: dict = None,
):
context_graph_db_config.set(graph_db_config)
context_vector_db_config.set(vector_db_config)
tasks = await get_default_tasks(user, graph_model, chunker, chunk_size, ontology_file_path)
return await cognee_pipeline(
tasks=tasks, datasets=datasets, user=user, pipeline_name="cognify_pipeline"
tasks=tasks,
datasets=datasets,
user=user,
pipeline_name="cognify_pipeline",
)

View file

@ -5,17 +5,24 @@ from functools import lru_cache
from .config import get_graph_config
from .graph_db_interface import GraphDBInterface
from contextvars import ContextVar
graph_db_config = ContextVar("graph_db_config", default=None)
async def get_graph_engine() -> GraphDBInterface:
"""Factory function to get the appropriate graph client based on the graph type."""
config = get_graph_config()
if graph_db_config.get():
config = graph_db_config.get()
else:
config = get_graph_config().to_hashable_dict()
graph_client = create_graph_engine(**get_graph_config().to_hashable_dict())
graph_client = create_graph_engine(**config)
# Async functions can't be cached. After creating and caching the graph engine
# handle all necessary async operations for different graph types bellow.
# Handle loading of graph for NetworkX
if config.graph_database_provider.lower() == "networkx" and graph_client.graph is None:
if config["graph_database_provider"].lower() == "networkx" and graph_client.graph is None:
await graph_client.load_graph_from_file()
return graph_client
@ -24,11 +31,11 @@ async def get_graph_engine() -> GraphDBInterface:
@lru_cache
def create_graph_engine(
graph_database_provider,
graph_database_url,
graph_database_username,
graph_database_password,
graph_database_port,
graph_file_path,
graph_database_url="",
graph_database_username="",
graph_database_password="",
graph_database_port="",
):
"""Factory function to create the appropriate graph client based on the graph type."""

View file

@ -5,10 +5,10 @@ from functools import lru_cache
@lru_cache
def create_vector_engine(
vector_db_url: str,
vector_db_port: str,
vector_db_key: str,
vector_db_provider: str,
vector_db_url: str,
vector_db_port: str = "",
vector_db_key: str = "",
):
embedding_engine = get_embedding_engine()

View file

@ -1,6 +1,13 @@
from .config import get_vectordb_config
from .create_vector_engine import create_vector_engine
from contextvars import ContextVar
# Think of this as a “global” task-local
vector_db_config = ContextVar("vector_db_config", default=None)
def get_vector_engine():
if vector_db_config.get():
return create_vector_engine(**vector_db_config.get())
return create_vector_engine(**get_vectordb_config().to_dict())

View file

@ -40,9 +40,46 @@ async def main():
"""
await cognee.add([text], dataset_name)
await cognee.add([text], "test1")
await cognee.add([text], "test2")
await cognee.cognify([dataset_name])
task_1_config = {
"vector_db_url": "/Users/igorilic/Desktop/cognee1.test",
"vector_db_key": "",
"vector_db_provider": "lancedb",
}
task_2_config = {
"vector_db_url": "/Users/igorilic/Desktop/cognee2.test",
"vector_db_key": "",
"vector_db_provider": "lancedb",
}
task_1_graph_config = {
"graph_database_provider": "kuzu",
"graph_file_path": "/Users/igorilic/Desktop/kuzu1.db",
}
task_2_graph_config = {
"graph_database_provider": "kuzu",
"graph_file_path": "/Users/igorilic/Desktop/kuzu2.db",
}
# schedule both cognify calls concurrently
task1 = asyncio.create_task(
cognee.cognify(
["test1"], vector_db_config=task_1_config, graph_db_config=task_1_graph_config
)
)
task2 = asyncio.create_task(
cognee.cognify(
["test2"], vector_db_config=task_2_config, graph_db_config=task_2_graph_config
)
)
# wait until both are done (raises first error if any)
await asyncio.gather(task1, task2)
from cognee.infrastructure.databases.vector import get_vector_engine
vector_engine = get_vector_engine()