feature: solve edge embedding duplicates in edge collection + retriever optimization (#1151)

<!-- .github/pull_request_template.md -->

## Description
feature: solve edge embedding duplicates in edge collection + retriever
optimization

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.

---------

Co-authored-by: Vasilije <8619304+Vasilije1990@users.noreply.github.com>
This commit is contained in:
hajdul88 2025-07-29 12:35:38 +02:00 committed by GitHub
parent 4ea4b100ab
commit f78af0cec3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 18 additions and 19 deletions

View file

@ -0,0 +1,5 @@
from uuid import NAMESPACE_OID, uuid5
def generate_edge_id(edge_id: str) -> str:
return uuid5(NAMESPACE_OID, edge_id.lower().replace(" ", "_").replace("'", ""))

View file

@ -170,28 +170,19 @@ class CogneeGraph(CogneeAbstractGraph):
for edge in self.edges: for edge in self.edges:
relationship_type = edge.attributes.get("relationship_type") relationship_type = edge.attributes.get("relationship_type")
if relationship_type and relationship_type in embedding_map: distance = embedding_map.get(relationship_type, None)
edge.attributes["vector_distance"] = embedding_map[relationship_type] if distance is not None:
edge.attributes["vector_distance"] = distance
except Exception as ex: except Exception as ex:
logger.error(f"Error mapping vector distances to edges: {str(ex)}") logger.error(f"Error mapping vector distances to edges: {str(ex)}")
raise ex raise ex
async def calculate_top_triplet_importances(self, k: int) -> List: async def calculate_top_triplet_importances(self, k: int) -> List:
min_heap = [] def score(edge):
n1 = edge.node1.attributes.get("vector_distance", 1)
n2 = edge.node2.attributes.get("vector_distance", 1)
e = edge.attributes.get("vector_distance", 1)
return n1 + n2 + e
for i, edge in enumerate(self.edges): return heapq.nsmallest(k, self.edges, key=score)
source_node = self.get_node(edge.node1.id)
target_node = self.get_node(edge.node2.id)
source_distance = source_node.attributes.get("vector_distance", 1) if source_node else 1
target_distance = target_node.attributes.get("vector_distance", 1) if target_node else 1
edge_distance = edge.attributes.get("vector_distance", 1)
total_distance = source_distance + target_distance + edge_distance
heapq.heappush(min_heap, (-total_distance, i, edge))
if len(min_heap) > k:
heapq.heappop(min_heap)
return [edge for _, _, edge in sorted(min_heap)]

View file

@ -1,3 +1,4 @@
from cognee.modules.engine.utils.generate_edge_id import generate_edge_id
from cognee.shared.logging_utils import get_logger, ERROR from cognee.shared.logging_utils import get_logger, ERROR
from collections import Counter from collections import Counter
@ -49,7 +50,9 @@ async def index_graph_edges(batch_size: int = 1024):
) )
for text, count in edge_types.items(): for text, count in edge_types.items():
edge = EdgeType(relationship_name=text, number_of_edges=count) edge = EdgeType(
id=generate_edge_id(edge_id=text), relationship_name=text, number_of_edges=count
)
data_point_type = type(edge) data_point_type = type(edge)
for field_name in edge.metadata["index_fields"]: for field_name in edge.metadata["index_fields"]: