Updated evals, added falkordb
This commit is contained in:
parent
d099cae128
commit
8ef23731a3
8 changed files with 247 additions and 8 deletions
191
cognee/infrastructure/databases/graph/falkordb/adapter.py
Normal file
191
cognee/infrastructure/databases/graph/falkordb/adapter.py
Normal file
|
|
@ -0,0 +1,191 @@
|
|||
""" FalcorDB Adapter for Graph Database"""
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional, Any, List, Dict
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
|
||||
from falkordb.asyncio import FalkorDB
|
||||
from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface
|
||||
|
||||
logger = logging.getLogger("FalcorDBAdapter")
|
||||
|
||||
class FalcorDBAdapter(GraphDBInterface):
|
||||
def __init__(
|
||||
self,
|
||||
graph_database_url: str,
|
||||
graph_database_username: str,
|
||||
graph_database_password: str,
|
||||
graph_database_port: int,
|
||||
driver: Optional[Any] = None,
|
||||
graph_name: str = "DefaultGraph",
|
||||
):
|
||||
self.driver = FalkorDB(
|
||||
host = graph_database_url,
|
||||
port = graph_database_port)
|
||||
self.graph_name = graph_name
|
||||
|
||||
|
||||
|
||||
async def query(
|
||||
self,
|
||||
query: str,
|
||||
params: Optional[Dict[str, Any]] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
try:
|
||||
selected_graph = self.driver.select_graph(self.graph_name)
|
||||
|
||||
result = await selected_graph.query(query)
|
||||
return result.result_set
|
||||
|
||||
except Exception as error:
|
||||
logger.error("Falkor query error: %s", error, exc_info = True)
|
||||
raise error
|
||||
|
||||
async def graph(self):
|
||||
return self.driver
|
||||
|
||||
async def add_node(self, node_id: str, node_properties: Dict[str, Any] = None):
|
||||
node_id = node_id.replace(":", "_")
|
||||
|
||||
serialized_properties = self.serialize_properties(node_properties)
|
||||
|
||||
if "name" not in serialized_properties:
|
||||
serialized_properties["name"] = node_id
|
||||
|
||||
# serialized_properties["created_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
# serialized_properties["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
# properties = ", ".join(f"{property_name}: ${property_name}" for property_name in serialized_properties.keys())
|
||||
|
||||
query = f"""MERGE (node:`{node_id}` {{id: $node_id}})
|
||||
ON CREATE SET node += $properties
|
||||
RETURN ID(node) AS internal_id, node.id AS nodeId"""
|
||||
|
||||
params = {
|
||||
"node_id": node_id,
|
||||
"properties": serialized_properties,
|
||||
}
|
||||
|
||||
return await self.query(query, params)
|
||||
|
||||
async def add_nodes(self, nodes: list[tuple[str, dict[str, Any]]]) -> None:
|
||||
# nodes_data = []
|
||||
|
||||
for node in nodes:
|
||||
node_id, node_properties = node
|
||||
node_id = node_id.replace(":", "_")
|
||||
|
||||
await self.add_node(
|
||||
node_id = node_id,
|
||||
node_properties = node_properties,
|
||||
)
|
||||
|
||||
|
||||
|
||||
async def extract_node_description(self, node_id: str):
|
||||
query = """MATCH (n)-[r]->(m)
|
||||
WHERE n.id = $node_id
|
||||
AND NOT m.id CONTAINS 'DefaultGraphModel'
|
||||
RETURN m
|
||||
"""
|
||||
|
||||
result = await self.query(query, dict(node_id = node_id))
|
||||
|
||||
descriptions = []
|
||||
|
||||
for node in result:
|
||||
# Assuming 'm' is a consistent key in your data structure
|
||||
attributes = node.get("m", {})
|
||||
|
||||
# Ensure all required attributes are present
|
||||
if all(key in attributes for key in ["id", "layer_id", "description"]):
|
||||
descriptions.append({
|
||||
"id": attributes["id"],
|
||||
"layer_id": attributes["layer_id"],
|
||||
"description": attributes["description"],
|
||||
})
|
||||
|
||||
return descriptions
|
||||
|
||||
async def get_layer_nodes(self):
|
||||
query = """MATCH (node) WHERE node.layer_id IS NOT NULL
|
||||
RETURN node"""
|
||||
|
||||
return [result['node'] for result in (await self.query(query))]
|
||||
|
||||
async def extract_node(self, node_id: str):
|
||||
query= """
|
||||
MATCH(node {id: $node_id})
|
||||
RETURN node
|
||||
"""
|
||||
|
||||
results = [node['node'] for node in (await self.query(query, dict(node_id = node_id)))]
|
||||
|
||||
return results[0] if len(results) > 0 else None
|
||||
|
||||
async def delete_node(self, node_id: str):
|
||||
node_id = id.replace(":", "_")
|
||||
|
||||
query = f"MATCH (node:`{node_id}` {{id: $node_id}}) DETACH DELETE n"
|
||||
params = { "node_id": node_id }
|
||||
|
||||
return await self.query(query, params)
|
||||
|
||||
async def add_edge(self, from_node: str, to_node: str, relationship_name: str, edge_properties: Optional[Dict[str, Any]] = {}):
|
||||
serialized_properties = self.serialize_properties(edge_properties)
|
||||
from_node = from_node.replace(":", "_")
|
||||
to_node = to_node.replace(":", "_")
|
||||
|
||||
query = f"""MATCH (from_node:`{from_node}` {{id: $from_node}}), (to_node:`{to_node}` {{id: $to_node}})
|
||||
MERGE (from_node)-[r:`{relationship_name}`]->(to_node)
|
||||
SET r += $properties
|
||||
RETURN r"""
|
||||
|
||||
params = {
|
||||
"from_node": from_node,
|
||||
"to_node": to_node,
|
||||
"properties": serialized_properties
|
||||
}
|
||||
|
||||
return await self.query(query, params)
|
||||
|
||||
|
||||
async def add_edges(self, edges: list[tuple[str, str, str, dict[str, Any]]]) -> None:
|
||||
# edges_data = []
|
||||
|
||||
for edge in edges:
|
||||
from_node, to_node, relationship_name, edge_properties = edge
|
||||
from_node = from_node.replace(":", "_")
|
||||
to_node = to_node.replace(":", "_")
|
||||
|
||||
await self.add_edge(
|
||||
from_node = from_node,
|
||||
to_node = to_node,
|
||||
relationship_name = relationship_name,
|
||||
edge_properties = edge_properties
|
||||
)
|
||||
|
||||
|
||||
|
||||
async def filter_nodes(self, search_criteria):
|
||||
query = f"""MATCH (node)
|
||||
WHERE node.id CONTAINS '{search_criteria}'
|
||||
RETURN node"""
|
||||
|
||||
|
||||
return await self.query(query)
|
||||
|
||||
|
||||
async def delete_graph(self):
|
||||
query = """MATCH (node)
|
||||
DETACH DELETE node;"""
|
||||
|
||||
return await self.query(query)
|
||||
|
||||
def serialize_properties(self, properties = dict()):
|
||||
return {
|
||||
property_key: json.dumps(property_value)
|
||||
if isinstance(property_value, (dict, list))
|
||||
else property_value for property_key, property_value in properties.items()
|
||||
}
|
||||
|
|
@ -67,6 +67,14 @@ services:
|
|||
- ./litellm-config.yaml:/app/config.yaml # Mount the local configuration file
|
||||
# You can change the port or number of workers as per your requirements or pass any new supported CLI augument. Make sure the port passed here matches with the container port defined above in `ports` value
|
||||
command: [ "--config", "/app/config.yaml", "--port", "4000", "--num_workers", "8" ]
|
||||
falkordb:
|
||||
image: falkordb/falkordb:edge
|
||||
container_name: falkordb
|
||||
ports:
|
||||
- "6379:6379"
|
||||
- "3001:3000"
|
||||
networks:
|
||||
- cognee_backend
|
||||
|
||||
networks:
|
||||
cognee_backend:
|
||||
|
|
|
|||
|
|
@ -67,12 +67,34 @@ def get_answer(content: str,context, model: Type[BaseModel]= AnswerModel):
|
|||
logger.error("Error extracting cognitive layers from content: %s", error, exc_info = True)
|
||||
raise error
|
||||
|
||||
def run_cognify_base_rag_and_search():
|
||||
async def run_cognify_base_rag():
|
||||
from cognee.api.v1.add import add
|
||||
from cognee.api.v1.prune import prune
|
||||
from cognee.api.v1.cognify.cognify import cognify
|
||||
|
||||
await prune.prune_system()
|
||||
|
||||
await add("data://test_datasets", "initial_test")
|
||||
|
||||
graph = await cognify("initial_test")
|
||||
|
||||
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def run_cognify_and_search():
|
||||
pass
|
||||
async def cognify_search_base_rag(content:str, context:str):
|
||||
vector_client = infrastructure_config.get_config("vector_engine")
|
||||
|
||||
return_ = await vector_client.search(collection_name="basic_rag", query_text="show_all_processes", limit=10)
|
||||
|
||||
print("results", return_)
|
||||
return return_
|
||||
|
||||
async def cognify_search_graph(content:str, context:str):
|
||||
from cognee.api.v1.search.search import search
|
||||
return_ = await search(content)
|
||||
return return_
|
||||
|
||||
|
||||
|
||||
|
|
@ -90,12 +112,29 @@ def convert_goldens_to_test_cases(test_cases_raw: List[LLMTestCase]) -> List[LLM
|
|||
test_cases.append(test_case)
|
||||
return test_cases
|
||||
|
||||
# Data preprocessing before setting the dataset test cases
|
||||
dataset.test_cases = convert_goldens_to_test_cases(dataset.test_cases)
|
||||
# # Data preprocessing before setting the dataset test cases
|
||||
# dataset.test_cases = convert_goldens_to_test_cases(dataset.test_cases)
|
||||
#
|
||||
#
|
||||
# from deepeval.metrics import HallucinationMetric
|
||||
#
|
||||
#
|
||||
# metric = HallucinationMetric()
|
||||
# dataset.evaluate([metric])
|
||||
|
||||
|
||||
from deepeval.metrics import HallucinationMetric
|
||||
if __name__ == "__main__":
|
||||
|
||||
import asyncio
|
||||
|
||||
metric = HallucinationMetric()
|
||||
dataset.evaluate([metric])
|
||||
async def main():
|
||||
await run_cognify_base_rag_and_search()
|
||||
|
||||
asyncio.run(main())
|
||||
# run_cognify_base_rag_and_search()
|
||||
# # Data preprocessing before setting the dataset test cases
|
||||
# dataset.test_cases = convert_goldens_to_test_cases(dataset.test_cases)
|
||||
# from deepeval.metrics import HallucinationMetric
|
||||
# metric = HallucinationMetric()
|
||||
# dataset.evaluate([metric])
|
||||
pass
|
||||
|
|
@ -71,6 +71,7 @@ protobuf = "<5.0.0"
|
|||
langchain-community = "0.0.38"
|
||||
langchain ="0.1.10"
|
||||
deepeval = "^0.21.42"
|
||||
falkordb = "^1.0.4"
|
||||
|
||||
|
||||
[tool.poetry.extras]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue