import os import json from cognee.shared.logging_utils import get_logger from cognee.infrastructure.files.storage.LocalFileStorage import LocalFileStorage logger = get_logger() async def cognee_network_visualization(graph_data, destination_file_path: str = None): import networkx import re nodes_data, edges_data = graph_data G = networkx.DiGraph() nodes_list = [] color_map = { "Entity": "#5C10F4", # Default purple, will be yellow if from Reddit "EntityType": "#A550FF", "DocumentChunk": "#808080", # Default gray, will be overridden based on source "TextSummary": "#5C10F4", "TableRow": "#A550FF", "TableType": "#5C10F4", "ColumnValue": "#757470", "SchemaTable": "#A550FF", "DatabaseSchema": "#5C10F4", "SchemaRelationship": "#323332", "default": "#D8D8D8", } # Debug counters reddit_count = 0 research_count = 0 other_count = 0 document_chunk_count = 0 # Track which nodes are from different source types reddit_document_ids = set() research_document_ids = set() # arXiv/research papers other_document_ids = set() # RSS feeds, etc. def adjust_brightness_by_catchiness(base_color, catchiness_score): """Adjust color brightness based on catchiness score (0-10)""" if catchiness_score is None: return base_color # Normalize catchiness to 0-1 range (assuming max is 10) normalized_catchiness = min(max(catchiness_score / 10.0, 0), 1) # Convert hex to RGB hex_color = base_color.lstrip('#') r, g, b = tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) # Increase brightness: interpolate between base color and a brighter version # Higher catchiness = brighter (closer to white, but maintaining hue) brightness_factor = 0.4 + (normalized_catchiness * 0.6) # Range: 0.4 to 1.0 # Apply brightness while maintaining color character r = int(min(255, r + (255 - r) * normalized_catchiness * 0.7)) g = int(min(255, g + (255 - g) * normalized_catchiness * 0.7)) b = int(min(255, b + (255 - b) * normalized_catchiness * 0.7)) return f"#{r:02x}{g:02x}{b:02x}" for node_id, node_info in nodes_data: node_info = node_info.copy() node_info["id"] = str(node_id) # Get node type node_type = node_info.get("type", "default") # For DocumentChunk nodes, parse catchiness and source from the text field catchiness = None source = None is_reddit = False is_research = False is_other = False if node_type == "DocumentChunk": document_chunk_count += 1 text_content = node_info.get("text", "") # Debug: Show first 200 chars of text print(f"\n=== DocumentChunk #{document_chunk_count} Debug ===") print(f"Text preview (first 200 chars): {text_content[:200]}") # Extract catchiness score from text using regex catchiness_match = re.search(r'Catchiness Score:\s*(\d+)/10', text_content) print(f"Catchiness match: {catchiness_match}") if catchiness_match: try: catchiness = int(catchiness_match.group(1)) node_info["catchiness"] = catchiness print(f"Catchiness extracted: {catchiness}") except (ValueError, TypeError) as e: print(f"Error parsing catchiness: {e}") pass else: print("No catchiness score found in text") # Determine source type from text content - check in priority order print(f"Checking for 'Subreddit: r/' in text: {'Subreddit: r/' in text_content}") print(f"Checking for 'arxiv.org' in text: {'arxiv.org' in text_content.lower()}") print(f"Checking for 'RSS Feed:' in text: {'RSS Feed:' in text_content}") if "Subreddit: r/" in text_content: is_reddit = True reddit_count += 1 reddit_document_ids.add(str(node_id)) print("š“ IDENTIFIED AS REDDIT") # Extract subreddit name subreddit_match = re.search(r'Subreddit: (r/\w+)', text_content) if subreddit_match: source = subreddit_match.group(1) node_info["source"] = source print(f"Subreddit: {source}") elif "arxiv.org" in text_content.lower() or "export.arxiv.org" in text_content.lower(): is_research = True research_count += 1 research_document_ids.add(str(node_id)) print("š¢ IDENTIFIED AS RESEARCH/ARXIV") # Extract source details if "RSS Feed:" in text_content: rss_match = re.search(r'RSS Feed:\s*([^\n]+)', text_content) if rss_match: source = rss_match.group(1).strip() node_info["source"] = "Research: " + source print(f"Research Feed: {source}") elif "RSS Feed:" in text_content: is_other = True other_count += 1 other_document_ids.add(str(node_id)) print("šµ IDENTIFIED AS OTHER/RSS") # Extract RSS feed name rss_match = re.search(r'RSS Feed:\s*([^\n]+)', text_content) if rss_match: source = rss_match.group(1).strip() node_info["source"] = source print(f"RSS Feed: {source}") else: print("ā ļø NO SOURCE IDENTIFIED") print(f"Flags - Reddit: {is_reddit}, Research: {is_research}, Other: {is_other}") else: # For non-DocumentChunk nodes, try to get catchiness and source from properties if "catchiness" in node_info: catchiness = node_info["catchiness"] elif "properties" in node_info and isinstance(node_info["properties"], dict): catchiness = node_info["properties"].get("catchiness") if "source" in node_info: source = node_info["source"] elif "properties" in node_info and isinstance(node_info["properties"], dict): source = node_info["properties"].get("source") if source is not None: node_info["source"] = source # Special handling for DocumentChunk nodes - color by source if node_type == "DocumentChunk": if is_reddit: base_color = "#8B0000" # Dark red for Reddit print(f"š“ Base color set to DARK RED (Reddit)") elif is_research: base_color = "#006400" # Dark green for Research/arXiv print(f"š¢ Base color set to DARK GREEN (Research)") elif is_other: base_color = "#00008B" # Dark blue for Other/RSS print(f"šµ Base color set to DARK BLUE (Other/RSS)") else: base_color = "#808080" # Gray fallback print(f"ā« Base color set to GRAY (Unidentified)") else: # Get base color from type for non-DocumentChunk nodes base_color = color_map.get(node_type, "#D3D3D3") # Adjust color brightness based on catchiness for DocumentChunk nodes if node_type == "DocumentChunk" and catchiness is not None: try: catchiness_score = float(catchiness) final_color = adjust_brightness_by_catchiness(base_color, catchiness_score) node_info["color"] = final_color print(f"Final color after brightness adjustment (catchiness={catchiness_score}): {final_color}") except (ValueError, TypeError) as e: node_info["color"] = base_color print(f"Error in brightness adjustment: {e}, using base_color: {base_color}") else: # For non-DocumentChunk nodes, use base color without brightness adjustment node_info["color"] = base_color if node_type == "DocumentChunk": print(f"No catchiness found, using base_color: {base_color}") if catchiness is not None: try: node_info["catchiness"] = float(catchiness) except (ValueError, TypeError): pass node_info["name"] = node_info.get("name", str(node_id)) try: del node_info[ "updated_at" ] #:TODO: We should decide what properties to show on the nodes and edges, we dont necessarily need all. except KeyError: pass try: del node_info["created_at"] except KeyError: pass nodes_list.append(node_info) G.add_node(node_id, **node_info) # Print summary print("\n" + "="*60) print("DOCUMENT CHUNK SUMMARY") print("="*60) print(f"Total DocumentChunks: {document_chunk_count}") print(f"š“ Reddit posts (DARK RED): {reddit_count}") print(f"š¢ Research/arXiv (DARK GREEN): {research_count}") print(f"šµ Other/RSS (DARK BLUE): {other_count}") print(f"ā ļø Unidentified: {document_chunk_count - reddit_count - research_count - other_count}") print(f"IDs tracked - Reddit: {len(reddit_document_ids)}, Research: {len(research_document_ids)}, Other: {len(other_document_ids)}") print("="*60 + "\n") # Build a mapping of entities connected to different source types print("Building entity-to-source mapping from edges...") reddit_connected_nodes = set() research_connected_nodes = set() other_connected_nodes = set() for source, target, relation, edge_info in edges_data: source_id = str(source) target_id = str(target) # Track connections to Reddit documents if source_id in reddit_document_ids: reddit_connected_nodes.add(target_id) if target_id in reddit_document_ids: reddit_connected_nodes.add(source_id) # Track connections to Research documents if source_id in research_document_ids: research_connected_nodes.add(target_id) if target_id in research_document_ids: research_connected_nodes.add(source_id) # Track connections to Other documents if source_id in other_document_ids: other_connected_nodes.add(target_id) if target_id in other_document_ids: other_connected_nodes.add(source_id) print(f"Entities connected to - Reddit: {len(reddit_connected_nodes)}, Research: {len(research_connected_nodes)}, Other: {len(other_connected_nodes)}") # Update Entity node colors based on source connection (priority: Reddit > Research > Other) entity_reddit_count = 0 entity_research_count = 0 entity_other_count = 0 entity_total_count = 0 for node_info in nodes_list: if node_info.get("type") == "Entity": entity_total_count += 1 node_id = node_info.get("id") # Priority order: Reddit > Research > Other if node_id in reddit_connected_nodes: node_info["color"] = "#FF4500" # Orange/red for Reddit entities node_info["source_type"] = "reddit" entity_reddit_count += 1 if entity_reddit_count <= 3: print(f" š Entity '{node_info.get('name', 'Unknown')}' colored orange/red (Reddit)") elif node_id in research_connected_nodes: node_info["color"] = "#00FF00" # Green for Research entities node_info["source_type"] = "research" entity_research_count += 1 if entity_research_count <= 3: print(f" š¢ Entity '{node_info.get('name', 'Unknown')}' colored green (Research)") elif node_id in other_connected_nodes: node_info["color"] = "#5C10F4" # Blue for Other entities (keep original purple/blue) node_info["source_type"] = "other" entity_other_count += 1 if entity_other_count <= 3: print(f" šµ Entity '{node_info.get('name', 'Unknown')}' colored blue (Other)") print(f"\nš Reddit entities (ORANGE #FF4500): {entity_reddit_count} / {entity_total_count}") print(f"š¢ Research entities (GREEN #00FF00): {entity_research_count} / {entity_total_count}") print(f"šµ Other entities (BLUE #5C10F4): {entity_other_count} / {entity_total_count}") print("\n" + "="*60) print("COLOR SCHEME SUMMARY") print("="*60) print("š“ DocumentChunks (Reddit) ā DARK RED #8B0000 (brightness by catchiness)") print("š Entities (Reddit) ā ORANGE/RED #FF4500") print("š¢ DocumentChunks (Research) ā DARK GREEN #006400 (brightness by catchiness)") print("š¢ Entities (Research) ā GREEN #00FF00") print("šµ DocumentChunks (Other) ā DARK BLUE #00008B (brightness by catchiness)") print("šµ Entities (Other) ā BLUE #5C10F4") print("="*60 + "\n") edge_labels = {} links_list = [] for source, target, relation, edge_info in edges_data: source = str(source) target = str(target) G.add_edge(source, target) edge_labels[(source, target)] = relation # Extract edge metadata including all weights all_weights = {} primary_weight = None if edge_info: # Single weight (backward compatibility) if "weight" in edge_info: all_weights["default"] = edge_info["weight"] primary_weight = edge_info["weight"] # Multiple weights if "weights" in edge_info and isinstance(edge_info["weights"], dict): all_weights.update(edge_info["weights"]) # Use the first weight as primary for visual thickness if no default weight if primary_weight is None and edge_info["weights"]: primary_weight = next(iter(edge_info["weights"].values())) # Individual weight fields (weight_strength, weight_confidence, etc.) for key, value in edge_info.items(): if key.startswith("weight_") and isinstance(value, (int, float)): weight_name = key[7:] # Remove "weight_" prefix all_weights[weight_name] = value link_data = { "source": source, "target": target, "relation": relation, "weight": primary_weight, # Primary weight for backward compatibility "all_weights": all_weights, # All weights for display "relationship_type": edge_info.get("relationship_type") if edge_info else None, "edge_info": edge_info if edge_info else {}, } links_list.append(link_data) html_template = """