feat: add experimental cognify pipeline [COG-1293] (#541)

<!-- .github/pull_request_template.md -->

## Description
- Integrate experimental tasks into the evaluation framework
## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced interactive prompt templates for extracting graph nodes,
edge triplets, and relationship names, resulting in more comprehensive
and accurate knowledge graphs.
- Added asynchronous processes to efficiently handle document data and
integrate graph components.
- Launched cascade graph task options to offer enhanced flexibility in
task management workflows.
- Added new functionality for extracting content nodes and relationship
names from text.

- **Refactor**
- Streamlined configurations for prompt processing and task
initialization, improving overall modularity and system stability.
- Updated task getter mechanisms to utilize function-based approaches
for improved flexibility.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Vasilije <8619304+Vasilije1990@users.noreply.github.com>
Co-authored-by: hajdul88 <52442977+hajdul88@users.noreply.github.com>
This commit is contained in:
lxobr 2025-02-25 16:14:27 +01:00 committed by GitHub
parent 55411ff44b
commit 1cb83312fe
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 380 additions and 70 deletions

View file

@ -3,10 +3,13 @@ import logging
from cognee.root_dir import get_absolute_path
def read_query_prompt(prompt_file_name: str):
def read_query_prompt(prompt_file_name: str, base_directory: str = None):
"""Read a query prompt from a file."""
try:
file_path = path.join(get_absolute_path("./infrastructure/llm/prompts"), prompt_file_name)
if base_directory is None:
base_directory = get_absolute_path("./infrastructure/llm/prompts")
file_path = path.join(base_directory, prompt_file_name)
with open(file_path, "r", encoding="utf-8") as file:
return file.read()

View file

@ -2,13 +2,14 @@ from jinja2 import Environment, FileSystemLoader, select_autoescape
from cognee.root_dir import get_absolute_path
def render_prompt(filename: str, context: dict) -> str:
def render_prompt(filename: str, context: dict, base_directory: str = None) -> str:
"""Render a Jinja2 template asynchronously.
:param filename: The name of the template file to render.
:param context: The context to render the template with.
:return: The rendered template as a string."""
# Set the base directory relative to the cognee root directory
if base_directory is None:
base_directory = get_absolute_path("./infrastructure/llm/prompts")
# Initialize the Jinja2 environment to load templates from the filesystem

View file

View file

@ -0,0 +1,18 @@
Using provided potential nodes and relationships, extract concrete edges from the following text. Build upon previously extracted nodes and edges (if any), as this is round {{ round_number }} of {{ total_rounds }}.
**Text:**
{{ text }}
**Potential Nodes to Use:**
{{ potential_nodes }}
**Potential Relationships to Use:**
{{ potential_relationship_names }}
**Previously Extracted Nodes:**
{{ previous_nodes }}
**Previously Extracted Edge Triplets:**
{{ previous_edge_triplets }}
Create specific edge triplets between nodes, ensuring each connection is clearly supported by the text content. Use the potential nodes and relationships as your primary building blocks, while considering previously extracted nodes and edges for consistency and completeness.

View file

@ -0,0 +1,8 @@
You are an expert in knowledge graph building focusing on the extraction of graph triplets.
Your task is to extract structured knowledge graph triplets from text, using as a reference provided list of potential nodes and relationship names.
• Form triplets in the format (start_node, relationship_name, end_node), selecting the most precise and relevant relationship.
• Identify explicit and implied relationships by leveraging the given nodes and relationship names, as well as logical inference.
• Ensure completeness by cross-checking all nodes and relationships across multiple rounds.
• Exclude trivial, redundant, or nonsensical triplets, keeping only meaningful and well-structured connections.
• Add relevant edge triplets beyond the available potential nodes and relationship names.
• Return a list of extracted triplets, ensuring clarity and accuracy for knowledge graph integration.

View file

@ -0,0 +1,7 @@
Extract distinct entities and concepts from the following text to expand the knowledge graph. Build upon previously extracted entities, ensuring completeness and consistency. This is round {{ round_number }} of {{ total_rounds }}.
**Text:**
{{ text }}
**Previously Extracted Entities:**
{{ previous_entities }}

View file

@ -0,0 +1,8 @@
You are an expert in entity extraction and knowledge graph building focusing on the node identification.
Your task is to perform a detailed entity and concept extraction from text to generate a list of potential nodes for a knowledge graph.
• Extract clear, distinct entities and concepts as individual strings.
• Be exhaustive, ensure completeness by capturing all the entities, names, nouns, noun-parts, and implied or implicit mentions.
• Also extract potential entity type nodes, directly mentioned or implied.
• Avoid duplicates and overly generic terms.
• Consider different perspectives and indirect references.
• Return only a list of unique node strings with all the entities.

View file

@ -0,0 +1,15 @@
Analyze the following text to identify relationships between entities in the knowledge graph. This is round {{ round_number }} of {{ total_rounds }}.
**Text:**
{{ text }}
**Previously Extracted Potential Nodes:**
{{ potential_nodes }}
**Nodes Identified in Previous Rounds:**
{{ previous_nodes }}
**Relationships Identified in Previous Rounds:**
{{ previous_relationship_names }}
Extract both explicit and implicit relationships between the nodes, building upon previous findings while ensuring completeness and consistency.

View file

@ -0,0 +1,6 @@
You are an expert in relationship identification and knowledge graph building focusing on relationships. Your task is to perform a detailed extraction of relationship names from the text.
• Extract all relationship names from explicit phrases, verbs, and implied context that could help form edge triplets.
• Use the potential nodes and reassign them to relationship names if they correspond to a relation, verb, action or similar.
• Ensure completeness by working in multiple rounds, capturing overlooked connections and refining the nodes list.
• Focus on meaningful entities and relationship, directly stated or implied and implicit.
• Return two lists: refined nodes and potential relationship names (for forming edges).

View file

@ -0,0 +1,61 @@
from typing import List, Tuple
from pydantic import BaseModel
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.llm.prompts import render_prompt, read_query_prompt
from cognee.root_dir import get_absolute_path
class PotentialNodesAndRelationshipNames(BaseModel):
"""Response model containing lists of potential node names and relationship names."""
nodes: List[str]
relationship_names: List[str]
async def extract_content_nodes_and_relationship_names(
content: str, existing_nodes: List[str], n_rounds: int = 2
) -> Tuple[List[str], List[str]]:
"""Extracts node names and relationship_names from content through multiple rounds of analysis."""
llm_client = get_llm_client()
all_nodes: List[str] = existing_nodes.copy()
all_relationship_names: List[str] = []
existing_node_set = {node.lower() for node in all_nodes}
existing_relationship_names = set()
for round_num in range(n_rounds):
context = {
"text": content,
"potential_nodes": existing_nodes,
"previous_nodes": all_nodes,
"previous_relationship_names": all_relationship_names,
"round_number": round_num + 1,
"total_rounds": n_rounds,
}
base_directory = get_absolute_path("./tasks/graph/cascade_extract/prompts")
text_input = render_prompt(
"extract_graph_relationship_names_prompt_input.txt",
context,
base_directory=base_directory,
)
system_prompt = read_query_prompt(
"extract_graph_relationship_names_prompt_system.txt", base_directory=base_directory
)
response = await llm_client.acreate_structured_output(
text_input=text_input,
system_prompt=system_prompt,
response_model=PotentialNodesAndRelationshipNames,
)
for node in response.nodes:
if node.lower() not in existing_node_set:
all_nodes.append(node)
existing_node_set.add(node.lower())
for relationship_name in response.relationship_names:
if relationship_name.lower() not in existing_relationship_names:
all_relationship_names.append(relationship_name)
existing_relationship_names.add(relationship_name.lower())
return all_nodes, all_relationship_names

View file

@ -0,0 +1,60 @@
from typing import List, Tuple
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.llm.prompts import render_prompt, read_query_prompt
from cognee.shared.data_models import KnowledgeGraph
from cognee.root_dir import get_absolute_path
async def extract_edge_triplets(
content: str, nodes: List[str], relationship_names: List[str], n_rounds: int = 2
) -> KnowledgeGraph:
"""Creates a knowledge graph by identifying relationships between the provided nodes."""
llm_client = get_llm_client()
final_graph = KnowledgeGraph(nodes=[], edges=[])
existing_nodes = set()
existing_node_ids = set()
existing_edge_triplets = set()
for round_num in range(n_rounds):
context = {
"text": content,
"potential_nodes": nodes,
"potential_relationship_names": relationship_names,
"previous_nodes": existing_nodes,
"previous_edge_triplets": existing_edge_triplets,
"round_number": round_num + 1,
"total_rounds": n_rounds,
}
base_directory = get_absolute_path("./tasks/graph/cascade_extract/prompts")
text_input = render_prompt(
"extract_graph_edge_triplets_prompt_input.txt", context, base_directory=base_directory
)
system_prompt = read_query_prompt(
"extract_graph_edge_triplets_prompt_system.txt", base_directory=base_directory
)
extracted_graph = await llm_client.acreate_structured_output(
text_input=text_input, system_prompt=system_prompt, response_model=KnowledgeGraph
)
for node in extracted_graph.nodes:
if node.name not in existing_nodes:
final_graph.nodes.append(node)
existing_nodes.add(node.name)
existing_node_ids.add(node.id)
for edge in extracted_graph.edges:
edge_key = (edge.source_node_id, edge.target_node_id, edge.relationship_name)
if edge_key in existing_edge_triplets:
continue
if not (
edge.source_node_id in existing_node_ids
and edge.target_node_id in existing_node_ids
):
continue
final_graph.edges.append(edge)
existing_edge_triplets.add(edge_key)
return final_graph

View file

@ -0,0 +1,45 @@
from typing import List
from pydantic import BaseModel
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.llm.prompts import render_prompt, read_query_prompt
from cognee.root_dir import get_absolute_path
class PotentialNodes(BaseModel):
"""Response model containing a list of potential node names."""
nodes: List[str]
async def extract_nodes(text: str, n_rounds: int = 2) -> List[str]:
"""Extracts node names from content through multiple rounds of analysis."""
llm_client = get_llm_client()
all_nodes: List[str] = []
existing_nodes = set()
for round_num in range(n_rounds):
context = {
"previous_nodes": all_nodes,
"round_number": round_num + 1,
"total_rounds": n_rounds,
"text": text,
}
base_directory = get_absolute_path("./tasks/graph/cascade_extract/prompts")
text_input = render_prompt(
"extract_graph_nodes_prompt_input.txt", context, base_directory=base_directory
)
system_prompt = read_query_prompt(
"extract_graph_nodes_prompt_system.txt", base_directory=base_directory
)
response = await llm_client.acreate_structured_output(
text_input=text_input, system_prompt=system_prompt, response_model=PotentialNodes
)
for node in response.nodes:
if node.lower() not in existing_nodes:
all_nodes.append(node)
existing_nodes.add(node.lower())
return all_nodes

View file

@ -14,16 +14,10 @@ from cognee.shared.data_models import KnowledgeGraph
from cognee.tasks.storage import add_data_points
async def extract_graph_from_data(
data_chunks: list[DocumentChunk], graph_model: Type[BaseModel]
async def integrate_chunk_graphs(
data_chunks: list[DocumentChunk], chunk_graphs: list, graph_model: Type[BaseModel]
) -> List[DocumentChunk]:
"""
Extracts and integrates a knowledge graph from the text content of document chunks using a specified graph model.
"""
chunk_graphs = await asyncio.gather(
*[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks]
)
"""Updates DocumentChunk objects, integrates data points and edges into databases."""
graph_engine = await get_graph_engine()
if graph_model is not KnowledgeGraph:
@ -52,3 +46,13 @@ async def extract_graph_from_data(
await graph_engine.add_edges(graph_edges)
return data_chunks
async def extract_graph_from_data(
data_chunks: list[DocumentChunk], graph_model: Type[BaseModel]
) -> List[DocumentChunk]:
"""Extracts and integrates a knowledge graph from the text content of document chunks using a specified graph model."""
chunk_graphs = await asyncio.gather(
*[extract_content_graph(chunk.text, graph_model) for chunk in data_chunks]
)
return await integrate_chunk_graphs(data_chunks, chunk_graphs, graph_model)

View file

@ -0,0 +1,40 @@
import asyncio
from typing import List
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.shared.data_models import KnowledgeGraph
from cognee.tasks.graph.cascade_extract.utils.extract_nodes import extract_nodes
from cognee.tasks.graph.cascade_extract.utils.extract_content_nodes_and_relationship_names import (
extract_content_nodes_and_relationship_names,
)
from cognee.tasks.graph.cascade_extract.utils.extract_edge_triplets import (
extract_edge_triplets,
)
from cognee.tasks.graph.extract_graph_from_data import integrate_chunk_graphs
async def extract_graph_from_data(
data_chunks: List[DocumentChunk], n_rounds: int = 2
) -> List[DocumentChunk]:
"""Extract and update graph data from document chunks in multiple steps."""
chunk_nodes = await asyncio.gather(
*[extract_nodes(chunk.text, n_rounds) for chunk in data_chunks]
)
chunk_results = await asyncio.gather(
*[
extract_content_nodes_and_relationship_names(chunk.text, nodes, n_rounds)
for chunk, nodes in zip(data_chunks, chunk_nodes)
]
)
updated_nodes, relationships = zip(*chunk_results)
chunk_graphs = await asyncio.gather(
*[
extract_edge_triplets(chunk.text, nodes, rels, n_rounds)
for chunk, nodes, rels in zip(data_chunks, updated_nodes, relationships)
]
)
return await integrate_chunk_graphs(data_chunks, chunk_graphs, KnowledgeGraph)

View file

@ -1,16 +1,18 @@
import cognee
import logging
from typing import Optional, Tuple, List, Dict, Union, Any
from typing import Optional, Tuple, List, Dict, Union, Any, Callable, Awaitable
from evals.eval_framework.benchmark_adapters.benchmark_adapters import BenchmarkAdapter
from evals.eval_framework.corpus_builder.task_getters.task_getters import TaskGetters
from evals.eval_framework.corpus_builder.task_getters.base_task_getter import BaseTaskGetter
from evals.eval_framework.corpus_builder.task_getters.TaskGetters import TaskGetters
from cognee.modules.pipelines.tasks.Task import Task
from cognee.shared.utils import setup_logging
class CorpusBuilderExecutor:
def __init__(
self, benchmark: Union[str, Any] = "Dummy", task_getter_type: str = "DEFAULT"
self,
benchmark: Union[str, Any] = "Dummy",
task_getter: Callable[..., Awaitable[List[Task]]] = None,
) -> None:
if isinstance(benchmark, str):
try:
@ -23,13 +25,7 @@ class CorpusBuilderExecutor:
self.raw_corpus = None
self.questions = None
try:
task_enum = TaskGetters(task_getter_type)
except KeyError:
raise ValueError(f"Invalid task getter type: {task_getter_type}")
self.task_getter: BaseTaskGetter = task_enum.getter_class()
self.task_getter = task_getter
def load_corpus(self, limit: Optional[int] = None) -> Tuple[List[Dict], List[str]]:
self.raw_corpus, self.questions = self.adapter.load_corpus(limit=limit)
@ -48,5 +44,5 @@ class CorpusBuilderExecutor:
await cognee.add(self.raw_corpus)
tasks = await self.task_getter.get_tasks()
tasks = await self.task_getter()
await cognee.cognify(tasks=tasks)

View file

@ -8,6 +8,7 @@ from cognee.infrastructure.databases.relational.get_relational_engine import (
get_relational_engine,
get_relational_config,
)
from evals.eval_framework.corpus_builder.task_getters.TaskGetters import TaskGetters
async def create_and_insert_questions_table(questions_payload):
@ -30,9 +31,15 @@ async def create_and_insert_questions_table(questions_payload):
async def run_corpus_builder(params: dict) -> None:
if params.get("building_corpus_from_scratch"):
logging.info("Corpus Builder started...")
try:
task_getter = TaskGetters(params.get("task_getter_type", "Default")).getter_func
except KeyError:
raise ValueError(f"Invalid task getter type: {params.get('task_getter_type')}")
corpus_builder = CorpusBuilderExecutor(
benchmark=params["benchmark"],
task_getter_type=params.get("task_getter_type", "Default"),
task_getter=task_getter,
)
questions = await corpus_builder.build_corpus(
limit=params.get("number_of_samples_in_corpus")

View file

@ -0,0 +1,23 @@
from enum import Enum
from typing import Callable, Awaitable, List
from cognee.api.v1.cognify.cognify_v2 import get_default_tasks
from cognee.modules.pipelines.tasks.Task import Task
from evals.eval_framework.corpus_builder.task_getters.get_cascade_graph_tasks import (
get_cascade_graph_tasks,
)
class TaskGetters(Enum):
"""Enum mapping task getter types to their respective functions."""
DEFAULT = ("Default", get_default_tasks)
CASCADE_GRAPH = ("CascadeGraph", get_cascade_graph_tasks)
def __new__(cls, getter_name: str, getter_func: Callable[..., Awaitable[List[Task]]]):
obj = object.__new__(cls)
obj._value_ = getter_name
obj.getter_func = getter_func
return obj
def __str__(self):
return self.value

View file

@ -1,12 +0,0 @@
from abc import ABC, abstractmethod
from typing import List
from cognee.modules.pipelines.tasks.Task import Task
class BaseTaskGetter(ABC):
"""Abstract base class for asynchronous task retrieval implementations."""
@abstractmethod
async def get_tasks(self) -> List[Task]:
"""Asynchronously retrieve a list of tasks. Must be implemented by subclasses."""
pass

View file

@ -1,12 +0,0 @@
from cognee.api.v1.cognify.cognify_v2 import get_default_tasks
from typing import List
from evals.eval_framework.corpus_builder.task_getters.base_task_getter import BaseTaskGetter
from cognee.modules.pipelines.tasks.Task import Task
class DefaultTaskGetter(BaseTaskGetter):
"""Default task getter that retrieves tasks using the standard get_default_tasks function."""
async def get_tasks(self) -> List[Task]:
"""Retrieve default tasks asynchronously."""
return await get_default_tasks()

View file

@ -0,0 +1,51 @@
from typing import List
from pydantic import BaseModel
from cognee.modules.cognify.config import get_cognify_config
from cognee.modules.pipelines.tasks.Task import Task
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
from cognee.shared.data_models import KnowledgeGraph
from cognee.shared.utils import send_telemetry
from cognee.tasks.documents import (
check_permissions_on_documents,
classify_documents,
extract_chunks_from_documents,
)
from cognee.tasks.graph.extract_graph_from_data_v2 import (
extract_graph_from_data,
)
from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_text
from cognee.infrastructure.llm import get_max_chunk_tokens
async def get_cascade_graph_tasks(
user: User = None, graph_model: BaseModel = KnowledgeGraph
) -> List[Task]:
"""Retrieve cascade graph tasks asynchronously."""
if user is None:
user = await get_default_user()
try:
cognee_config = get_cognify_config()
default_tasks = [
Task(classify_documents),
Task(check_permissions_on_documents, user=user, permissions=["write"]),
Task(
extract_chunks_from_documents, max_chunk_tokens=get_max_chunk_tokens()
), # Extract text chunks based on the document type.
Task(
extract_graph_from_data, task_config={"batch_size": 10}
), # Generate knowledge graphs using cascade extraction
Task(
summarize_text,
summarization_model=cognee_config.summarization_model,
task_config={"batch_size": 10},
),
Task(add_data_points, task_config={"batch_size": 10}),
]
except Exception as error:
send_telemetry("cognee.cognify DEFAULT TASKS CREATION ERRORED", user.id)
raise error
return default_tasks

View file

@ -1,19 +0,0 @@
from enum import Enum
from typing import Type
from evals.eval_framework.corpus_builder.task_getters.default_task_getter import DefaultTaskGetter
class TaskGetters(Enum):
"""Enum mapping task getter types to their respective classes."""
DEFAULT = ("Default", DefaultTaskGetter)
# CUSTOM = ("Custom", CustomTaskGetter)
def __new__(cls, getter_name: str, getter_class: Type):
obj = object.__new__(cls)
obj._value_ = getter_name
obj.getter_class = getter_class
return obj
def __str__(self):
return self.value

View file

@ -8,7 +8,7 @@ class EvalConfig(BaseSettings):
building_corpus_from_scratch: bool = True
number_of_samples_in_corpus: int = 1
benchmark: str = "Dummy" # Options: 'HotPotQA', 'Dummy', 'TwoWikiMultiHop'
task_getter_type: str = "Default"
task_getter_type: str = "Default" # Options: 'Default', 'CascadeGraph'
# Question answering params
answering_questions: bool = True