Feat: Store descriptive metrics identified by pipeline run id [cog-1260] (#582)

<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a new analytic capability that calculates descriptive graph
metrics for pipeline runs when enabled.
- Updated the execution flow to include an option for activating the
graph metrics step.

- **Chores**
- Removed the previous mechanism for storing descriptive metrics to
streamline the system.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Igor Ilic <30923996+dexters1@users.noreply.github.com>
Co-authored-by: Boris <boris@topoteretes.com>
This commit is contained in:
alekszievr 2025-03-03 19:09:35 +01:00 committed by GitHub
parent 10e4bfb6ab
commit 6d7a68dbba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 69 additions and 54 deletions

View file

@ -8,6 +8,7 @@ from .api.v1.visualize import visualize_graph, start_visualization_server
from cognee.modules.visualization.cognee_network_visualization import (
cognee_network_visualization,
)
from .modules.data.operations.get_pipeline_run_metrics import get_pipeline_run_metrics
# Pipelines
from .modules import pipelines

View file

@ -11,5 +11,3 @@ from .get_data import get_data
# Delete
from .delete_dataset import delete_dataset
from .delete_data import delete_data
from .store_descriptive_metrics import store_descriptive_metrics

View file

@ -1,50 +0,0 @@
from cognee.infrastructure.engine import DataPoint
from cognee.infrastructure.databases.relational import get_relational_engine
from sqlalchemy import select
from sqlalchemy.sql import func
from cognee.modules.data.models import Data
from cognee.modules.data.models import GraphMetrics
import uuid
from cognee.infrastructure.databases.graph import get_graph_engine
async def fetch_token_count(db_engine) -> int:
"""
Fetches and sums token counts from the database.
Returns:
int: The total number of tokens across all documents.
"""
async with db_engine.get_async_session() as session:
token_count_sum = await session.execute(select(func.sum(Data.token_count)))
token_count_sum = token_count_sum.scalar()
return token_count_sum
async def store_descriptive_metrics(data_points: list[DataPoint], include_optional: bool):
db_engine = get_relational_engine()
graph_engine = await get_graph_engine()
graph_metrics = await graph_engine.get_graph_metrics(include_optional)
async with db_engine.get_async_session() as session:
metrics = GraphMetrics(
id=uuid.uuid4(),
num_tokens=await fetch_token_count(db_engine),
num_nodes=graph_metrics["num_nodes"],
num_edges=graph_metrics["num_edges"],
mean_degree=graph_metrics["mean_degree"],
edge_density=graph_metrics["edge_density"],
num_connected_components=graph_metrics["num_connected_components"],
sizes_of_connected_components=graph_metrics["sizes_of_connected_components"],
num_selfloops=graph_metrics["num_selfloops"],
diameter=graph_metrics["diameter"],
avg_shortest_path_length=graph_metrics["avg_shortest_path_length"],
avg_clustering=graph_metrics["avg_clustering"],
)
session.add(metrics)
await session.commit()
return data_points

View file

@ -0,0 +1,60 @@
from cognee.infrastructure.databases.relational import get_relational_engine
from sqlalchemy import select
from sqlalchemy.sql import func
from cognee.modules.data.models import Data
from cognee.modules.data.models import GraphMetrics
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.modules.pipelines.models import PipelineRun
async def fetch_token_count(db_engine) -> int:
"""
Fetches and sums token counts from the database.
Returns:
int: The total number of tokens across all documents.
"""
async with db_engine.get_async_session() as session:
token_count_sum = await session.execute(select(func.sum(Data.token_count)))
token_count_sum = token_count_sum.scalar()
return token_count_sum
async def get_pipeline_run_metrics(pipeline_runs: list[PipelineRun], include_optional: bool):
db_engine = get_relational_engine()
graph_engine = await get_graph_engine()
metrics_for_pipeline_runs = []
async with db_engine.get_async_session() as session:
for pipeline_run in pipeline_runs:
existing_metrics = await session.execute(
select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id)
)
existing_metrics = existing_metrics.scalars().first()
if existing_metrics:
metrics_for_pipeline_runs.append(existing_metrics)
else:
graph_metrics = await graph_engine.get_graph_metrics(include_optional)
metrics = GraphMetrics(
id=pipeline_run.pipeline_run_id,
num_tokens=await fetch_token_count(db_engine),
num_nodes=graph_metrics["num_nodes"],
num_edges=graph_metrics["num_edges"],
mean_degree=graph_metrics["mean_degree"],
edge_density=graph_metrics["edge_density"],
num_connected_components=graph_metrics["num_connected_components"],
sizes_of_connected_components=graph_metrics["sizes_of_connected_components"],
num_selfloops=graph_metrics["num_selfloops"],
diameter=graph_metrics["diameter"],
avg_shortest_path_length=graph_metrics["avg_shortest_path_length"],
avg_clustering=graph_metrics["avg_clustering"],
)
metrics_for_pipeline_runs.append(metrics)
session.add(metrics)
await session.commit()
return metrics_for_pipeline_runs

View file

@ -180,10 +180,15 @@ async def main(enable_steps):
# Step 3: Create knowledge graph
if enable_steps.get("cognify"):
await cognee.cognify()
pipeline_run = await cognee.cognify()
print("Knowledge graph created.")
# Step 4: Query insights
# Step 4: Calculate descriptive metrics
if enable_steps.get("graph_metrics"):
await cognee.get_pipeline_run_metrics(pipeline_run, include_optional=True)
print("Descriptive graph metrics saved to database.")
# Step 5: Query insights
if enable_steps.get("retriever"):
search_results = await cognee.search(
query_type=SearchType.GRAPH_COMPLETION, query_text="Who has experience in design tools?"
@ -201,6 +206,7 @@ if __name__ == "__main__":
"prune_system": rebuild_kg,
"add_text": rebuild_kg,
"cognify": rebuild_kg,
"graph_metrics": rebuild_kg,
"retriever": retrieve,
}