From 6d7a68dbbad3a8500f82273bdcf1b0f84db19d0e Mon Sep 17 00:00:00 2001 From: alekszievr <44192193+alekszievr@users.noreply.github.com> Date: Mon, 3 Mar 2025 19:09:35 +0100 Subject: [PATCH] Feat: Store descriptive metrics identified by pipeline run id [cog-1260] (#582) ## Description ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin ## Summary by CodeRabbit - **New Features** - Introduced a new analytic capability that calculates descriptive graph metrics for pipeline runs when enabled. - Updated the execution flow to include an option for activating the graph metrics step. - **Chores** - Removed the previous mechanism for storing descriptive metrics to streamline the system. --------- Co-authored-by: Igor Ilic <30923996+dexters1@users.noreply.github.com> Co-authored-by: Boris --- cognee/__init__.py | 1 + cognee/modules/data/methods/__init__.py | 2 - .../data/methods/store_descriptive_metrics.py | 50 ---------------- .../operations/get_pipeline_run_metrics.py | 60 +++++++++++++++++++ examples/python/dynamic_steps_example.py | 10 +++- 5 files changed, 69 insertions(+), 54 deletions(-) delete mode 100644 cognee/modules/data/methods/store_descriptive_metrics.py create mode 100644 cognee/modules/data/operations/get_pipeline_run_metrics.py diff --git a/cognee/__init__.py b/cognee/__init__.py index f534d3cf2..4e4930775 100644 --- a/cognee/__init__.py +++ b/cognee/__init__.py @@ -8,6 +8,7 @@ from .api.v1.visualize import visualize_graph, start_visualization_server from cognee.modules.visualization.cognee_network_visualization import ( cognee_network_visualization, ) +from .modules.data.operations.get_pipeline_run_metrics import get_pipeline_run_metrics # Pipelines from .modules import pipelines diff --git a/cognee/modules/data/methods/__init__.py b/cognee/modules/data/methods/__init__.py index 57ac00c1a..c32db1d2f 100644 --- a/cognee/modules/data/methods/__init__.py +++ b/cognee/modules/data/methods/__init__.py @@ -11,5 +11,3 @@ from .get_data import get_data # Delete from .delete_dataset import delete_dataset from .delete_data import delete_data - -from .store_descriptive_metrics import store_descriptive_metrics diff --git a/cognee/modules/data/methods/store_descriptive_metrics.py b/cognee/modules/data/methods/store_descriptive_metrics.py deleted file mode 100644 index c39571d0a..000000000 --- a/cognee/modules/data/methods/store_descriptive_metrics.py +++ /dev/null @@ -1,50 +0,0 @@ -from cognee.infrastructure.engine import DataPoint -from cognee.infrastructure.databases.relational import get_relational_engine -from sqlalchemy import select -from sqlalchemy.sql import func -from cognee.modules.data.models import Data -from cognee.modules.data.models import GraphMetrics -import uuid -from cognee.infrastructure.databases.graph import get_graph_engine - - -async def fetch_token_count(db_engine) -> int: - """ - Fetches and sums token counts from the database. - - Returns: - int: The total number of tokens across all documents. - """ - - async with db_engine.get_async_session() as session: - token_count_sum = await session.execute(select(func.sum(Data.token_count))) - token_count_sum = token_count_sum.scalar() - - return token_count_sum - - -async def store_descriptive_metrics(data_points: list[DataPoint], include_optional: bool): - db_engine = get_relational_engine() - graph_engine = await get_graph_engine() - graph_metrics = await graph_engine.get_graph_metrics(include_optional) - - async with db_engine.get_async_session() as session: - metrics = GraphMetrics( - id=uuid.uuid4(), - num_tokens=await fetch_token_count(db_engine), - num_nodes=graph_metrics["num_nodes"], - num_edges=graph_metrics["num_edges"], - mean_degree=graph_metrics["mean_degree"], - edge_density=graph_metrics["edge_density"], - num_connected_components=graph_metrics["num_connected_components"], - sizes_of_connected_components=graph_metrics["sizes_of_connected_components"], - num_selfloops=graph_metrics["num_selfloops"], - diameter=graph_metrics["diameter"], - avg_shortest_path_length=graph_metrics["avg_shortest_path_length"], - avg_clustering=graph_metrics["avg_clustering"], - ) - - session.add(metrics) - await session.commit() - - return data_points diff --git a/cognee/modules/data/operations/get_pipeline_run_metrics.py b/cognee/modules/data/operations/get_pipeline_run_metrics.py new file mode 100644 index 000000000..19bf4e36a --- /dev/null +++ b/cognee/modules/data/operations/get_pipeline_run_metrics.py @@ -0,0 +1,60 @@ +from cognee.infrastructure.databases.relational import get_relational_engine +from sqlalchemy import select +from sqlalchemy.sql import func +from cognee.modules.data.models import Data +from cognee.modules.data.models import GraphMetrics +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.modules.pipelines.models import PipelineRun + + +async def fetch_token_count(db_engine) -> int: + """ + Fetches and sums token counts from the database. + + Returns: + int: The total number of tokens across all documents. + """ + + async with db_engine.get_async_session() as session: + token_count_sum = await session.execute(select(func.sum(Data.token_count))) + token_count_sum = token_count_sum.scalar() + + return token_count_sum + + +async def get_pipeline_run_metrics(pipeline_runs: list[PipelineRun], include_optional: bool): + db_engine = get_relational_engine() + graph_engine = await get_graph_engine() + + metrics_for_pipeline_runs = [] + + async with db_engine.get_async_session() as session: + for pipeline_run in pipeline_runs: + existing_metrics = await session.execute( + select(GraphMetrics).where(GraphMetrics.id == pipeline_run.pipeline_run_id) + ) + existing_metrics = existing_metrics.scalars().first() + + if existing_metrics: + metrics_for_pipeline_runs.append(existing_metrics) + else: + graph_metrics = await graph_engine.get_graph_metrics(include_optional) + metrics = GraphMetrics( + id=pipeline_run.pipeline_run_id, + num_tokens=await fetch_token_count(db_engine), + num_nodes=graph_metrics["num_nodes"], + num_edges=graph_metrics["num_edges"], + mean_degree=graph_metrics["mean_degree"], + edge_density=graph_metrics["edge_density"], + num_connected_components=graph_metrics["num_connected_components"], + sizes_of_connected_components=graph_metrics["sizes_of_connected_components"], + num_selfloops=graph_metrics["num_selfloops"], + diameter=graph_metrics["diameter"], + avg_shortest_path_length=graph_metrics["avg_shortest_path_length"], + avg_clustering=graph_metrics["avg_clustering"], + ) + metrics_for_pipeline_runs.append(metrics) + session.add(metrics) + await session.commit() + + return metrics_for_pipeline_runs diff --git a/examples/python/dynamic_steps_example.py b/examples/python/dynamic_steps_example.py index 5117dda92..b858eb95f 100644 --- a/examples/python/dynamic_steps_example.py +++ b/examples/python/dynamic_steps_example.py @@ -180,10 +180,15 @@ async def main(enable_steps): # Step 3: Create knowledge graph if enable_steps.get("cognify"): - await cognee.cognify() + pipeline_run = await cognee.cognify() print("Knowledge graph created.") - # Step 4: Query insights + # Step 4: Calculate descriptive metrics + if enable_steps.get("graph_metrics"): + await cognee.get_pipeline_run_metrics(pipeline_run, include_optional=True) + print("Descriptive graph metrics saved to database.") + + # Step 5: Query insights if enable_steps.get("retriever"): search_results = await cognee.search( query_type=SearchType.GRAPH_COMPLETION, query_text="Who has experience in design tools?" @@ -201,6 +206,7 @@ if __name__ == "__main__": "prune_system": rebuild_kg, "add_text": rebuild_kg, "cognify": rebuild_kg, + "graph_metrics": rebuild_kg, "retriever": retrieve, }