small cleanup
This commit is contained in:
parent
602aecba43
commit
51274972e8
3 changed files with 359 additions and 267 deletions
|
|
@ -10,7 +10,13 @@ from typing import List, Dict, Any, Optional, Tuple, Type, Union
|
|||
from falkordb import FalkorDB
|
||||
|
||||
from cognee.exceptions import InvalidValueError
|
||||
from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface, record_graph_changes, NodeData, EdgeData, Node
|
||||
from cognee.infrastructure.databases.graph.graph_db_interface import (
|
||||
GraphDBInterface,
|
||||
record_graph_changes,
|
||||
NodeData,
|
||||
EdgeData,
|
||||
Node,
|
||||
)
|
||||
from cognee.infrastructure.databases.vector.embeddings import EmbeddingEngine
|
||||
from cognee.infrastructure.databases.vector.vector_db_interface import VectorDBInterface
|
||||
from cognee.infrastructure.engine import DataPoint
|
||||
|
|
@ -176,7 +182,14 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
return f"'{json.dumps(value).replace(chr(39), chr(34))}'"
|
||||
if type(value) is str:
|
||||
# Escape single quotes and handle special characters
|
||||
escaped_value = str(value).replace("'", "\\'").replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t')
|
||||
escaped_value = (
|
||||
str(value)
|
||||
.replace("'", "\\'")
|
||||
.replace('"', '\\"')
|
||||
.replace("\n", "\\n")
|
||||
.replace("\r", "\\r")
|
||||
.replace("\t", "\\t")
|
||||
)
|
||||
return f"'{escaped_value}'"
|
||||
return f"'{str(value)}'"
|
||||
|
||||
|
|
@ -215,7 +228,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
}
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
# Clean the properties - remove None values and handle special types
|
||||
clean_properties = {}
|
||||
for key, value in properties.items():
|
||||
|
|
@ -236,37 +249,35 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
SET node += $properties, node.updated_at = timestamp()
|
||||
"""
|
||||
).strip()
|
||||
|
||||
params = {
|
||||
"node_id": str(data_point.id),
|
||||
"properties": clean_properties
|
||||
}
|
||||
|
||||
params = {"node_id": str(data_point.id), "properties": clean_properties}
|
||||
|
||||
return query, params
|
||||
|
||||
def sanitize_relationship_name(self, relationship_name: str) -> str:
|
||||
"""
|
||||
Sanitize relationship name to be valid for Cypher queries.
|
||||
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
- relationship_name (str): The original relationship name
|
||||
|
||||
|
||||
Returns:
|
||||
--------
|
||||
- str: A sanitized relationship name valid for Cypher
|
||||
"""
|
||||
# Replace hyphens, spaces, and other special characters with underscores
|
||||
import re
|
||||
sanitized = re.sub(r'[^\w]', '_', relationship_name)
|
||||
|
||||
sanitized = re.sub(r"[^\w]", "_", relationship_name)
|
||||
# Remove consecutive underscores
|
||||
sanitized = re.sub(r'_+', '_', sanitized)
|
||||
sanitized = re.sub(r"_+", "_", sanitized)
|
||||
# Remove leading/trailing underscores
|
||||
sanitized = sanitized.strip('_')
|
||||
sanitized = sanitized.strip("_")
|
||||
# Ensure it starts with a letter or underscore
|
||||
if sanitized and not sanitized[0].isalpha() and sanitized[0] != '_':
|
||||
sanitized = '_' + sanitized
|
||||
return sanitized or 'RELATIONSHIP'
|
||||
if sanitized and not sanitized[0].isalpha() and sanitized[0] != "_":
|
||||
sanitized = "_" + sanitized
|
||||
return sanitized or "RELATIONSHIP"
|
||||
|
||||
async def create_edge_query(self, edge: tuple[str, str, str, dict]) -> str:
|
||||
"""
|
||||
|
|
@ -285,7 +296,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
"""
|
||||
# Sanitize the relationship name for Cypher compatibility
|
||||
sanitized_relationship = self.sanitize_relationship_name(edge[2])
|
||||
|
||||
|
||||
# Add the original relationship name to properties
|
||||
edge_properties = {**edge[3], "relationship_name": edge[2]}
|
||||
properties = await self.stringify_properties(edge_properties)
|
||||
|
|
@ -368,7 +379,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
else None
|
||||
for property_name in DataPoint.get_embeddable_property_names(data_point)
|
||||
]
|
||||
|
||||
|
||||
query, params = await self.create_data_point_query(data_point, vectorized_data)
|
||||
self.query(query, params)
|
||||
|
||||
|
|
@ -460,13 +471,10 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
clean_properties[key] = f"vecf32({value})"
|
||||
else:
|
||||
clean_properties[key] = value
|
||||
|
||||
|
||||
query = "MERGE (node {id: $node_id}) SET node += $properties, node.updated_at = timestamp()"
|
||||
params = {
|
||||
"node_id": node_id,
|
||||
"properties": clean_properties
|
||||
}
|
||||
|
||||
params = {"node_id": node_id, "properties": clean_properties}
|
||||
|
||||
self.query(query, params)
|
||||
|
||||
# Keep the original create_data_points method for VectorDBInterface compatibility
|
||||
|
|
@ -508,7 +516,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
else None
|
||||
for property_name in DataPoint.get_embeddable_property_names(data_point)
|
||||
]
|
||||
|
||||
|
||||
query, params = await self.create_data_point_query(data_point, vectorized_data)
|
||||
self.query(query, params)
|
||||
|
||||
|
|
@ -550,11 +558,13 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
# Node is in (node_id, properties) format
|
||||
node_id, properties = node
|
||||
await self.add_node(node_id, properties)
|
||||
elif hasattr(node, 'id') and hasattr(node, 'model_dump'):
|
||||
elif hasattr(node, "id") and hasattr(node, "model_dump"):
|
||||
# Node is a DataPoint object
|
||||
await self.add_node(str(node.id), node.model_dump())
|
||||
else:
|
||||
raise ValueError(f"Invalid node format: {node}. Expected tuple (node_id, properties) or DataPoint object.")
|
||||
raise ValueError(
|
||||
f"Invalid node format: {node}. Expected tuple (node_id, properties) or DataPoint object."
|
||||
)
|
||||
|
||||
async def add_edge(
|
||||
self,
|
||||
|
|
@ -578,7 +588,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
"""
|
||||
if properties is None:
|
||||
properties = {}
|
||||
|
||||
|
||||
edge_tuple = (source_id, target_id, relationship_name, properties)
|
||||
query = await self.create_edge_query(edge_tuple)
|
||||
self.query(query)
|
||||
|
|
@ -599,7 +609,9 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
source_id, target_id, relationship_name, properties = edge
|
||||
await self.add_edge(source_id, target_id, relationship_name, properties)
|
||||
else:
|
||||
raise ValueError(f"Invalid edge format: {edge}. Expected tuple (source_id, target_id, relationship_name, properties).")
|
||||
raise ValueError(
|
||||
f"Invalid edge format: {edge}. Expected tuple (source_id, target_id, relationship_name, properties)."
|
||||
)
|
||||
|
||||
async def has_edges(self, edges):
|
||||
"""
|
||||
|
|
@ -782,7 +794,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
RETURN n, 1.0 as score
|
||||
LIMIT $limit
|
||||
"""
|
||||
|
||||
|
||||
params = {"query_text": query_text, "limit": limit}
|
||||
result = self.query(query, params)
|
||||
return result.result_set
|
||||
|
|
@ -945,7 +957,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
"MATCH (node) WHERE node.id = $node_id RETURN node",
|
||||
{"node_id": node_id},
|
||||
)
|
||||
|
||||
|
||||
if result.result_set and len(result.result_set) > 0:
|
||||
# FalkorDB returns node objects as first element in the result list
|
||||
return result.result_set[0][0].properties
|
||||
|
|
@ -964,7 +976,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
"MATCH (node) WHERE node.id IN $node_ids RETURN node",
|
||||
{"node_ids": node_ids},
|
||||
)
|
||||
|
||||
|
||||
nodes = []
|
||||
if result.result_set:
|
||||
for record in result.result_set:
|
||||
|
|
@ -985,7 +997,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
"MATCH (node)-[]-(neighbor) WHERE node.id = $node_id RETURN DISTINCT neighbor",
|
||||
{"node_id": node_id},
|
||||
)
|
||||
|
||||
|
||||
neighbors = []
|
||||
if result.result_set:
|
||||
for record in result.result_set:
|
||||
|
|
@ -1010,17 +1022,19 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
""",
|
||||
{"node_id": node_id},
|
||||
)
|
||||
|
||||
|
||||
edges = []
|
||||
if result.result_set:
|
||||
for record in result.result_set:
|
||||
# FalkorDB returns values by index: source_id, target_id, relationship_name, properties
|
||||
edges.append((
|
||||
record[0], # source_id
|
||||
record[1], # target_id
|
||||
record[2], # relationship_name
|
||||
record[3] # properties
|
||||
))
|
||||
edges.append(
|
||||
(
|
||||
record[0], # source_id
|
||||
record[1], # target_id
|
||||
record[2], # relationship_name
|
||||
record[3], # properties
|
||||
)
|
||||
)
|
||||
return edges
|
||||
|
||||
async def has_edge(self, source_id: str, target_id: str, relationship_name: str) -> bool:
|
||||
|
|
@ -1036,7 +1050,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
"""
|
||||
# Check both the sanitized relationship type and the original name in properties
|
||||
sanitized_relationship = self.sanitize_relationship_name(relationship_name)
|
||||
|
||||
|
||||
result = self.query(
|
||||
f"""
|
||||
MATCH (source)-[r:{sanitized_relationship}]->(target)
|
||||
|
|
@ -1044,9 +1058,13 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
AND (r.relationship_name = $relationship_name OR NOT EXISTS(r.relationship_name))
|
||||
RETURN COUNT(r) > 0 AS edge_exists
|
||||
""",
|
||||
{"source_id": source_id, "target_id": target_id, "relationship_name": relationship_name},
|
||||
{
|
||||
"source_id": source_id,
|
||||
"target_id": target_id,
|
||||
"relationship_name": relationship_name,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
if result.result_set and len(result.result_set) > 0:
|
||||
# FalkorDB returns scalar results as a list, access by index instead of key
|
||||
return result.result_set[0][0]
|
||||
|
|
@ -1065,11 +1083,11 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
# Get basic node and edge counts
|
||||
node_result = self.query("MATCH (n) RETURN count(n) AS node_count")
|
||||
edge_result = self.query("MATCH ()-[r]->() RETURN count(r) AS edge_count")
|
||||
|
||||
|
||||
# FalkorDB returns scalar results as a list, access by index instead of key
|
||||
num_nodes = node_result.result_set[0][0] if node_result.result_set else 0
|
||||
num_edges = edge_result.result_set[0][0] if edge_result.result_set else 0
|
||||
|
||||
|
||||
metrics = {
|
||||
"num_nodes": num_nodes,
|
||||
"num_edges": num_edges,
|
||||
|
|
@ -1078,23 +1096,27 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
"num_connected_components": 1, # Simplified for now
|
||||
"sizes_of_connected_components": [num_nodes] if num_nodes > 0 else [],
|
||||
}
|
||||
|
||||
|
||||
if include_optional:
|
||||
# Add optional metrics - simplified implementation
|
||||
metrics.update({
|
||||
"num_selfloops": 0, # Simplified
|
||||
"diameter": -1, # Not implemented
|
||||
"avg_shortest_path_length": -1, # Not implemented
|
||||
"avg_clustering": -1, # Not implemented
|
||||
})
|
||||
metrics.update(
|
||||
{
|
||||
"num_selfloops": 0, # Simplified
|
||||
"diameter": -1, # Not implemented
|
||||
"avg_shortest_path_length": -1, # Not implemented
|
||||
"avg_clustering": -1, # Not implemented
|
||||
}
|
||||
)
|
||||
else:
|
||||
metrics.update({
|
||||
"num_selfloops": -1,
|
||||
"diameter": -1,
|
||||
"avg_shortest_path_length": -1,
|
||||
"avg_clustering": -1,
|
||||
})
|
||||
|
||||
metrics.update(
|
||||
{
|
||||
"num_selfloops": -1,
|
||||
"diameter": -1,
|
||||
"avg_shortest_path_length": -1,
|
||||
"avg_clustering": -1,
|
||||
}
|
||||
)
|
||||
|
||||
return metrics
|
||||
|
||||
async def get_document_subgraph(self, content_hash: str):
|
||||
|
|
@ -1117,9 +1139,9 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
COLLECT(DISTINCT c) AS made_from_nodes,
|
||||
COLLECT(DISTINCT et) AS orphan_types
|
||||
"""
|
||||
|
||||
|
||||
result = self.query(query, {"content_hash": f"text_{content_hash}"})
|
||||
|
||||
|
||||
if not result.result_set or not result.result_set[0]:
|
||||
return None
|
||||
|
||||
|
|
@ -1154,7 +1176,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
RETURN n
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
# FalkorDB returns node objects as first element in each record
|
||||
return [record[0] for record in result.result_set] if result.result_set else []
|
||||
|
||||
|
|
@ -1171,7 +1193,7 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
- node_name (List[str]): A list of names of the nodes to include in the subgraph.
|
||||
"""
|
||||
label = node_type.__name__
|
||||
|
||||
|
||||
# Find primary nodes of the specified type and names
|
||||
primary_query = f"""
|
||||
UNWIND $names AS wantedName
|
||||
|
|
@ -1179,60 +1201,66 @@ class FalkorDBAdapter(VectorDBInterface, GraphDBInterface):
|
|||
WHERE n.name = wantedName
|
||||
RETURN DISTINCT n.id, properties(n) AS properties
|
||||
"""
|
||||
|
||||
|
||||
primary_result = self.query(primary_query, {"names": node_name})
|
||||
if not primary_result.result_set:
|
||||
return [], []
|
||||
|
||||
# FalkorDB returns values by index: id, properties
|
||||
|
||||
# FalkorDB returns values by index: id, properties
|
||||
primary_ids = [record[0] for record in primary_result.result_set]
|
||||
|
||||
|
||||
# Find neighbors of primary nodes
|
||||
neighbor_query = """
|
||||
MATCH (n)-[]-(neighbor)
|
||||
WHERE n.id IN $ids
|
||||
RETURN DISTINCT neighbor.id, properties(neighbor) AS properties
|
||||
"""
|
||||
|
||||
|
||||
neighbor_result = self.query(neighbor_query, {"ids": primary_ids})
|
||||
# FalkorDB returns values by index: id, properties
|
||||
neighbor_ids = [record[0] for record in neighbor_result.result_set] if neighbor_result.result_set else []
|
||||
|
||||
neighbor_ids = (
|
||||
[record[0] for record in neighbor_result.result_set]
|
||||
if neighbor_result.result_set
|
||||
else []
|
||||
)
|
||||
|
||||
all_ids = list(set(primary_ids + neighbor_ids))
|
||||
|
||||
|
||||
# Get all nodes in the subgraph
|
||||
nodes_query = """
|
||||
MATCH (n)
|
||||
WHERE n.id IN $ids
|
||||
RETURN n.id, properties(n) AS properties
|
||||
"""
|
||||
|
||||
|
||||
nodes_result = self.query(nodes_query, {"ids": all_ids})
|
||||
nodes = []
|
||||
if nodes_result.result_set:
|
||||
for record in nodes_result.result_set:
|
||||
# FalkorDB returns values by index: id, properties
|
||||
nodes.append((record[0], record[1]))
|
||||
|
||||
|
||||
# Get edges between these nodes
|
||||
edges_query = """
|
||||
MATCH (a)-[r]->(b)
|
||||
WHERE a.id IN $ids AND b.id IN $ids
|
||||
RETURN a.id AS source_id, b.id AS target_id, type(r) AS relationship_name, properties(r) AS properties
|
||||
"""
|
||||
|
||||
|
||||
edges_result = self.query(edges_query, {"ids": all_ids})
|
||||
edges = []
|
||||
if edges_result.result_set:
|
||||
for record in edges_result.result_set:
|
||||
# FalkorDB returns values by index: source_id, target_id, relationship_name, properties
|
||||
edges.append((
|
||||
record[0], # source_id
|
||||
record[1], # target_id
|
||||
record[2], # relationship_name
|
||||
record[3] # properties
|
||||
))
|
||||
|
||||
edges.append(
|
||||
(
|
||||
record[0], # source_id
|
||||
record[1], # target_id
|
||||
record[2], # relationship_name
|
||||
record[3], # properties
|
||||
)
|
||||
)
|
||||
|
||||
return nodes, edges
|
||||
|
||||
async def prune(self):
|
||||
|
|
|
|||
|
|
@ -10,6 +10,202 @@ from cognee.modules.engine.utils import (
|
|||
from cognee.shared.data_models import KnowledgeGraph
|
||||
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
|
||||
|
||||
# Constants for node key suffixes
|
||||
TYPE_SUFFIX = "_type"
|
||||
ENTITY_SUFFIX = "_entity"
|
||||
|
||||
|
||||
def _create_ontology_node(ont_node, category: str, data_chunk: DocumentChunk):
|
||||
"""Create an ontology node based on category (classes or individuals)."""
|
||||
ont_node_id = generate_node_id(ont_node.name)
|
||||
ont_node_name = generate_node_name(ont_node.name)
|
||||
|
||||
if category == "classes":
|
||||
return f"{ont_node_id}{TYPE_SUFFIX}", EntityType(
|
||||
id=ont_node_id,
|
||||
name=ont_node_name,
|
||||
description=ont_node_name,
|
||||
ontology_valid=True,
|
||||
)
|
||||
elif category == "individuals":
|
||||
return f"{ont_node_id}{ENTITY_SUFFIX}", Entity(
|
||||
id=ont_node_id,
|
||||
name=ont_node_name,
|
||||
description=ont_node_name,
|
||||
ontology_valid=True,
|
||||
belongs_to_set=data_chunk.belongs_to_set,
|
||||
)
|
||||
return None, None
|
||||
|
||||
|
||||
def _process_ontology_nodes(
|
||||
ontology_nodes, data_chunk: DocumentChunk, added_nodes_map: dict, added_ontology_nodes_map: dict
|
||||
):
|
||||
"""Process and add ontology nodes to the appropriate maps."""
|
||||
for ont_node in ontology_nodes:
|
||||
ont_node_key, ont_node_obj = _create_ontology_node(ont_node, ont_node.category, data_chunk)
|
||||
if (
|
||||
ont_node_key
|
||||
and ont_node_obj
|
||||
and ont_node_key not in added_nodes_map
|
||||
and ont_node_key not in added_ontology_nodes_map
|
||||
):
|
||||
added_ontology_nodes_map[ont_node_key] = ont_node_obj
|
||||
|
||||
|
||||
def _process_ontology_edges(
|
||||
ontology_edges,
|
||||
existing_edges_map: dict,
|
||||
ontology_relationships: list,
|
||||
ontology_valid: bool = True,
|
||||
):
|
||||
"""Process ontology edges and add them to relationships if not already existing."""
|
||||
for source, relation, target in ontology_edges:
|
||||
source_node_id = generate_node_id(source)
|
||||
target_node_id = generate_node_id(target)
|
||||
relationship_name = generate_edge_name(relation)
|
||||
edge_key = f"{source_node_id}_{target_node_id}_{relationship_name}"
|
||||
|
||||
if edge_key not in existing_edges_map:
|
||||
ontology_relationships.append(
|
||||
(
|
||||
source_node_id,
|
||||
target_node_id,
|
||||
relationship_name,
|
||||
{
|
||||
"relationship_name": relationship_name,
|
||||
"source_node_id": source_node_id,
|
||||
"target_node_id": target_node_id,
|
||||
"ontology_valid": ontology_valid,
|
||||
},
|
||||
)
|
||||
)
|
||||
existing_edges_map[edge_key] = True
|
||||
|
||||
|
||||
def _resolve_ontology_mapping(node_name: str, node_type: str, ontology_resolver: OntologyResolver):
|
||||
"""Resolve ontology mapping for a node and return validation result and closest match."""
|
||||
ontology_nodes, ontology_edges, closest_node = ontology_resolver.get_subgraph(
|
||||
node_name=node_name, node_type=node_type
|
||||
)
|
||||
|
||||
ontology_validated = bool(closest_node)
|
||||
mapped_name = closest_node.name if closest_node else node_name
|
||||
|
||||
return ontology_nodes, ontology_edges, ontology_validated, mapped_name
|
||||
|
||||
|
||||
def _get_or_create_type_node(
|
||||
node,
|
||||
data_chunk: DocumentChunk,
|
||||
ontology_resolver: OntologyResolver,
|
||||
added_nodes_map: dict,
|
||||
added_ontology_nodes_map: dict,
|
||||
name_mapping: dict,
|
||||
key_mapping: dict,
|
||||
existing_edges_map: dict,
|
||||
ontology_relationships: list,
|
||||
):
|
||||
"""Get or create a type node with ontology validation."""
|
||||
node_id = generate_node_id(node.id)
|
||||
node_name = generate_node_name(node.name)
|
||||
type_node_id = generate_node_id(node.type)
|
||||
type_node_name = generate_node_name(node.type)
|
||||
type_node_key = f"{type_node_id}{TYPE_SUFFIX}"
|
||||
|
||||
# Check if node already exists
|
||||
if type_node_key in added_nodes_map or type_node_key in key_mapping:
|
||||
return added_nodes_map.get(type_node_key) or added_nodes_map.get(
|
||||
key_mapping.get(type_node_key)
|
||||
)
|
||||
|
||||
# Resolve ontology for type
|
||||
ontology_nodes, ontology_edges, ontology_validated, mapped_type_name = (
|
||||
_resolve_ontology_mapping(type_node_name, "classes", ontology_resolver)
|
||||
)
|
||||
|
||||
# Update mappings if ontology validation succeeded
|
||||
if ontology_validated:
|
||||
name_mapping[type_node_name] = mapped_type_name
|
||||
old_key = type_node_key
|
||||
type_node_id = generate_node_id(mapped_type_name)
|
||||
type_node_key = f"{type_node_id}{TYPE_SUFFIX}"
|
||||
type_node_name = generate_node_name(mapped_type_name)
|
||||
key_mapping[old_key] = type_node_key
|
||||
|
||||
# Create type node
|
||||
type_node = EntityType(
|
||||
id=type_node_id,
|
||||
name=type_node_name,
|
||||
type=type_node_name,
|
||||
description=type_node_name,
|
||||
ontology_valid=ontology_validated,
|
||||
)
|
||||
added_nodes_map[type_node_key] = type_node
|
||||
|
||||
# Process ontology nodes and edges
|
||||
_process_ontology_nodes(ontology_nodes, data_chunk, added_nodes_map, added_ontology_nodes_map)
|
||||
_process_ontology_edges(ontology_edges, existing_edges_map, ontology_relationships)
|
||||
|
||||
return type_node
|
||||
|
||||
|
||||
def _get_or_create_entity_node(
|
||||
node,
|
||||
type_node,
|
||||
data_chunk: DocumentChunk,
|
||||
ontology_resolver: OntologyResolver,
|
||||
added_nodes_map: dict,
|
||||
added_ontology_nodes_map: dict,
|
||||
name_mapping: dict,
|
||||
key_mapping: dict,
|
||||
existing_edges_map: dict,
|
||||
ontology_relationships: list,
|
||||
):
|
||||
"""Get or create an entity node with ontology validation."""
|
||||
node_id = generate_node_id(node.id)
|
||||
node_name = generate_node_name(node.name)
|
||||
entity_node_key = f"{node_id}{ENTITY_SUFFIX}"
|
||||
|
||||
# Check if node already exists
|
||||
if entity_node_key in added_nodes_map or entity_node_key in key_mapping:
|
||||
return added_nodes_map.get(entity_node_key) or added_nodes_map.get(
|
||||
key_mapping.get(entity_node_key)
|
||||
)
|
||||
|
||||
# Resolve ontology for entity
|
||||
ontology_nodes, ontology_edges, ontology_validated, mapped_entity_name = (
|
||||
_resolve_ontology_mapping(node_name, "individuals", ontology_resolver)
|
||||
)
|
||||
|
||||
# Update mappings if ontology validation succeeded
|
||||
if ontology_validated:
|
||||
name_mapping[node_name] = mapped_entity_name
|
||||
old_key = entity_node_key
|
||||
node_id = generate_node_id(mapped_entity_name)
|
||||
entity_node_key = f"{node_id}{ENTITY_SUFFIX}"
|
||||
node_name = generate_node_name(mapped_entity_name)
|
||||
key_mapping[old_key] = entity_node_key
|
||||
|
||||
# Create entity node
|
||||
entity_node = Entity(
|
||||
id=node_id,
|
||||
name=node_name,
|
||||
is_a=type_node,
|
||||
description=node.description,
|
||||
ontology_valid=ontology_validated,
|
||||
belongs_to_set=data_chunk.belongs_to_set,
|
||||
)
|
||||
added_nodes_map[entity_node_key] = entity_node
|
||||
|
||||
# Process ontology nodes and edges
|
||||
_process_ontology_nodes(ontology_nodes, data_chunk, added_nodes_map, added_ontology_nodes_map)
|
||||
_process_ontology_edges(
|
||||
ontology_edges, existing_edges_map, ontology_relationships, ontology_valid=True
|
||||
)
|
||||
|
||||
return entity_node
|
||||
|
||||
|
||||
def expand_with_nodes_and_edges(
|
||||
data_chunks: list[DocumentChunk],
|
||||
|
|
@ -17,17 +213,25 @@ def expand_with_nodes_and_edges(
|
|||
ontology_resolver: OntologyResolver = None,
|
||||
existing_edges_map: Optional[dict[str, bool]] = None,
|
||||
):
|
||||
if existing_edges_map is None:
|
||||
existing_edges_map = {}
|
||||
"""
|
||||
Expand chunk graphs with nodes and edges, applying ontology validation.
|
||||
|
||||
if ontology_resolver is None:
|
||||
ontology_resolver = OntologyResolver()
|
||||
Args:
|
||||
data_chunks: List of document chunks
|
||||
chunk_graphs: List of knowledge graphs corresponding to chunks
|
||||
ontology_resolver: Optional ontology resolver for validation
|
||||
existing_edges_map: Optional map of existing edges to avoid duplicates
|
||||
|
||||
Returns:
|
||||
Tuple of (graph_nodes, graph_edges)
|
||||
"""
|
||||
existing_edges_map = existing_edges_map or {}
|
||||
ontology_resolver = ontology_resolver or OntologyResolver()
|
||||
|
||||
added_nodes_map = {}
|
||||
added_ontology_nodes_map = {}
|
||||
relationships = []
|
||||
ontology_relationships = []
|
||||
|
||||
name_mapping = {}
|
||||
key_mapping = {}
|
||||
|
||||
|
|
@ -35,184 +239,41 @@ def expand_with_nodes_and_edges(
|
|||
if not graph:
|
||||
continue
|
||||
|
||||
# Process nodes
|
||||
for node in graph.nodes:
|
||||
node_id = generate_node_id(node.id)
|
||||
node_name = generate_node_name(node.name)
|
||||
type_node_id = generate_node_id(node.type)
|
||||
type_node_name = generate_node_name(node.type)
|
||||
# Get or create type node
|
||||
type_node = _get_or_create_type_node(
|
||||
node,
|
||||
data_chunk,
|
||||
ontology_resolver,
|
||||
added_nodes_map,
|
||||
added_ontology_nodes_map,
|
||||
name_mapping,
|
||||
key_mapping,
|
||||
existing_edges_map,
|
||||
ontology_relationships,
|
||||
)
|
||||
|
||||
ontology_validated_source_type = False
|
||||
ontology_validated_source_ent = False
|
||||
|
||||
type_node_key = f"{type_node_id}_type"
|
||||
|
||||
if type_node_key not in added_nodes_map and type_node_key not in key_mapping:
|
||||
(
|
||||
ontology_entity_type_nodes,
|
||||
ontology_entity_type_edges,
|
||||
ontology_closest_class_node,
|
||||
) = ontology_resolver.get_subgraph(node_name=type_node_name, node_type="classes")
|
||||
|
||||
if ontology_closest_class_node:
|
||||
name_mapping[type_node_name] = ontology_closest_class_node.name
|
||||
ontology_validated_source_type = True
|
||||
old_key = type_node_key
|
||||
type_node_id = generate_node_id(ontology_closest_class_node.name)
|
||||
type_node_key = f"{type_node_id}_type"
|
||||
type_node_name = generate_node_name(ontology_closest_class_node.name)
|
||||
key_mapping[old_key] = type_node_key
|
||||
|
||||
type_node = EntityType(
|
||||
id=type_node_id,
|
||||
name=type_node_name,
|
||||
type=type_node_name,
|
||||
description=type_node_name,
|
||||
ontology_valid=ontology_validated_source_type,
|
||||
)
|
||||
added_nodes_map[type_node_key] = type_node
|
||||
|
||||
for ontology_node_to_store in ontology_entity_type_nodes:
|
||||
ont_node_id = generate_node_id(ontology_node_to_store.name)
|
||||
ont_node_name = generate_node_name(ontology_node_to_store.name)
|
||||
|
||||
if ontology_node_to_store.category == "classes":
|
||||
ont_node_key = f"{ont_node_id}_type"
|
||||
if (ont_node_key not in added_nodes_map) and (
|
||||
ont_node_key not in added_ontology_nodes_map
|
||||
):
|
||||
added_ontology_nodes_map[ont_node_key] = EntityType(
|
||||
id=ont_node_id,
|
||||
name=ont_node_name,
|
||||
description=ont_node_name,
|
||||
ontology_valid=True,
|
||||
)
|
||||
|
||||
elif ontology_node_to_store.category == "individuals":
|
||||
ont_node_key = f"{ont_node_id}_entity"
|
||||
if (ont_node_key not in added_nodes_map) and (
|
||||
ont_node_key not in added_ontology_nodes_map
|
||||
):
|
||||
added_ontology_nodes_map[ont_node_key] = Entity(
|
||||
id=ont_node_id,
|
||||
name=ont_node_name,
|
||||
description=ont_node_name,
|
||||
ontology_valid=True,
|
||||
belongs_to_set=data_chunk.belongs_to_set,
|
||||
)
|
||||
|
||||
for source, relation, target in ontology_entity_type_edges:
|
||||
source_node_id = generate_node_id(source)
|
||||
target_node_id = generate_node_id(target)
|
||||
relationship_name = generate_edge_name(relation)
|
||||
edge_key = f"{source_node_id}_{target_node_id}_{relationship_name}"
|
||||
|
||||
if edge_key not in existing_edges_map:
|
||||
ontology_relationships.append(
|
||||
(
|
||||
source_node_id,
|
||||
target_node_id,
|
||||
relationship_name,
|
||||
dict(
|
||||
relationship_name=relationship_name,
|
||||
source_node_id=source_node_id,
|
||||
target_node_id=target_node_id,
|
||||
),
|
||||
)
|
||||
)
|
||||
existing_edges_map[edge_key] = True
|
||||
else:
|
||||
type_node = added_nodes_map.get(type_node_key) or added_nodes_map.get(
|
||||
key_mapping.get(type_node_key)
|
||||
)
|
||||
|
||||
entity_node_key = f"{node_id}_entity"
|
||||
|
||||
if entity_node_key not in added_nodes_map and entity_node_key not in key_mapping:
|
||||
ontology_entity_nodes, ontology_entity_edges, start_ent_ont = (
|
||||
ontology_resolver.get_subgraph(node_name=node_name, node_type="individuals")
|
||||
)
|
||||
|
||||
if start_ent_ont:
|
||||
name_mapping[node_name] = start_ent_ont.name
|
||||
ontology_validated_source_ent = True
|
||||
old_key = entity_node_key
|
||||
node_id = generate_node_id(start_ent_ont.name)
|
||||
entity_node_key = f"{node_id}_entity"
|
||||
node_name = generate_node_name(start_ent_ont.name)
|
||||
key_mapping[old_key] = entity_node_key
|
||||
|
||||
entity_node = Entity(
|
||||
id=node_id,
|
||||
name=node_name,
|
||||
is_a=type_node,
|
||||
description=node.description,
|
||||
ontology_valid=ontology_validated_source_ent,
|
||||
belongs_to_set=data_chunk.belongs_to_set,
|
||||
)
|
||||
|
||||
added_nodes_map[entity_node_key] = entity_node
|
||||
|
||||
for ontology_node_to_store in ontology_entity_nodes:
|
||||
ont_node_id = generate_node_id(ontology_node_to_store.name)
|
||||
ont_node_name = generate_node_name(ontology_node_to_store.name)
|
||||
|
||||
if ontology_node_to_store.category == "classes":
|
||||
ont_node_key = f"{ont_node_id}_type"
|
||||
if (ont_node_key not in added_nodes_map) and (
|
||||
ont_node_key not in added_ontology_nodes_map
|
||||
):
|
||||
added_ontology_nodes_map[ont_node_key] = EntityType(
|
||||
id=ont_node_id,
|
||||
name=ont_node_name,
|
||||
description=ont_node_name,
|
||||
ontology_valid=True,
|
||||
)
|
||||
|
||||
elif ontology_node_to_store.category == "individuals":
|
||||
ont_node_key = f"{ont_node_id}_entity"
|
||||
if (ont_node_key not in added_nodes_map) and (
|
||||
ont_node_key not in added_ontology_nodes_map
|
||||
):
|
||||
added_ontology_nodes_map[ont_node_key] = Entity(
|
||||
id=ont_node_id,
|
||||
name=ont_node_name,
|
||||
description=ont_node_name,
|
||||
ontology_valid=True,
|
||||
belongs_to_set=data_chunk.belongs_to_set,
|
||||
)
|
||||
|
||||
for source, relation, target in ontology_entity_edges:
|
||||
source_node_id = generate_node_id(source)
|
||||
target_node_id = generate_node_id(target)
|
||||
relationship_name = generate_edge_name(relation)
|
||||
edge_key = f"{source_node_id}_{target_node_id}_{relationship_name}"
|
||||
|
||||
if edge_key not in existing_edges_map:
|
||||
ontology_relationships.append(
|
||||
(
|
||||
source_node_id,
|
||||
target_node_id,
|
||||
relationship_name,
|
||||
dict(
|
||||
relationship_name=relationship_name,
|
||||
source_node_id=source_node_id,
|
||||
target_node_id=target_node_id,
|
||||
ontology_valid=True,
|
||||
),
|
||||
)
|
||||
)
|
||||
existing_edges_map[edge_key] = True
|
||||
|
||||
else:
|
||||
entity_node = added_nodes_map.get(entity_node_key) or added_nodes_map.get(
|
||||
key_mapping.get(entity_node_key)
|
||||
)
|
||||
# Get or create entity node
|
||||
entity_node = _get_or_create_entity_node(
|
||||
node,
|
||||
type_node,
|
||||
data_chunk,
|
||||
ontology_resolver,
|
||||
added_nodes_map,
|
||||
added_ontology_nodes_map,
|
||||
name_mapping,
|
||||
key_mapping,
|
||||
existing_edges_map,
|
||||
ontology_relationships,
|
||||
)
|
||||
|
||||
# Add entity to chunk
|
||||
if data_chunk.contains is None:
|
||||
data_chunk.contains = []
|
||||
|
||||
data_chunk.contains.append(entity_node)
|
||||
|
||||
# Process edges
|
||||
for edge in graph.edges:
|
||||
source_node_id = generate_node_id(
|
||||
name_mapping.get(edge.source_node_id, edge.source_node_id)
|
||||
|
|
@ -229,12 +290,12 @@ def expand_with_nodes_and_edges(
|
|||
source_node_id,
|
||||
target_node_id,
|
||||
relationship_name,
|
||||
dict(
|
||||
relationship_name=relationship_name,
|
||||
source_node_id=source_node_id,
|
||||
target_node_id=target_node_id,
|
||||
ontology_valid=False,
|
||||
),
|
||||
{
|
||||
"relationship_name": relationship_name,
|
||||
"source_node_id": source_node_id,
|
||||
"target_node_id": target_node_id,
|
||||
"ontology_valid": False,
|
||||
},
|
||||
)
|
||||
)
|
||||
existing_edges_map[edge_key] = True
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ async def check_falkordb_connection():
|
|||
"""Check if FalkorDB is available at localhost:6379"""
|
||||
try:
|
||||
from falkordb import FalkorDB
|
||||
|
||||
client = FalkorDB(host="localhost", port=6379)
|
||||
# Try to list graphs to check connection
|
||||
client.list_graphs()
|
||||
|
|
@ -130,7 +131,7 @@ async def main():
|
|||
# For FalkorDB vector engine, check if collections are empty
|
||||
# Since FalkorDB is a hybrid adapter, we can check if the graph is empty
|
||||
# as the vector data is stored in the same graph
|
||||
if hasattr(vector_engine, 'driver'):
|
||||
if hasattr(vector_engine, "driver"):
|
||||
# This is FalkorDB - check if graphs exist
|
||||
collections = vector_engine.driver.list_graphs()
|
||||
# The graph should be deleted, so either no graphs or empty graph
|
||||
|
|
@ -138,7 +139,9 @@ async def main():
|
|||
# Graph exists but should be empty
|
||||
vector_graph_data = await vector_engine.get_graph_data()
|
||||
vector_nodes, vector_edges = vector_graph_data
|
||||
assert len(vector_nodes) == 0 and len(vector_edges) == 0, "FalkorDB vector database is not empty"
|
||||
assert len(vector_nodes) == 0 and len(vector_edges) == 0, (
|
||||
"FalkorDB vector database is not empty"
|
||||
)
|
||||
else:
|
||||
# Fallback for other vector engines like LanceDB
|
||||
connection = await vector_engine.get_connection()
|
||||
|
|
@ -153,7 +156,7 @@ async def main():
|
|||
|
||||
# For FalkorDB, check if the graph database is empty
|
||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||
|
||||
|
||||
graph_engine = get_graph_engine()
|
||||
graph_data = await graph_engine.get_graph_data()
|
||||
nodes, edges = graph_data
|
||||
|
|
@ -161,7 +164,7 @@ async def main():
|
|||
|
||||
print("🎉 FalkorDB test completed successfully!")
|
||||
print(" ✓ Data ingestion worked")
|
||||
print(" ✓ Cognify processing worked")
|
||||
print(" ✓ Cognify processing worked")
|
||||
print(" ✓ Search operations worked")
|
||||
print(" ✓ Cleanup worked")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue