cognee/cognee/infrastructure/databases/vector/create_vector_engine.py
Andrew Carbonetto 7d2bf78c81
Add Neptune Analytics hybrid storage (#1156)
<!-- .github/pull_request_template.md -->

## Description
Adds a Neptune Analytics 'hybrid' integration layer to the Cognee.ai
memory storage layer. The following configuration will use Amazon
Neptune Analytics to store all nodes, edges, and vector embeddings for
the Cognee.ai memory conversation.

```
    cognee.config.set_graph_db_config(
        {
            "graph_database_provider": "neptune_analytics",  # Specify Neptune Analytics as provider
            "graph_database_url": graph_endpoint_url,  # Neptune Analytics endpoint with the format neptune-graph://<GRAPH_ID>
        }
    )
    cognee.config.set_vector_db_config(
        {
            "vector_db_provider": "neptune_analytics",  # Specify Neptune Analytics as provider
            "vector_db_url": graph_endpoint_url,  # Neptune Analytics endpoint with the format neptune-graph://<GRAPH_ID>
        }
    )
```

For example, see
[neptune_analytics_example.py](08a3a1d2a8/examples/database_examples/neptune_analytics_example.py)

Related: https://github.com/topoteretes/cognee-starter/pull/11

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.

---------

Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>
Signed-off-by: Andy Kwok <andy.kwok@improving.com>
Co-authored-by: Andy Kwok <andy.kwok@improving.com>
Co-authored-by: Vasilije <8619304+Vasilije1990@users.noreply.github.com>
2025-08-05 10:05:31 +02:00

146 lines
4.9 KiB
Python

from .supported_databases import supported_databases
from .embeddings import get_embedding_engine
from functools import lru_cache
@lru_cache
def create_vector_engine(
vector_db_provider: str,
vector_db_url: str,
vector_db_port: str = "",
vector_db_key: str = "",
):
"""
Create a vector database engine based on the specified provider.
This function initializes and returns a database adapter for vector storage, depending
on the provided vector database provider. The function checks for required credentials
for each provider, raising an EnvironmentError if any are missing, or ImportError if the
ChromaDB package is not installed.
Supported providers include: Qdrant, pgvector, FalkorDB, ChromaDB, and
LanceDB.
Parameters:
-----------
- vector_db_url (str): The URL for the vector database instance.
- vector_db_port (str): The port for the vector database instance. Required for some
providers.
- vector_db_key (str): The API key or access token for the vector database instance.
- vector_db_provider (str): The name of the vector database provider to use (e.g.,
'qdrant', 'pgvector').
Returns:
--------
An instance of the corresponding database adapter class for the specified provider.
"""
embedding_engine = get_embedding_engine()
if vector_db_provider in supported_databases:
adapter = supported_databases[vector_db_provider]
return adapter(
url=vector_db_url,
api_key=vector_db_key,
embedding_engine=embedding_engine,
)
if vector_db_provider == "qdrant":
if not (vector_db_url and vector_db_key):
raise EnvironmentError("Missing requred Qdrant credentials!")
from .qdrant.QDrantAdapter import QDrantAdapter
return QDrantAdapter(
url=vector_db_url,
api_key=vector_db_key,
embedding_engine=embedding_engine,
)
elif vector_db_provider == "pgvector":
from cognee.infrastructure.databases.relational import get_relational_config
# Get configuration for postgres database
relational_config = get_relational_config()
db_username = relational_config.db_username
db_password = relational_config.db_password
db_host = relational_config.db_host
db_port = relational_config.db_port
db_name = relational_config.db_name
if not (db_host and db_port and db_name and db_username and db_password):
raise EnvironmentError("Missing requred pgvector credentials!")
connection_string: str = (
f"postgresql+asyncpg://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}"
)
from .pgvector.PGVectorAdapter import PGVectorAdapter
return PGVectorAdapter(
connection_string,
vector_db_key,
embedding_engine,
)
elif vector_db_provider == "falkordb":
if not (vector_db_url and vector_db_port):
raise EnvironmentError("Missing requred FalkorDB credentials!")
from ..hybrid.falkordb.FalkorDBAdapter import FalkorDBAdapter
return FalkorDBAdapter(
database_url=vector_db_url,
database_port=vector_db_port,
embedding_engine=embedding_engine,
)
elif vector_db_provider == "chromadb":
try:
import chromadb
except ImportError:
raise ImportError(
"ChromaDB is not installed. Please install it with 'pip install chromadb'"
)
from .chromadb.ChromaDBAdapter import ChromaDBAdapter
return ChromaDBAdapter(
url=vector_db_url,
api_key=vector_db_key,
embedding_engine=embedding_engine,
)
elif vector_db_provider == "neptune_analytics":
try:
from langchain_aws import NeptuneAnalyticsGraph
except ImportError:
raise ImportError(
"langchain_aws is not installed. Please install it with 'pip install langchain_aws'"
)
if not vector_db_url:
raise EnvironmentError("Missing Neptune endpoint.")
from cognee.infrastructure.databases.hybrid.neptune_analytics.NeptuneAnalyticsAdapter import NeptuneAnalyticsAdapter, NEPTUNE_ANALYTICS_ENDPOINT_URL
if not vector_db_url.startswith(NEPTUNE_ANALYTICS_ENDPOINT_URL):
raise ValueError(f"Neptune endpoint must have the format '{NEPTUNE_ANALYTICS_ENDPOINT_URL}<GRAPH_ID>'")
graph_identifier = vector_db_url.replace(NEPTUNE_ANALYTICS_ENDPOINT_URL, "")
return NeptuneAnalyticsAdapter(
graph_id=graph_identifier,
embedding_engine=embedding_engine,
)
else:
from .lancedb.LanceDBAdapter import LanceDBAdapter
return LanceDBAdapter(
url=vector_db_url,
api_key=vector_db_key,
embedding_engine=embedding_engine,
)