diff --git a/cognee/eval_framework/evaluation/layered_graph_eval_adapter.py b/cognee/eval_framework/evaluation/layered_graph_eval_adapter.py deleted file mode 100644 index 3169e902b..000000000 --- a/cognee/eval_framework/evaluation/layered_graph_eval_adapter.py +++ /dev/null @@ -1,374 +0,0 @@ -""" -Layered Graph Evaluation Adapter for Cognee. - -This module provides an adapter for evaluating layered knowledge graphs, -allowing evaluation of individual layers and cumulative graphs. -""" - -import logging -from typing import Dict, List, Any, Optional, Union, Tuple - -from cognee.shared.data_models import LayeredKnowledgeGraph, KnowledgeGraph - -# Import evaluation components if available, otherwise use mock classes -try: - # Try to import the actual evaluator classes - from cognee.eval_framework.base_evaluator import BaseEvaluator - from cognee.eval_framework.deepeval_adapter import DeepEval - from cognee.eval_framework.direct_llm_evaluator import DirectLLM - from cognee.eval_framework.retrievers.base_retriever import BaseRetriever - from cognee.eval_framework.retrievers.graph_completion_retriever import GraphCompletionRetriever -except ImportError: - # If not available, create placeholder classes for testing - class BaseEvaluator: - async def evaluate_answers(self, *args, **kwargs): - return {"0": {"correctness": 0.8, "relevance": 0.7}} - - class DeepEval(BaseEvaluator): - pass - - class DirectLLM(BaseEvaluator): - pass - - class BaseRetriever: - def __init__(self, knowledge_graph=None): - self.knowledge_graph = knowledge_graph - - async def retrieve(self, query): - return f"Answer for {query}" - - class GraphCompletionRetriever(BaseRetriever): - pass - -logger = logging.getLogger(__name__) - - -class LayeredGraphEvalAdapter: - """ - Adapter for evaluating layered knowledge graphs. - - This adapter supports evaluation of individual layers and cumulative - evaluation that includes parent layers. - """ - - def __init__( - self, - evaluator: Optional[Union[str, BaseEvaluator]] = "deepeval", - retriever_class: Optional[type] = None - ): - """ - Initialize the layered graph evaluation adapter. - - Args: - evaluator: Either "deepeval", "direct_llm", or a BaseEvaluator instance - retriever_class: Retriever class to use (defaults to GraphCompletionRetriever) - """ - # Initialize evaluator - if evaluator == "deepeval" or evaluator is None: - self.evaluator = DeepEval() - elif evaluator == "direct_llm": - self.evaluator = DirectLLM() - elif isinstance(evaluator, BaseEvaluator): - self.evaluator = evaluator - else: - raise ValueError(f"Unknown evaluator type: {evaluator}") - - # Initialize retriever class - self.retriever_class = retriever_class or GraphCompletionRetriever - - async def evaluate_answers( - self, - answers: List[str], - expected_answers: List[str], - questions: List[str], - eval_metrics: List[str] = None - ) -> Dict[str, Any]: - """ - Evaluate a list of answers using the selected evaluator. - - Args: - answers: List of generated answers - expected_answers: List of expected (ground truth) answers - questions: List of questions corresponding to the answers - eval_metrics: List of evaluation metrics to use - - Returns: - Dictionary of evaluation results - """ - try: - return await self.evaluator.evaluate_answers( - answers=answers, - expected_answers=expected_answers, - questions=questions, - eval_metrics=eval_metrics - ) - except Exception as e: - logger.warning(f"Error evaluating answers: {str(e)}") - # Return mock evaluation results for testing - return {str(i): {metric: 0.5 for metric in (eval_metrics or ["correctness", "relevance"])} - for i in range(len(questions))} - - async def evaluate_layered_graph( - self, - layered_graph: LayeredKnowledgeGraph, - questions: List[str], - expected_answers: List[str], - eval_metrics: List[str] = None, - layer_ids: List[str] = None, - include_per_question_scores: bool = False - ) -> Dict[str, Any]: - """ - Evaluate a layered knowledge graph by evaluating each layer individually - and cumulatively. - - Args: - layered_graph: The layered knowledge graph to evaluate - questions: List of evaluation questions - expected_answers: List of expected answers corresponding to questions - eval_metrics: List of evaluation metrics to use - layer_ids: Optional list of layer IDs to evaluate (None = all layers) - include_per_question_scores: Whether to include scores for each question - - Returns: - Dictionary containing evaluation results for each layer and - cumulative results - """ - if eval_metrics is None: - eval_metrics = ["faithfulness", "relevance", "correctness"] - - # If no specific layers, evaluate all layers - if layer_ids is None: - layer_ids = [layer.id for layer in layered_graph.layers] - - # Prepare results dictionary - results = { - "per_layer": {}, - "cumulative": {}, - "layer_improvements": {}, - "overall_metrics": {}, - "questions": questions, - "expected_answers": expected_answers - } - - # Evaluate each layer individually - for layer_id in layer_ids: - logger.info(f"Evaluating layer {layer_id} individually") - - # Get the individual layer graph - layer_graph = layered_graph.get_layer_graph(layer_id) - - # Skip empty layers - if len(layer_graph.nodes) == 0: - logger.warning(f"Skipping empty layer: {layer_id}") - results["per_layer"][layer_id] = { - "is_empty": True, - "metrics": {metric: 0.0 for metric in eval_metrics} - } - continue - - # Generate answers for this layer - layer_answers = await self._generate_answers( - questions=questions, - knowledge_graph=layer_graph - ) - - # Evaluate answers - layer_eval_results = await self.evaluate_answers( - answers=layer_answers, - expected_answers=expected_answers, - questions=questions, - eval_metrics=eval_metrics - ) - - # Store individual layer results - results["per_layer"][layer_id] = { - "is_empty": False, - "metrics": self._summarize_eval_results(layer_eval_results, eval_metrics), - "answers": layer_answers - } - - if include_per_question_scores: - results["per_layer"][layer_id]["per_question"] = layer_eval_results - - # Evaluate each layer cumulatively (including parent layers) - for layer_id in layer_ids: - logger.info(f"Evaluating layer {layer_id} cumulatively") - - # Get the cumulative graph for this layer - cumulative_graph = layered_graph.get_cumulative_layer_graph(layer_id) - - # Skip if no nodes in cumulative graph (shouldn't happen but just in case) - if len(cumulative_graph.nodes) == 0: - logger.warning(f"Skipping empty cumulative graph for layer: {layer_id}") - results["cumulative"][layer_id] = { - "is_empty": True, - "metrics": {metric: 0.0 for metric in eval_metrics} - } - continue - - # Generate answers for this cumulative graph - cumulative_answers = await self._generate_answers( - questions=questions, - knowledge_graph=cumulative_graph - ) - - # Evaluate answers - cumulative_eval_results = await self.evaluate_answers( - answers=cumulative_answers, - expected_answers=expected_answers, - questions=questions, - eval_metrics=eval_metrics - ) - - # Store cumulative results - results["cumulative"][layer_id] = { - "is_empty": False, - "metrics": self._summarize_eval_results(cumulative_eval_results, eval_metrics), - "answers": cumulative_answers - } - - if include_per_question_scores: - results["cumulative"][layer_id]["per_question"] = cumulative_eval_results - - # Calculate layer improvements (how much each layer contributes) - results["layer_improvements"] = self._calculate_layer_improvements( - layered_graph=layered_graph, - cumulative_results=results["cumulative"], - layer_ids=layer_ids, - eval_metrics=eval_metrics - ) - - # Calculate overall metrics - if layer_ids: - top_layer_id = layer_ids[-1] # Assume the last layer is the top layer - if top_layer_id in results["cumulative"] and not results["cumulative"][top_layer_id].get("is_empty", False): - results["overall_metrics"] = results["cumulative"][top_layer_id]["metrics"].copy() - - return results - - async def _generate_answers( - self, - questions: List[str], - knowledge_graph: KnowledgeGraph - ) -> List[str]: - """ - Generate answers for a set of questions based on the knowledge graph. - - Args: - questions: List of questions to answer - knowledge_graph: Knowledge graph to use for answering - - Returns: - List of generated answers - """ - # Create retriever for this graph - try: - retriever = self.retriever_class(knowledge_graph=knowledge_graph) - - # Generate answers - answers = [] - for question in questions: - try: - answer = await retriever.retrieve(query=question) - answers.append(answer) - except Exception as e: - logger.error(f"Error generating answer for question: {question}") - logger.error(f"Error details: {str(e)}") - answers.append(f"Mock answer for: {question}") - except Exception as e: - logger.warning(f"Error creating retriever: {str(e)}") - # Return mock answers for testing - answers = [f"Mock answer for: {q}" for q in questions] - - return answers - - def _summarize_eval_results( - self, - eval_results: Dict[str, Any], - eval_metrics: List[str] - ) -> Dict[str, float]: - """ - Average scores for specified evaluation metrics. - - Args: - eval_results: Evaluation results from evaluator - eval_metrics: List of evaluation metrics to summarize - - Returns: - Dictionary of averaged metric scores - """ - metrics = {} - - # Create a summary of the metrics - for metric in eval_metrics: - metric_scores = [] - - # Gather all scores for this metric - for question_idx in eval_results: - if metric in eval_results[question_idx]: - score = eval_results[question_idx][metric] - if isinstance(score, (int, float)): - metric_scores.append(score) - - # Calculate average score - if metric_scores: - metrics[metric] = sum(metric_scores) / len(metric_scores) - else: - metrics[metric] = 0.0 - - return metrics - - def _calculate_layer_improvements( - self, - layered_graph: LayeredKnowledgeGraph, - cumulative_results: Dict[str, Any], - layer_ids: List[str], - eval_metrics: List[str] - ) -> Dict[str, Dict[str, float]]: - """ - Calculate the improvement contributed by each layer. - - Args: - layered_graph: The layered knowledge graph - cumulative_results: Dictionary of cumulative evaluation results - layer_ids: List of layer IDs to analyze - eval_metrics: List of evaluation metrics to analyze - - Returns: - Dictionary mapping layer IDs to their improvement metrics - """ - improvements = {} - - # Get layer dependencies - layer_parents = {layer.id: layer.parent_layers for layer in layered_graph.layers} - - # For each layer, calculate improvements over parent layers - for layer_id in layer_ids: - if layer_id not in cumulative_results or cumulative_results[layer_id].get("is_empty", False): - continue - - parent_layers = layer_parents.get(layer_id, []) - - # If no parents, improvement is the absolute score - if not parent_layers: - improvements[layer_id] = { - metric: cumulative_results[layer_id]["metrics"].get(metric, 0.0) - for metric in eval_metrics - } - continue - - # Find the parent with the highest cumulative score - best_parent_metrics = {metric: 0.0 for metric in eval_metrics} - for parent_id in parent_layers: - if parent_id in cumulative_results and not cumulative_results[parent_id].get("is_empty", False): - for metric in eval_metrics: - parent_score = cumulative_results[parent_id]["metrics"].get(metric, 0.0) - best_parent_metrics[metric] = max(best_parent_metrics[metric], parent_score) - - # Calculate improvement as current score - best parent score - improvements[layer_id] = { - metric: cumulative_results[layer_id]["metrics"].get(metric, 0.0) - best_parent_metrics.get(metric, 0.0) - for metric in eval_metrics - } - - return improvements \ No newline at end of file diff --git a/cognee/examples/layered_graph_example.py b/cognee/examples/layered_graph_example.py deleted file mode 100644 index 178aec0d8..000000000 --- a/cognee/examples/layered_graph_example.py +++ /dev/null @@ -1,568 +0,0 @@ -""" -Layered Knowledge Graph Example for Cognee. - -This example demonstrates how to build layered knowledge graphs and evaluate -them using the Cognee framework. -""" - -import asyncio -import logging -import sys -from typing import List, Dict, Any - -from cognee.shared.data_models import KnowledgeGraph, Node, Edge -from cognee.modules.graph.layered_graph_builder import LayeredGraphBuilder, convert_to_layered_graph -from cognee.modules.graph.layered_graph_service import LayeredGraphService - -# Configure logging -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) - -# Try to import evaluation components, but continue if not available -try: - from cognee.eval_framework.evaluation.layered_graph_eval_adapter import LayeredGraphEvalAdapter - EVAL_AVAILABLE = True - logger.info("Evaluation framework available. Full example will run.") -except ImportError: - EVAL_AVAILABLE = False - logger.warning("Evaluation framework not available. Running with limited functionality.") - - -def create_car_brands_graph() -> KnowledgeGraph: - """ - Create a simple knowledge graph about car brands. - - Returns: - Knowledge graph with car brands - """ - nodes = [ - Node( - id="brand_audi", - name="Audi", - type="CarBrand", - description="Audi is a German luxury car manufacturer." - ), - Node( - id="brand_bmw", - name="BMW", - type="CarBrand", - description="BMW is a German luxury car manufacturer." - ), - Node( - id="country_germany", - name="Germany", - type="Country", - description="Germany is a country in Central Europe." - ), - Node( - id="city_munich", - name="Munich", - type="City", - description="Munich is a city in Germany." - ), - Node( - id="city_ingolstadt", - name="Ingolstadt", - type="City", - description="Ingolstadt is a city in Germany." - ) - ] - - edges = [ - Edge( - source_node_id="brand_audi", - target_node_id="country_germany", - relationship_name="HEADQUARTERED_IN" - ), - Edge( - source_node_id="brand_bmw", - target_node_id="country_germany", - relationship_name="HEADQUARTERED_IN" - ), - Edge( - source_node_id="brand_audi", - target_node_id="city_ingolstadt", - relationship_name="BASED_IN" - ), - Edge( - source_node_id="brand_bmw", - target_node_id="city_munich", - relationship_name="BASED_IN" - ), - Edge( - source_node_id="city_munich", - target_node_id="country_germany", - relationship_name="LOCATED_IN" - ), - Edge( - source_node_id="city_ingolstadt", - target_node_id="country_germany", - relationship_name="LOCATED_IN" - ) - ] - - return KnowledgeGraph( - nodes=nodes, - edges=edges, - name="Car Brands Graph", - description="A knowledge graph about car brands and their locations." - ) - - -def create_car_models_graph() -> KnowledgeGraph: - """ - Create a knowledge graph about car models. - - Returns: - Knowledge graph with car models - """ - nodes = [ - Node( - id="model_a4", - name="A4", - type="CarModel", - description="The Audi A4 is a line of compact executive cars produced since 1994." - ), - Node( - id="model_a6", - name="A6", - type="CarModel", - description="The Audi A6 is an executive car made by Audi." - ), - Node( - id="model_3series", - name="3 Series", - type="CarModel", - description="The BMW 3 Series is a line of compact executive cars." - ), - Node( - id="model_5series", - name="5 Series", - type="CarModel", - description="The BMW 5 Series is an executive car manufactured by BMW." - ), - Node( - id="brand_audi", - name="Audi", - type="CarBrand", - description="Audi is a German luxury car manufacturer." - ), - Node( - id="brand_bmw", - name="BMW", - type="CarBrand", - description="BMW is a German luxury car manufacturer." - ) - ] - - edges = [ - Edge( - source_node_id="model_a4", - target_node_id="brand_audi", - relationship_name="MADE_BY" - ), - Edge( - source_node_id="model_a6", - target_node_id="brand_audi", - relationship_name="MADE_BY" - ), - Edge( - source_node_id="model_3series", - target_node_id="brand_bmw", - relationship_name="MADE_BY" - ), - Edge( - source_node_id="model_5series", - target_node_id="brand_bmw", - relationship_name="MADE_BY" - ) - ] - - return KnowledgeGraph( - nodes=nodes, - edges=edges, - name="Car Models Graph", - description="A knowledge graph about car models and their brands." - ) - - -def create_car_specs_graph() -> KnowledgeGraph: - """ - Create a knowledge graph about car specifications. - - Returns: - Knowledge graph with car specifications - """ - nodes = [ - Node( - id="model_a4", - name="A4", - type="CarModel", - description="The Audi A4 is a line of compact executive cars produced since 1994." - ), - Node( - id="model_a6", - name="A6", - type="CarModel", - description="The Audi A6 is an executive car made by Audi." - ), - Node( - id="model_3series", - name="3 Series", - type="CarModel", - description="The BMW 3 Series is a line of compact executive cars." - ), - Node( - id="model_5series", - name="5 Series", - type="CarModel", - description="The BMW 5 Series is an executive car manufactured by BMW." - ), - Node( - id="engine_2_0tdi", - name="2.0 TDI", - type="Engine", - description="2.0-liter turbocharged diesel engine." - ), - Node( - id="engine_3_0tfsi", - name="3.0 TFSI", - type="Engine", - description="3.0-liter turbocharged gasoline engine." - ), - Node( - id="engine_2_0i", - name="2.0i", - type="Engine", - description="2.0-liter gasoline engine." - ), - Node( - id="engine_3_0i", - name="3.0i", - type="Engine", - description="3.0-liter gasoline engine." - ) - ] - - edges = [ - Edge( - source_node_id="model_a4", - target_node_id="engine_2_0tdi", - relationship_name="HAS_ENGINE_OPTION" - ), - Edge( - source_node_id="model_a6", - target_node_id="engine_2_0tdi", - relationship_name="HAS_ENGINE_OPTION" - ), - Edge( - source_node_id="model_a6", - target_node_id="engine_3_0tfsi", - relationship_name="HAS_ENGINE_OPTION" - ), - Edge( - source_node_id="model_3series", - target_node_id="engine_2_0i", - relationship_name="HAS_ENGINE_OPTION" - ), - Edge( - source_node_id="model_5series", - target_node_id="engine_2_0i", - relationship_name="HAS_ENGINE_OPTION" - ), - Edge( - source_node_id="model_5series", - target_node_id="engine_3_0i", - relationship_name="HAS_ENGINE_OPTION" - ) - ] - - return KnowledgeGraph( - nodes=nodes, - edges=edges, - name="Car Specifications Graph", - description="A knowledge graph about car specifications and engine options." - ) - - -async def build_layered_car_graph() -> Dict[str, Any]: - """ - Build a layered car knowledge graph. - - Returns: - Dictionary with the layered graph and layer IDs - """ - # Create the builder - builder = LayeredGraphBuilder( - name="Layered Car Knowledge Graph", - description="A layered knowledge graph about cars, with brands, models, and specifications." - ) - - # Create the base layer with car brands - base_layer_id = builder.create_layer( - name="Car Brands Layer", - description="Base layer with car brands and geographical information", - layer_type="base" - ) - - # Add car brands subgraph to the base layer - brands_graph = create_car_brands_graph() - builder.add_subgraph_to_layer(base_layer_id, brands_graph) - - # Create the models layer - models_layer_id = builder.create_layer( - name="Car Models Layer", - description="Layer with car models information", - layer_type="enrichment", - parent_layers=[base_layer_id] - ) - - # Add car models subgraph to the models layer - models_graph = create_car_models_graph() - builder.add_subgraph_to_layer(models_layer_id, models_graph) - - # Create the specifications layer - specs_layer_id = builder.create_layer( - name="Car Specifications Layer", - description="Layer with car specifications and engine information", - layer_type="enrichment", - parent_layers=[models_layer_id] - ) - - # Add car specifications subgraph to the specifications layer - specs_graph = create_car_specs_graph() - builder.add_subgraph_to_layer(specs_layer_id, specs_graph) - - # Build the layered graph - layered_graph = builder.build() - - # Return the graph and layer IDs - return { - "layered_graph": layered_graph, - "base_layer_id": base_layer_id, - "models_layer_id": models_layer_id, - "specs_layer_id": specs_layer_id - } - - -async def analyze_layered_graph(layered_graph_data: Dict[str, Any]) -> None: - """ - Analyze a layered graph using the LayeredGraphService. - - Args: - layered_graph_data: Dictionary with layered graph and layer IDs - """ - layered_graph = layered_graph_data["layered_graph"] - - # Analyze layer dependencies - dependencies = await LayeredGraphService.analyze_layer_dependencies(layered_graph) - logger.info("Layer Dependencies:") - logger.info(f"Root layers: {dependencies['root_layers']}") - logger.info(f"Leaf layers: {dependencies['leaf_layers']}") - logger.info(f"Max depth: {dependencies['max_depth']}") - - # Calculate metrics for each layer - metrics = await LayeredGraphService.calculate_layer_metrics(layered_graph) - logger.info("\nLayer Metrics:") - for layer_id, layer_metrics in metrics.items(): - logger.info(f"Layer {layer_id}:") - logger.info(f" Node count: {layer_metrics['node_count']}") - logger.info(f" Edge count: {layer_metrics['edge_count']}") - logger.info(f" Cumulative node count: {layer_metrics['cumulative_node_count']}") - logger.info(f" Cumulative edge count: {layer_metrics['cumulative_edge_count']}") - logger.info(f" Node contribution ratio: {layer_metrics['node_contribution_ratio']:.2f}") - logger.info(f" Edge contribution ratio: {layer_metrics['edge_contribution_ratio']:.2f}") - - # Compare base layer and models layer - base_layer_id = layered_graph_data["base_layer_id"] - models_layer_id = layered_graph_data["models_layer_id"] - diff_result = await LayeredGraphService.diff_layers( - layered_graph, base_layer_id, models_layer_id - ) - logger.info("\nDifference between Base Layer and Models Layer:") - logger.info(f"Added nodes: {len(diff_result['added_nodes'])}") - logger.info(f"Added edges: {len(diff_result['added_edges'])}") - - # Extract a specific relationship type - filtered_graph = await LayeredGraphService.filter_graph_by_relationship_types( - layered_graph, ["MADE_BY"], include_only=True - ) - logger.info("\nFiltered Graph (MADE_BY relationships only):") - logger.info(f"Node count: {len(filtered_graph.nodes)}") - logger.info(f"Edge count: {len(filtered_graph.edges)}") - - -async def evaluate_layered_graph(layered_graph_data: Dict[str, Any]) -> None: - """ - Evaluate a layered knowledge graph using the LayeredGraphEvalAdapter. - - Args: - layered_graph_data: Dictionary with layered graph and layer IDs - """ - if not EVAL_AVAILABLE: - logger.warning("Skipping evaluation as evaluation framework is not available.") - return - - layered_graph = layered_graph_data["layered_graph"] - - # Create evaluation questions - questions = [ - "What car brands are headquartered in Germany?", - "Which city is Audi based in?", - "What models does Audi make?", - "What engine options are available for the Audi A6?", - "Where is the BMW 5 Series manufactured?" - ] - - # Expected answers (simplified for the example) - expected_answers = [ - "Audi and BMW are car brands headquartered in Germany.", - "Audi is based in Ingolstadt, Germany.", - "Audi makes the A4 and A6 models.", - "The Audi A6 has 2.0 TDI and 3.0 TFSI engine options.", - "The BMW 5 Series is manufactured by BMW in Munich, Germany." - ] - - # Create the evaluation adapter - evaluator = LayeredGraphEvalAdapter(evaluator="direct_llm") - - # Define layer IDs to evaluate - layer_ids = [ - layered_graph_data["base_layer_id"], - layered_graph_data["models_layer_id"], - layered_graph_data["specs_layer_id"] - ] - - # Evaluate the layered graph - logger.info("\nEvaluating layered graph...") - eval_results = await evaluator.evaluate_layered_graph( - layered_graph=layered_graph, - questions=questions, - expected_answers=expected_answers, - eval_metrics=["correctness", "relevance"], - layer_ids=layer_ids - ) - - # Print evaluation results - logger.info("\nEvaluation Results:") - - # Per layer results - logger.info("\nPer Layer Results:") - for layer_id, layer_results in eval_results["per_layer"].items(): - if layer_results.get("is_empty", False): - logger.info(f"Layer {layer_id}: Empty layer") - continue - - logger.info(f"Layer {layer_id}:") - for metric, score in layer_results["metrics"].items(): - logger.info(f" {metric}: {score:.4f}") - - # Cumulative results - logger.info("\nCumulative Results:") - for layer_id, layer_results in eval_results["cumulative"].items(): - if layer_results.get("is_empty", False): - logger.info(f"Cumulative {layer_id}: Empty layer") - continue - - logger.info(f"Cumulative {layer_id}:") - for metric, score in layer_results["metrics"].items(): - logger.info(f" {metric}: {score:.4f}") - - # Layer improvements - logger.info("\nLayer Improvements:") - for layer_id, improvements in eval_results["layer_improvements"].items(): - logger.info(f"Layer {layer_id} improvements:") - for metric, improvement in improvements.items(): - # Format as percentage with +/- sign - sign = "+" if improvement >= 0 else "" - logger.info(f" {metric}: {sign}{improvement:.2%}") - - -async def demonstrate_converting_regular_graph() -> None: - """ - Demonstrate converting a regular KnowledgeGraph to a LayeredKnowledgeGraph. - """ - # Create a regular knowledge graph - regular_graph = create_car_brands_graph() - - # Convert to a layered graph - layered_graph = await convert_to_layered_graph( - knowledge_graph=regular_graph, - layer_name="Brands Base Layer", - layer_description="Converted from regular graph", - graph_name="Converted Brands Graph", - graph_description="Demonstration of converting a regular graph to a layered graph" - ) - - # Print information about the converted graph - logger.info("\nConverted Regular Graph to Layered Graph:") - logger.info(f"Graph name: {layered_graph.name}") - logger.info(f"Number of layers: {len(layered_graph.layers)}") - if layered_graph.layers: - layer = layered_graph.layers[0] - logger.info(f"Layer name: {layer.name}") - logger.info(f"Layer description: {layer.description}") - - # Get the graph from the single layer - layer_graph = layered_graph.get_layer_graph(layered_graph.layers[0].id) - logger.info(f"Layer node count: {len(layer_graph.nodes)}") - logger.info(f"Layer edge count: {len(layer_graph.edges)}") - - -async def main() -> None: - """ - Main function to run the layered graph example. - """ - logger.info("===== Layered Knowledge Graph Example =====") - - logger.info("\nBuilding layered car knowledge graph...") - layered_graph_data = await build_layered_car_graph() - - logger.info("\nAnalyzing layered graph...") - await analyze_layered_graph(layered_graph_data) - - logger.info("\nDemonstrating conversion of regular graph to layered graph...") - await demonstrate_converting_regular_graph() - - if EVAL_AVAILABLE: - logger.info("\nEvaluating layered graph...") - await evaluate_layered_graph(layered_graph_data) - else: - logger.info("\nSkipping evaluation (framework not available).") - # Display graph structure details instead - layered_graph = layered_graph_data["layered_graph"] - logger.info(f"\nLayered Graph Structure:") - for i, layer in enumerate(layered_graph.layers): - logger.info(f"Layer {i+1}: {layer.name} (ID: {layer.id})") - layer_graph = layered_graph.get_layer_graph(layer.id) - logger.info(f" Nodes: {len(layer_graph.nodes)}") - logger.info(f" Edges: {len(layer_graph.edges)}") - - # Show node and relationship types - node_types = {} - for node in layer_graph.nodes: - if node.type not in node_types: - node_types[node.type] = 0 - node_types[node.type] += 1 - - rel_types = {} - for edge in layer_graph.edges: - if edge.relationship_name not in rel_types: - rel_types[edge.relationship_name] = 0 - rel_types[edge.relationship_name] += 1 - - logger.info(f" Node types: {node_types}") - logger.info(f" Relationship types: {rel_types}") - - # If not the first layer, show parent layers - if layer.parent_layers: - logger.info(f" Parent layers: {layer.parent_layers}") - - logger.info("\nExample completed!") - - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/cognee/modules/graph/layered_graph_builder.py b/cognee/modules/graph/layered_graph_builder.py deleted file mode 100644 index a8d1c377e..000000000 --- a/cognee/modules/graph/layered_graph_builder.py +++ /dev/null @@ -1,313 +0,0 @@ -""" -Layered Knowledge Graph Builder for Cognee. - -This module provides utilities for building and managing layered knowledge graphs. -""" - -import uuid -from typing import Dict, List, Optional, Any, Union - -from cognee.shared.data_models import ( - LayeredKnowledgeGraph, - KnowledgeGraph, - Layer, - Node, - Edge -) - - -class LayeredGraphBuilder: - """ - Utility class for building layered knowledge graphs in Cognee. - - This class provides methods for creating and managing layered knowledge graphs, - including adding layers, nodes, and edges to specific layers, and building - hierarchical relationships between layers. - """ - - def __init__(self, name: str = "Layered Knowledge Graph", description: str = ""): - """ - Initialize a new layered graph builder. - - Args: - name: Name of the layered graph - description: Description of the layered graph - """ - # Initialize empty base graph - self.base_graph = KnowledgeGraph(nodes=[], edges=[]) - - # Initialize layered graph - self.layered_graph = LayeredKnowledgeGraph( - base_graph=self.base_graph, - layers=[], - name=name, - description=description - ) - - # Keep track of layers - self.layers: Dict[str, Layer] = {} - - # Keep track of node IDs by layer - self.layer_nodes: Dict[str, List[str]] = {} - - # Keep track of edge IDs by layer - self.layer_edges: Dict[str, List[tuple]] = {} - - def create_layer( - self, - name: str, - description: str, - layer_type: str = "default", - parent_layers: List[str] = None, - layer_id: Optional[str] = None, - properties: Optional[Dict[str, Any]] = None - ) -> str: - """ - Create a new layer in the layered graph. - - Args: - name: Name of the layer - description: Description of the layer - layer_type: Type of the layer (e.g., "base", "enrichment", "inference") - parent_layers: List of parent layer IDs - layer_id: Specific ID for the layer (generated if not provided) - properties: Additional layer properties - - Returns: - ID of the created layer - """ - # Generate ID if not provided - if layer_id is None: - layer_id = str(uuid.uuid4()) - - # Initialize empty parents list if not provided - if parent_layers is None: - parent_layers = [] - - # Initialize empty properties dict if not provided - if properties is None: - properties = {} - - # Verify parent layers exist - for parent_id in parent_layers: - if parent_id not in self.layers: - raise ValueError(f"Parent layer with ID {parent_id} does not exist") - - # Create layer - layer = Layer( - id=layer_id, - name=name, - description=description, - layer_type=layer_type, - parent_layers=parent_layers, - properties=properties - ) - - # Add layer to layered graph - self.layered_graph.add_layer(layer) - - # Keep track of layer - self.layers[layer_id] = layer - self.layer_nodes[layer_id] = [] - self.layer_edges[layer_id] = [] - - return layer_id - - def add_node_to_layer( - self, - layer_id: str, - node_id: str, - name: str, - node_type: str, - description: str, - properties: Optional[Dict[str, Any]] = None - ) -> Node: - """ - Add a node to a specific layer. - - Args: - layer_id: ID of the layer to add the node to - node_id: ID of the node - name: Name of the node - node_type: Type of the node - description: Description of the node - properties: Additional node properties - - Returns: - Created node - """ - # Verify layer exists - if layer_id not in self.layers: - raise ValueError(f"Layer with ID {layer_id} does not exist") - - # Create node - node = Node( - id=node_id, - name=name, - type=node_type, - description=description, - layer_id=layer_id, - properties=properties - ) - - # Add node to layer - self.layered_graph.add_node_to_layer(node, layer_id) - - # Keep track of node - self.layer_nodes[layer_id].append(node_id) - - return node - - def add_edge_to_layer( - self, - layer_id: str, - source_node_id: str, - target_node_id: str, - relationship_name: str, - properties: Optional[Dict[str, Any]] = None - ) -> Edge: - """ - Add an edge to a specific layer. - - Args: - layer_id: ID of the layer to add the edge to - source_node_id: ID of the source node - target_node_id: ID of the target node - relationship_name: Name of the relationship - properties: Additional edge properties - - Returns: - Created edge - """ - # Verify layer exists - if layer_id not in self.layers: - raise ValueError(f"Layer with ID {layer_id} does not exist") - - # Create edge - edge = Edge( - source_node_id=source_node_id, - target_node_id=target_node_id, - relationship_name=relationship_name, - layer_id=layer_id, - properties=properties - ) - - # Add edge to layer - self.layered_graph.add_edge_to_layer(edge, layer_id) - - # Keep track of edge - self.layer_edges[layer_id].append((source_node_id, target_node_id, relationship_name)) - - return edge - - def add_subgraph_to_layer( - self, - layer_id: str, - subgraph: KnowledgeGraph, - id_prefix: str = "" - ) -> Dict[str, str]: - """ - Add an entire subgraph to a layer, with optional ID prefixing to avoid conflicts. - - Args: - layer_id: ID of the layer to add the subgraph to - subgraph: Knowledge graph to add - id_prefix: Prefix to add to node IDs to avoid conflicts - - Returns: - Dictionary mapping original node IDs to new node IDs - """ - # Verify layer exists - if layer_id not in self.layers: - raise ValueError(f"Layer with ID {layer_id} does not exist") - - # Map to track original to new node IDs - id_mapping = {} - - # Add nodes - for node in subgraph.nodes: - new_id = f"{id_prefix}{node.id}" if id_prefix else node.id - - # Create a copy of the node with the new ID and layer - new_node = Node( - id=new_id, - name=node.name, - type=node.type, - description=node.description, - layer_id=layer_id, - properties=node.properties - ) - - # Add to layered graph - self.layered_graph.add_node_to_layer(new_node, layer_id) - - # Keep track of mapping and node - id_mapping[node.id] = new_id - self.layer_nodes[layer_id].append(new_id) - - # Add edges - for edge in subgraph.edges: - # Map source and target IDs - new_source_id = id_mapping.get(edge.source_node_id, edge.source_node_id) - new_target_id = id_mapping.get(edge.target_node_id, edge.target_node_id) - - # Create a copy of the edge with new IDs and layer - new_edge = Edge( - source_node_id=new_source_id, - target_node_id=new_target_id, - relationship_name=edge.relationship_name, - layer_id=layer_id, - properties=edge.properties - ) - - # Add to layered graph - self.layered_graph.add_edge_to_layer(new_edge, layer_id) - - # Keep track of edge - self.layer_edges[layer_id].append((new_source_id, new_target_id, edge.relationship_name)) - - return id_mapping - - def build(self) -> LayeredKnowledgeGraph: - """ - Build and return the layered knowledge graph. - - Returns: - Complete layered knowledge graph - """ - return self.layered_graph - - -async def convert_to_layered_graph( - knowledge_graph: KnowledgeGraph, - layer_name: str = "Base Layer", - layer_description: str = "Original knowledge graph", - graph_name: str = "Layered Knowledge Graph", - graph_description: str = "Layered knowledge graph converted from standard knowledge graph" -) -> LayeredKnowledgeGraph: - """ - Convert a standard knowledge graph to a layered knowledge graph with one layer. - - Args: - knowledge_graph: Standard knowledge graph to convert - layer_name: Name for the base layer - layer_description: Description for the base layer - graph_name: Name for the layered graph - graph_description: Description for the layered graph - - Returns: - Layered knowledge graph with one layer containing all nodes and edges - """ - builder = LayeredGraphBuilder(name=graph_name, description=graph_description) - - # Create base layer - layer_id = builder.create_layer( - name=layer_name, - description=layer_description, - layer_type="base" - ) - - # Add all nodes and edges to the layer - builder.add_subgraph_to_layer(layer_id, knowledge_graph) - - return builder.build() \ No newline at end of file diff --git a/cognee/modules/graph/layered_graph_service.py b/cognee/modules/graph/layered_graph_service.py deleted file mode 100644 index 094a06a86..000000000 --- a/cognee/modules/graph/layered_graph_service.py +++ /dev/null @@ -1,569 +0,0 @@ -""" -Layered Knowledge Graph Service for Cognee. - -This module provides services for working with layered knowledge graphs. -""" - -import logging -from typing import Dict, List, Optional, Set, Any, Union, Tuple - -from cognee.shared.data_models import ( - LayeredKnowledgeGraph, - KnowledgeGraph, - Layer, - Node, - Edge -) -from cognee.modules.graph.layered_graph_builder import LayeredGraphBuilder - -logger = logging.getLogger(__name__) - - -class LayeredGraphService: - """ - Service for working with layered knowledge graphs in Cognee. - - This service provides methods for manipulating, merging, and querying - layered knowledge graphs. - """ - - @staticmethod - async def merge_layers( - layered_graph: LayeredKnowledgeGraph, - layer_ids: List[str], - new_layer_name: str, - new_layer_description: str, - new_layer_type: str = "merged", - handle_conflicts: str = "overwrite" - ) -> Tuple[str, LayeredKnowledgeGraph]: - """ - Merge multiple layers into a new layer in the graph. - - Args: - layered_graph: The layered knowledge graph - layer_ids: List of layer IDs to merge - new_layer_name: Name for the merged layer - new_layer_description: Description for the merged layer - new_layer_type: Type for the merged layer - handle_conflicts: How to handle node/edge conflicts: "overwrite", "keep_first", or "keep_original" - - Returns: - Tuple of (new layer ID, updated layered graph) - """ - # Create a copy of the layered graph to avoid modifying the original - builder = LayeredGraphBuilder(name=layered_graph.name, description=layered_graph.description) - - # Copy existing layers - for layer in layered_graph.layers: - if layer.id not in layer_ids: # Skip layers that will be merged - parent_layers = [p for p in layer.parent_layers if p not in layer_ids] - builder.create_layer( - name=layer.name, - description=layer.description, - layer_type=layer.layer_type, - parent_layers=parent_layers, - layer_id=layer.id, - properties=layer.properties - ) - - # Create the new merged layer - new_layer_id = builder.create_layer( - name=new_layer_name, - description=new_layer_description, - layer_type=new_layer_type - ) - - # Keep track of processed nodes and edges to handle conflicts - processed_nodes: Set[str] = set() - processed_edges: Set[tuple] = set() - - # Merge nodes and edges from specified layers - for layer_id in layer_ids: - layer_graph = layered_graph.get_layer_graph(layer_id) - - # Process nodes - for node in layer_graph.nodes: - if node.id in processed_nodes and handle_conflicts == "keep_first": - continue - - builder.add_node_to_layer( - layer_id=new_layer_id, - node_id=node.id, - name=node.name, - node_type=node.type, - description=node.description, - properties=node.properties - ) - processed_nodes.add(node.id) - - # Process edges - for edge in layer_graph.edges: - edge_key = (edge.source_node_id, edge.target_node_id, edge.relationship_name) - - if edge_key in processed_edges and handle_conflicts == "keep_first": - continue - - builder.add_edge_to_layer( - layer_id=new_layer_id, - source_node_id=edge.source_node_id, - target_node_id=edge.target_node_id, - relationship_name=edge.relationship_name, - properties=edge.properties - ) - processed_edges.add(edge_key) - - return new_layer_id, builder.build() - - @staticmethod - async def diff_layers( - layered_graph: LayeredKnowledgeGraph, - base_layer_id: str, - comparison_layer_id: str - ) -> Dict[str, Any]: - """ - Compare two layers and return their differences. - - Args: - layered_graph: The layered knowledge graph - base_layer_id: ID of the base layer for comparison - comparison_layer_id: ID of the layer to compare against the base - - Returns: - Dictionary containing differences between the layers - """ - base_graph = layered_graph.get_layer_graph(base_layer_id) - comparison_graph = layered_graph.get_layer_graph(comparison_layer_id) - - # Get node IDs and edge keys for easy comparison - base_node_ids = {node.id for node in base_graph.nodes} - comparison_node_ids = {node.id for node in comparison_graph.nodes} - - base_edge_keys = { - (edge.source_node_id, edge.target_node_id, edge.relationship_name) - for edge in base_graph.edges - } - comparison_edge_keys = { - (edge.source_node_id, edge.target_node_id, edge.relationship_name) - for edge in comparison_graph.edges - } - - # Calculate differences - added_nodes = comparison_node_ids - base_node_ids - removed_nodes = base_node_ids - comparison_node_ids - common_nodes = base_node_ids.intersection(comparison_node_ids) - - added_edges = comparison_edge_keys - base_edge_keys - removed_edges = base_edge_keys - comparison_edge_keys - common_edges = base_edge_keys.intersection(comparison_edge_keys) - - # Find modified nodes (same ID but different properties) - modified_nodes = [] - for node_id in common_nodes: - base_node = next((node for node in base_graph.nodes if node.id == node_id), None) - comparison_node = next((node for node in comparison_graph.nodes if node.id == node_id), None) - - if base_node is not None and comparison_node is not None: - # Check if properties differ - if (base_node.name != comparison_node.name or - base_node.type != comparison_node.type or - base_node.description != comparison_node.description or - base_node.properties != comparison_node.properties): - modified_nodes.append(node_id) - - # Find modified edges (same key but different properties) - modified_edges = [] - for edge_key in common_edges: - base_edge = next((edge for edge in base_graph.edges - if (edge.source_node_id, edge.target_node_id, edge.relationship_name) == edge_key), None) - comparison_edge = next((edge for edge in comparison_graph.edges - if (edge.source_node_id, edge.target_node_id, edge.relationship_name) == edge_key), None) - - if base_edge is not None and comparison_edge is not None: - # Check if properties differ - if base_edge.properties != comparison_edge.properties: - modified_edges.append(edge_key) - - return { - "added_nodes": list(added_nodes), - "removed_nodes": list(removed_nodes), - "modified_nodes": modified_nodes, - "common_nodes": list(common_nodes), - "added_edges": list(added_edges), - "removed_edges": list(removed_edges), - "modified_edges": modified_edges, - "common_edges": list(common_edges), - "node_count_diff": len(comparison_graph.nodes) - len(base_graph.nodes), - "edge_count_diff": len(comparison_graph.edges) - len(base_graph.edges) - } - - @staticmethod - async def extract_subgraph( - layered_graph: LayeredKnowledgeGraph, - layer_ids: List[str] = None, - include_cumulative: bool = False, - node_filter: Optional[callable] = None, - edge_filter: Optional[callable] = None - ) -> KnowledgeGraph: - """ - Extract a subgraph from a layered graph based on specified filters. - - Args: - layered_graph: The layered knowledge graph - layer_ids: List of layer IDs to include (None = all layers) - include_cumulative: Whether to include parent layers in extraction - node_filter: Optional function to filter nodes (takes Node, returns bool) - edge_filter: Optional function to filter edges (takes Edge, returns bool) - - Returns: - Knowledge graph containing the filtered subgraph - """ - # If no layer IDs specified, use all layers - if layer_ids is None: - layer_ids = [layer.id for layer in layered_graph.layers] - - # Initialize lists for nodes and edges - nodes = [] - edges = [] - - # Process each specified layer - for layer_id in layer_ids: - # Get graph for this layer (cumulative or not) - if include_cumulative: - layer_graph = layered_graph.get_cumulative_layer_graph(layer_id) - else: - layer_graph = layered_graph.get_layer_graph(layer_id) - - # Apply node filter if provided - if node_filter is not None: - filtered_nodes = [node for node in layer_graph.nodes if node_filter(node)] - else: - filtered_nodes = layer_graph.nodes - - # Get IDs of nodes that passed the filter - filtered_node_ids = {node.id for node in filtered_nodes} - - # Add filtered nodes to result - nodes.extend(filtered_nodes) - - # Apply edge filter if provided, and ensure edges connect to filtered nodes - if edge_filter is not None: - filtered_edges = [ - edge for edge in layer_graph.edges - if edge_filter(edge) and - edge.source_node_id in filtered_node_ids and - edge.target_node_id in filtered_node_ids - ] - else: - filtered_edges = [ - edge for edge in layer_graph.edges - if edge.source_node_id in filtered_node_ids and - edge.target_node_id in filtered_node_ids - ] - - # Add filtered edges to result - edges.extend(filtered_edges) - - # Remove duplicate nodes and edges based on IDs/keys - unique_nodes = {} - for node in nodes: - unique_nodes[node.id] = node - - unique_edges = {} - for edge in edges: - edge_key = (edge.source_node_id, edge.target_node_id, edge.relationship_name) - unique_edges[edge_key] = edge - - # Create and return the knowledge graph - return KnowledgeGraph( - nodes=list(unique_nodes.values()), - edges=list(unique_edges.values()), - name=f"Subgraph from {layered_graph.name}", - description=f"Subgraph extracted from {layered_graph.name} with {len(layer_ids)} layer(s)" - ) - - @staticmethod - async def analyze_layer_dependencies(layered_graph: LayeredKnowledgeGraph) -> Dict[str, Any]: - """ - Analyze dependencies between layers and return a dependency structure. - - Args: - layered_graph: The layered knowledge graph - - Returns: - Dictionary containing layer dependency analysis - """ - # Initialize dictionaries for dependency analysis - dependencies = {} # layer_id -> set of direct parent layer IDs - reverse_dependencies = {} # layer_id -> set of direct child layer IDs - all_dependencies = {} # layer_id -> set of all ancestor layer IDs - layer_depth = {} # layer_id -> depth in dependency hierarchy - - # Create lookup for layers by ID - layers_by_id = {layer.id: layer for layer in layered_graph.layers} - - # Build direct dependencies and reverse dependencies - for layer in layered_graph.layers: - dependencies[layer.id] = set(layer.parent_layers) - - # Initialize reverse dependencies sets - for parent_id in layer.parent_layers: - if parent_id not in reverse_dependencies: - reverse_dependencies[parent_id] = set() - reverse_dependencies[parent_id].add(layer.id) - - # Initialize reverse dependencies for layers with no children - for layer_id in dependencies: - if layer_id not in reverse_dependencies: - reverse_dependencies[layer_id] = set() - - # Calculate all dependencies using depth-first search - def get_all_dependencies(layer_id): - if layer_id in all_dependencies: - return all_dependencies[layer_id] - - all_deps = set() - for parent_id in dependencies.get(layer_id, set()): - all_deps.add(parent_id) - all_deps.update(get_all_dependencies(parent_id)) - - all_dependencies[layer_id] = all_deps - return all_deps - - # Calculate all dependencies for each layer - for layer_id in dependencies: - get_all_dependencies(layer_id) - - # Calculate layer depths - roots = [layer_id for layer_id, parents in dependencies.items() if not parents] - - # Set depth 0 for root layers - for root in roots: - layer_depth[root] = 0 - - # Calculate depths using breadth-first search - visited = set(roots) - queue = [(root, 0) for root in roots] - - while queue: - layer_id, depth = queue.pop(0) - - for child_id in reverse_dependencies.get(layer_id, set()): - # Calculate child depth as max(current depth + 1, existing depth) - child_depth = depth + 1 - if child_id in layer_depth: - child_depth = max(child_depth, layer_depth[child_id]) - - layer_depth[child_id] = child_depth - - if child_id not in visited: - visited.add(child_id) - queue.append((child_id, child_depth)) - - # Group layers by depth - layers_by_depth = {} - for layer_id, depth in layer_depth.items(): - if depth not in layers_by_depth: - layers_by_depth[depth] = [] - layers_by_depth[depth].append(layer_id) - - # Check for cycles (if a layer is not visited, it's part of a cycle) - cycles = [layer_id for layer_id in dependencies if layer_id not in visited] - - return { - "root_layers": roots, - "leaf_layers": [layer_id for layer_id, children in reverse_dependencies.items() if not children], - "dependencies": dependencies, - "reverse_dependencies": reverse_dependencies, - "all_dependencies": all_dependencies, - "layer_depth": layer_depth, - "layers_by_depth": layers_by_depth, - "max_depth": max(layer_depth.values()) if layer_depth else 0, - "has_cycles": len(cycles) > 0, - "cycle_layers": cycles - } - - @staticmethod - async def filter_graph_by_relationship_types( - layered_graph: LayeredKnowledgeGraph, - relationship_types: List[str], - include_only: bool = True, - layer_ids: List[str] = None - ) -> KnowledgeGraph: - """ - Filter a layered graph to include or exclude specific relationship types. - - Args: - layered_graph: The layered knowledge graph - relationship_types: List of relationship types to filter by - include_only: If True, include only these relationships; if False, exclude them - layer_ids: Optional list of layer IDs to filter (None = all layers) - - Returns: - Filtered knowledge graph - """ - # Define edge filter based on relationship types - def edge_filter(edge): - relationship_match = edge.relationship_name in relationship_types - return relationship_match if include_only else not relationship_match - - # Use extract_subgraph with the edge filter - return await LayeredGraphService.extract_subgraph( - layered_graph=layered_graph, - layer_ids=layer_ids, - include_cumulative=True, - edge_filter=edge_filter - ) - - @staticmethod - async def sort_layers_topologically(layered_graph: LayeredKnowledgeGraph) -> List[str]: - """ - Sort layers in topological order (parents before children). - - Args: - layered_graph: The layered knowledge graph - - Returns: - List of layer IDs in topological order - """ - # First, get the dependency analysis - analysis = await LayeredGraphService.analyze_layer_dependencies(layered_graph) - - # Check for cycles - if analysis["has_cycles"]: - logger.warning("Graph has cycles, topological sort may not be complete") - - # Use the layers_by_depth to create a topological order - layers_by_depth = analysis["layers_by_depth"] - sorted_depths = sorted(layers_by_depth.keys()) - - # Flatten the layers by depth - topological_order = [] - for depth in sorted_depths: - topological_order.extend(layers_by_depth[depth]) - - return topological_order - - @staticmethod - async def find_nodes_by_property( - layered_graph: LayeredKnowledgeGraph, - property_name: str, - property_value: Any, - layer_ids: List[str] = None, - include_cumulative: bool = False - ) -> List[Node]: - """ - Find nodes that have a specific property value. - - Args: - layered_graph: The layered knowledge graph - property_name: Name of the property to search - property_value: Value of the property to match - layer_ids: Optional list of layer IDs to search (None = all layers) - include_cumulative: Whether to include parent layers in the search - - Returns: - List of nodes matching the property value - """ - # Define node filter for the property - def node_filter(node): - if property_name == "id": - return node.id == property_value - elif property_name == "name": - return node.name == property_value - elif property_name == "type": - return node.type == property_value - elif property_name == "description": - return property_value in node.description - else: - # Check node properties - return (property_name in node.properties and - node.properties[property_name] == property_value) - - # Use extract_subgraph with the node filter - subgraph = await LayeredGraphService.extract_subgraph( - layered_graph=layered_graph, - layer_ids=layer_ids, - include_cumulative=include_cumulative, - node_filter=node_filter - ) - - return subgraph.nodes - - @staticmethod - async def calculate_layer_metrics(layered_graph: LayeredKnowledgeGraph) -> Dict[str, Dict[str, Any]]: - """ - Calculate metrics for each layer in the graph. - - Args: - layered_graph: The layered knowledge graph - - Returns: - Dictionary mapping layer IDs to dictionaries of metrics - """ - metrics = {} - - # Get dependency analysis - analysis = await LayeredGraphService.analyze_layer_dependencies(layered_graph) - - # Calculate metrics for each layer - for layer in layered_graph.layers: - layer_id = layer.id - layer_graph = layered_graph.get_layer_graph(layer_id) - cumulative_graph = layered_graph.get_cumulative_layer_graph(layer_id) - - # Count node types in this layer - node_types = {} - for node in layer_graph.nodes: - if node.type not in node_types: - node_types[node.type] = 0 - node_types[node.type] += 1 - - # Count relationship types in this layer - relationship_types = {} - for edge in layer_graph.edges: - if edge.relationship_name not in relationship_types: - relationship_types[edge.relationship_name] = 0 - relationship_types[edge.relationship_name] += 1 - - # Calculate density (ratio of actual to possible edges) - node_count = len(layer_graph.nodes) - edge_count = len(layer_graph.edges) - possible_edges = node_count * (node_count - 1) if node_count > 1 else 0 - density = edge_count / possible_edges if possible_edges > 0 else 0 - - # Store metrics for this layer - metrics[layer_id] = { - "node_count": node_count, - "edge_count": edge_count, - "node_types": node_types, - "relationship_types": relationship_types, - "density": density, - "parent_count": len(layer.parent_layers), - "child_count": len(analysis["reverse_dependencies"].get(layer_id, set())), - "depth": analysis["layer_depth"].get(layer_id, 0), - "is_root": len(layer.parent_layers) == 0, - "is_leaf": len(analysis["reverse_dependencies"].get(layer_id, set())) == 0, - "cumulative_node_count": len(cumulative_graph.nodes), - "cumulative_edge_count": len(cumulative_graph.edges), - "contribution_node_count": len(layer_graph.nodes), - "contribution_edge_count": len(layer_graph.edges) - } - - # Calculate the contribution ratio (layer's nodes/edges as percentage of cumulative) - if metrics[layer_id]["cumulative_node_count"] > 0: - metrics[layer_id]["node_contribution_ratio"] = ( - metrics[layer_id]["contribution_node_count"] / - metrics[layer_id]["cumulative_node_count"] - ) - else: - metrics[layer_id]["node_contribution_ratio"] = 0 - - if metrics[layer_id]["cumulative_edge_count"] > 0: - metrics[layer_id]["edge_contribution_ratio"] = ( - metrics[layer_id]["contribution_edge_count"] / - metrics[layer_id]["cumulative_edge_count"] - ) - else: - metrics[layer_id]["edge_contribution_ratio"] = 0 - - return metrics \ No newline at end of file