fix: Remove milvus from core (#1096)

<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Vasilije 2025-07-16 15:56:34 +02:00 committed by GitHub
parent a06b3fc7e4
commit 67c006bd2f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 2 additions and 784 deletions

View file

@ -116,38 +116,6 @@ jobs:
run: |
poetry run python examples/database_examples/kuzu_example.py
run-db-example-milvus:
name: "Milvus DB Example Test"
runs-on: ubuntu-22.04
if: ${{ inputs.databases == 'all' || contains(inputs.databases, 'milvus') }}
steps:
- name: Check out
uses: actions/checkout@v4
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: ${{ inputs.python-version }}
- name: Install Milvus extra
run: |
poetry install -E milvus
- name: Run Milvus Example
env:
ENV: dev
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
run: |
poetry run python examples/database_examples/milvus_example.py
run-db-example-weaviate:
name: "Weaviate DB Example Test"
runs-on: ubuntu-22.04

View file

@ -10,7 +10,6 @@ WORKFLOWS=(
"test_kuzu.yml"
"test_multimetric_qa_eval_run.yaml"
"test_graphrag_vs_rag_notebook.yml"
"test_milvus.yml"
"test_gemini.yml"
"test_multimedia_example.yaml"
"test_deduplication.yml"

View file

@ -95,38 +95,6 @@ jobs:
VECTOR_DB_KEY: ${{ secrets.WEAVIATE_API_KEY }}
run: poetry run python ./cognee/tests/test_weaviate.py
run-milvus-tests:
name: Milvus Tests
runs-on: ubuntu-22.04
if: ${{ inputs.databases == 'all' || contains(inputs.databases, 'milvus') }}
steps:
- name: Check out
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: ${{ inputs.python-version }}
- name: Install specific db dependency
run: |
poetry install -E milvus
- name: Run Milvus Tests
env:
ENV: 'dev'
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
run: poetry run python ./cognee/tests/test_milvus.py
run_qdrant_integration_test:
name: Qdrant Tests
runs-on: ubuntu-latest

View file

@ -19,7 +19,7 @@ def create_vector_engine(
for each provider, raising an EnvironmentError if any are missing, or ImportError if the
ChromaDB package is not installed.
Supported providers include: Weaviate, Qdrant, Milvus, pgvector, FalkorDB, ChromaDB, and
Supported providers include: Weaviate, Qdrant, pgvector, FalkorDB, ChromaDB, and
LanceDB.
Parameters:
@ -68,18 +68,6 @@ def create_vector_engine(
embedding_engine=embedding_engine,
)
elif vector_db_provider == "milvus":
from .milvus.MilvusAdapter import MilvusAdapter
if not vector_db_url:
raise EnvironmentError("Missing required Milvus credentials!")
return MilvusAdapter(
url=vector_db_url,
api_key=vector_db_key,
embedding_engine=embedding_engine,
)
elif vector_db_provider == "pgvector":
from cognee.infrastructure.databases.relational import get_relational_config

View file

@ -1,511 +0,0 @@
from __future__ import annotations
import asyncio
import os
from uuid import UUID
from typing import List, Optional
from cognee.shared.logging_utils import get_logger
from cognee.infrastructure.engine import DataPoint
from cognee.infrastructure.engine.utils import parse_id
from cognee.infrastructure.databases.vector.exceptions import CollectionNotFoundError
from cognee.infrastructure.files.storage import get_file_storage
from ..embeddings.EmbeddingEngine import EmbeddingEngine
from ..models.ScoredResult import ScoredResult
from ..vector_db_interface import VectorDBInterface
logger = get_logger("MilvusAdapter")
class IndexSchema(DataPoint):
"""
Represent a schema for an index that includes text data and associated metadata.
This class inherits from DataPoint and includes attributes for text and metadata. It
defines the structure of the data points used in the index, holding the text as a string
and metadata as a dictionary with predefined index fields.
"""
text: str
metadata: dict = {"index_fields": ["text"]}
class MilvusAdapter(VectorDBInterface):
"""
Interface for interacting with a Milvus vector database.
Public methods:
- __init__
- get_milvus_client
- embed_data
- has_collection
- create_collection
- create_data_points
- create_vector_index
- index_data_points
- retrieve
- search
- batch_search
- delete_data_points
- prune
"""
name = "Milvus"
url: str
api_key: Optional[str]
embedding_engine: EmbeddingEngine = None
def __init__(self, url: str, api_key: Optional[str], embedding_engine: EmbeddingEngine):
self.url = url
self.api_key = api_key
self.embedding_engine = embedding_engine
def get_milvus_client(self):
"""
Retrieve a Milvus client instance.
Returns a MilvusClient object configured with the provided URL and optional API key.
Returns:
--------
A MilvusClient instance.
"""
from pymilvus import MilvusClient
# Ensure the parent directory exists for local file-based Milvus databases
if self.url and not self.url.startswith(("http://", "https://", "grpc://")):
# This is likely a local file path, ensure the directory exists
db_dir = os.path.dirname(self.url)
if db_dir and not os.path.exists(db_dir):
try:
file_storage = get_file_storage(db_dir)
if hasattr(file_storage, "ensure_directory_exists"):
if asyncio.iscoroutinefunction(file_storage.ensure_directory_exists):
# Run async function synchronously in this sync method
loop = asyncio.get_event_loop()
if loop.is_running():
# If we're already in an async context, we can't use run_sync easily
# Create the directory directly as a fallback
os.makedirs(db_dir, exist_ok=True)
else:
loop.run_until_complete(file_storage.ensure_directory_exists())
else:
file_storage.ensure_directory_exists()
else:
# Fallback to os.makedirs if file_storage doesn't have ensure_directory_exists
os.makedirs(db_dir, exist_ok=True)
except Exception as e:
logger.warning(
f"Could not create directory {db_dir} using file_storage, falling back to os.makedirs: {e}"
)
os.makedirs(db_dir, exist_ok=True)
if self.api_key:
client = MilvusClient(uri=self.url, token=self.api_key)
else:
client = MilvusClient(uri=self.url)
return client
async def embed_data(self, data: List[str]) -> list[list[float]]:
"""
Embed a list of text data into vectors asynchronously.
Accepts a list of strings and utilizes the embedding engine to convert them into
vectors.
Parameters:
-----------
- data (List[str]): A list of textual data to be embedded into vectors.
Returns:
--------
- list[list[float]]: A list of lists containing embedded vectors.
"""
return await self.embedding_engine.embed_text(data)
async def has_collection(self, collection_name: str) -> bool:
"""
Check if a collection exists in the database asynchronously.
Returns a boolean indicating whether the specified collection is present.
Parameters:
-----------
- collection_name (str): The name of the collection to check for its existence.
Returns:
--------
- bool: True if the collection exists, False otherwise.
"""
future = asyncio.Future()
client = self.get_milvus_client()
future.set_result(client.has_collection(collection_name=collection_name))
return await future
async def create_collection(
self,
collection_name: str,
payload_schema=None,
):
"""
Create a new collection in the vector database asynchronously.
Raises a MilvusException if there are issues creating the collection, such as already
existing collection.
Parameters:
-----------
- collection_name (str): The name of the collection to be created.
- payload_schema: Optional schema for the collection, defaults to None if not
provided. (default None)
Returns:
--------
True if the collection is created successfully, otherwise returns None.
"""
from pymilvus import DataType, MilvusException
client = self.get_milvus_client()
if client.has_collection(collection_name=collection_name):
logger.info(f"Collection '{collection_name}' already exists.")
return True
try:
dimension = self.embedding_engine.get_vector_size()
assert dimension > 0, "Embedding dimension must be greater than 0."
schema = client.create_schema(
auto_id=False,
enable_dynamic_field=False,
)
schema.add_field(
field_name="id", datatype=DataType.VARCHAR, is_primary=True, max_length=36
)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=dimension)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=60535)
index_params = client.prepare_index_params()
index_params.add_index(field_name="vector", metric_type="COSINE")
client.create_collection(
collection_name=collection_name, schema=schema, index_params=index_params
)
client.load_collection(collection_name)
logger.info(f"Collection '{collection_name}' created successfully.")
return True
except MilvusException as e:
logger.error(f"Error creating collection '{collection_name}': {str(e)}")
raise e
async def create_data_points(self, collection_name: str, data_points: List[DataPoint]):
"""
Insert multiple data points into a specified collection asynchronously.
Raises CollectionNotFoundError if the specified collection does not exist.
Parameters:
-----------
- collection_name (str): The name of the collection where data points will be
inserted.
- data_points (List[DataPoint]): A list of DataPoint objects to be inserted into the
collection.
Returns:
--------
The result of the insert operation, includes count of inserted data points.
"""
from pymilvus import MilvusException, exceptions
client = self.get_milvus_client()
data_vectors = await self.embed_data(
[data_point.get_embeddable_data(data_point) for data_point in data_points]
)
insert_data = [
{
"id": str(data_point.id),
"vector": data_vectors[index],
"text": data_point.text,
}
for index, data_point in enumerate(data_points)
]
try:
result = client.insert(collection_name=collection_name, data=insert_data)
logger.info(
f"Inserted {result.get('insert_count', 0)} data points into collection '{collection_name}'."
)
return result
except exceptions.CollectionNotExistException as error:
raise CollectionNotFoundError(
f"Collection '{collection_name}' does not exist!"
) from error
except MilvusException as e:
logger.error(
f"Error inserting data points into collection '{collection_name}': {str(e)}"
)
raise e
async def create_vector_index(self, index_name: str, index_property_name: str):
"""
Create a vector index for a given collection asynchronously.
Parameters:
-----------
- index_name (str): The name of the vector index being created.
- index_property_name (str): The property name associated with the index.
"""
await self.create_collection(f"{index_name}_{index_property_name}")
async def index_data_points(
self, index_name: str, index_property_name: str, data_points: List[DataPoint]
):
"""
Index the provided data points into the collection based on index names asynchronously.
Parameters:
-----------
- index_name (str): The name of the index where data points will be indexed.
- index_property_name (str): The property name associated with the index.
- data_points (List[DataPoint]): A list of DataPoint objects to be indexed.
"""
formatted_data_points = [
IndexSchema(
id=data_point.id,
text=getattr(data_point, data_point.metadata["index_fields"][0]),
)
for data_point in data_points
]
collection_name = f"{index_name}_{index_property_name}"
await self.create_data_points(collection_name, formatted_data_points)
async def retrieve(self, collection_name: str, data_point_ids: list[UUID]):
"""
Retrieve data points from a collection based on their IDs asynchronously.
Raises CollectionNotFoundError if the specified collection does not exist.
Parameters:
-----------
- collection_name (str): The name of the collection from which data points will be
retrieved.
- data_point_ids (list[UUID]): A list of UUIDs representing the IDs of the data
points to be retrieved.
Returns:
--------
The results of the query, including the requested data points.
"""
from pymilvus import MilvusException, exceptions
client = self.get_milvus_client()
try:
filter_expression = f"""id in [{", ".join(f'"{id}"' for id in data_point_ids)}]"""
results = client.query(
collection_name=collection_name,
expr=filter_expression,
output_fields=["*"],
)
return results
except exceptions.CollectionNotExistException as error:
raise CollectionNotFoundError(
f"Collection '{collection_name}' does not exist!"
) from error
except MilvusException as e:
logger.error(
f"Error retrieving data points from collection '{collection_name}': {str(e)}"
)
raise e
async def search(
self,
collection_name: str,
query_text: Optional[str] = None,
query_vector: Optional[List[float]] = None,
limit: int = 15,
with_vector: bool = False,
):
"""
Search for data points in a collection based on a text query or vector asynchronously.
Raises ValueError if neither query_text nor query_vector is provided. Raises
MilvusException for errors during the search process.
Parameters:
-----------
- collection_name (str): The name of the collection to search within.
- query_text (Optional[str]): Optional text query used for searching, defaults to
None. (default None)
- query_vector (Optional[List[float]]): Optional vector query used for searching,
defaults to None. (default None)
- limit (int): Maximum number of results to return, defaults to 15. (default 15)
- with_vector (bool): Flag to indicate if the vector should be included in the
results, defaults to False. (default False)
Returns:
--------
A list of scored results that match the query; may include vector data if requested.
"""
from pymilvus import MilvusException, exceptions
client = self.get_milvus_client()
if query_text is None and query_vector is None:
raise ValueError("One of query_text or query_vector must be provided!")
if not client.has_collection(collection_name=collection_name):
logger.warning(
f"Collection '{collection_name}' not found in MilvusAdapter.search; returning []."
)
return []
try:
query_vector = query_vector or (await self.embed_data([query_text]))[0]
output_fields = ["id", "text"]
if with_vector:
output_fields.append("vector")
results = client.search(
collection_name=collection_name,
data=[query_vector],
anns_field="vector",
limit=limit if limit > 0 else None,
output_fields=output_fields,
search_params={
"metric_type": "COSINE",
},
)
return [
ScoredResult(
id=parse_id(result["id"]),
score=result["distance"],
payload=result.get("entity", {}),
)
for result in results[0]
]
except exceptions.CollectionNotExistException:
logger.warning(
f"Collection '{collection_name}' not found (exception) in MilvusAdapter.search; returning []."
)
return []
except MilvusException as e:
# Catch other Milvus errors that are "collection not found" (paranoid safety)
if "collection not found" in str(e).lower() or "schema" in str(e).lower():
logger.warning(
f"Collection '{collection_name}' not found (MilvusException) in MilvusAdapter.search; returning []."
)
return []
logger.error(f"Error searching Milvus collection '{collection_name}': {e}")
raise e
async def batch_search(
self, collection_name: str, query_texts: List[str], limit: int, with_vectors: bool = False
):
"""
Perform a batch search in a collection for multiple textual queries asynchronously.
Utilizes embed_data to convert texts into vectors and returns the search results for
each query.
Parameters:
-----------
- collection_name (str): The name of the collection where the search will be
performed.
- query_texts (List[str]): A list of texts to search for in the collection.
- limit (int): Maximum number of results to return per query.
- with_vectors (bool): Specifies if the vectors should be included in the search
results, defaults to False. (default False)
Returns:
--------
A list of search result sets, one for each query input.
"""
query_vectors = await self.embed_data(query_texts)
return await asyncio.gather(
*[
self.search(
collection_name=collection_name,
query_vector=query_vector,
limit=limit,
with_vector=with_vectors,
)
for query_vector in query_vectors
]
)
async def delete_data_points(self, collection_name: str, data_point_ids: list[UUID]):
"""
Delete specific data points from a collection based on their IDs asynchronously.
Raises MilvusException for errors during the deletion process.
Parameters:
-----------
- collection_name (str): The name of the collection from which data points will be
deleted.
- data_point_ids (list[UUID]): A list of UUIDs representing the IDs of the data
points to be deleted.
Returns:
--------
The result of the delete operation, indicating success or failure.
"""
from pymilvus import MilvusException
client = self.get_milvus_client()
try:
filter_expression = f"""id in [{", ".join(f'"{id}"' for id in data_point_ids)}]"""
delete_result = client.delete(collection_name=collection_name, filter=filter_expression)
logger.info(
f"Deleted data points with IDs {data_point_ids} from collection '{collection_name}'."
)
return delete_result
except MilvusException as e:
logger.error(
f"Error deleting data points from collection '{collection_name}': {str(e)}"
)
raise e
async def prune(self):
"""
Remove all collections from the connected Milvus client asynchronously.
"""
client = self.get_milvus_client()
if client:
collections = client.list_collections()
for collection_name in collections:
client.drop_collection(collection_name=collection_name)
client.close()

View file

@ -1 +0,0 @@
from .MilvusAdapter import MilvusAdapter

View file

@ -1,103 +0,0 @@
import os
import pathlib
import cognee
from cognee.infrastructure.files.storage import get_storage_config
from cognee.modules.search.operations import get_history
from cognee.modules.users.methods import get_default_user
from cognee.shared.logging_utils import get_logger
from cognee.modules.search.types import SearchType
logger = get_logger()
async def main():
cognee.config.set_vector_db_provider("milvus")
data_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_milvus")
).resolve()
)
cognee.config.data_root_directory(data_directory_path)
cognee_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_milvus")
).resolve()
)
cognee.config.system_root_directory(cognee_directory_path)
cognee.config.set_vector_db_config(
{
"vector_db_url": os.path.join(cognee_directory_path, "databases/milvus.db"),
"vector_db_key": "",
"vector_db_provider": "milvus",
}
)
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
dataset_name = "cs_explanations"
explanation_file_path = os.path.join(
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
)
await cognee.add([explanation_file_path], dataset_name)
text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena.
At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states.
Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible.
The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly.
Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate.
In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited.
"""
await cognee.add([text], dataset_name)
await cognee.cognify([dataset_name])
from cognee.infrastructure.databases.vector import get_vector_engine
vector_engine = get_vector_engine()
random_node = (await vector_engine.search("Entity_name", "Quantum computer"))[0]
random_node_name = random_node.payload["text"]
search_results = await cognee.search(
query_type=SearchType.INSIGHTS, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted INSIGHTS are:\n")
for result in search_results:
print(f"{result}\n")
search_results = await cognee.search(query_type=SearchType.CHUNKS, query_text=random_node_name)
assert len(search_results) != 0, "The search results list is empty."
print("\n\nExtracted CHUNKS are:\n")
for result in search_results:
print(f"{result}\n")
search_results = await cognee.search(
query_type=SearchType.SUMMARIES, query_text=random_node_name
)
assert len(search_results) != 0, "The search results list is empty."
print("\nExtracted SUMMARIES are:\n")
for result in search_results:
print(f"{result}\n")
user = await get_default_user()
history = await get_history(user.id)
assert len(history) == 6, "Search history is not correct."
await cognee.prune.prune_data()
data_root_directory = get_storage_config()["data_root_directory"]
assert not os.path.isdir(data_root_directory), "Local data files are not deleted"
await cognee.prune.prune_system(metadata=True)
milvus_client = get_vector_engine().get_milvus_client()
collections = milvus_client.list_collections()
assert len(collections) == 0, "Milvus vector database is not empty"
if __name__ == "__main__":
import asyncio
asyncio.run(main())

View file

@ -3,7 +3,7 @@ FROM python:3.11-slim
# Define Poetry extras to install
ARG POETRY_EXTRAS="\
# Storage & Databases \
postgres weaviate qdrant neo4j falkordb milvus kuzu \
postgres weaviate qdrant neo4j falkordb kuzu \
# Notebooks & Interactive Environments \
notebook \
# LLM & AI Frameworks \

View file

@ -1,89 +0,0 @@
import os
import pathlib
import asyncio
import cognee
from cognee.modules.search.types import SearchType
async def main():
"""
Example script demonstrating how to use Cognee with Milvus
This example:
1. Configures Cognee to use Milvus as vector database
2. Sets up data directories
3. Adds sample data to Cognee
4. Processes (cognifies) the data
5. Performs different types of searches
"""
# Set up data directories for storing documents and system files
# You should adjust these paths to your needs
current_dir = pathlib.Path(__file__).parent
data_directory_path = str(current_dir / "data_storage")
cognee.config.data_root_directory(data_directory_path)
cognee_directory_path = str(current_dir / "cognee_system")
cognee.config.system_root_directory(cognee_directory_path)
local_milvus_db_path = os.path.join(cognee_directory_path, "databases", "milvus.db")
# Configure Milvus as the vector database provider
cognee.config.set_vector_db_config(
{
"vector_db_url": local_milvus_db_path, # Enter Milvus Endpoint if exist
"vector_db_key": "", # Enter Token
"vector_db_provider": "milvus", # Specify Milvus as provider
}
)
# Clean any existing data (optional)
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
# Create a dataset
dataset_name = "milvus_example"
# Add sample text to the dataset
sample_text = """Milvus is an open-source vector database built to power AI applications.
It is designed for storing, indexing, and querying large-scale vector datasets.
Milvus implements efficient approximate nearest neighbor search algorithms.
It features advanced indexing techniques like HNSW, IVF, PQ, and more.
Milvus supports hybrid searches combining vector similarity with scalar filtering.
The system can be deployed standalone, in clusters, or through a cloud service."""
# Add the sample text to the dataset
await cognee.add([sample_text], dataset_name)
# Process the added document to extract knowledge
await cognee.cognify([dataset_name])
# Now let's perform some searches
# 1. Search for insights related to "Milvus"
insights_results = await cognee.search(query_type=SearchType.INSIGHTS, query_text="Milvus")
print("\nInsights about Milvus:")
for result in insights_results:
print(f"- {result}")
# 2. Search for text chunks related to "vector similarity"
chunks_results = await cognee.search(
query_type=SearchType.CHUNKS, query_text="vector similarity", datasets=[dataset_name]
)
print("\nChunks about vector similarity:")
for result in chunks_results:
print(f"- {result}")
# 3. Get graph completion related to databases
graph_completion_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION, query_text="database"
)
print("\nGraph completion for databases:")
for result in graph_completion_results:
print(f"- {result}")
# Clean up (optional)
# await cognee.prune.prune_data()
# await cognee.prune.prune_system(metadata=True)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -100,7 +100,6 @@ deepeval = ["deepeval>=2.0.1,<3"]
posthog = ["posthog>=3.5.0,<4"]
falkordb = ["falkordb>=1.0.9,<2.0.0"]
groq = ["groq>=0.8.0,<1.0.0"]
milvus = ["pymilvus>=2.5.0,<3"]
chromadb = [
"chromadb>=0.3.0,<0.7",
"pypika==0.48.8",