From 5bee392e6cc7db1e8ca48ac7a1da270305e31dce Mon Sep 17 00:00:00 2001 From: LStromann <152971369+LStromann@users.noreply.github.com> Date: Wed, 5 Nov 2025 13:03:24 +0100 Subject: [PATCH] Visuals for AI news agent blog post graph visuals file that adjusts colours and brightness based on news source and catchiness score --- examples/python/source_specific_visual.py | 999 ++++++++++++++++++++++ 1 file changed, 999 insertions(+) create mode 100644 examples/python/source_specific_visual.py diff --git a/examples/python/source_specific_visual.py b/examples/python/source_specific_visual.py new file mode 100644 index 000000000..b129728e9 --- /dev/null +++ b/examples/python/source_specific_visual.py @@ -0,0 +1,999 @@ +import os +import json + +from cognee.shared.logging_utils import get_logger +from cognee.infrastructure.files.storage.LocalFileStorage import LocalFileStorage + +logger = get_logger() + + +async def cognee_network_visualization(graph_data, destination_file_path: str = None): + import networkx + import re + + nodes_data, edges_data = graph_data + + G = networkx.DiGraph() + + nodes_list = [] + color_map = { + "Entity": "#5C10F4", # Default purple, will be yellow if from Reddit + "EntityType": "#A550FF", + "DocumentChunk": "#808080", # Default gray, will be overridden based on source + "TextSummary": "#5C10F4", + "TableRow": "#A550FF", + "TableType": "#5C10F4", + "ColumnValue": "#757470", + "SchemaTable": "#A550FF", + "DatabaseSchema": "#5C10F4", + "SchemaRelationship": "#323332", + "default": "#D8D8D8", + } + + # Debug counters + reddit_count = 0 + research_count = 0 + other_count = 0 + document_chunk_count = 0 + + # Track which nodes are from different source types + reddit_document_ids = set() + research_document_ids = set() # arXiv/research papers + other_document_ids = set() # RSS feeds, etc. + + def adjust_brightness_by_catchiness(base_color, catchiness_score): + """Adjust color brightness based on catchiness score (0-10)""" + if catchiness_score is None: + return base_color + + # Normalize catchiness to 0-1 range (assuming max is 10) + normalized_catchiness = min(max(catchiness_score / 10.0, 0), 1) + + # Convert hex to RGB + hex_color = base_color.lstrip('#') + r, g, b = tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) + + # Increase brightness: interpolate between base color and a brighter version + # Higher catchiness = brighter (closer to white, but maintaining hue) + brightness_factor = 0.4 + (normalized_catchiness * 0.6) # Range: 0.4 to 1.0 + + # Apply brightness while maintaining color character + r = int(min(255, r + (255 - r) * normalized_catchiness * 0.7)) + g = int(min(255, g + (255 - g) * normalized_catchiness * 0.7)) + b = int(min(255, b + (255 - b) * normalized_catchiness * 0.7)) + + return f"#{r:02x}{g:02x}{b:02x}" + + for node_id, node_info in nodes_data: + node_info = node_info.copy() + node_info["id"] = str(node_id) + + # Get node type + node_type = node_info.get("type", "default") + + # For DocumentChunk nodes, parse catchiness and source from the text field + catchiness = None + source = None + is_reddit = False + is_research = False + is_other = False + + if node_type == "DocumentChunk": + document_chunk_count += 1 + text_content = node_info.get("text", "") + + # Debug: Show first 200 chars of text + print(f"\n=== DocumentChunk #{document_chunk_count} Debug ===") + print(f"Text preview (first 200 chars): {text_content[:200]}") + + # Extract catchiness score from text using regex + catchiness_match = re.search(r'Catchiness Score:\s*(\d+)/10', text_content) + print(f"Catchiness match: {catchiness_match}") + if catchiness_match: + try: + catchiness = int(catchiness_match.group(1)) + node_info["catchiness"] = catchiness + print(f"Catchiness extracted: {catchiness}") + except (ValueError, TypeError) as e: + print(f"Error parsing catchiness: {e}") + pass + else: + print("No catchiness score found in text") + + # Determine source type from text content - check in priority order + print(f"Checking for 'Subreddit: r/' in text: {'Subreddit: r/' in text_content}") + print(f"Checking for 'arxiv.org' in text: {'arxiv.org' in text_content.lower()}") + print(f"Checking for 'RSS Feed:' in text: {'RSS Feed:' in text_content}") + + if "Subreddit: r/" in text_content: + is_reddit = True + reddit_count += 1 + reddit_document_ids.add(str(node_id)) + print("š“ IDENTIFIED AS REDDIT") + # Extract subreddit name + subreddit_match = re.search(r'Subreddit: (r/\w+)', text_content) + if subreddit_match: + source = subreddit_match.group(1) + node_info["source"] = source + print(f"Subreddit: {source}") + elif "arxiv.org" in text_content.lower() or "export.arxiv.org" in text_content.lower(): + is_research = True + research_count += 1 + research_document_ids.add(str(node_id)) + print("š¢ IDENTIFIED AS RESEARCH/ARXIV") + # Extract source details + if "RSS Feed:" in text_content: + rss_match = re.search(r'RSS Feed:\s*([^\n]+)', text_content) + if rss_match: + source = rss_match.group(1).strip() + node_info["source"] = "Research: " + source + print(f"Research Feed: {source}") + elif "RSS Feed:" in text_content: + is_other = True + other_count += 1 + other_document_ids.add(str(node_id)) + print("šµ IDENTIFIED AS OTHER/RSS") + # Extract RSS feed name + rss_match = re.search(r'RSS Feed:\s*([^\n]+)', text_content) + if rss_match: + source = rss_match.group(1).strip() + node_info["source"] = source + print(f"RSS Feed: {source}") + else: + print("ā ļø NO SOURCE IDENTIFIED") + + print(f"Flags - Reddit: {is_reddit}, Research: {is_research}, Other: {is_other}") + else: + # For non-DocumentChunk nodes, try to get catchiness and source from properties + if "catchiness" in node_info: + catchiness = node_info["catchiness"] + elif "properties" in node_info and isinstance(node_info["properties"], dict): + catchiness = node_info["properties"].get("catchiness") + + if "source" in node_info: + source = node_info["source"] + elif "properties" in node_info and isinstance(node_info["properties"], dict): + source = node_info["properties"].get("source") + + if source is not None: + node_info["source"] = source + + # Special handling for DocumentChunk nodes - color by source + if node_type == "DocumentChunk": + if is_reddit: + base_color = "#8B0000" # Dark red for Reddit + print(f"š“ Base color set to DARK RED (Reddit)") + elif is_research: + base_color = "#006400" # Dark green for Research/arXiv + print(f"š¢ Base color set to DARK GREEN (Research)") + elif is_other: + base_color = "#00008B" # Dark blue for Other/RSS + print(f"šµ Base color set to DARK BLUE (Other/RSS)") + else: + base_color = "#808080" # Gray fallback + print(f"ā« Base color set to GRAY (Unidentified)") + else: + # Get base color from type for non-DocumentChunk nodes + base_color = color_map.get(node_type, "#D3D3D3") + + # Adjust color brightness based on catchiness for DocumentChunk nodes + if node_type == "DocumentChunk" and catchiness is not None: + try: + catchiness_score = float(catchiness) + final_color = adjust_brightness_by_catchiness(base_color, catchiness_score) + node_info["color"] = final_color + print(f"Final color after brightness adjustment (catchiness={catchiness_score}): {final_color}") + except (ValueError, TypeError) as e: + node_info["color"] = base_color + print(f"Error in brightness adjustment: {e}, using base_color: {base_color}") + else: + # For non-DocumentChunk nodes, use base color without brightness adjustment + node_info["color"] = base_color + if node_type == "DocumentChunk": + print(f"No catchiness found, using base_color: {base_color}") + if catchiness is not None: + try: + node_info["catchiness"] = float(catchiness) + except (ValueError, TypeError): + pass + + node_info["name"] = node_info.get("name", str(node_id)) + + try: + del node_info[ + "updated_at" + ] #:TODO: We should decide what properties to show on the nodes and edges, we dont necessarily need all. + except KeyError: + pass + + try: + del node_info["created_at"] + except KeyError: + pass + + nodes_list.append(node_info) + G.add_node(node_id, **node_info) + + # Print summary + print("\n" + "="*60) + print("DOCUMENT CHUNK SUMMARY") + print("="*60) + print(f"Total DocumentChunks: {document_chunk_count}") + print(f"š“ Reddit posts (DARK RED): {reddit_count}") + print(f"š¢ Research/arXiv (DARK GREEN): {research_count}") + print(f"šµ Other/RSS (DARK BLUE): {other_count}") + print(f"ā ļø Unidentified: {document_chunk_count - reddit_count - research_count - other_count}") + print(f"IDs tracked - Reddit: {len(reddit_document_ids)}, Research: {len(research_document_ids)}, Other: {len(other_document_ids)}") + print("="*60 + "\n") + + # Build a mapping of entities connected to different source types + print("Building entity-to-source mapping from edges...") + reddit_connected_nodes = set() + research_connected_nodes = set() + other_connected_nodes = set() + + for source, target, relation, edge_info in edges_data: + source_id = str(source) + target_id = str(target) + + # Track connections to Reddit documents + if source_id in reddit_document_ids: + reddit_connected_nodes.add(target_id) + if target_id in reddit_document_ids: + reddit_connected_nodes.add(source_id) + + # Track connections to Research documents + if source_id in research_document_ids: + research_connected_nodes.add(target_id) + if target_id in research_document_ids: + research_connected_nodes.add(source_id) + + # Track connections to Other documents + if source_id in other_document_ids: + other_connected_nodes.add(target_id) + if target_id in other_document_ids: + other_connected_nodes.add(source_id) + + print(f"Entities connected to - Reddit: {len(reddit_connected_nodes)}, Research: {len(research_connected_nodes)}, Other: {len(other_connected_nodes)}") + + # Update Entity node colors based on source connection (priority: Reddit > Research > Other) + entity_reddit_count = 0 + entity_research_count = 0 + entity_other_count = 0 + entity_total_count = 0 + + for node_info in nodes_list: + if node_info.get("type") == "Entity": + entity_total_count += 1 + node_id = node_info.get("id") + + # Priority order: Reddit > Research > Other + if node_id in reddit_connected_nodes: + node_info["color"] = "#FF4500" # Orange/red for Reddit entities + node_info["source_type"] = "reddit" + entity_reddit_count += 1 + if entity_reddit_count <= 3: + print(f" š Entity '{node_info.get('name', 'Unknown')}' colored orange/red (Reddit)") + elif node_id in research_connected_nodes: + node_info["color"] = "#00FF00" # Green for Research entities + node_info["source_type"] = "research" + entity_research_count += 1 + if entity_research_count <= 3: + print(f" š¢ Entity '{node_info.get('name', 'Unknown')}' colored green (Research)") + elif node_id in other_connected_nodes: + node_info["color"] = "#5C10F4" # Blue for Other entities (keep original purple/blue) + node_info["source_type"] = "other" + entity_other_count += 1 + if entity_other_count <= 3: + print(f" šµ Entity '{node_info.get('name', 'Unknown')}' colored blue (Other)") + + print(f"\nš Reddit entities (ORANGE #FF4500): {entity_reddit_count} / {entity_total_count}") + print(f"š¢ Research entities (GREEN #00FF00): {entity_research_count} / {entity_total_count}") + print(f"šµ Other entities (BLUE #5C10F4): {entity_other_count} / {entity_total_count}") + print("\n" + "="*60) + print("COLOR SCHEME SUMMARY") + print("="*60) + print("š“ DocumentChunks (Reddit) ā DARK RED #8B0000 (brightness by catchiness)") + print("š Entities (Reddit) ā ORANGE/RED #FF4500") + print("š¢ DocumentChunks (Research) ā DARK GREEN #006400 (brightness by catchiness)") + print("š¢ Entities (Research) ā GREEN #00FF00") + print("šµ DocumentChunks (Other) ā DARK BLUE #00008B (brightness by catchiness)") + print("šµ Entities (Other) ā BLUE #5C10F4") + print("="*60 + "\n") + + edge_labels = {} + links_list = [] + for source, target, relation, edge_info in edges_data: + source = str(source) + target = str(target) + G.add_edge(source, target) + edge_labels[(source, target)] = relation + + # Extract edge metadata including all weights + all_weights = {} + primary_weight = None + + if edge_info: + # Single weight (backward compatibility) + if "weight" in edge_info: + all_weights["default"] = edge_info["weight"] + primary_weight = edge_info["weight"] + + # Multiple weights + if "weights" in edge_info and isinstance(edge_info["weights"], dict): + all_weights.update(edge_info["weights"]) + # Use the first weight as primary for visual thickness if no default weight + if primary_weight is None and edge_info["weights"]: + primary_weight = next(iter(edge_info["weights"].values())) + + # Individual weight fields (weight_strength, weight_confidence, etc.) + for key, value in edge_info.items(): + if key.startswith("weight_") and isinstance(value, (int, float)): + weight_name = key[7:] # Remove "weight_" prefix + all_weights[weight_name] = value + + link_data = { + "source": source, + "target": target, + "relation": relation, + "weight": primary_weight, # Primary weight for backward compatibility + "all_weights": all_weights, # All weights for display + "relationship_type": edge_info.get("relationship_type") if edge_info else None, + "edge_info": edge_info if edge_info else {}, + } + links_list.append(link_data) + + html_template = """ + + +
+ + + + + + + + +