feat: Calculate graph metrics for networkx graph [COG-1082] (#484)

<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enabled an option to retrieve more detailed metrics, providing
comprehensive analytics for graph and descriptive data.

- **Refactor**
- Standardized the way metrics are obtained across components for
consistent behavior and improved data accuracy.
  
- **Chore**
- Made internal enhancements to support optional detailed metric
calculations, streamlining system performance and ensuring future
scalability.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Igor Ilic <30923996+dexters1@users.noreply.github.com>
This commit is contained in:
alekszievr 2025-02-03 18:05:53 +01:00 committed by GitHub
parent 5119992fd8
commit 2858a674f5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 66 additions and 17 deletions

View file

@ -165,7 +165,7 @@ async def get_default_tasks(
task_config={"batch_size": 10}, task_config={"batch_size": 10},
), ),
Task(add_data_points, task_config={"batch_size": 10}), Task(add_data_points, task_config={"batch_size": 10}),
Task(store_descriptive_metrics), Task(store_descriptive_metrics, include_optional=True),
] ]
except Exception as error: except Exception as error:
send_telemetry("cognee.cognify DEFAULT TASKS CREATION ERRORED", user.id) send_telemetry("cognee.cognify DEFAULT TASKS CREATION ERRORED", user.id)

View file

@ -56,5 +56,5 @@ class GraphDBInterface(Protocol):
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
async def get_graph_metrics(self): async def get_graph_metrics(self, include_optional):
raise NotImplementedError raise NotImplementedError

View file

@ -531,7 +531,7 @@ class Neo4jAdapter(GraphDBInterface):
return (nodes, edges) return (nodes, edges)
async def get_graph_metrics(self): async def get_graph_metrics(self, include_optional=False):
return { return {
"num_nodes": -1, "num_nodes": -1,
"num_edges": -1, "num_edges": -1,

View file

@ -14,6 +14,7 @@ import networkx as nx
from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface from cognee.infrastructure.databases.graph.graph_db_interface import GraphDBInterface
from cognee.infrastructure.engine import DataPoint from cognee.infrastructure.engine import DataPoint
from cognee.modules.storage.utils import JSONEncoder from cognee.modules.storage.utils import JSONEncoder
import numpy as np
logger = logging.getLogger("NetworkXAdapter") logger = logging.getLogger("NetworkXAdapter")
@ -386,16 +387,63 @@ class NetworkXAdapter(GraphDBInterface):
return filtered_nodes, filtered_edges return filtered_nodes, filtered_edges
async def get_graph_metrics(self): async def get_graph_metrics(self, include_optional=False):
return { graph = self.graph
"num_nodes": -1,
"num_edges": -1, def _get_mean_degree(graph):
"mean_degree": -1, degrees = [d for _, d in graph.degree()]
"edge_density": -1, return np.mean(degrees) if degrees else 0
"num_connected_components": -1,
"sizes_of_connected_components": -1, def _get_edge_density(graph):
"num_selfloops": -1, num_nodes = graph.number_of_nodes()
"diameter": -1, num_edges = graph.number_of_edges()
"avg_shortest_path_length": -1, num_possible_edges = num_nodes * (num_nodes - 1)
"avg_clustering": -1, edge_density = num_edges / num_possible_edges if num_possible_edges > 0 else 0
return edge_density
def _get_diameter(graph):
if nx.is_strongly_connected(graph):
return nx.diameter(graph.to_undirected())
else:
return None
def _get_avg_shortest_path_length(graph):
if nx.is_strongly_connected(graph):
return nx.average_shortest_path_length(graph)
else:
return None
def _get_avg_clustering(graph):
try:
return nx.average_clustering(nx.DiGraph(graph))
except Exception as e:
logger.warning("Failed to calculate clustering coefficient: %s", e)
return None
mandatory_metrics = {
"num_nodes": graph.number_of_nodes(),
"num_edges": graph.number_of_edges(),
"mean_degree": _get_mean_degree(graph),
"edge_density": _get_edge_density(graph),
"num_connected_components": nx.number_weakly_connected_components(graph),
"sizes_of_connected_components": [
len(c) for c in nx.weakly_connected_components(graph)
],
} }
if include_optional:
optional_metrics = {
"num_selfloops": sum(1 for u, v in graph.edges() if u == v),
"diameter": _get_diameter(graph),
"avg_shortest_path_length": _get_avg_shortest_path_length(graph),
"avg_clustering": _get_avg_clustering(graph),
}
else:
optional_metrics = {
"num_selfloops": -1,
"diameter": -1,
"avg_shortest_path_length": -1,
"avg_clustering": -1,
}
return mandatory_metrics | optional_metrics

View file

@ -23,10 +23,10 @@ async def fetch_token_count(db_engine) -> int:
return token_count_sum return token_count_sum
async def store_descriptive_metrics(data_points: list[DataPoint]): async def store_descriptive_metrics(data_points: list[DataPoint], include_optional: bool):
db_engine = get_relational_engine() db_engine = get_relational_engine()
graph_engine = await get_graph_engine() graph_engine = await get_graph_engine()
graph_metrics = await graph_engine.get_graph_metrics() graph_metrics = await graph_engine.get_graph_metrics(include_optional)
async with db_engine.get_async_session() as session: async with db_engine.get_async_session() as session:
metrics = GraphMetrics( metrics = GraphMetrics(

View file

@ -1,4 +1,5 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from sqlalchemy.sql import func
from sqlalchemy import Column, DateTime, Float, Integer, JSON, UUID from sqlalchemy import Column, DateTime, Float, Integer, JSON, UUID