diff --git a/cognee/infrastructure/llm/prompts/edge_association_prompt.txt b/cognee/infrastructure/llm/prompts/edge_association_prompt.txt new file mode 100644 index 000000000..b3a758e47 --- /dev/null +++ b/cognee/infrastructure/llm/prompts/edge_association_prompt.txt @@ -0,0 +1,49 @@ +You are an expert graph augmentation assistant. + +Input: +{{subgraph_description}} + +Where: +- “Nodes” is a JSON array of objects, each with: + - “name”: the node’s unique label + - “content”: the full text or description +- “Edges” is a JSON array of objects, each with: + - “source”: name of the first node + - “target”: name of the second node + - “relationship_name”: the existing link label + +Task: +Infer and propose **only new** semantic or conceptual association edges—no structural, containment, usage, or metadata links (e.g. contains, is_part_of, belongs_to_set, uses, applies, leverages, developed, etc.). + +Rules: +1. Do **not** re‑output any existing edge (in either direction). +2. Do **not** output self‑loops (source == target). +3. Only generate associations of these general types: + - **Semantic or syntactic siblings** (e.g. apple ↔ pear) + - **Domain peers** (e.g. Audi ↔ BMW) + - **Conceptual or functional complements** (e.g. pen ↔ paper) + +**Strict Exclusions** +- Do **not** create any edge if the pair is already connected by any existing relationship (in either direction). +- Do **not** output edges whose `relationship_name` is structural or metadata (e.g. contains, is_part_of, belongs_to_set, applies, uses, developed, redefining, leverages, offers_service, etc.). +- Do **not** output self‑loops (source == target). + +For each new edge, output: +- “source”: the first node’s name +- “target”: the second node’s name +- “relationship_name”: a concise snake_case label (e.g. “fruit_siblings”, “luxury_peers”) +- “reason”: a brief justification (e.g. “both are fruits”, “both are European luxury car brands”) + +Output **only** valid JSON in this schema—no extra text: +```json +{ + "new_edges": [ + { + "source": "NodeA", + "target": "NodeB", + "relationship_name": "your_label_here", + "reason": "your_reason_here" + } + … + ] +} diff --git a/cognee/tasks/experimental_tasks/node_set_edge_association.py b/cognee/tasks/experimental_tasks/node_set_edge_association.py new file mode 100644 index 000000000..ce0feb40a --- /dev/null +++ b/cognee/tasks/experimental_tasks/node_set_edge_association.py @@ -0,0 +1,99 @@ +from typing import Union, Optional, Type, List +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.modules.engine.models.node_set import NodeSet +from cognee.shared.data_models import Edge +from pydantic import BaseModel, Field +from cognee.infrastructure.llm.get_llm_client import get_llm_client +from cognee.infrastructure.llm.prompts import render_prompt +from cognee.infrastructure.llm.config import get_llm_config +from uuid import UUID + + +class AssociativeEdge(BaseModel): + source_node: str + target_node: str + relationship_name: str + reason: str + + +class AssociativeEdges(BaseModel): + edges: List[AssociativeEdge] = Field(..., default_factory=list) + + +async def node_set_edge_association(): + graph_engine = await get_graph_engine() + + node_set_names = await graph_engine.query("""MATCH (n) + WHERE n.type = 'NodeSet' + RETURN n.name AS name + """) + + for node_set in node_set_names: + node_name = node_set.get("name", None) + nodes_data, edges_data = await graph_engine.get_subgraph( + node_type=NodeSet, node_name=node_name + ) + nodes = {} + for node_id, attributes in nodes_data: + if node_id not in nodes: + text = attributes.get("text") + if text: + name = text.strip().split("\n")[0][:50] + content = text + else: + name = attributes.get("name", "Unnamed Node") + content = name + nodes[node_id] = {"node": attributes, "name": name, "content": content} + + name_to_uuid = {data["name"].strip().lower(): node_id for node_id, data in nodes.items()} + + subgraph_description = create_subgraph_description(nodes, edges_data) + + llm_client = get_llm_client() + + system_prompt = system_prompt = render_prompt("edge_association_prompt.txt", {}) + associative_edges = await llm_client.acreate_structured_output( + subgraph_description, system_prompt, AssociativeEdges + ) + + graph_edges = [] + for ae in associative_edges.edges: + src_str = name_to_uuid.get(ae.source_node) + tgt_str = name_to_uuid.get(ae.target_node) + if not src_str or not tgt_str: + continue + + src = UUID(src_str) + tgt = UUID(tgt_str) + rel = ae.relationship_name + rea = ae.reason + + props = { + "ontology_valid": False, + "relationship_name": rel, + "source_node_id": src, + "target_node_id": tgt, + "reason": rea, + } + + graph_edges.append((src, tgt, rel, props)) + + if graph_edges: + await graph_engine.add_edges(graph_edges) + + print() + + +def create_subgraph_description(nodes, edges_data): + node_section = "\n".join( + f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n" + for info in nodes.values() + ) + + connection_section = "\n".join( + f"{nodes[source_id]['name']} --[{relationship_type}]--> {nodes[target_id]['name']}" + for source_id, target_id, relationship_type, attributes in edges_data + if source_id in nodes and target_id in nodes + ) + + return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}" diff --git a/examples/python/simple_node_set_example.py b/examples/python/simple_node_set_example.py index 2f891b7d8..167b61df1 100644 --- a/examples/python/simple_node_set_example.py +++ b/examples/python/simple_node_set_example.py @@ -2,24 +2,38 @@ import asyncio import cognee from cognee.shared.logging_utils import get_logger, ERROR from cognee.api.v1.search import SearchType +from cognee.modules.pipelines import run_tasks, Task +from cognee.tasks.experimental_tasks.node_set_edge_association import node_set_edge_association text_a = """ - AI is revolutionizing financial services through intelligent fraud detection - and automated customer service platforms. + Leading financial technology firms like Stripe, Square, and Revolut are redefining digital commerce by embedding AI + into their payment ecosystems. Stripe leverages machine learning to detect and prevent fraud in real time, + while Square uses predictive analytics to offer customized lending solutions to small businesses. + Meanwhile, Revolut applies AI algorithms to automate wealth management services, enabling users to invest, + save, and budget with unparalleled personalization and efficiency. """ text_b = """ - Advances in AI are enabling smarter systems that learn and adapt over time. + Pioneering AI companies such as OpenAI, Anthropic, and DeepMind are advancing self-supervised + learning techniques that empower systems to autonomously evolve their cognitive capabilities. + OpenAI's models interpret complex multimodal data with minimal human annotation, while Anthropic’s + Constitutional AI approach refines alignment and safety. DeepMind continues to push boundaries with + breakthroughs like AlphaFold, illustrating the power of AI to decipher intricate biological structures + without exhaustive manual input. """ text_c = """ - MedTech startups have seen significant growth in recent years, driven by innovation - in digital health and medical devices. + MedTech innovators like Medtronic, Butterfly Network, and Intuitive Surgical are revolutionizing + healthcare delivery through smart devices and AI-driven platforms. Medtronic's connected insulin + pumps enable real-time glucose monitoring, Butterfly Network’s portable ultrasound devices bring + diagnostic imaging to remote areas, and Intuitive Surgical’s da Vinci system enhances precision + in minimally invasive surgeries. Together, these companies are reshaping clinical pathways and + extending care beyond traditional hospital settings. """ -node_set_a = ["AI", "FinTech"] -node_set_b = ["AI"] -node_set_c = ["MedTech"] +node_set_a = ["AI_NODESET", "FinTech_NODESET"] +node_set_b = ["AI_NODESET"] +node_set_c = ["MedTech_NODESET"] async def main(): @@ -31,6 +45,15 @@ async def main(): await cognee.add(text_c, node_set=node_set_c) await cognee.cognify() + tasks = [Task(node_set_edge_association)] + + pipeline = run_tasks(tasks=tasks) + + async for pipeline_status in pipeline: + print(f"Pipeline run status: {pipeline_status.pipeline_name} - {pipeline_status.status}") + + print() + if __name__ == "__main__": logger = get_logger(level=ERROR)