From 44ec81425604c464730d545a3df0e8cc1d5c1938 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Mon, 20 Oct 2025 12:48:11 +0200 Subject: [PATCH 01/25] feat: feedback enrichment preparation --- .../llm/prompts/feedback_reaction_prompt.txt | 14 +++++ .../llm/prompts/feedback_report_prompt.txt | 13 +++++ .../prompts/feedback_user_context_prompt.txt | 5 ++ cognee/tasks/feedback/models.py | 20 +++++++ cognee/tasks/feedback/utils.py | 57 +++++++++++++++++++ 5 files changed, 109 insertions(+) create mode 100644 cognee/infrastructure/llm/prompts/feedback_reaction_prompt.txt create mode 100644 cognee/infrastructure/llm/prompts/feedback_report_prompt.txt create mode 100644 cognee/infrastructure/llm/prompts/feedback_user_context_prompt.txt create mode 100644 cognee/tasks/feedback/models.py create mode 100644 cognee/tasks/feedback/utils.py diff --git a/cognee/infrastructure/llm/prompts/feedback_reaction_prompt.txt b/cognee/infrastructure/llm/prompts/feedback_reaction_prompt.txt new file mode 100644 index 000000000..c77ed8fca --- /dev/null +++ b/cognee/infrastructure/llm/prompts/feedback_reaction_prompt.txt @@ -0,0 +1,14 @@ +A question was previously answered, but the answer received negative feedback. +Please reconsider and improve the response. + +Question: {question} +Context originally used: {context} +Previous answer: {wrong_answer} +Feedback on that answer: {negative_feedback} + +Task: Provide a better response. The new answer should be short and direct. +Then explain briefly why this answer is better. + +Format your reply as: +Answer: +Explanation: diff --git a/cognee/infrastructure/llm/prompts/feedback_report_prompt.txt b/cognee/infrastructure/llm/prompts/feedback_report_prompt.txt new file mode 100644 index 000000000..2d4194f4d --- /dev/null +++ b/cognee/infrastructure/llm/prompts/feedback_report_prompt.txt @@ -0,0 +1,13 @@ +Write a concise, stand-alone paragraph that explains the correct answer to the question below. +The paragraph should read naturally on its own, providing all necessary context and reasoning +so the answer is clear and well-supported. + +Question: {question} +Correct answer: {improved_answer} +Supporting context: {new_context} + +Your paragraph should: +- First sentence clearly states the correct answer as a full sentence +- Remainder flows from first sentence and provides explanation based on context +- Use simple, direct language that is easy to follow +- Use shorter sentences, no long-winded explanations diff --git a/cognee/infrastructure/llm/prompts/feedback_user_context_prompt.txt b/cognee/infrastructure/llm/prompts/feedback_user_context_prompt.txt new file mode 100644 index 000000000..3d9a25f96 --- /dev/null +++ b/cognee/infrastructure/llm/prompts/feedback_user_context_prompt.txt @@ -0,0 +1,5 @@ +Question: {question} +Context: {context} + +Provide a one paragraph human readable summary of this interaction context, +listing all the relevant facts and information in a simple and direct way. diff --git a/cognee/tasks/feedback/models.py b/cognee/tasks/feedback/models.py new file mode 100644 index 000000000..403bc0e13 --- /dev/null +++ b/cognee/tasks/feedback/models.py @@ -0,0 +1,20 @@ +from typing import List, Optional, Union +from uuid import UUID + +from cognee.infrastructure.engine import DataPoint +from cognee.modules.engine.models import Entity +from cognee.tasks.temporal_graph.models import Event + + +class FeedbackEnrichment(DataPoint): + """Minimal DataPoint for feedback enrichment that works with extract_graph_from_data.""" + + text: str + contains: Optional[List[Union[Entity, Event]]] = None + metadata: dict = {"index_fields": ["text"]} + + question: str + original_answer: str + improved_answer: str + feedback_id: UUID + interaction_id: UUID diff --git a/cognee/tasks/feedback/utils.py b/cognee/tasks/feedback/utils.py new file mode 100644 index 000000000..d85b719c5 --- /dev/null +++ b/cognee/tasks/feedback/utils.py @@ -0,0 +1,57 @@ +from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever +from cognee.modules.retrieval.graph_completion_cot_retriever import GraphCompletionCotRetriever +from cognee.modules.retrieval.graph_completion_context_extension_retriever import ( + GraphCompletionContextExtensionRetriever, +) +from cognee.shared.logging_utils import get_logger + + +logger = get_logger("feedback_utils") + + +def create_retriever( + retriever_name: str = "graph_completion_cot", + top_k: int = 20, + user_prompt_path: str = "graph_context_for_question.txt", + system_prompt_path: str = "answer_simple_question.txt", +): + """Factory for retriever instances with configurable top_k and prompt paths.""" + if retriever_name == "graph_completion": + return GraphCompletionRetriever( + top_k=top_k, + save_interaction=False, + user_prompt_path=user_prompt_path, + system_prompt_path=system_prompt_path, + ) + if retriever_name == "graph_completion_cot": + return GraphCompletionCotRetriever( + top_k=top_k, + save_interaction=False, + user_prompt_path=user_prompt_path, + system_prompt_path=system_prompt_path, + ) + if retriever_name == "graph_completion_context_extension": + return GraphCompletionContextExtensionRetriever( + top_k=top_k, + save_interaction=False, + user_prompt_path=user_prompt_path, + system_prompt_path=system_prompt_path, + ) + logger.warning( + "Unknown retriever, defaulting to graph_completion_cot", retriever=retriever_name + ) + return GraphCompletionCotRetriever( + top_k=top_k, + save_interaction=False, + user_prompt_path=user_prompt_path, + system_prompt_path=system_prompt_path, + ) + + +def filter_negative_feedback(feedback_nodes): + """Filter for negative sentiment feedback using precise sentiment classification.""" + return [ + (node_id, props) + for node_id, props in feedback_nodes + if (props.get("sentiment", "").casefold() == "negative" or props.get("score", 0) < 0) + ] From 78fca9feb7f8c6deed283c9f20e099613433eec1 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Mon, 20 Oct 2025 20:07:03 +0200 Subject: [PATCH 02/25] feat: extract feedback interactions --- .../feedback/extract_feedback_interactions.py | 199 ++++++++++++++++++ .../feedback_enrichment_minimal_example.py | 74 +++++++ 2 files changed, 273 insertions(+) create mode 100644 cognee/tasks/feedback/extract_feedback_interactions.py create mode 100644 examples/python/feedback_enrichment_minimal_example.py diff --git a/cognee/tasks/feedback/extract_feedback_interactions.py b/cognee/tasks/feedback/extract_feedback_interactions.py new file mode 100644 index 000000000..44f139d70 --- /dev/null +++ b/cognee/tasks/feedback/extract_feedback_interactions.py @@ -0,0 +1,199 @@ +from __future__ import annotations + +from typing import Dict, List, Optional, Tuple +from uuid import UUID + +from cognee.infrastructure.llm import LLMGateway +from cognee.infrastructure.llm.prompts.read_query_prompt import read_query_prompt +from cognee.shared.logging_utils import get_logger +from cognee.infrastructure.databases.graph import get_graph_engine + +from .utils import filter_negative_feedback + + +logger = get_logger("extract_feedback_interactions") + + +def _get_normalized_id(node_id, props) -> str: + """Return Cognee node id preference: props.id → props.node_id → raw node_id.""" + return str(props.get("id") or props.get("node_id") or node_id) + + +async def _fetch_feedback_and_interaction_graph_data() -> Tuple[List, List]: + """Fetch feedback and interaction nodes with edges from graph engine.""" + try: + graph_engine = await get_graph_engine() + attribute_filters = [{"type": ["CogneeUserFeedback", "CogneeUserInteraction"]}] + return await graph_engine.get_filtered_graph_data(attribute_filters) + except Exception as exc: # noqa: BLE001 + logger.error("Failed to fetch filtered graph data", error=str(exc)) + return [], [] + + +def _separate_feedback_and_interaction_nodes(graph_nodes: List) -> Tuple[List, List]: + """Split nodes into feedback and interaction groups by type field.""" + feedback_nodes = [ + (_get_normalized_id(node_id, props), props) + for node_id, props in graph_nodes + if props.get("type") == "CogneeUserFeedback" + ] + interaction_nodes = [ + (_get_normalized_id(node_id, props), props) + for node_id, props in graph_nodes + if props.get("type") == "CogneeUserInteraction" + ] + return feedback_nodes, interaction_nodes + + +def _match_feedback_nodes_to_interactions_by_edges( + feedback_nodes: List, interaction_nodes: List, graph_edges: List +) -> List[Tuple[Tuple, Tuple]]: + """Match feedback to interactions using gives_feedback_to edges.""" + # Build single lookup maps using normalized Cognee IDs + interaction_by_id = {node_id: (node_id, props) for node_id, props in interaction_nodes} + feedback_by_id = {node_id: (node_id, props) for node_id, props in feedback_nodes} + + # Filter to only gives_feedback_to edges + feedback_edges = [ + (source_id, target_id) + for source_id, target_id, rel, _ in graph_edges + if rel == "gives_feedback_to" + ] + + feedback_interaction_pairs: List[Tuple[Tuple, Tuple]] = [] + for source_id, target_id in feedback_edges: + source_id_str, target_id_str = str(source_id), str(target_id) + + feedback_node = feedback_by_id.get(source_id_str) + interaction_node = interaction_by_id.get(target_id_str) + + if feedback_node and interaction_node: + feedback_interaction_pairs.append((feedback_node, interaction_node)) + + return feedback_interaction_pairs + + +def _sort_pairs_by_recency_and_limit( + feedback_interaction_pairs: List[Tuple[Tuple, Tuple]], last_n_limit: Optional[int] +) -> List[Tuple[Tuple, Tuple]]: + """Sort by interaction created_at desc with updated_at fallback, then limit.""" + + def _recency_key(pair): + _, (_, interaction_props) = pair + created_at = interaction_props.get("created_at") or "" + updated_at = interaction_props.get("updated_at") or "" + return (created_at, updated_at) + + sorted_pairs = sorted(feedback_interaction_pairs, key=_recency_key, reverse=True) + return sorted_pairs[: last_n_limit or len(sorted_pairs)] + + +async def _generate_human_readable_context_summary( + question_text: str, raw_context_text: str +) -> str: + """Generate a concise human-readable summary for given context.""" + try: + prompt = read_query_prompt("feedback_user_context_prompt.txt") + rendered = prompt.format(question=question_text, context=raw_context_text) + return await LLMGateway.acreate_structured_output( + text_input=rendered, system_prompt="", response_model=str + ) + except Exception as exc: # noqa: BLE001 + logger.warning("Failed to summarize context", error=str(exc)) + return raw_context_text or "" + + +def _has_required_feedback_fields(record: Dict) -> bool: + """Validate required fields exist in the item dict.""" + required_fields = [ + "question", + "answer", + "context", + "feedback_text", + "feedback_id", + "interaction_id", + ] + return all(record.get(field_name) is not None for field_name in required_fields) + + +async def _build_feedback_interaction_record( + feedback_node_id: str, feedback_props: Dict, interaction_node_id: str, interaction_props: Dict +) -> Optional[Dict]: + """Build a single feedback-interaction record with context summary.""" + try: + question_text = interaction_props.get("question") + original_answer_text = interaction_props.get("answer") + raw_context_text = interaction_props.get("context", "") + feedback_text = feedback_props.get("feedback") or feedback_props.get("text") or "" + + context_summary_text = await _generate_human_readable_context_summary( + question_text or "", raw_context_text + ) + + feedback_interaction_record = { + "question": question_text, + "answer": original_answer_text, + "context": context_summary_text, + "feedback_text": feedback_text, + "feedback_id": UUID(str(feedback_node_id)), + "interaction_id": UUID(str(interaction_node_id)), + } + + if _has_required_feedback_fields(feedback_interaction_record): + return feedback_interaction_record + else: + logger.warning("Skipping invalid feedback item", interaction=str(interaction_node_id)) + return None + except Exception as exc: # noqa: BLE001 + logger.error("Failed to process feedback pair", error=str(exc)) + return None + + +async def _build_feedback_interaction_records( + matched_feedback_interaction_pairs: List[Tuple[Tuple, Tuple]], +) -> List[Dict]: + """Build all feedback-interaction records from matched pairs.""" + feedback_interaction_records: List[Dict] = [] + for (feedback_node_id, feedback_props), ( + interaction_node_id, + interaction_props, + ) in matched_feedback_interaction_pairs: + record = await _build_feedback_interaction_record( + feedback_node_id, feedback_props, interaction_node_id, interaction_props + ) + if record: + feedback_interaction_records.append(record) + return feedback_interaction_records + + +async def extract_feedback_interactions( + subgraphs: List, last_n: Optional[int] = None +) -> List[Dict]: + """Extract negative feedback-interaction pairs; fetch internally and use last_n param for limiting.""" + graph_nodes, graph_edges = await _fetch_feedback_and_interaction_graph_data() + if not graph_nodes: + return [] + + feedback_nodes, interaction_nodes = _separate_feedback_and_interaction_nodes(graph_nodes) + negative_feedback_nodes = filter_negative_feedback(feedback_nodes) + if not negative_feedback_nodes: + logger.info("No negative feedback found; returning empty list") + return [] + + matched_feedback_interaction_pairs = _match_feedback_nodes_to_interactions_by_edges( + negative_feedback_nodes, interaction_nodes, graph_edges + ) + if not matched_feedback_interaction_pairs: + logger.info("No feedback-to-interaction matches found; returning empty list") + return [] + + matched_feedback_interaction_pairs = _sort_pairs_by_recency_and_limit( + matched_feedback_interaction_pairs, last_n + ) + + feedback_interaction_records = await _build_feedback_interaction_records( + matched_feedback_interaction_pairs + ) + + logger.info("Extracted feedback pairs", count=len(feedback_interaction_records)) + return feedback_interaction_records diff --git a/examples/python/feedback_enrichment_minimal_example.py b/examples/python/feedback_enrichment_minimal_example.py new file mode 100644 index 000000000..a36a7af8a --- /dev/null +++ b/examples/python/feedback_enrichment_minimal_example.py @@ -0,0 +1,74 @@ +import asyncio + +import cognee +from cognee.api.v1.search import SearchType +from cognee.modules.pipelines.tasks.task import Task + +from cognee.tasks.feedback.extract_feedback_interactions import extract_feedback_interactions +from cognee.tasks.feedback.generate_improved_answers import generate_improved_answers + + +CONVERSATION = [ + "Alice: Hey, Bob. Did you talk to Mallory?", + "Bob: Yeah, I just saw her before coming here.", + "Alice: Then she told you to bring my documents, right?", + "Bob: Uh… not exactly. She said you wanted me to bring you donuts. Which sounded kind of odd…", + "Alice: Ugh, she’s so annoying. Thanks for the donuts anyway!", +] + + +async def initialize_conversation_and_graph(conversation): + """Prune data/system, add conversation, cognify.""" + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + await cognee.add(conversation) + await cognee.cognify() + + +async def run_question_and_submit_feedback(question_text: str) -> bool: + """Ask question, submit feedback based on correctness, and return correctness flag.""" + result = await cognee.search( + query_type=SearchType.GRAPH_COMPLETION, + query_text=question_text, + save_interaction=True, + ) + answer_text = str(result).lower() + mentions_mallory = "mallory" in answer_text + feedback_text = ( + "Great answers, very helpful!" + if mentions_mallory + else "The answer about Bob and donuts was wrong." + ) + await cognee.search( + query_type=SearchType.FEEDBACK, + query_text=feedback_text, + last_k=2, + ) + return mentions_mallory + + +async def run_feedback_enrichment_memify(last_n: int = 5): + """Execute memify with extraction and answer improvement tasks.""" + # Instantiate tasks with their own kwargs + extraction_tasks = [Task(extract_feedback_interactions, last_n=last_n)] + enrichment_tasks = [ + Task(generate_improved_answers, retriever_name="graph_completion_cot", top_k=20) + ] + await cognee.memify( + extraction_tasks=extraction_tasks, + enrichment_tasks=enrichment_tasks, + data=[{}], # A placeholder to prevent fetching the entire graph + dataset="feedback_enrichment_minimal", + ) + + +async def main(): + # await initialize_conversation_and_graph(CONVERSATION) + # is_correct = await run_question_and_submit_feedback("Who told Bob to bring the donuts?") + is_correct = False + if not is_correct: + await run_feedback_enrichment_memify(last_n=5) + + +if __name__ == "__main__": + asyncio.run(main()) From 97eb89386ececf8acaa5cf26161ba7892dc4595e Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Mon, 20 Oct 2025 20:07:16 +0200 Subject: [PATCH 03/25] feat: generate improved answers temp --- .../feedback/generate_improved_answers.py | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 cognee/tasks/feedback/generate_improved_answers.py diff --git a/cognee/tasks/feedback/generate_improved_answers.py b/cognee/tasks/feedback/generate_improved_answers.py new file mode 100644 index 000000000..88cbfae8c --- /dev/null +++ b/cognee/tasks/feedback/generate_improved_answers.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +from typing import Dict, List, Optional, Tuple +from pydantic import BaseModel + +from cognee.infrastructure.llm import LLMGateway +from cognee.infrastructure.llm.prompts.read_query_prompt import read_query_prompt +from cognee.modules.graph.utils import resolve_edges_to_text +from cognee.shared.logging_utils import get_logger + +from .utils import create_retriever + + +class ImprovedAnswerResponse(BaseModel): + """Response model for improved answer generation containing answer and explanation.""" + + answer: str + explanation: str + + +logger = get_logger("generate_improved_answers") + + +def _validate_input_data(feedback_interactions: List[Dict]) -> bool: + """Validate that input contains required fields for all items.""" + required_fields = [ + "question", + "answer", + "context", + "feedback_text", + "feedback_id", + "interaction_id", + ] + return all( + all(item.get(field) is not None for field in required_fields) + for item in feedback_interactions + ) + + +def _render_reaction_prompt( + question: str, context: str, wrong_answer: str, negative_feedback: str +) -> str: + """Render the feedback reaction prompt with provided variables.""" + prompt_template = read_query_prompt("feedback_reaction_prompt.txt") + return prompt_template.format( + question=question, + context=context, + wrong_answer=wrong_answer, + negative_feedback=negative_feedback, + ) + + +async def _generate_improved_answer_for_single_interaction( + feedback_interaction: Dict, retriever, reaction_prompt_location: str +) -> Optional[Dict]: + """Generate improved answer for a single feedback-interaction pair using structured retriever completion.""" + try: + question_text = feedback_interaction["question"] + original_answer_text = feedback_interaction["answer"] + context_text = feedback_interaction["context"] + feedback_text = feedback_interaction["feedback_text"] + + query_text = _render_reaction_prompt( + question_text, context_text, original_answer_text, feedback_text + ) + + retrieved_context = await retriever.get_context(query_text) + completion, new_context_text = await retriever.get_structured_completion( + query=query_text, context=retrieved_context, response_model=ImprovedAnswerResponse + ) + + if completion: + return { + **feedback_interaction, + "improved_answer": completion.answer, + "new_context": new_context_text, + "explanation": completion.explanation, + } + else: + logger.warning( + "Failed to get structured completion from retriever", question=question_text + ) + return None + + except Exception as exc: # noqa: BLE001 + logger.error( + "Failed to generate improved answer", + error=str(exc), + question=feedback_interaction.get("question"), + ) + return None + + +async def generate_improved_answers( + feedback_interactions: List[Dict], + retriever_name: str = "graph_completion_cot", + top_k: int = 20, + reaction_prompt_location: str = "feedback_reaction_prompt.txt", +) -> List[Dict]: + """Generate improved answers using configurable retriever and LLM.""" + if not feedback_interactions: + logger.info("No feedback interactions provided; returning empty list") + return [] + + if not _validate_input_data(feedback_interactions): + logger.error("Input data validation failed; missing required fields") + return [] + + retriever = create_retriever( + retriever_name=retriever_name, + top_k=top_k, + user_prompt_path="graph_context_for_question.txt", + system_prompt_path="answer_simple_question.txt", + ) + + improved_answers: List[Dict] = [] + successful_count = 0 + failed_count = 0 + + for feedback_interaction in feedback_interactions: + result = await _generate_improved_answer_for_single_interaction( + feedback_interaction, retriever, reaction_prompt_location + ) + + if result: + improved_answers.append(result) + successful_count += 1 + else: + failed_count += 1 + + logger.info("Generated improved answers", successful=successful_count, failed=failed_count) + return improved_answers From 1e1fac32611fb7e77201c6937776ea4100844813 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Mon, 20 Oct 2025 23:43:41 +0200 Subject: [PATCH 04/25] feat: allow structured output in the cot retriever --- .../graph_completion_cot_retriever.py | 201 +++++++++++++----- .../graph_completion_retriever_cot_test.py | 51 +++++ 2 files changed, 201 insertions(+), 51 deletions(-) diff --git a/cognee/modules/retrieval/graph_completion_cot_retriever.py b/cognee/modules/retrieval/graph_completion_cot_retriever.py index 4602dd59d..55cbcfce5 100644 --- a/cognee/modules/retrieval/graph_completion_cot_retriever.py +++ b/cognee/modules/retrieval/graph_completion_cot_retriever.py @@ -1,4 +1,6 @@ +import json from typing import Optional, List, Type, Any +from pydantic import BaseModel from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge from cognee.shared.logging_utils import get_logger @@ -10,6 +12,20 @@ from cognee.infrastructure.llm.prompts import render_prompt, read_query_prompt logger = get_logger() +def _as_answer_text(completion: Any) -> str: + """Convert completion to human-readable text for validation and follow-up prompts.""" + if isinstance(completion, str): + return completion + if isinstance(completion, BaseModel): + # Add notice that this is a structured response + json_str = completion.model_dump_json(indent=2) + return f"[Structured Response]\n{json_str}" + try: + return json.dumps(completion, indent=2) + except TypeError: + return str(completion) + + class GraphCompletionCotRetriever(GraphCompletionRetriever): """ Handles graph completion by generating responses based on a series of interactions with @@ -54,6 +70,134 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): self.followup_system_prompt_path = followup_system_prompt_path self.followup_user_prompt_path = followup_user_prompt_path + async def _run_cot_completion( + self, + query: str, + context: Optional[List[Edge]] = None, + max_iter: int = 4, + response_model: Type = str, + ) -> tuple[Any, str, List[Edge]]: + """ + Run chain-of-thought completion with optional structured output. + + Returns: + -------- + - completion_result: The generated completion (string or structured model) + - context_text: The resolved context text + - triplets: The list of triplets used + """ + followup_question = "" + triplets = [] + completion = "" + + for round_idx in range(max_iter + 1): + if round_idx == 0: + if context is None: + triplets = await self.get_context(query) + context_text = await self.resolve_edges_to_text(triplets) + else: + context_text = await self.resolve_edges_to_text(context) + else: + triplets += await self.get_context(followup_question) + context_text = await self.resolve_edges_to_text(list(set(triplets))) + + if response_model is str: + completion = await generate_completion( + query=query, + context=context_text, + user_prompt_path=self.user_prompt_path, + system_prompt_path=self.system_prompt_path, + system_prompt=self.system_prompt, + ) + else: + args = {"question": query, "context": context_text} + user_prompt = render_prompt(self.user_prompt_path, args) + system_prompt = ( + self.system_prompt + if self.system_prompt + else read_query_prompt(self.system_prompt_path) + ) + + completion = await LLMGateway.acreate_structured_output( + text_input=user_prompt, + system_prompt=system_prompt, + response_model=response_model, + ) + + logger.info(f"Chain-of-thought: round {round_idx} - answer: {completion}") + + if round_idx < max_iter: + answer_text = _as_answer_text(completion) + valid_args = {"query": query, "answer": answer_text, "context": context_text} + valid_user_prompt = render_prompt( + filename=self.validation_user_prompt_path, context=valid_args + ) + valid_system_prompt = read_query_prompt( + prompt_file_name=self.validation_system_prompt_path + ) + + reasoning = await LLMGateway.acreate_structured_output( + text_input=valid_user_prompt, + system_prompt=valid_system_prompt, + response_model=str, + ) + followup_args = {"query": query, "answer": answer_text, "reasoning": reasoning} + followup_prompt = render_prompt( + filename=self.followup_user_prompt_path, context=followup_args + ) + followup_system = read_query_prompt( + prompt_file_name=self.followup_system_prompt_path + ) + + followup_question = await LLMGateway.acreate_structured_output( + text_input=followup_prompt, system_prompt=followup_system, response_model=str + ) + logger.info( + f"Chain-of-thought: round {round_idx} - follow-up question: {followup_question}" + ) + + return completion, context_text, triplets + + async def get_structured_completion( + self, + query: str, + context: Optional[List[Edge]] = None, + max_iter: int = 4, + response_model: Type = str, + ) -> Any: + """ + Generate structured completion responses based on a user query and contextual information. + + This method applies the same chain-of-thought logic as get_completion but returns + structured output using the provided response model. + + Parameters: + ----------- + - query (str): The user's query to be processed and answered. + - context (Optional[List[Edge]]): Optional context that may assist in answering the query. + If not provided, it will be fetched based on the query. (default None) + - max_iter: The maximum number of iterations to refine the answer and generate + follow-up questions. (default 4) + - response_model (Type): The Pydantic model type for structured output. (default str) + + Returns: + -------- + - Any: The generated structured completion based on the response model. + """ + completion, context_text, triplets = await self._run_cot_completion( + query=query, + context=context, + max_iter=max_iter, + response_model=response_model, + ) + + if self.save_interaction and context and triplets and completion: + await self.save_qa( + question=query, answer=str(completion), context=context_text, triplets=triplets + ) + + return completion + async def get_completion( self, query: str, @@ -82,57 +226,12 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): - List[str]: A list containing the generated answer to the user's query. """ - followup_question = "" - triplets = [] - completion = "" - - for round_idx in range(max_iter + 1): - if round_idx == 0: - if context is None: - triplets = await self.get_context(query) - context_text = await self.resolve_edges_to_text(triplets) - else: - context_text = await self.resolve_edges_to_text(context) - else: - triplets += await self.get_context(followup_question) - context_text = await self.resolve_edges_to_text(list(set(triplets))) - - completion = await generate_completion( - query=query, - context=context_text, - user_prompt_path=self.user_prompt_path, - system_prompt_path=self.system_prompt_path, - system_prompt=self.system_prompt, - ) - logger.info(f"Chain-of-thought: round {round_idx} - answer: {completion}") - if round_idx < max_iter: - valid_args = {"query": query, "answer": completion, "context": context_text} - valid_user_prompt = render_prompt( - filename=self.validation_user_prompt_path, context=valid_args - ) - valid_system_prompt = read_query_prompt( - prompt_file_name=self.validation_system_prompt_path - ) - - reasoning = await LLMGateway.acreate_structured_output( - text_input=valid_user_prompt, - system_prompt=valid_system_prompt, - response_model=str, - ) - followup_args = {"query": query, "answer": completion, "reasoning": reasoning} - followup_prompt = render_prompt( - filename=self.followup_user_prompt_path, context=followup_args - ) - followup_system = read_query_prompt( - prompt_file_name=self.followup_system_prompt_path - ) - - followup_question = await LLMGateway.acreate_structured_output( - text_input=followup_prompt, system_prompt=followup_system, response_model=str - ) - logger.info( - f"Chain-of-thought: round {round_idx} - follow-up question: {followup_question}" - ) + completion, context_text, triplets = await self._run_cot_completion( + query=query, + context=context, + max_iter=max_iter, + response_model=str, + ) if self.save_interaction and context and triplets and completion: await self.save_qa( diff --git a/cognee/tests/unit/modules/retrieval/graph_completion_retriever_cot_test.py b/cognee/tests/unit/modules/retrieval/graph_completion_retriever_cot_test.py index 206cfaf84..7fcfe0d6b 100644 --- a/cognee/tests/unit/modules/retrieval/graph_completion_retriever_cot_test.py +++ b/cognee/tests/unit/modules/retrieval/graph_completion_retriever_cot_test.py @@ -2,6 +2,7 @@ import os import pytest import pathlib from typing import Optional, Union +from pydantic import BaseModel import cognee from cognee.low_level import setup, DataPoint @@ -10,6 +11,11 @@ from cognee.tasks.storage import add_data_points from cognee.modules.retrieval.graph_completion_cot_retriever import GraphCompletionCotRetriever +class TestAnswer(BaseModel): + answer: str + explanation: str + + class TestGraphCompletionCoTRetriever: @pytest.mark.asyncio async def test_graph_completion_cot_context_simple(self): @@ -168,3 +174,48 @@ class TestGraphCompletionCoTRetriever: assert all(isinstance(item, str) and item.strip() for item in answer), ( "Answer must contain only non-empty strings" ) + + @pytest.mark.asyncio + async def test_get_structured_completion(self): + system_directory_path = os.path.join( + pathlib.Path(__file__).parent, ".cognee_system/test_get_structured_completion" + ) + cognee.config.system_root_directory(system_directory_path) + data_directory_path = os.path.join( + pathlib.Path(__file__).parent, ".data_storage/test_get_structured_completion" + ) + cognee.config.data_root_directory(data_directory_path) + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + await setup() + + class Company(DataPoint): + name: str + + class Person(DataPoint): + name: str + works_for: Company + + company1 = Company(name="Figma") + person1 = Person(name="Steve Rodger", works_for=company1) + + entities = [company1, person1] + await add_data_points(entities) + + retriever = GraphCompletionCotRetriever() + + # Test with string response model (default) + string_answer = await retriever.get_structured_completion("Who works at Figma?") + assert isinstance(string_answer, str), f"Expected str, got {type(string_answer).__name__}" + assert string_answer.strip(), "Answer should not be empty" + + # Test with structured response model + structured_answer = await retriever.get_structured_completion( + "Who works at Figma?", response_model=TestAnswer + ) + assert isinstance(structured_answer, TestAnswer), ( + f"Expected TestAnswer, got {type(structured_answer).__name__}" + ) + assert structured_answer.answer.strip(), "Answer field should not be empty" + assert structured_answer.explanation.strip(), "Explanation field should not be empty" From ce418828b4535ec16e4706551120aeb0a42f4fa4 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Mon, 20 Oct 2025 23:45:18 +0200 Subject: [PATCH 05/25] feat: generate improved answers --- cognee/tasks/feedback/generate_improved_answers.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/cognee/tasks/feedback/generate_improved_answers.py b/cognee/tasks/feedback/generate_improved_answers.py index 88cbfae8c..de8b1cc0b 100644 --- a/cognee/tasks/feedback/generate_improved_answers.py +++ b/cognee/tasks/feedback/generate_improved_answers.py @@ -65,9 +65,10 @@ async def _generate_improved_answer_for_single_interaction( ) retrieved_context = await retriever.get_context(query_text) - completion, new_context_text = await retriever.get_structured_completion( + completion = await retriever.get_structured_completion( query=query_text, context=retrieved_context, response_model=ImprovedAnswerResponse ) + new_context_text = await retriever.resolve_edges_to_text(retrieved_context) if completion: return { @@ -114,8 +115,6 @@ async def generate_improved_answers( ) improved_answers: List[Dict] = [] - successful_count = 0 - failed_count = 0 for feedback_interaction in feedback_interactions: result = await _generate_improved_answer_for_single_interaction( @@ -124,9 +123,12 @@ async def generate_improved_answers( if result: improved_answers.append(result) - successful_count += 1 else: - failed_count += 1 + logger.warning( + "Failed to generate improved answer", + question=feedback_interaction.get("question"), + interaction_id=feedback_interaction.get("interaction_id"), + ) - logger.info("Generated improved answers", successful=successful_count, failed=failed_count) + logger.info("Generated improved answers", count=len(improved_answers)) return improved_answers From 834cf8b11307f38a09b42660db493bdf2ddaa14c Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Tue, 21 Oct 2025 00:34:12 +0200 Subject: [PATCH 06/25] feat: create_enrichments.py --- cognee/tasks/feedback/create_enrichments.py | 145 ++++++++++++++++++ .../feedback/generate_improved_answers.py | 5 +- cognee/tasks/feedback/models.py | 3 +- .../feedback_enrichment_minimal_example.py | 12 +- 4 files changed, 158 insertions(+), 7 deletions(-) create mode 100644 cognee/tasks/feedback/create_enrichments.py diff --git a/cognee/tasks/feedback/create_enrichments.py b/cognee/tasks/feedback/create_enrichments.py new file mode 100644 index 000000000..99de162b4 --- /dev/null +++ b/cognee/tasks/feedback/create_enrichments.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +from typing import Dict, List, Optional +from uuid import NAMESPACE_OID, uuid5 + +from cognee.infrastructure.llm import LLMGateway +from cognee.infrastructure.llm.prompts.read_query_prompt import read_query_prompt +from cognee.shared.logging_utils import get_logger +from cognee.modules.engine.models import NodeSet + +from .models import FeedbackEnrichment + + +logger = get_logger("create_enrichments") + + +def _validate_improved_answers(improved_answers: List[Dict]) -> bool: + """Validate that all items contain required fields for enrichment creation.""" + required_fields = [ + "question", + "answer", # This is the original answer field from feedback_interaction + "improved_answer", + "new_context", + "feedback_id", + "interaction_id", + ] + return all( + all(item.get(field) is not None for field in required_fields) for item in improved_answers + ) + + +def _validate_uuid_fields(improved_answers: List[Dict]) -> bool: + """Validate that feedback_id and interaction_id are valid UUID objects.""" + try: + for item in improved_answers: + feedback_id = item.get("feedback_id") + interaction_id = item.get("interaction_id") + if not isinstance(feedback_id, type(feedback_id)) or not isinstance( + interaction_id, type(interaction_id) + ): + return False + return True + except Exception: + return False + + +async def _generate_enrichment_report( + question: str, improved_answer: str, new_context: str, report_prompt_location: str +) -> str: + """Generate educational report using feedback report prompt.""" + try: + prompt_template = read_query_prompt(report_prompt_location) + rendered_prompt = prompt_template.format( + question=question, + improved_answer=improved_answer, + new_context=new_context, + ) + return await LLMGateway.acreate_structured_output( + text_input=rendered_prompt, + system_prompt="You are a helpful assistant that creates educational content.", + response_model=str, + ) + except Exception as exc: + logger.warning("Failed to generate enrichment report", error=str(exc), question=question) + return f"Educational content for: {question} - {improved_answer}" + + +async def _create_enrichment_datapoint( + improved_answer_item: Dict, + report_text: str, +) -> Optional[FeedbackEnrichment]: + """Create a single FeedbackEnrichment DataPoint with proper ID and nodeset assignment.""" + try: + question = improved_answer_item["question"] + improved_answer = improved_answer_item["improved_answer"] + + # Create nodeset following UserQAFeedback pattern + nodeset = NodeSet( + id=uuid5(NAMESPACE_OID, name="FeedbackEnrichment"), name="FeedbackEnrichment" + ) + + enrichment = FeedbackEnrichment( + id=str(uuid5(NAMESPACE_OID, f"{question}_{improved_answer}")), + text=report_text, + question=question, + original_answer=improved_answer_item["answer"], # Use "answer" field + improved_answer=improved_answer, + feedback_id=improved_answer_item["feedback_id"], + interaction_id=improved_answer_item["interaction_id"], + belongs_to_set=nodeset, + ) + + return enrichment + except Exception as exc: + logger.error( + "Failed to create enrichment datapoint", + error=str(exc), + question=improved_answer_item.get("question"), + ) + return None + + +async def create_enrichments( + improved_answers: List[Dict], + report_prompt_location: str = "feedback_report_prompt.txt", +) -> List[FeedbackEnrichment]: + """Create FeedbackEnrichment DataPoint instances from improved answers.""" + if not improved_answers: + logger.info("No improved answers provided; returning empty list") + return [] + + if not _validate_improved_answers(improved_answers): + logger.error("Input validation failed; missing required fields") + return [] + + if not _validate_uuid_fields(improved_answers): + logger.error("UUID validation failed; invalid feedback_id or interaction_id") + return [] + + logger.info("Creating enrichments", count=len(improved_answers)) + + enrichments: List[FeedbackEnrichment] = [] + + for improved_answer_item in improved_answers: + question = improved_answer_item["question"] + improved_answer = improved_answer_item["improved_answer"] + new_context = improved_answer_item["new_context"] + + report_text = await _generate_enrichment_report( + question, improved_answer, new_context, report_prompt_location + ) + + enrichment = await _create_enrichment_datapoint(improved_answer_item, report_text) + + if enrichment: + enrichments.append(enrichment) + else: + logger.warning( + "Failed to create enrichment", + question=question, + interaction_id=improved_answer_item.get("interaction_id"), + ) + + logger.info("Created enrichments", successful=len(enrichments)) + return enrichments diff --git a/cognee/tasks/feedback/generate_improved_answers.py b/cognee/tasks/feedback/generate_improved_answers.py index de8b1cc0b..a4edab7c1 100644 --- a/cognee/tasks/feedback/generate_improved_answers.py +++ b/cognee/tasks/feedback/generate_improved_answers.py @@ -66,7 +66,10 @@ async def _generate_improved_answer_for_single_interaction( retrieved_context = await retriever.get_context(query_text) completion = await retriever.get_structured_completion( - query=query_text, context=retrieved_context, response_model=ImprovedAnswerResponse + query=query_text, + context=retrieved_context, + response_model=ImprovedAnswerResponse, + max_iter=1, ) new_context_text = await retriever.resolve_edges_to_text(retrieved_context) diff --git a/cognee/tasks/feedback/models.py b/cognee/tasks/feedback/models.py index 403bc0e13..ae1064709 100644 --- a/cognee/tasks/feedback/models.py +++ b/cognee/tasks/feedback/models.py @@ -2,7 +2,7 @@ from typing import List, Optional, Union from uuid import UUID from cognee.infrastructure.engine import DataPoint -from cognee.modules.engine.models import Entity +from cognee.modules.engine.models import Entity, NodeSet from cognee.tasks.temporal_graph.models import Event @@ -18,3 +18,4 @@ class FeedbackEnrichment(DataPoint): improved_answer: str feedback_id: UUID interaction_id: UUID + belongs_to_set: Optional[NodeSet] = None diff --git a/examples/python/feedback_enrichment_minimal_example.py b/examples/python/feedback_enrichment_minimal_example.py index a36a7af8a..9fbb84821 100644 --- a/examples/python/feedback_enrichment_minimal_example.py +++ b/examples/python/feedback_enrichment_minimal_example.py @@ -6,6 +6,7 @@ from cognee.modules.pipelines.tasks.task import Task from cognee.tasks.feedback.extract_feedback_interactions import extract_feedback_interactions from cognee.tasks.feedback.generate_improved_answers import generate_improved_answers +from cognee.tasks.feedback.create_enrichments import create_enrichments CONVERSATION = [ @@ -48,11 +49,12 @@ async def run_question_and_submit_feedback(question_text: str) -> bool: async def run_feedback_enrichment_memify(last_n: int = 5): - """Execute memify with extraction and answer improvement tasks.""" + """Execute memify with extraction, answer improvement, and enrichment creation tasks.""" # Instantiate tasks with their own kwargs extraction_tasks = [Task(extract_feedback_interactions, last_n=last_n)] enrichment_tasks = [ - Task(generate_improved_answers, retriever_name="graph_completion_cot", top_k=20) + Task(generate_improved_answers, retriever_name="graph_completion_cot", top_k=20), + Task(create_enrichments), ] await cognee.memify( extraction_tasks=extraction_tasks, @@ -63,9 +65,9 @@ async def run_feedback_enrichment_memify(last_n: int = 5): async def main(): - # await initialize_conversation_and_graph(CONVERSATION) - # is_correct = await run_question_and_submit_feedback("Who told Bob to bring the donuts?") - is_correct = False + await initialize_conversation_and_graph(CONVERSATION) + is_correct = await run_question_and_submit_feedback("Who told Bob to bring the donuts?") + # is_correct = False if not is_correct: await run_feedback_enrichment_memify(last_n=5) From 8e580bd3d38b100a4ddb85e71b9d250595d53483 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Tue, 21 Oct 2025 00:57:42 +0200 Subject: [PATCH 07/25] fix: create enrichments --- cognee/tasks/feedback/create_enrichments.py | 13 ++++++------- cognee/tasks/feedback/models.py | 2 +- .../python/feedback_enrichment_minimal_example.py | 13 +++++++++---- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/cognee/tasks/feedback/create_enrichments.py b/cognee/tasks/feedback/create_enrichments.py index 99de162b4..9ffbf9f88 100644 --- a/cognee/tasks/feedback/create_enrichments.py +++ b/cognee/tasks/feedback/create_enrichments.py @@ -68,17 +68,13 @@ async def _generate_enrichment_report( async def _create_enrichment_datapoint( improved_answer_item: Dict, report_text: str, + nodeset: NodeSet, ) -> Optional[FeedbackEnrichment]: """Create a single FeedbackEnrichment DataPoint with proper ID and nodeset assignment.""" try: question = improved_answer_item["question"] improved_answer = improved_answer_item["improved_answer"] - # Create nodeset following UserQAFeedback pattern - nodeset = NodeSet( - id=uuid5(NAMESPACE_OID, name="FeedbackEnrichment"), name="FeedbackEnrichment" - ) - enrichment = FeedbackEnrichment( id=str(uuid5(NAMESPACE_OID, f"{question}_{improved_answer}")), text=report_text, @@ -87,7 +83,7 @@ async def _create_enrichment_datapoint( improved_answer=improved_answer, feedback_id=improved_answer_item["feedback_id"], interaction_id=improved_answer_item["interaction_id"], - belongs_to_set=nodeset, + belongs_to_set=[nodeset], ) return enrichment @@ -119,6 +115,9 @@ async def create_enrichments( logger.info("Creating enrichments", count=len(improved_answers)) + # Create nodeset once for all enrichments + nodeset = NodeSet(id=uuid5(NAMESPACE_OID, name="FeedbackEnrichment"), name="FeedbackEnrichment") + enrichments: List[FeedbackEnrichment] = [] for improved_answer_item in improved_answers: @@ -130,7 +129,7 @@ async def create_enrichments( question, improved_answer, new_context, report_prompt_location ) - enrichment = await _create_enrichment_datapoint(improved_answer_item, report_text) + enrichment = await _create_enrichment_datapoint(improved_answer_item, report_text, nodeset) if enrichment: enrichments.append(enrichment) diff --git a/cognee/tasks/feedback/models.py b/cognee/tasks/feedback/models.py index ae1064709..6815c2de1 100644 --- a/cognee/tasks/feedback/models.py +++ b/cognee/tasks/feedback/models.py @@ -18,4 +18,4 @@ class FeedbackEnrichment(DataPoint): improved_answer: str feedback_id: UUID interaction_id: UUID - belongs_to_set: Optional[NodeSet] = None + belongs_to_set: Optional[List[NodeSet]] = None diff --git a/examples/python/feedback_enrichment_minimal_example.py b/examples/python/feedback_enrichment_minimal_example.py index 9fbb84821..c37c0fbdf 100644 --- a/examples/python/feedback_enrichment_minimal_example.py +++ b/examples/python/feedback_enrichment_minimal_example.py @@ -3,6 +3,9 @@ import asyncio import cognee from cognee.api.v1.search import SearchType from cognee.modules.pipelines.tasks.task import Task +from cognee.tasks.graph import extract_graph_from_data +from cognee.tasks.storage import add_data_points +from cognee.shared.data_models import KnowledgeGraph from cognee.tasks.feedback.extract_feedback_interactions import extract_feedback_interactions from cognee.tasks.feedback.generate_improved_answers import generate_improved_answers @@ -49,12 +52,14 @@ async def run_question_and_submit_feedback(question_text: str) -> bool: async def run_feedback_enrichment_memify(last_n: int = 5): - """Execute memify with extraction, answer improvement, and enrichment creation tasks.""" + """Execute memify with extraction, answer improvement, enrichment creation, and graph processing tasks.""" # Instantiate tasks with their own kwargs extraction_tasks = [Task(extract_feedback_interactions, last_n=last_n)] enrichment_tasks = [ Task(generate_improved_answers, retriever_name="graph_completion_cot", top_k=20), Task(create_enrichments), + Task(extract_graph_from_data, graph_model=KnowledgeGraph, task_config={"batch_size": 10}), + Task(add_data_points, task_config={"batch_size": 10}), ] await cognee.memify( extraction_tasks=extraction_tasks, @@ -65,9 +70,9 @@ async def run_feedback_enrichment_memify(last_n: int = 5): async def main(): - await initialize_conversation_and_graph(CONVERSATION) - is_correct = await run_question_and_submit_feedback("Who told Bob to bring the donuts?") - # is_correct = False + # await initialize_conversation_and_graph(CONVERSATION) + # is_correct = await run_question_and_submit_feedback("Who told Bob to bring the donuts?") + is_correct = False if not is_correct: await run_feedback_enrichment_memify(last_n=5) From 590c3ad7ec2dc687c6e9d088a1c672eb2c2f91cc Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Tue, 21 Oct 2025 01:30:08 +0200 Subject: [PATCH 08/25] feat: use datapoints only --- cognee/tasks/feedback/__init__.py | 13 ++ cognee/tasks/feedback/create_enrichments.py | 112 ++++-------------- .../feedback/extract_feedback_interactions.py | 66 ++++++----- .../feedback/generate_improved_answers.py | 74 ++++++------ .../feedback/link_enrichments_to_feedback.py | 67 +++++++++++ cognee/tasks/feedback/models.py | 5 + .../feedback_enrichment_minimal_example.py | 7 +- 7 files changed, 184 insertions(+), 160 deletions(-) create mode 100644 cognee/tasks/feedback/__init__.py create mode 100644 cognee/tasks/feedback/link_enrichments_to_feedback.py diff --git a/cognee/tasks/feedback/__init__.py b/cognee/tasks/feedback/__init__.py new file mode 100644 index 000000000..25102dfb4 --- /dev/null +++ b/cognee/tasks/feedback/__init__.py @@ -0,0 +1,13 @@ +from .extract_feedback_interactions import extract_feedback_interactions +from .generate_improved_answers import generate_improved_answers +from .create_enrichments import create_enrichments +from .link_enrichments_to_feedback import link_enrichments_to_feedback +from .models import FeedbackEnrichment + +__all__ = [ + "extract_feedback_interactions", + "generate_improved_answers", + "create_enrichments", + "link_enrichments_to_feedback", + "FeedbackEnrichment", +] diff --git a/cognee/tasks/feedback/create_enrichments.py b/cognee/tasks/feedback/create_enrichments.py index 9ffbf9f88..ee15e9797 100644 --- a/cognee/tasks/feedback/create_enrichments.py +++ b/cognee/tasks/feedback/create_enrichments.py @@ -14,36 +14,19 @@ from .models import FeedbackEnrichment logger = get_logger("create_enrichments") -def _validate_improved_answers(improved_answers: List[Dict]) -> bool: - """Validate that all items contain required fields for enrichment creation.""" - required_fields = [ - "question", - "answer", # This is the original answer field from feedback_interaction - "improved_answer", - "new_context", - "feedback_id", - "interaction_id", - ] +def _validate_enrichments(enrichments: List[FeedbackEnrichment]) -> bool: + """Validate that all enrichments contain required fields for completion.""" return all( - all(item.get(field) is not None for field in required_fields) for item in improved_answers + enrichment.question is not None + and enrichment.original_answer is not None + and enrichment.improved_answer is not None + and enrichment.new_context is not None + and enrichment.feedback_id is not None + and enrichment.interaction_id is not None + for enrichment in enrichments ) -def _validate_uuid_fields(improved_answers: List[Dict]) -> bool: - """Validate that feedback_id and interaction_id are valid UUID objects.""" - try: - for item in improved_answers: - feedback_id = item.get("feedback_id") - interaction_id = item.get("interaction_id") - if not isinstance(feedback_id, type(feedback_id)) or not isinstance( - interaction_id, type(interaction_id) - ): - return False - return True - except Exception: - return False - - async def _generate_enrichment_report( question: str, improved_answer: str, new_context: str, report_prompt_location: str ) -> str: @@ -65,80 +48,37 @@ async def _generate_enrichment_report( return f"Educational content for: {question} - {improved_answer}" -async def _create_enrichment_datapoint( - improved_answer_item: Dict, - report_text: str, - nodeset: NodeSet, -) -> Optional[FeedbackEnrichment]: - """Create a single FeedbackEnrichment DataPoint with proper ID and nodeset assignment.""" - try: - question = improved_answer_item["question"] - improved_answer = improved_answer_item["improved_answer"] - - enrichment = FeedbackEnrichment( - id=str(uuid5(NAMESPACE_OID, f"{question}_{improved_answer}")), - text=report_text, - question=question, - original_answer=improved_answer_item["answer"], # Use "answer" field - improved_answer=improved_answer, - feedback_id=improved_answer_item["feedback_id"], - interaction_id=improved_answer_item["interaction_id"], - belongs_to_set=[nodeset], - ) - - return enrichment - except Exception as exc: - logger.error( - "Failed to create enrichment datapoint", - error=str(exc), - question=improved_answer_item.get("question"), - ) - return None - - async def create_enrichments( - improved_answers: List[Dict], + enrichments: List[FeedbackEnrichment], report_prompt_location: str = "feedback_report_prompt.txt", ) -> List[FeedbackEnrichment]: - """Create FeedbackEnrichment DataPoint instances from improved answers.""" - if not improved_answers: - logger.info("No improved answers provided; returning empty list") + """Fill text and belongs_to_set fields of existing FeedbackEnrichment DataPoints.""" + if not enrichments: + logger.info("No enrichments provided; returning empty list") return [] - if not _validate_improved_answers(improved_answers): + if not _validate_enrichments(enrichments): logger.error("Input validation failed; missing required fields") return [] - if not _validate_uuid_fields(improved_answers): - logger.error("UUID validation failed; invalid feedback_id or interaction_id") - return [] + logger.info("Completing enrichments", count=len(enrichments)) - logger.info("Creating enrichments", count=len(improved_answers)) - - # Create nodeset once for all enrichments nodeset = NodeSet(id=uuid5(NAMESPACE_OID, name="FeedbackEnrichment"), name="FeedbackEnrichment") - enrichments: List[FeedbackEnrichment] = [] - - for improved_answer_item in improved_answers: - question = improved_answer_item["question"] - improved_answer = improved_answer_item["improved_answer"] - new_context = improved_answer_item["new_context"] + completed_enrichments: List[FeedbackEnrichment] = [] + for enrichment in enrichments: report_text = await _generate_enrichment_report( - question, improved_answer, new_context, report_prompt_location + enrichment.question, + enrichment.improved_answer, + enrichment.new_context, + report_prompt_location, ) - enrichment = await _create_enrichment_datapoint(improved_answer_item, report_text, nodeset) + enrichment.text = report_text + enrichment.belongs_to_set = [nodeset] - if enrichment: - enrichments.append(enrichment) - else: - logger.warning( - "Failed to create enrichment", - question=question, - interaction_id=improved_answer_item.get("interaction_id"), - ) + completed_enrichments.append(enrichment) - logger.info("Created enrichments", successful=len(enrichments)) - return enrichments + logger.info("Completed enrichments", successful=len(completed_enrichments)) + return completed_enrichments diff --git a/cognee/tasks/feedback/extract_feedback_interactions.py b/cognee/tasks/feedback/extract_feedback_interactions.py index 44f139d70..e5d03026e 100644 --- a/cognee/tasks/feedback/extract_feedback_interactions.py +++ b/cognee/tasks/feedback/extract_feedback_interactions.py @@ -7,8 +7,10 @@ from cognee.infrastructure.llm import LLMGateway from cognee.infrastructure.llm.prompts.read_query_prompt import read_query_prompt from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.graph import get_graph_engine +from uuid import uuid5, NAMESPACE_OID from .utils import filter_negative_feedback +from .models import FeedbackEnrichment logger = get_logger("extract_feedback_interactions") @@ -49,11 +51,8 @@ def _match_feedback_nodes_to_interactions_by_edges( feedback_nodes: List, interaction_nodes: List, graph_edges: List ) -> List[Tuple[Tuple, Tuple]]: """Match feedback to interactions using gives_feedback_to edges.""" - # Build single lookup maps using normalized Cognee IDs interaction_by_id = {node_id: (node_id, props) for node_id, props in interaction_nodes} feedback_by_id = {node_id: (node_id, props) for node_id, props in feedback_nodes} - - # Filter to only gives_feedback_to edges feedback_edges = [ (source_id, target_id) for source_id, target_id, rel, _ in graph_edges @@ -103,23 +102,22 @@ async def _generate_human_readable_context_summary( return raw_context_text or "" -def _has_required_feedback_fields(record: Dict) -> bool: - """Validate required fields exist in the item dict.""" - required_fields = [ - "question", - "answer", - "context", - "feedback_text", - "feedback_id", - "interaction_id", - ] - return all(record.get(field_name) is not None for field_name in required_fields) +def _has_required_feedback_fields(enrichment: FeedbackEnrichment) -> bool: + """Validate required fields exist in the FeedbackEnrichment DataPoint.""" + return ( + enrichment.question is not None + and enrichment.original_answer is not None + and enrichment.context is not None + and enrichment.feedback_text is not None + and enrichment.feedback_id is not None + and enrichment.interaction_id is not None + ) async def _build_feedback_interaction_record( feedback_node_id: str, feedback_props: Dict, interaction_node_id: str, interaction_props: Dict -) -> Optional[Dict]: - """Build a single feedback-interaction record with context summary.""" +) -> Optional[FeedbackEnrichment]: + """Build a single FeedbackEnrichment DataPoint with context summary.""" try: question_text = interaction_props.get("question") original_answer_text = interaction_props.get("answer") @@ -130,17 +128,23 @@ async def _build_feedback_interaction_record( question_text or "", raw_context_text ) - feedback_interaction_record = { - "question": question_text, - "answer": original_answer_text, - "context": context_summary_text, - "feedback_text": feedback_text, - "feedback_id": UUID(str(feedback_node_id)), - "interaction_id": UUID(str(interaction_node_id)), - } + enrichment = FeedbackEnrichment( + id=str(uuid5(NAMESPACE_OID, f"{question_text}_{interaction_node_id}")), + text="", + question=question_text, + original_answer=original_answer_text, + improved_answer="", + feedback_id=UUID(str(feedback_node_id)), + interaction_id=UUID(str(interaction_node_id)), + belongs_to_set=None, + context=context_summary_text, + feedback_text=feedback_text, + new_context="", + explanation="", + ) - if _has_required_feedback_fields(feedback_interaction_record): - return feedback_interaction_record + if _has_required_feedback_fields(enrichment): + return enrichment else: logger.warning("Skipping invalid feedback item", interaction=str(interaction_node_id)) return None @@ -151,9 +155,9 @@ async def _build_feedback_interaction_record( async def _build_feedback_interaction_records( matched_feedback_interaction_pairs: List[Tuple[Tuple, Tuple]], -) -> List[Dict]: - """Build all feedback-interaction records from matched pairs.""" - feedback_interaction_records: List[Dict] = [] +) -> List[FeedbackEnrichment]: + """Build all FeedbackEnrichment DataPoints from matched pairs.""" + feedback_interaction_records: List[FeedbackEnrichment] = [] for (feedback_node_id, feedback_props), ( interaction_node_id, interaction_props, @@ -168,8 +172,8 @@ async def _build_feedback_interaction_records( async def extract_feedback_interactions( subgraphs: List, last_n: Optional[int] = None -) -> List[Dict]: - """Extract negative feedback-interaction pairs; fetch internally and use last_n param for limiting.""" +) -> List[FeedbackEnrichment]: + """Extract negative feedback-interaction pairs and create FeedbackEnrichment DataPoints.""" graph_nodes, graph_edges = await _fetch_feedback_and_interaction_graph_data() if not graph_nodes: return [] diff --git a/cognee/tasks/feedback/generate_improved_answers.py b/cognee/tasks/feedback/generate_improved_answers.py index a4edab7c1..10059df7e 100644 --- a/cognee/tasks/feedback/generate_improved_answers.py +++ b/cognee/tasks/feedback/generate_improved_answers.py @@ -9,6 +9,7 @@ from cognee.modules.graph.utils import resolve_edges_to_text from cognee.shared.logging_utils import get_logger from .utils import create_retriever +from .models import FeedbackEnrichment class ImprovedAnswerResponse(BaseModel): @@ -21,19 +22,16 @@ class ImprovedAnswerResponse(BaseModel): logger = get_logger("generate_improved_answers") -def _validate_input_data(feedback_interactions: List[Dict]) -> bool: - """Validate that input contains required fields for all items.""" - required_fields = [ - "question", - "answer", - "context", - "feedback_text", - "feedback_id", - "interaction_id", - ] +def _validate_input_data(enrichments: List[FeedbackEnrichment]) -> bool: + """Validate that input contains required fields for all enrichments.""" return all( - all(item.get(field) is not None for field in required_fields) - for item in feedback_interactions + enrichment.question is not None + and enrichment.original_answer is not None + and enrichment.context is not None + and enrichment.feedback_text is not None + and enrichment.feedback_id is not None + and enrichment.interaction_id is not None + for enrichment in enrichments ) @@ -51,17 +49,15 @@ def _render_reaction_prompt( async def _generate_improved_answer_for_single_interaction( - feedback_interaction: Dict, retriever, reaction_prompt_location: str -) -> Optional[Dict]: - """Generate improved answer for a single feedback-interaction pair using structured retriever completion.""" + enrichment: FeedbackEnrichment, retriever, reaction_prompt_location: str +) -> Optional[FeedbackEnrichment]: + """Generate improved answer for a single enrichment using structured retriever completion.""" try: - question_text = feedback_interaction["question"] - original_answer_text = feedback_interaction["answer"] - context_text = feedback_interaction["context"] - feedback_text = feedback_interaction["feedback_text"] - query_text = _render_reaction_prompt( - question_text, context_text, original_answer_text, feedback_text + enrichment.question, + enrichment.context, + enrichment.original_answer, + enrichment.feedback_text, ) retrieved_context = await retriever.get_context(query_text) @@ -69,20 +65,18 @@ async def _generate_improved_answer_for_single_interaction( query=query_text, context=retrieved_context, response_model=ImprovedAnswerResponse, - max_iter=1, + max_iter=4, ) new_context_text = await retriever.resolve_edges_to_text(retrieved_context) if completion: - return { - **feedback_interaction, - "improved_answer": completion.answer, - "new_context": new_context_text, - "explanation": completion.explanation, - } + enrichment.improved_answer = completion.answer + enrichment.new_context = new_context_text + enrichment.explanation = completion.explanation + return enrichment else: logger.warning( - "Failed to get structured completion from retriever", question=question_text + "Failed to get structured completion from retriever", question=enrichment.question ) return None @@ -90,23 +84,23 @@ async def _generate_improved_answer_for_single_interaction( logger.error( "Failed to generate improved answer", error=str(exc), - question=feedback_interaction.get("question"), + question=enrichment.question, ) return None async def generate_improved_answers( - feedback_interactions: List[Dict], + enrichments: List[FeedbackEnrichment], retriever_name: str = "graph_completion_cot", top_k: int = 20, reaction_prompt_location: str = "feedback_reaction_prompt.txt", -) -> List[Dict]: +) -> List[FeedbackEnrichment]: """Generate improved answers using configurable retriever and LLM.""" - if not feedback_interactions: - logger.info("No feedback interactions provided; returning empty list") + if not enrichments: + logger.info("No enrichments provided; returning empty list") return [] - if not _validate_input_data(feedback_interactions): + if not _validate_input_data(enrichments): logger.error("Input data validation failed; missing required fields") return [] @@ -117,11 +111,11 @@ async def generate_improved_answers( system_prompt_path="answer_simple_question.txt", ) - improved_answers: List[Dict] = [] + improved_answers: List[FeedbackEnrichment] = [] - for feedback_interaction in feedback_interactions: + for enrichment in enrichments: result = await _generate_improved_answer_for_single_interaction( - feedback_interaction, retriever, reaction_prompt_location + enrichment, retriever, reaction_prompt_location ) if result: @@ -129,8 +123,8 @@ async def generate_improved_answers( else: logger.warning( "Failed to generate improved answer", - question=feedback_interaction.get("question"), - interaction_id=feedback_interaction.get("interaction_id"), + question=enrichment.question, + interaction_id=enrichment.interaction_id, ) logger.info("Generated improved answers", count=len(improved_answers)) diff --git a/cognee/tasks/feedback/link_enrichments_to_feedback.py b/cognee/tasks/feedback/link_enrichments_to_feedback.py new file mode 100644 index 000000000..d536bdc56 --- /dev/null +++ b/cognee/tasks/feedback/link_enrichments_to_feedback.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +from typing import List, Tuple +from uuid import UUID + +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.tasks.storage import index_graph_edges +from cognee.shared.logging_utils import get_logger + +from .models import FeedbackEnrichment + + +logger = get_logger("link_enrichments_to_feedback") + + +def _create_edge_tuple( + source_id: UUID, target_id: UUID, relationship_name: str +) -> Tuple[UUID, UUID, str, dict]: + """Create an edge tuple with proper properties structure.""" + return ( + source_id, + target_id, + relationship_name, + { + "relationship_name": relationship_name, + "source_node_id": source_id, + "target_node_id": target_id, + "ontology_valid": False, + }, + ) + + +async def link_enrichments_to_feedback( + enrichments: List[FeedbackEnrichment], +) -> List[FeedbackEnrichment]: + """Manually create edges from enrichments to original feedback/interaction nodes.""" + if not enrichments: + logger.info("No enrichments provided; returning empty list") + return [] + + relationships = [] + + for enrichment in enrichments: + enrichment_id = enrichment.id + feedback_id = enrichment.feedback_id + interaction_id = enrichment.interaction_id + + if enrichment_id and feedback_id: + enriches_feedback_edge = _create_edge_tuple( + enrichment_id, feedback_id, "enriches_feedback" + ) + relationships.append(enriches_feedback_edge) + + if enrichment_id and interaction_id: + improves_interaction_edge = _create_edge_tuple( + enrichment_id, interaction_id, "improves_interaction" + ) + relationships.append(improves_interaction_edge) + + if relationships: + graph_engine = await get_graph_engine() + await graph_engine.add_edges(relationships) + await index_graph_edges(relationships) + logger.info("Linking enrichments to feedback", edge_count=len(relationships)) + + logger.info("Linked enrichments", enrichment_count=len(enrichments)) + return enrichments diff --git a/cognee/tasks/feedback/models.py b/cognee/tasks/feedback/models.py index 6815c2de1..c334ec8c0 100644 --- a/cognee/tasks/feedback/models.py +++ b/cognee/tasks/feedback/models.py @@ -19,3 +19,8 @@ class FeedbackEnrichment(DataPoint): feedback_id: UUID interaction_id: UUID belongs_to_set: Optional[List[NodeSet]] = None + + context: str = "" + feedback_text: str = "" + new_context: str = "" + explanation: str = "" diff --git a/examples/python/feedback_enrichment_minimal_example.py b/examples/python/feedback_enrichment_minimal_example.py index c37c0fbdf..8e7f01c7d 100644 --- a/examples/python/feedback_enrichment_minimal_example.py +++ b/examples/python/feedback_enrichment_minimal_example.py @@ -10,6 +10,7 @@ from cognee.shared.data_models import KnowledgeGraph from cognee.tasks.feedback.extract_feedback_interactions import extract_feedback_interactions from cognee.tasks.feedback.generate_improved_answers import generate_improved_answers from cognee.tasks.feedback.create_enrichments import create_enrichments +from cognee.tasks.feedback.link_enrichments_to_feedback import link_enrichments_to_feedback CONVERSATION = [ @@ -60,6 +61,7 @@ async def run_feedback_enrichment_memify(last_n: int = 5): Task(create_enrichments), Task(extract_graph_from_data, graph_model=KnowledgeGraph, task_config={"batch_size": 10}), Task(add_data_points, task_config={"batch_size": 10}), + Task(link_enrichments_to_feedback), ] await cognee.memify( extraction_tasks=extraction_tasks, @@ -70,9 +72,8 @@ async def run_feedback_enrichment_memify(last_n: int = 5): async def main(): - # await initialize_conversation_and_graph(CONVERSATION) - # is_correct = await run_question_and_submit_feedback("Who told Bob to bring the donuts?") - is_correct = False + await initialize_conversation_and_graph(CONVERSATION) + is_correct = await run_question_and_submit_feedback("Who told Bob to bring the donuts?") if not is_correct: await run_feedback_enrichment_memify(last_n=5) From 70c0a98055e4049bd2d91d6ac558637cc7de26c6 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Tue, 21 Oct 2025 01:39:35 +0200 Subject: [PATCH 09/25] chore: use cot retriever only --- cognee/tasks/feedback/create_enrichments.py | 2 +- .../feedback/extract_feedback_interactions.py | 15 +++-- .../feedback/generate_improved_answers.py | 11 ++-- cognee/tasks/feedback/utils.py | 57 ------------------- .../feedback_enrichment_minimal_example.py | 2 +- 5 files changed, 18 insertions(+), 69 deletions(-) delete mode 100644 cognee/tasks/feedback/utils.py diff --git a/cognee/tasks/feedback/create_enrichments.py b/cognee/tasks/feedback/create_enrichments.py index ee15e9797..7b18fc99f 100644 --- a/cognee/tasks/feedback/create_enrichments.py +++ b/cognee/tasks/feedback/create_enrichments.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Dict, List, Optional +from typing import List from uuid import NAMESPACE_OID, uuid5 from cognee.infrastructure.llm import LLMGateway diff --git a/cognee/tasks/feedback/extract_feedback_interactions.py b/cognee/tasks/feedback/extract_feedback_interactions.py index e5d03026e..c80173188 100644 --- a/cognee/tasks/feedback/extract_feedback_interactions.py +++ b/cognee/tasks/feedback/extract_feedback_interactions.py @@ -1,21 +1,28 @@ from __future__ import annotations from typing import Dict, List, Optional, Tuple -from uuid import UUID +from uuid import UUID, uuid5, NAMESPACE_OID from cognee.infrastructure.llm import LLMGateway from cognee.infrastructure.llm.prompts.read_query_prompt import read_query_prompt from cognee.shared.logging_utils import get_logger from cognee.infrastructure.databases.graph import get_graph_engine -from uuid import uuid5, NAMESPACE_OID -from .utils import filter_negative_feedback from .models import FeedbackEnrichment logger = get_logger("extract_feedback_interactions") +def _filter_negative_feedback(feedback_nodes): + """Filter for negative sentiment feedback using precise sentiment classification.""" + return [ + (node_id, props) + for node_id, props in feedback_nodes + if (props.get("sentiment", "").casefold() == "negative" or props.get("score", 0) < 0) + ] + + def _get_normalized_id(node_id, props) -> str: """Return Cognee node id preference: props.id → props.node_id → raw node_id.""" return str(props.get("id") or props.get("node_id") or node_id) @@ -179,7 +186,7 @@ async def extract_feedback_interactions( return [] feedback_nodes, interaction_nodes = _separate_feedback_and_interaction_nodes(graph_nodes) - negative_feedback_nodes = filter_negative_feedback(feedback_nodes) + negative_feedback_nodes = _filter_negative_feedback(feedback_nodes) if not negative_feedback_nodes: logger.info("No negative feedback found; returning empty list") return [] diff --git a/cognee/tasks/feedback/generate_improved_answers.py b/cognee/tasks/feedback/generate_improved_answers.py index 10059df7e..e439cf9e5 100644 --- a/cognee/tasks/feedback/generate_improved_answers.py +++ b/cognee/tasks/feedback/generate_improved_answers.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Dict, List, Optional, Tuple +from typing import List, Optional from pydantic import BaseModel from cognee.infrastructure.llm import LLMGateway @@ -8,7 +8,7 @@ from cognee.infrastructure.llm.prompts.read_query_prompt import read_query_promp from cognee.modules.graph.utils import resolve_edges_to_text from cognee.shared.logging_utils import get_logger -from .utils import create_retriever +from cognee.modules.retrieval.graph_completion_cot_retriever import GraphCompletionCotRetriever from .models import FeedbackEnrichment @@ -91,11 +91,10 @@ async def _generate_improved_answer_for_single_interaction( async def generate_improved_answers( enrichments: List[FeedbackEnrichment], - retriever_name: str = "graph_completion_cot", top_k: int = 20, reaction_prompt_location: str = "feedback_reaction_prompt.txt", ) -> List[FeedbackEnrichment]: - """Generate improved answers using configurable retriever and LLM.""" + """Generate improved answers using CoT retriever and LLM.""" if not enrichments: logger.info("No enrichments provided; returning empty list") return [] @@ -104,9 +103,9 @@ async def generate_improved_answers( logger.error("Input data validation failed; missing required fields") return [] - retriever = create_retriever( - retriever_name=retriever_name, + retriever = GraphCompletionCotRetriever( top_k=top_k, + save_interaction=False, user_prompt_path="graph_context_for_question.txt", system_prompt_path="answer_simple_question.txt", ) diff --git a/cognee/tasks/feedback/utils.py b/cognee/tasks/feedback/utils.py deleted file mode 100644 index d85b719c5..000000000 --- a/cognee/tasks/feedback/utils.py +++ /dev/null @@ -1,57 +0,0 @@ -from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever -from cognee.modules.retrieval.graph_completion_cot_retriever import GraphCompletionCotRetriever -from cognee.modules.retrieval.graph_completion_context_extension_retriever import ( - GraphCompletionContextExtensionRetriever, -) -from cognee.shared.logging_utils import get_logger - - -logger = get_logger("feedback_utils") - - -def create_retriever( - retriever_name: str = "graph_completion_cot", - top_k: int = 20, - user_prompt_path: str = "graph_context_for_question.txt", - system_prompt_path: str = "answer_simple_question.txt", -): - """Factory for retriever instances with configurable top_k and prompt paths.""" - if retriever_name == "graph_completion": - return GraphCompletionRetriever( - top_k=top_k, - save_interaction=False, - user_prompt_path=user_prompt_path, - system_prompt_path=system_prompt_path, - ) - if retriever_name == "graph_completion_cot": - return GraphCompletionCotRetriever( - top_k=top_k, - save_interaction=False, - user_prompt_path=user_prompt_path, - system_prompt_path=system_prompt_path, - ) - if retriever_name == "graph_completion_context_extension": - return GraphCompletionContextExtensionRetriever( - top_k=top_k, - save_interaction=False, - user_prompt_path=user_prompt_path, - system_prompt_path=system_prompt_path, - ) - logger.warning( - "Unknown retriever, defaulting to graph_completion_cot", retriever=retriever_name - ) - return GraphCompletionCotRetriever( - top_k=top_k, - save_interaction=False, - user_prompt_path=user_prompt_path, - system_prompt_path=system_prompt_path, - ) - - -def filter_negative_feedback(feedback_nodes): - """Filter for negative sentiment feedback using precise sentiment classification.""" - return [ - (node_id, props) - for node_id, props in feedback_nodes - if (props.get("sentiment", "").casefold() == "negative" or props.get("score", 0) < 0) - ] diff --git a/examples/python/feedback_enrichment_minimal_example.py b/examples/python/feedback_enrichment_minimal_example.py index 8e7f01c7d..3af838214 100644 --- a/examples/python/feedback_enrichment_minimal_example.py +++ b/examples/python/feedback_enrichment_minimal_example.py @@ -57,7 +57,7 @@ async def run_feedback_enrichment_memify(last_n: int = 5): # Instantiate tasks with their own kwargs extraction_tasks = [Task(extract_feedback_interactions, last_n=last_n)] enrichment_tasks = [ - Task(generate_improved_answers, retriever_name="graph_completion_cot", top_k=20), + Task(generate_improved_answers, top_k=20), Task(create_enrichments), Task(extract_graph_from_data, graph_model=KnowledgeGraph, task_config={"batch_size": 10}), Task(add_data_points, task_config={"batch_size": 10}), From f4d038b385a47d28df427aaf163d674dcc193e74 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Thu, 23 Oct 2025 11:31:11 +0200 Subject: [PATCH 10/25] chore: pre-align cot retriever with dev --- .../graph_completion_cot_retriever.py | 234 +++++++----------- 1 file changed, 83 insertions(+), 151 deletions(-) diff --git a/cognee/modules/retrieval/graph_completion_cot_retriever.py b/cognee/modules/retrieval/graph_completion_cot_retriever.py index 55cbcfce5..3f6ca81be 100644 --- a/cognee/modules/retrieval/graph_completion_cot_retriever.py +++ b/cognee/modules/retrieval/graph_completion_cot_retriever.py @@ -1,31 +1,22 @@ -import json +import asyncio from typing import Optional, List, Type, Any -from pydantic import BaseModel from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge from cognee.shared.logging_utils import get_logger from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever -from cognee.modules.retrieval.utils.completion import generate_completion +from cognee.modules.retrieval.utils.completion import generate_completion, summarize_text +from cognee.modules.retrieval.utils.session_cache import ( + save_conversation_history, + get_conversation_history, +) from cognee.infrastructure.llm.LLMGateway import LLMGateway from cognee.infrastructure.llm.prompts import render_prompt, read_query_prompt +from cognee.context_global_variables import session_user +from cognee.infrastructure.databases.cache.config import CacheConfig logger = get_logger() -def _as_answer_text(completion: Any) -> str: - """Convert completion to human-readable text for validation and follow-up prompts.""" - if isinstance(completion, str): - return completion - if isinstance(completion, BaseModel): - # Add notice that this is a structured response - json_str = completion.model_dump_json(indent=2) - return f"[Structured Response]\n{json_str}" - try: - return json.dumps(completion, indent=2) - except TypeError: - return str(completion) - - class GraphCompletionCotRetriever(GraphCompletionRetriever): """ Handles graph completion by generating responses based on a series of interactions with @@ -70,138 +61,11 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): self.followup_system_prompt_path = followup_system_prompt_path self.followup_user_prompt_path = followup_user_prompt_path - async def _run_cot_completion( - self, - query: str, - context: Optional[List[Edge]] = None, - max_iter: int = 4, - response_model: Type = str, - ) -> tuple[Any, str, List[Edge]]: - """ - Run chain-of-thought completion with optional structured output. - - Returns: - -------- - - completion_result: The generated completion (string or structured model) - - context_text: The resolved context text - - triplets: The list of triplets used - """ - followup_question = "" - triplets = [] - completion = "" - - for round_idx in range(max_iter + 1): - if round_idx == 0: - if context is None: - triplets = await self.get_context(query) - context_text = await self.resolve_edges_to_text(triplets) - else: - context_text = await self.resolve_edges_to_text(context) - else: - triplets += await self.get_context(followup_question) - context_text = await self.resolve_edges_to_text(list(set(triplets))) - - if response_model is str: - completion = await generate_completion( - query=query, - context=context_text, - user_prompt_path=self.user_prompt_path, - system_prompt_path=self.system_prompt_path, - system_prompt=self.system_prompt, - ) - else: - args = {"question": query, "context": context_text} - user_prompt = render_prompt(self.user_prompt_path, args) - system_prompt = ( - self.system_prompt - if self.system_prompt - else read_query_prompt(self.system_prompt_path) - ) - - completion = await LLMGateway.acreate_structured_output( - text_input=user_prompt, - system_prompt=system_prompt, - response_model=response_model, - ) - - logger.info(f"Chain-of-thought: round {round_idx} - answer: {completion}") - - if round_idx < max_iter: - answer_text = _as_answer_text(completion) - valid_args = {"query": query, "answer": answer_text, "context": context_text} - valid_user_prompt = render_prompt( - filename=self.validation_user_prompt_path, context=valid_args - ) - valid_system_prompt = read_query_prompt( - prompt_file_name=self.validation_system_prompt_path - ) - - reasoning = await LLMGateway.acreate_structured_output( - text_input=valid_user_prompt, - system_prompt=valid_system_prompt, - response_model=str, - ) - followup_args = {"query": query, "answer": answer_text, "reasoning": reasoning} - followup_prompt = render_prompt( - filename=self.followup_user_prompt_path, context=followup_args - ) - followup_system = read_query_prompt( - prompt_file_name=self.followup_system_prompt_path - ) - - followup_question = await LLMGateway.acreate_structured_output( - text_input=followup_prompt, system_prompt=followup_system, response_model=str - ) - logger.info( - f"Chain-of-thought: round {round_idx} - follow-up question: {followup_question}" - ) - - return completion, context_text, triplets - - async def get_structured_completion( - self, - query: str, - context: Optional[List[Edge]] = None, - max_iter: int = 4, - response_model: Type = str, - ) -> Any: - """ - Generate structured completion responses based on a user query and contextual information. - - This method applies the same chain-of-thought logic as get_completion but returns - structured output using the provided response model. - - Parameters: - ----------- - - query (str): The user's query to be processed and answered. - - context (Optional[List[Edge]]): Optional context that may assist in answering the query. - If not provided, it will be fetched based on the query. (default None) - - max_iter: The maximum number of iterations to refine the answer and generate - follow-up questions. (default 4) - - response_model (Type): The Pydantic model type for structured output. (default str) - - Returns: - -------- - - Any: The generated structured completion based on the response model. - """ - completion, context_text, triplets = await self._run_cot_completion( - query=query, - context=context, - max_iter=max_iter, - response_model=response_model, - ) - - if self.save_interaction and context and triplets and completion: - await self.save_qa( - question=query, answer=str(completion), context=context_text, triplets=triplets - ) - - return completion - async def get_completion( self, query: str, context: Optional[List[Edge]] = None, + session_id: Optional[str] = None, max_iter=4, ) -> List[str]: """ @@ -218,6 +82,8 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): - query (str): The user's query to be processed and answered. - context (Optional[Any]): Optional context that may assist in answering the query. If not provided, it will be fetched based on the query. (default None) + - session_id (Optional[str]): Optional session identifier for caching. If None, + defaults to 'default_session'. (default None) - max_iter: The maximum number of iterations to refine the answer and generate follow-up questions. (default 4) @@ -226,16 +92,82 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): - List[str]: A list containing the generated answer to the user's query. """ - completion, context_text, triplets = await self._run_cot_completion( - query=query, - context=context, - max_iter=max_iter, - response_model=str, - ) + followup_question = "" + triplets = [] + completion = "" + + # Retrieve conversation history if session saving is enabled + cache_config = CacheConfig() + user = session_user.get() + user_id = getattr(user, "id", None) + session_save = user_id and cache_config.caching + + conversation_history = "" + if session_save: + conversation_history = await get_conversation_history(session_id=session_id) + + for round_idx in range(max_iter + 1): + if round_idx == 0: + if context is None: + triplets = await self.get_context(query) + context_text = await self.resolve_edges_to_text(triplets) + else: + context_text = await self.resolve_edges_to_text(context) + else: + triplets += await self.get_context(followup_question) + context_text = await self.resolve_edges_to_text(list(set(triplets))) + + completion = await generate_completion( + query=query, + context=context_text, + user_prompt_path=self.user_prompt_path, + system_prompt_path=self.system_prompt_path, + system_prompt=self.system_prompt, + conversation_history=conversation_history if session_save else None, + ) + logger.info(f"Chain-of-thought: round {round_idx} - answer: {completion}") + if round_idx < max_iter: + valid_args = {"query": query, "answer": completion, "context": context_text} + valid_user_prompt = render_prompt( + filename=self.validation_user_prompt_path, context=valid_args + ) + valid_system_prompt = read_query_prompt( + prompt_file_name=self.validation_system_prompt_path + ) + + reasoning = await LLMGateway.acreate_structured_output( + text_input=valid_user_prompt, + system_prompt=valid_system_prompt, + response_model=str, + ) + followup_args = {"query": query, "answer": completion, "reasoning": reasoning} + followup_prompt = render_prompt( + filename=self.followup_user_prompt_path, context=followup_args + ) + followup_system = read_query_prompt( + prompt_file_name=self.followup_system_prompt_path + ) + + followup_question = await LLMGateway.acreate_structured_output( + text_input=followup_prompt, system_prompt=followup_system, response_model=str + ) + logger.info( + f"Chain-of-thought: round {round_idx} - follow-up question: {followup_question}" + ) if self.save_interaction and context and triplets and completion: await self.save_qa( question=query, answer=completion, context=context_text, triplets=triplets ) + # Save to session cache + if session_save: + context_summary = await summarize_text(context_text) + await save_conversation_history( + query=query, + context_summary=context_summary, + answer=completion, + session_id=session_id, + ) + return [completion] From 66a8242cec17cbba417dfb0dec06e3c907e3119d Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Thu, 23 Oct 2025 12:07:31 +0200 Subject: [PATCH 11/25] chore: restore the feedback enrichment cot retriever functionality --- .../graph_completion_cot_retriever.py | 250 +++++++++++++----- 1 file changed, 178 insertions(+), 72 deletions(-) diff --git a/cognee/modules/retrieval/graph_completion_cot_retriever.py b/cognee/modules/retrieval/graph_completion_cot_retriever.py index 3f6ca81be..d785a1494 100644 --- a/cognee/modules/retrieval/graph_completion_cot_retriever.py +++ b/cognee/modules/retrieval/graph_completion_cot_retriever.py @@ -1,5 +1,7 @@ import asyncio +import json from typing import Optional, List, Type, Any +from pydantic import BaseModel from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge from cognee.shared.logging_utils import get_logger @@ -17,6 +19,20 @@ from cognee.infrastructure.databases.cache.config import CacheConfig logger = get_logger() +def _as_answer_text(completion: Any) -> str: + """Convert completion to human-readable text for validation and follow-up prompts.""" + if isinstance(completion, str): + return completion + if isinstance(completion, BaseModel): + # Add notice that this is a structured response + json_str = completion.model_dump_json(indent=2) + return f"[Structured Response]\n{json_str}" + try: + return json.dumps(completion, indent=2) + except TypeError: + return str(completion) + + class GraphCompletionCotRetriever(GraphCompletionRetriever): """ Handles graph completion by generating responses based on a series of interactions with @@ -25,6 +41,7 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): questions based on reasoning. The public methods are: - get_completion + - get_structured_completion Instance variables include: - validation_system_prompt_path @@ -61,6 +78,160 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): self.followup_system_prompt_path = followup_system_prompt_path self.followup_user_prompt_path = followup_user_prompt_path + async def _run_cot_completion( + self, + query: str, + context: Optional[List[Edge]] = None, + session_id: Optional[str] = None, + max_iter: int = 4, + response_model: Type = str, + ) -> tuple[Any, str, List[Edge]]: + """ + Run chain-of-thought completion with optional structured output and session caching. + + Returns: + -------- + - completion_result: The generated completion (string or structured model) + - context_text: The resolved context text + - triplets: The list of triplets used + """ + followup_question = "" + triplets = [] + completion = "" + + # Retrieve conversation history if session saving is enabled + cache_config = CacheConfig() + user = session_user.get() + user_id = getattr(user, "id", None) + session_save = user_id and cache_config.caching + + conversation_history = "" + if session_save: + conversation_history = await get_conversation_history(session_id=session_id) + + for round_idx in range(max_iter + 1): + if round_idx == 0: + if context is None: + triplets = await self.get_context(query) + context_text = await self.resolve_edges_to_text(triplets) + else: + context_text = await self.resolve_edges_to_text(context) + else: + triplets += await self.get_context(followup_question) + context_text = await self.resolve_edges_to_text(list(set(triplets))) + + if response_model is str: + completion = await generate_completion( + query=query, + context=context_text, + user_prompt_path=self.user_prompt_path, + system_prompt_path=self.system_prompt_path, + system_prompt=self.system_prompt, + conversation_history=conversation_history if session_save else None, + ) + else: + args = {"question": query, "context": context_text} + user_prompt = render_prompt(self.user_prompt_path, args) + system_prompt = ( + self.system_prompt + if self.system_prompt + else read_query_prompt(self.system_prompt_path) + ) + + completion = await LLMGateway.acreate_structured_output( + text_input=user_prompt, + system_prompt=system_prompt, + response_model=response_model, + ) + + logger.info(f"Chain-of-thought: round {round_idx} - answer: {completion}") + + if round_idx < max_iter: + answer_text = _as_answer_text(completion) + valid_args = {"query": query, "answer": answer_text, "context": context_text} + valid_user_prompt = render_prompt( + filename=self.validation_user_prompt_path, context=valid_args + ) + valid_system_prompt = read_query_prompt( + prompt_file_name=self.validation_system_prompt_path + ) + + reasoning = await LLMGateway.acreate_structured_output( + text_input=valid_user_prompt, + system_prompt=valid_system_prompt, + response_model=str, + ) + followup_args = {"query": query, "answer": answer_text, "reasoning": reasoning} + followup_prompt = render_prompt( + filename=self.followup_user_prompt_path, context=followup_args + ) + followup_system = read_query_prompt( + prompt_file_name=self.followup_system_prompt_path + ) + + followup_question = await LLMGateway.acreate_structured_output( + text_input=followup_prompt, system_prompt=followup_system, response_model=str + ) + logger.info( + f"Chain-of-thought: round {round_idx} - follow-up question: {followup_question}" + ) + + # Save to session cache + if session_save: + context_summary = await summarize_text(context_text) + await save_conversation_history( + query=query, + context_summary=context_summary, + answer=str(completion), + session_id=session_id, + ) + + return completion, context_text, triplets + + async def get_structured_completion( + self, + query: str, + context: Optional[List[Edge]] = None, + session_id: Optional[str] = None, + max_iter: int = 4, + response_model: Type = str, + ) -> Any: + """ + Generate structured completion responses based on a user query and contextual information. + + This method applies the same chain-of-thought logic as get_completion but returns + structured output using the provided response model. + + Parameters: + ----------- + - query (str): The user's query to be processed and answered. + - context (Optional[List[Edge]]): Optional context that may assist in answering the query. + If not provided, it will be fetched based on the query. (default None) + - session_id (Optional[str]): Optional session identifier for caching. If None, + defaults to 'default_session'. (default None) + - max_iter: The maximum number of iterations to refine the answer and generate + follow-up questions. (default 4) + - response_model (Type): The Pydantic model type for structured output. (default str) + + Returns: + -------- + - Any: The generated structured completion based on the response model. + """ + completion, context_text, triplets = await self._run_cot_completion( + query=query, + context=context, + session_id=session_id, + max_iter=max_iter, + response_model=response_model, + ) + + if self.save_interaction and context and triplets and completion: + await self.save_qa( + question=query, answer=str(completion), context=context_text, triplets=triplets + ) + + return completion + async def get_completion( self, query: str, @@ -92,82 +263,17 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): - List[str]: A list containing the generated answer to the user's query. """ - followup_question = "" - triplets = [] - completion = "" - - # Retrieve conversation history if session saving is enabled - cache_config = CacheConfig() - user = session_user.get() - user_id = getattr(user, "id", None) - session_save = user_id and cache_config.caching - - conversation_history = "" - if session_save: - conversation_history = await get_conversation_history(session_id=session_id) - - for round_idx in range(max_iter + 1): - if round_idx == 0: - if context is None: - triplets = await self.get_context(query) - context_text = await self.resolve_edges_to_text(triplets) - else: - context_text = await self.resolve_edges_to_text(context) - else: - triplets += await self.get_context(followup_question) - context_text = await self.resolve_edges_to_text(list(set(triplets))) - - completion = await generate_completion( - query=query, - context=context_text, - user_prompt_path=self.user_prompt_path, - system_prompt_path=self.system_prompt_path, - system_prompt=self.system_prompt, - conversation_history=conversation_history if session_save else None, - ) - logger.info(f"Chain-of-thought: round {round_idx} - answer: {completion}") - if round_idx < max_iter: - valid_args = {"query": query, "answer": completion, "context": context_text} - valid_user_prompt = render_prompt( - filename=self.validation_user_prompt_path, context=valid_args - ) - valid_system_prompt = read_query_prompt( - prompt_file_name=self.validation_system_prompt_path - ) - - reasoning = await LLMGateway.acreate_structured_output( - text_input=valid_user_prompt, - system_prompt=valid_system_prompt, - response_model=str, - ) - followup_args = {"query": query, "answer": completion, "reasoning": reasoning} - followup_prompt = render_prompt( - filename=self.followup_user_prompt_path, context=followup_args - ) - followup_system = read_query_prompt( - prompt_file_name=self.followup_system_prompt_path - ) - - followup_question = await LLMGateway.acreate_structured_output( - text_input=followup_prompt, system_prompt=followup_system, response_model=str - ) - logger.info( - f"Chain-of-thought: round {round_idx} - follow-up question: {followup_question}" - ) + completion, context_text, triplets = await self._run_cot_completion( + query=query, + context=context, + session_id=session_id, + max_iter=max_iter, + response_model=str, + ) if self.save_interaction and context and triplets and completion: await self.save_qa( question=query, answer=completion, context=context_text, triplets=triplets ) - # Save to session cache - if session_save: - context_summary = await summarize_text(context_text) - await save_conversation_history( - query=query, - context_summary=context_summary, - answer=completion, - session_id=session_id, - ) - return [completion] From ecae650a2873cb94b78198ac10bf216ac523d179 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Thu, 23 Oct 2025 12:30:55 +0200 Subject: [PATCH 12/25] refactor: unify structured and str completion --- .../graph_completion_cot_retriever.py | 99 +++++++++---------- cognee/modules/retrieval/utils/completion.py | 29 +++++- 2 files changed, 71 insertions(+), 57 deletions(-) diff --git a/cognee/modules/retrieval/graph_completion_cot_retriever.py b/cognee/modules/retrieval/graph_completion_cot_retriever.py index d785a1494..299db6855 100644 --- a/cognee/modules/retrieval/graph_completion_cot_retriever.py +++ b/cognee/modules/retrieval/graph_completion_cot_retriever.py @@ -6,7 +6,10 @@ from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge from cognee.shared.logging_utils import get_logger from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever -from cognee.modules.retrieval.utils.completion import generate_completion, summarize_text +from cognee.modules.retrieval.utils.completion import ( + generate_structured_completion, + summarize_text, +) from cognee.modules.retrieval.utils.session_cache import ( save_conversation_history, get_conversation_history, @@ -82,12 +85,20 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): self, query: str, context: Optional[List[Edge]] = None, - session_id: Optional[str] = None, + conversation_history: str = "", max_iter: int = 4, response_model: Type = str, ) -> tuple[Any, str, List[Edge]]: """ - Run chain-of-thought completion with optional structured output and session caching. + Run chain-of-thought completion with optional structured output. + + Parameters: + ----------- + - query: User query + - context: Optional pre-fetched context edges + - conversation_history: Optional conversation history string + - max_iter: Maximum CoT iterations + - response_model: Type for structured output (str for plain text) Returns: -------- @@ -99,16 +110,6 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): triplets = [] completion = "" - # Retrieve conversation history if session saving is enabled - cache_config = CacheConfig() - user = session_user.get() - user_id = getattr(user, "id", None) - session_save = user_id and cache_config.caching - - conversation_history = "" - if session_save: - conversation_history = await get_conversation_history(session_id=session_id) - for round_idx in range(max_iter + 1): if round_idx == 0: if context is None: @@ -120,29 +121,15 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): triplets += await self.get_context(followup_question) context_text = await self.resolve_edges_to_text(list(set(triplets))) - if response_model is str: - completion = await generate_completion( - query=query, - context=context_text, - user_prompt_path=self.user_prompt_path, - system_prompt_path=self.system_prompt_path, - system_prompt=self.system_prompt, - conversation_history=conversation_history if session_save else None, - ) - else: - args = {"question": query, "context": context_text} - user_prompt = render_prompt(self.user_prompt_path, args) - system_prompt = ( - self.system_prompt - if self.system_prompt - else read_query_prompt(self.system_prompt_path) - ) - - completion = await LLMGateway.acreate_structured_output( - text_input=user_prompt, - system_prompt=system_prompt, - response_model=response_model, - ) + completion = await generate_structured_completion( + query=query, + context=context_text, + user_prompt_path=self.user_prompt_path, + system_prompt_path=self.system_prompt_path, + system_prompt=self.system_prompt, + conversation_history=conversation_history if conversation_history else None, + response_model=response_model, + ) logger.info(f"Chain-of-thought: round {round_idx} - answer: {completion}") @@ -176,16 +163,6 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): f"Chain-of-thought: round {round_idx} - follow-up question: {followup_question}" ) - # Save to session cache - if session_save: - context_summary = await summarize_text(context_text) - await save_conversation_history( - query=query, - context_summary=context_summary, - answer=str(completion), - session_id=session_id, - ) - return completion, context_text, triplets async def get_structured_completion( @@ -217,10 +194,21 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): -------- - Any: The generated structured completion based on the response model. """ + # Check if session saving is enabled + cache_config = CacheConfig() + user = session_user.get() + user_id = getattr(user, "id", None) + session_save = user_id and cache_config.caching + + # Load conversation history if enabled + conversation_history = "" + if session_save: + conversation_history = await get_conversation_history(session_id=session_id) + completion, context_text, triplets = await self._run_cot_completion( query=query, context=context, - session_id=session_id, + conversation_history=conversation_history, max_iter=max_iter, response_model=response_model, ) @@ -230,6 +218,16 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): question=query, answer=str(completion), context=context_text, triplets=triplets ) + # Save to session cache if enabled + if session_save: + context_summary = await summarize_text(context_text) + await save_conversation_history( + query=query, + context_summary=context_summary, + answer=str(completion), + session_id=session_id, + ) + return completion async def get_completion( @@ -263,7 +261,7 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): - List[str]: A list containing the generated answer to the user's query. """ - completion, context_text, triplets = await self._run_cot_completion( + completion = await self.get_structured_completion( query=query, context=context, session_id=session_id, @@ -271,9 +269,4 @@ class GraphCompletionCotRetriever(GraphCompletionRetriever): response_model=str, ) - if self.save_interaction and context and triplets and completion: - await self.save_qa( - question=query, answer=completion, context=context_text, triplets=triplets - ) - return [completion] diff --git a/cognee/modules/retrieval/utils/completion.py b/cognee/modules/retrieval/utils/completion.py index 6b6b6190e..db7a10252 100644 --- a/cognee/modules/retrieval/utils/completion.py +++ b/cognee/modules/retrieval/utils/completion.py @@ -1,17 +1,18 @@ -from typing import Optional +from typing import Optional, Type, Any from cognee.infrastructure.llm.LLMGateway import LLMGateway from cognee.infrastructure.llm.prompts import render_prompt, read_query_prompt -async def generate_completion( +async def generate_structured_completion( query: str, context: str, user_prompt_path: str, system_prompt_path: str, system_prompt: Optional[str] = None, conversation_history: Optional[str] = None, -) -> str: - """Generates a completion using LLM with given context and prompts.""" + response_model: Type = str, +) -> Any: + """Generates a structured completion using LLM with given context and prompts.""" args = {"question": query, "context": context} user_prompt = render_prompt(user_prompt_path, args) system_prompt = system_prompt if system_prompt else read_query_prompt(system_prompt_path) @@ -23,6 +24,26 @@ async def generate_completion( return await LLMGateway.acreate_structured_output( text_input=user_prompt, system_prompt=system_prompt, + response_model=response_model, + ) + + +async def generate_completion( + query: str, + context: str, + user_prompt_path: str, + system_prompt_path: str, + system_prompt: Optional[str] = None, + conversation_history: Optional[str] = None, +) -> str: + """Generates a completion using LLM with given context and prompts.""" + return await generate_structured_completion( + query=query, + context=context, + user_prompt_path=user_prompt_path, + system_prompt_path=system_prompt_path, + system_prompt=system_prompt, + conversation_history=conversation_history, response_model=str, ) From aba5f9ba55a829f54da2e806ecd7f4a1bebc8837 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Thu, 23 Oct 2025 17:09:15 +0200 Subject: [PATCH 13/25] test: add e2e feedback enrichment test --- .github/workflows/e2e_tests.yml | 28 ++++ cognee/tests/test_feedback_enrichment.py | 162 +++++++++++++++++++++++ 2 files changed, 190 insertions(+) create mode 100644 cognee/tests/test_feedback_enrichment.py diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index 775eb2912..70a4b56e6 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -358,6 +358,34 @@ jobs: EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} run: uv run python ./cognee/tests/tasks/entity_extraction/entity_extraction_test.py + test-feedback-enrichment: + name: Test Feedback Enrichment + runs-on: ubuntu-22.04 + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Cognee Setup + uses: ./.github/actions/cognee_setup + with: + python-version: '3.11.x' + + - name: Dependencies already installed + run: echo "Dependencies already installed in setup" + + - name: Run Feedback Enrichment Test + env: + ENV: 'dev' + LLM_MODEL: ${{ secrets.LLM_MODEL }} + LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} + EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} + EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} + EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} + EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} + run: uv run python ./cognee/tests/test_feedback_enrichment.py + run_conversation_sessions_test: name: Conversation sessions test runs-on: ubuntu-latest diff --git a/cognee/tests/test_feedback_enrichment.py b/cognee/tests/test_feedback_enrichment.py new file mode 100644 index 000000000..37bda95d5 --- /dev/null +++ b/cognee/tests/test_feedback_enrichment.py @@ -0,0 +1,162 @@ +""" +End-to-end integration test for feedback enrichment feature. + +Tests the complete feedback enrichment pipeline: +1. Add data and cognify +2. Run search with save_interaction=True to create CogneeUserInteraction nodes +3. Submit feedback to create CogneeUserFeedback nodes +4. Run memify with feedback enrichment tasks to create FeedbackEnrichment nodes +5. Verify all nodes and edges are properly created and linked in the graph +""" + +import os +import pathlib +from collections import Counter + +import cognee +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.modules.pipelines.tasks.task import Task +from cognee.modules.search.types import SearchType +from cognee.shared.data_models import KnowledgeGraph +from cognee.shared.logging_utils import get_logger +from cognee.tasks.feedback.create_enrichments import create_enrichments +from cognee.tasks.feedback.extract_feedback_interactions import ( + extract_feedback_interactions, +) +from cognee.tasks.feedback.generate_improved_answers import generate_improved_answers +from cognee.tasks.feedback.link_enrichments_to_feedback import ( + link_enrichments_to_feedback, +) +from cognee.tasks.graph import extract_graph_from_data +from cognee.tasks.storage import add_data_points + +logger = get_logger() + + +async def main(): + data_directory_path = str( + pathlib.Path( + os.path.join( + pathlib.Path(__file__).parent, + ".data_storage/test_feedback_enrichment", + ) + ).resolve() + ) + cognee_directory_path = str( + pathlib.Path( + os.path.join( + pathlib.Path(__file__).parent, + ".cognee_system/test_feedback_enrichment", + ) + ).resolve() + ) + + cognee.config.data_root_directory(data_directory_path) + cognee.config.system_root_directory(cognee_directory_path) + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + dataset_name = "feedback_enrichment_test" + + await cognee.add("Cognee turns documents into AI memory.", dataset_name) + await cognee.cognify([dataset_name]) + + question_text = "Say something." + result = await cognee.search( + query_type=SearchType.GRAPH_COMPLETION, + query_text=question_text, + save_interaction=True, + ) + + assert len(result) > 0, "Search should return non-empty results" + + feedback_text = "Negative feedback." + await cognee.search( + query_type=SearchType.FEEDBACK, + query_text=feedback_text, + last_k=2, + ) + + graph_engine = await get_graph_engine() + nodes_before, edges_before = await graph_engine.get_graph_data() + + interaction_nodes_before = [ + (node_id, props) + for node_id, props in nodes_before + if props.get("type") == "CogneeUserInteraction" + ] + feedback_nodes_before = [ + (node_id, props) + for node_id, props in nodes_before + if props.get("type") == "CogneeUserFeedback" + ] + + edge_types_before = Counter(edge[2] for edge in edges_before) + + assert len(interaction_nodes_before) >= 1, ( + f"Expected at least 1 CogneeUserInteraction node, found {len(interaction_nodes_before)}" + ) + assert len(feedback_nodes_before) >= 1, ( + f"Expected at least 1 CogneeUserFeedback node, found {len(feedback_nodes_before)}" + ) + assert edge_types_before.get("gives_feedback_to", 0) >= 1, ( + f"Expected at least 1 'gives_feedback_to' edge, found {edge_types_before.get('gives_feedback_to', 0)}" + ) + + extraction_tasks = [Task(extract_feedback_interactions, last_n=5)] + enrichment_tasks = [ + Task(generate_improved_answers, top_k=20), + Task(create_enrichments), + Task( + extract_graph_from_data, + graph_model=KnowledgeGraph, + task_config={"batch_size": 10}, + ), + Task(add_data_points, task_config={"batch_size": 10}), + Task(link_enrichments_to_feedback), + ] + + await cognee.memify( + extraction_tasks=extraction_tasks, + enrichment_tasks=enrichment_tasks, + data=[{}], + dataset="feedback_enrichment_test_memify", + ) + + nodes_after, edges_after = await graph_engine.get_graph_data() + + enrichment_nodes = [ + (node_id, props) + for node_id, props in nodes_after + if props.get("type") == "FeedbackEnrichment" + ] + + assert len(enrichment_nodes) >= 1, ( + f"Expected at least 1 FeedbackEnrichment node, found {len(enrichment_nodes)}" + ) + + for node_id, props in enrichment_nodes: + assert "text" in props, f"FeedbackEnrichment node {node_id} missing 'text' property" + + enrichment_node_ids = {node_id for node_id, _ in enrichment_nodes} + edges_with_enrichments = [ + edge + for edge in edges_after + if edge[0] in enrichment_node_ids or edge[1] in enrichment_node_ids + ] + + assert len(edges_with_enrichments) >= 1, ( + f"Expected enrichment nodes to have at least 1 edge, found {len(edges_with_enrichments)}" + ) + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + logger.info("All feedback enrichment tests passed successfully") + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) From 2d6188523ab0cfa31aae99f078f075735af4921a Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Thu, 23 Oct 2025 17:11:01 +0200 Subject: [PATCH 14/25] chore: minor improvements --- cognee/tasks/feedback/extract_feedback_interactions.py | 4 +--- examples/python/feedback_enrichment_minimal_example.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/cognee/tasks/feedback/extract_feedback_interactions.py b/cognee/tasks/feedback/extract_feedback_interactions.py index c80173188..521d35472 100644 --- a/cognee/tasks/feedback/extract_feedback_interactions.py +++ b/cognee/tasks/feedback/extract_feedback_interactions.py @@ -177,9 +177,7 @@ async def _build_feedback_interaction_records( return feedback_interaction_records -async def extract_feedback_interactions( - subgraphs: List, last_n: Optional[int] = None -) -> List[FeedbackEnrichment]: +async def extract_feedback_interactions(last_n: Optional[int] = None) -> List[FeedbackEnrichment]: """Extract negative feedback-interaction pairs and create FeedbackEnrichment DataPoints.""" graph_nodes, graph_edges = await _fetch_feedback_and_interaction_graph_data() if not graph_nodes: diff --git a/examples/python/feedback_enrichment_minimal_example.py b/examples/python/feedback_enrichment_minimal_example.py index 3af838214..11ef20830 100644 --- a/examples/python/feedback_enrichment_minimal_example.py +++ b/examples/python/feedback_enrichment_minimal_example.py @@ -47,7 +47,7 @@ async def run_question_and_submit_feedback(question_text: str) -> bool: await cognee.search( query_type=SearchType.FEEDBACK, query_text=feedback_text, - last_k=2, + last_k=1, ) return mentions_mallory From b09e4b7cc430221abd499be49179da28609e8f42 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Thu, 23 Oct 2025 17:48:21 +0200 Subject: [PATCH 15/25] chore: adhere to memify input convention --- .../tasks/feedback/extract_feedback_interactions.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/cognee/tasks/feedback/extract_feedback_interactions.py b/cognee/tasks/feedback/extract_feedback_interactions.py index 521d35472..03dbbf784 100644 --- a/cognee/tasks/feedback/extract_feedback_interactions.py +++ b/cognee/tasks/feedback/extract_feedback_interactions.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from uuid import UUID, uuid5, NAMESPACE_OID from cognee.infrastructure.llm import LLMGateway @@ -177,8 +177,16 @@ async def _build_feedback_interaction_records( return feedback_interaction_records -async def extract_feedback_interactions(last_n: Optional[int] = None) -> List[FeedbackEnrichment]: +async def extract_feedback_interactions( + data: Any, last_n: Optional[int] = None +) -> List[FeedbackEnrichment]: """Extract negative feedback-interaction pairs and create FeedbackEnrichment DataPoints.""" + if not data or data == [{}]: + logger.info( + "No data passed to the extraction task (extraction task fetches data from graph directly)", + data=data, + ) + graph_nodes, graph_edges = await _fetch_feedback_and_interaction_graph_data() if not graph_nodes: return [] From f49b17133721ffb50233e8ed74b4394256e3d2c4 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Thu, 23 Oct 2025 18:18:38 +0200 Subject: [PATCH 16/25] fix: emphasize negative feedback language --- cognee/tests/test_feedback_enrichment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognee/tests/test_feedback_enrichment.py b/cognee/tests/test_feedback_enrichment.py index 37bda95d5..9febca109 100644 --- a/cognee/tests/test_feedback_enrichment.py +++ b/cognee/tests/test_feedback_enrichment.py @@ -71,7 +71,7 @@ async def main(): assert len(result) > 0, "Search should return non-empty results" - feedback_text = "Negative feedback." + feedback_text = "This answer was completely useless, my feedback is definitely negative." await cognee.search( query_type=SearchType.FEEDBACK, query_text=feedback_text, From 23e66a63758102c212412d8ee04d464459a9b09e Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Thu, 23 Oct 2025 18:30:17 +0200 Subject: [PATCH 17/25] chore: expand logging --- cognee/tests/test_feedback_enrichment.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/cognee/tests/test_feedback_enrichment.py b/cognee/tests/test_feedback_enrichment.py index 9febca109..02d90db32 100644 --- a/cognee/tests/test_feedback_enrichment.py +++ b/cognee/tests/test_feedback_enrichment.py @@ -75,7 +75,7 @@ async def main(): await cognee.search( query_type=SearchType.FEEDBACK, query_text=feedback_text, - last_k=2, + last_k=1, ) graph_engine = await get_graph_engine() @@ -100,6 +100,18 @@ async def main(): assert len(feedback_nodes_before) >= 1, ( f"Expected at least 1 CogneeUserFeedback node, found {len(feedback_nodes_before)}" ) + + for node_id, props in feedback_nodes_before: + sentiment = props.get("sentiment", "") + score = props.get("score", 0) + feedback_text = props.get("feedback", "") + logger.info( + "Feedback node created", + feedback=feedback_text, + sentiment=sentiment, + score=score, + ) + assert edge_types_before.get("gives_feedback_to", 0) >= 1, ( f"Expected at least 1 'gives_feedback_to' edge, found {edge_types_before.get('gives_feedback_to', 0)}" ) From 7a08e13a208a0018652b8228bd2ebdcbc540e71d Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Thu, 23 Oct 2025 18:36:51 +0200 Subject: [PATCH 18/25] chore: further expand logging --- .../feedback/extract_feedback_interactions.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cognee/tasks/feedback/extract_feedback_interactions.py b/cognee/tasks/feedback/extract_feedback_interactions.py index 03dbbf784..851983dc0 100644 --- a/cognee/tasks/feedback/extract_feedback_interactions.py +++ b/cognee/tasks/feedback/extract_feedback_interactions.py @@ -189,10 +189,24 @@ async def extract_feedback_interactions( graph_nodes, graph_edges = await _fetch_feedback_and_interaction_graph_data() if not graph_nodes: + logger.warning("No graph nodes retrieved from database") return [] feedback_nodes, interaction_nodes = _separate_feedback_and_interaction_nodes(graph_nodes) + logger.info( + "Retrieved nodes from graph", + total_nodes=len(graph_nodes), + feedback_nodes=len(feedback_nodes), + interaction_nodes=len(interaction_nodes), + ) + negative_feedback_nodes = _filter_negative_feedback(feedback_nodes) + logger.info( + "Filtered feedback nodes", + total_feedback=len(feedback_nodes), + negative_feedback=len(negative_feedback_nodes), + ) + if not negative_feedback_nodes: logger.info("No negative feedback found; returning empty list") return [] From 6dea23b743ff745ab34e01450355c6c11894e077 Mon Sep 17 00:00:00 2001 From: lxobr <122801072+lxobr@users.noreply.github.com> Date: Thu, 23 Oct 2025 18:47:49 +0200 Subject: [PATCH 19/25] fix: update kuzu get_filtered_graph_data --- .../infrastructure/databases/graph/kuzu/adapter.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/cognee/infrastructure/databases/graph/kuzu/adapter.py b/cognee/infrastructure/databases/graph/kuzu/adapter.py index 2d3866888..8dd160665 100644 --- a/cognee/infrastructure/databases/graph/kuzu/adapter.py +++ b/cognee/infrastructure/databases/graph/kuzu/adapter.py @@ -1366,9 +1366,15 @@ class KuzuAdapter(GraphDBInterface): params[param_name] = values where_clause = " AND ".join(where_clauses) - nodes_query = ( - f"MATCH (n:Node) WHERE {where_clause} RETURN n.id, {{properties: n.properties}}" - ) + nodes_query = f""" + MATCH (n:Node) + WHERE {where_clause} + RETURN n.id, {{ + name: n.name, + type: n.type, + properties: n.properties + }} + """ edges_query = f""" MATCH (n1:Node)-[r:EDGE]->(n2:Node) WHERE {where_clause.replace("n.", "n1.")} AND {where_clause.replace("n.", "n2.")} From 828a1096de1de9f0a9e98181367540a4b39b1f5f Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 24 Oct 2025 12:52:21 +0200 Subject: [PATCH 20/25] feat: adds config as additional properties to Pipeline_Run_Finished event --- .../modules/pipelines/operations/run_tasks_with_telemetry.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cognee/modules/pipelines/operations/run_tasks_with_telemetry.py b/cognee/modules/pipelines/operations/run_tasks_with_telemetry.py index a2af18be6..bcee4cec6 100644 --- a/cognee/modules/pipelines/operations/run_tasks_with_telemetry.py +++ b/cognee/modules/pipelines/operations/run_tasks_with_telemetry.py @@ -39,7 +39,8 @@ async def run_tasks_with_telemetry( user.id, additional_properties={ "pipeline_name": str(pipeline_name), - }, + } + | config, ) except Exception as error: logger.error( From d9f3be4d629c15c6a82c9df14a3e8c01e5100e7d Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 24 Oct 2025 12:53:11 +0200 Subject: [PATCH 21/25] feat: Adds nested property sanitizer function to telemetry --- cognee/shared/utils.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/cognee/shared/utils.py b/cognee/shared/utils.py index 90fbb9cd4..5a3b039a9 100644 --- a/cognee/shared/utils.py +++ b/cognee/shared/utils.py @@ -8,7 +8,7 @@ import http.server import socketserver from threading import Thread import pathlib -from uuid import uuid4 +from uuid import uuid4, uuid5, NAMESPACE_OID from cognee.base_config import get_base_config from cognee.infrastructure.databases.graph import get_graph_engine @@ -50,6 +50,25 @@ def get_anonymous_id(): anonymous_id = f.read() return anonymous_id +def _sanitize_nested_properties(obj, property_names: list[str]): + """ + Recursively replaces any property whose key matches one of `property_names` + (e.g., ['url', 'path']) in a nested dict or list with a uuid5 hash + of its string value. Returns a new sanitized copy. + """ + if isinstance(obj, dict): + new_obj = {} + for k, v in obj.items(): + if k in property_names and isinstance(v, str): + new_obj[k] = str(uuid5(NAMESPACE_OID, v)) + else: + new_obj[k] = _sanitize_nested_properties(v, property_names) + return new_obj + elif isinstance(obj, list): + return [_sanitize_nested_properties(item, property_names) for item in obj] + else: + return obj + def send_telemetry(event_name: str, user_id, additional_properties: dict = {}): if os.getenv("TELEMETRY_DISABLED"): @@ -58,7 +77,7 @@ def send_telemetry(event_name: str, user_id, additional_properties: dict = {}): env = os.getenv("ENV") if env in ["test", "dev"]: return - + additional_properties = _sanitize_nested_properties(obj=additional_properties, property_names=['url']) current_time = datetime.now(timezone.utc) payload = { "anonymous_id": str(get_anonymous_id()), From 5ce79fb361934078b2277c3f086e016d295f96cc Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 24 Oct 2025 15:12:35 +0200 Subject: [PATCH 22/25] feat: adds cognee version to task and pipeline based telemetry info --- cognee/modules/pipelines/operations/run_tasks_base.py | 4 ++++ .../modules/pipelines/operations/run_tasks_with_telemetry.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/cognee/modules/pipelines/operations/run_tasks_base.py b/cognee/modules/pipelines/operations/run_tasks_base.py index e5f577848..ee2ccfd8c 100644 --- a/cognee/modules/pipelines/operations/run_tasks_base.py +++ b/cognee/modules/pipelines/operations/run_tasks_base.py @@ -2,6 +2,7 @@ import inspect from cognee.shared.logging_utils import get_logger from cognee.modules.users.models import User from cognee.shared.utils import send_telemetry +from cognee import __version__ as cognee_version from ..tasks.task import Task @@ -25,6 +26,7 @@ async def handle_task( user_id=user.id, additional_properties={ "task_name": running_task.executable.__name__, + "cognee_version": cognee_version, }, ) @@ -46,6 +48,7 @@ async def handle_task( user_id=user.id, additional_properties={ "task_name": running_task.executable.__name__, + "cognee_version": cognee_version, }, ) except Exception as error: @@ -58,6 +61,7 @@ async def handle_task( user_id=user.id, additional_properties={ "task_name": running_task.executable.__name__, + "cognee_version": cognee_version, }, ) raise error diff --git a/cognee/modules/pipelines/operations/run_tasks_with_telemetry.py b/cognee/modules/pipelines/operations/run_tasks_with_telemetry.py index bcee4cec6..9a52bf854 100644 --- a/cognee/modules/pipelines/operations/run_tasks_with_telemetry.py +++ b/cognee/modules/pipelines/operations/run_tasks_with_telemetry.py @@ -4,6 +4,7 @@ from cognee.modules.settings import get_current_settings from cognee.modules.users.models import User from cognee.shared.logging_utils import get_logger from cognee.shared.utils import send_telemetry +from cognee import __version__ as cognee_version from .run_tasks_base import run_tasks_base from ..tasks.task import Task @@ -26,6 +27,7 @@ async def run_tasks_with_telemetry( user.id, additional_properties={ "pipeline_name": str(pipeline_name), + "cognee_version": cognee_version, } | config, ) @@ -39,6 +41,7 @@ async def run_tasks_with_telemetry( user.id, additional_properties={ "pipeline_name": str(pipeline_name), + "cognee_version": cognee_version, } | config, ) @@ -54,6 +57,7 @@ async def run_tasks_with_telemetry( user.id, additional_properties={ "pipeline_name": str(pipeline_name), + "cognee_version": cognee_version, } | config, ) From 2ecace0ce20f8d16e32409bc170c0802e2c8cc42 Mon Sep 17 00:00:00 2001 From: Daulet Amirkhanov Date: Fri, 24 Oct 2025 14:12:59 +0100 Subject: [PATCH 23/25] fix: update `cognee-cli -ui` MCP docker image --- cognee/api/v1/ui/ui.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cognee/api/v1/ui/ui.py b/cognee/api/v1/ui/ui.py index 51088c3e1..344acf87b 100644 --- a/cognee/api/v1/ui/ui.py +++ b/cognee/api/v1/ui/ui.py @@ -503,7 +503,7 @@ def start_ui( if start_mcp: logger.info("Starting Cognee MCP server with Docker...") try: - image = "cognee/cognee-mcp:feature-standalone-mcp" # TODO: change to "cognee/cognee-mcp:main" right before merging into main + image = "cognee/cognee-mcp:main" subprocess.run(["docker", "pull", image], check=True) import uuid @@ -538,9 +538,7 @@ def start_ui( env_file = os.path.join(cwd, ".env") docker_cmd.extend(["--env-file", env_file]) - docker_cmd.append( - image - ) # TODO: change to "cognee/cognee-mcp:main" right before merging into main + docker_cmd.append(image) mcp_process = subprocess.Popen( docker_cmd, From cd33740d3d6677f36ebef22bbff7d7802d35d627 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 24 Oct 2025 15:32:36 +0200 Subject: [PATCH 24/25] feat: adds cognee version to logging --- cognee/api/v1/add/routers/get_add_router.py | 4 +++- cognee/api/v1/cognify/routers/get_cognify_router.py | 3 ++- cognee/api/v1/datasets/routers/get_datasets_router.py | 8 ++++++++ cognee/api/v1/delete/routers/get_delete_router.py | 2 ++ cognee/api/v1/memify/routers/get_memify_router.py | 4 ++-- .../api/v1/permissions/routers/get_permissions_router.py | 7 ++++++- cognee/api/v1/search/routers/get_search_router.py | 4 +++- cognee/api/v1/sync/routers/get_sync_router.py | 3 +++ cognee/api/v1/update/routers/get_update_router.py | 2 ++ cognee/api/v1/users/routers/get_visualize_router.py | 2 ++ cognee/modules/search/methods/search.py | 6 +++--- 11 files changed, 36 insertions(+), 9 deletions(-) diff --git a/cognee/api/v1/add/routers/get_add_router.py b/cognee/api/v1/add/routers/get_add_router.py index 4d0063cc9..62cb96a54 100644 --- a/cognee/api/v1/add/routers/get_add_router.py +++ b/cognee/api/v1/add/routers/get_add_router.py @@ -10,6 +10,7 @@ from cognee.modules.users.methods import get_authenticated_user from cognee.shared.utils import send_telemetry from cognee.modules.pipelines.models import PipelineRunErrored from cognee.shared.logging_utils import get_logger +from cognee import __version__ as cognee_version logger = get_logger() @@ -63,7 +64,8 @@ def get_add_router() -> APIRouter: send_telemetry( "Add API Endpoint Invoked", user.id, - additional_properties={"endpoint": "POST /v1/add", "node_set": node_set}, + additional_properties={"endpoint": "POST /v1/add", "node_set": node_set, "cognee_version": cognee_version}, + ) from cognee.api.v1.add import add as cognee_add diff --git a/cognee/api/v1/cognify/routers/get_cognify_router.py b/cognee/api/v1/cognify/routers/get_cognify_router.py index 9e4bdbbfd..231acce16 100644 --- a/cognee/api/v1/cognify/routers/get_cognify_router.py +++ b/cognee/api/v1/cognify/routers/get_cognify_router.py @@ -29,7 +29,7 @@ from cognee.modules.pipelines.queues.pipeline_run_info_queues import ( ) from cognee.shared.logging_utils import get_logger from cognee.shared.utils import send_telemetry - +from cognee import __version__ as cognee_version logger = get_logger("api.cognify") @@ -98,6 +98,7 @@ def get_cognify_router() -> APIRouter: user.id, additional_properties={ "endpoint": "POST /v1/cognify", + "cognee_version": cognee_version }, ) diff --git a/cognee/api/v1/datasets/routers/get_datasets_router.py b/cognee/api/v1/datasets/routers/get_datasets_router.py index be8b5af8d..3a5d7d3b2 100644 --- a/cognee/api/v1/datasets/routers/get_datasets_router.py +++ b/cognee/api/v1/datasets/routers/get_datasets_router.py @@ -24,6 +24,7 @@ from cognee.modules.users.permissions.methods import ( from cognee.modules.graph.methods import get_formatted_graph_data from cognee.modules.pipelines.models import PipelineRunStatus from cognee.shared.utils import send_telemetry +from cognee import __version__ as cognee_version logger = get_logger() @@ -100,6 +101,7 @@ def get_datasets_router() -> APIRouter: user.id, additional_properties={ "endpoint": "GET /v1/datasets", + "cognee_version": cognee_version }, ) @@ -147,6 +149,7 @@ def get_datasets_router() -> APIRouter: user.id, additional_properties={ "endpoint": "POST /v1/datasets", + "cognee_version": cognee_version }, ) @@ -201,6 +204,7 @@ def get_datasets_router() -> APIRouter: additional_properties={ "endpoint": f"DELETE /v1/datasets/{str(dataset_id)}", "dataset_id": str(dataset_id), + "cognee_version": cognee_version }, ) @@ -246,6 +250,7 @@ def get_datasets_router() -> APIRouter: "endpoint": f"DELETE /v1/datasets/{str(dataset_id)}/data/{str(data_id)}", "dataset_id": str(dataset_id), "data_id": str(data_id), + "cognee_version": cognee_version }, ) @@ -327,6 +332,7 @@ def get_datasets_router() -> APIRouter: additional_properties={ "endpoint": f"GET /v1/datasets/{str(dataset_id)}/data", "dataset_id": str(dataset_id), + "cognee_version": cognee_version }, ) @@ -387,6 +393,7 @@ def get_datasets_router() -> APIRouter: additional_properties={ "endpoint": "GET /v1/datasets/status", "datasets": [str(dataset_id) for dataset_id in datasets], + "cognee_version": cognee_version }, ) @@ -433,6 +440,7 @@ def get_datasets_router() -> APIRouter: "endpoint": f"GET /v1/datasets/{str(dataset_id)}/data/{str(data_id)}/raw", "dataset_id": str(dataset_id), "data_id": str(data_id), + "cognee_version": cognee_version }, ) diff --git a/cognee/api/v1/delete/routers/get_delete_router.py b/cognee/api/v1/delete/routers/get_delete_router.py index 9e6aa5799..977b042c9 100644 --- a/cognee/api/v1/delete/routers/get_delete_router.py +++ b/cognee/api/v1/delete/routers/get_delete_router.py @@ -6,6 +6,7 @@ from cognee.shared.logging_utils import get_logger from cognee.modules.users.models import User from cognee.modules.users.methods import get_authenticated_user from cognee.shared.utils import send_telemetry +from cognee import __version__ as cognee_version logger = get_logger() @@ -39,6 +40,7 @@ def get_delete_router() -> APIRouter: "endpoint": "DELETE /v1/delete", "dataset_id": str(dataset_id), "data_id": str(data_id), + "cognee_version": cognee_version }, ) diff --git a/cognee/api/v1/memify/routers/get_memify_router.py b/cognee/api/v1/memify/routers/get_memify_router.py index 1976d7414..706e3fae5 100644 --- a/cognee/api/v1/memify/routers/get_memify_router.py +++ b/cognee/api/v1/memify/routers/get_memify_router.py @@ -12,7 +12,7 @@ from cognee.modules.users.methods import get_authenticated_user from cognee.shared.utils import send_telemetry from cognee.modules.pipelines.models import PipelineRunErrored from cognee.shared.logging_utils import get_logger - +from cognee import __version__ as cognee_version logger = get_logger() @@ -73,7 +73,7 @@ def get_memify_router() -> APIRouter: send_telemetry( "Memify API Endpoint Invoked", user.id, - additional_properties={"endpoint": "POST /v1/memify"}, + additional_properties={"endpoint": "POST /v1/memify", "cognee_version": cognee_version}, ) if not payload.dataset_id and not payload.dataset_name: diff --git a/cognee/api/v1/permissions/routers/get_permissions_router.py b/cognee/api/v1/permissions/routers/get_permissions_router.py index 637293268..ee3e9d1a0 100644 --- a/cognee/api/v1/permissions/routers/get_permissions_router.py +++ b/cognee/api/v1/permissions/routers/get_permissions_router.py @@ -7,7 +7,7 @@ from fastapi.responses import JSONResponse from cognee.modules.users.models import User from cognee.modules.users.methods import get_authenticated_user from cognee.shared.utils import send_telemetry - +from cognee import __version__ as cognee_version def get_permissions_router() -> APIRouter: permissions_router = APIRouter() @@ -48,6 +48,7 @@ def get_permissions_router() -> APIRouter: "endpoint": f"POST /v1/permissions/datasets/{str(principal_id)}", "dataset_ids": str(dataset_ids), "principal_id": str(principal_id), + "cognee_version": cognee_version }, ) @@ -89,6 +90,7 @@ def get_permissions_router() -> APIRouter: additional_properties={ "endpoint": "POST /v1/permissions/roles", "role_name": role_name, + "cognee_version": cognee_version }, ) @@ -133,6 +135,7 @@ def get_permissions_router() -> APIRouter: "endpoint": f"POST /v1/permissions/users/{str(user_id)}/roles", "user_id": str(user_id), "role_id": str(role_id), + "cognee_version": cognee_version }, ) @@ -175,6 +178,7 @@ def get_permissions_router() -> APIRouter: "endpoint": f"POST /v1/permissions/users/{str(user_id)}/tenants", "user_id": str(user_id), "tenant_id": str(tenant_id), + "cognee_version": cognee_version }, ) @@ -209,6 +213,7 @@ def get_permissions_router() -> APIRouter: additional_properties={ "endpoint": "POST /v1/permissions/tenants", "tenant_name": tenant_name, + "cognee_version": cognee_version }, ) diff --git a/cognee/api/v1/search/routers/get_search_router.py b/cognee/api/v1/search/routers/get_search_router.py index 36d1c567e..38c037121 100644 --- a/cognee/api/v1/search/routers/get_search_router.py +++ b/cognee/api/v1/search/routers/get_search_router.py @@ -13,7 +13,7 @@ from cognee.modules.users.models import User from cognee.modules.search.operations import get_history from cognee.modules.users.methods import get_authenticated_user from cognee.shared.utils import send_telemetry - +from cognee import __version__ as cognee_version # Note: Datasets sent by name will only map to datasets owned by the request sender # To search for datasets not owned by the request sender dataset UUID is needed @@ -63,6 +63,7 @@ def get_search_router() -> APIRouter: user.id, additional_properties={ "endpoint": "GET /v1/search", + "cognee_version": cognee_version }, ) @@ -118,6 +119,7 @@ def get_search_router() -> APIRouter: "top_k": payload.top_k, "only_context": payload.only_context, "use_combined_context": payload.use_combined_context, + "cognee_version": cognee_version }, ) diff --git a/cognee/api/v1/sync/routers/get_sync_router.py b/cognee/api/v1/sync/routers/get_sync_router.py index d74ae4e7d..dbb2a5cd1 100644 --- a/cognee/api/v1/sync/routers/get_sync_router.py +++ b/cognee/api/v1/sync/routers/get_sync_router.py @@ -12,6 +12,7 @@ from cognee.modules.sync.methods import get_running_sync_operations_for_user, ge from cognee.shared.utils import send_telemetry from cognee.shared.logging_utils import get_logger from cognee.api.v1.sync import SyncResponse +from cognee import __version__ as cognee_version from cognee.context_global_variables import set_database_global_context_variables logger = get_logger() @@ -99,6 +100,7 @@ def get_sync_router() -> APIRouter: user.id, additional_properties={ "endpoint": "POST /v1/sync", + "cognee_version": cognee_version, "dataset_ids": [str(id) for id in request.dataset_ids] if request.dataset_ids else "*", @@ -205,6 +207,7 @@ def get_sync_router() -> APIRouter: user.id, additional_properties={ "endpoint": "GET /v1/sync/status", + "cognee_version": cognee_version }, ) diff --git a/cognee/api/v1/update/routers/get_update_router.py b/cognee/api/v1/update/routers/get_update_router.py index 4101e1e31..1befdd16b 100644 --- a/cognee/api/v1/update/routers/get_update_router.py +++ b/cognee/api/v1/update/routers/get_update_router.py @@ -9,6 +9,7 @@ from cognee.shared.logging_utils import get_logger from cognee.modules.users.models import User from cognee.modules.users.methods import get_authenticated_user from cognee.shared.utils import send_telemetry +from cognee import __version__ as cognee_version from cognee.modules.pipelines.models.PipelineRunInfo import ( PipelineRunErrored, ) @@ -64,6 +65,7 @@ def get_update_router() -> APIRouter: "dataset_id": str(dataset_id), "data_id": str(data_id), "node_set": str(node_set), + "cognee_version": cognee_version }, ) diff --git a/cognee/api/v1/users/routers/get_visualize_router.py b/cognee/api/v1/users/routers/get_visualize_router.py index 95e79d3d5..4c840c9e1 100644 --- a/cognee/api/v1/users/routers/get_visualize_router.py +++ b/cognee/api/v1/users/routers/get_visualize_router.py @@ -8,6 +8,7 @@ from cognee.modules.users.models import User from cognee.context_global_variables import set_database_global_context_variables from cognee.shared.utils import send_telemetry +from cognee import __version__ as cognee_version logger = get_logger() @@ -46,6 +47,7 @@ def get_visualize_router() -> APIRouter: additional_properties={ "endpoint": "GET /v1/visualize", "dataset_id": str(dataset_id), + "cognee_version": cognee_version }, ) diff --git a/cognee/modules/search/methods/search.py b/cognee/modules/search/methods/search.py index 29f50119c..db6a78133 100644 --- a/cognee/modules/search/methods/search.py +++ b/cognee/modules/search/methods/search.py @@ -24,7 +24,7 @@ from cognee.modules.data.models import Dataset from cognee.modules.data.methods.get_authorized_existing_datasets import ( get_authorized_existing_datasets, ) - +from cognee import __version__ as cognee_version from .get_search_type_tools import get_search_type_tools from .no_access_control_search import no_access_control_search from ..utils.prepare_search_result import prepare_search_result @@ -64,7 +64,7 @@ async def search( Searching by dataset is only available in ENABLE_BACKEND_ACCESS_CONTROL mode """ query = await log_query(query_text, query_type.value, user.id) - send_telemetry("cognee.search EXECUTION STARTED", user.id) + send_telemetry("cognee.search EXECUTION STARTED", user.id, additional_properties={"cognee_version": cognee_version}) # Use search function filtered by permissions if access control is enabled if os.getenv("ENABLE_BACKEND_ACCESS_CONTROL", "false").lower() == "true": @@ -101,7 +101,7 @@ async def search( ) ] - send_telemetry("cognee.search EXECUTION COMPLETED", user.id) + send_telemetry("cognee.search EXECUTION COMPLETED", user.id, additional_properties={"cognee_version": cognee_version}) await log_result( query.id, From 7c921c22dcb3b85bef16a542816ef9df19cc879d Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 24 Oct 2025 15:37:31 +0200 Subject: [PATCH 25/25] ruff fix --- cognee/api/v1/add/routers/get_add_router.py | 7 +++++-- .../api/v1/cognify/routers/get_cognify_router.py | 2 +- .../api/v1/datasets/routers/get_datasets_router.py | 14 +++++++------- cognee/api/v1/delete/routers/get_delete_router.py | 2 +- cognee/api/v1/memify/routers/get_memify_router.py | 1 + .../permissions/routers/get_permissions_router.py | 11 ++++++----- cognee/api/v1/search/routers/get_search_router.py | 8 +++----- cognee/api/v1/sync/routers/get_sync_router.py | 2 +- cognee/api/v1/update/routers/get_update_router.py | 2 +- .../api/v1/users/routers/get_visualize_router.py | 2 +- cognee/modules/search/methods/search.py | 12 ++++++++++-- cognee/shared/utils.py | 5 ++++- 12 files changed, 41 insertions(+), 27 deletions(-) diff --git a/cognee/api/v1/add/routers/get_add_router.py b/cognee/api/v1/add/routers/get_add_router.py index 62cb96a54..b2e7068b0 100644 --- a/cognee/api/v1/add/routers/get_add_router.py +++ b/cognee/api/v1/add/routers/get_add_router.py @@ -64,8 +64,11 @@ def get_add_router() -> APIRouter: send_telemetry( "Add API Endpoint Invoked", user.id, - additional_properties={"endpoint": "POST /v1/add", "node_set": node_set, "cognee_version": cognee_version}, - + additional_properties={ + "endpoint": "POST /v1/add", + "node_set": node_set, + "cognee_version": cognee_version, + }, ) from cognee.api.v1.add import add as cognee_add diff --git a/cognee/api/v1/cognify/routers/get_cognify_router.py b/cognee/api/v1/cognify/routers/get_cognify_router.py index 231acce16..231bbcd11 100644 --- a/cognee/api/v1/cognify/routers/get_cognify_router.py +++ b/cognee/api/v1/cognify/routers/get_cognify_router.py @@ -98,7 +98,7 @@ def get_cognify_router() -> APIRouter: user.id, additional_properties={ "endpoint": "POST /v1/cognify", - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) diff --git a/cognee/api/v1/datasets/routers/get_datasets_router.py b/cognee/api/v1/datasets/routers/get_datasets_router.py index 3a5d7d3b2..eff87b3af 100644 --- a/cognee/api/v1/datasets/routers/get_datasets_router.py +++ b/cognee/api/v1/datasets/routers/get_datasets_router.py @@ -101,7 +101,7 @@ def get_datasets_router() -> APIRouter: user.id, additional_properties={ "endpoint": "GET /v1/datasets", - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) @@ -149,7 +149,7 @@ def get_datasets_router() -> APIRouter: user.id, additional_properties={ "endpoint": "POST /v1/datasets", - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) @@ -204,7 +204,7 @@ def get_datasets_router() -> APIRouter: additional_properties={ "endpoint": f"DELETE /v1/datasets/{str(dataset_id)}", "dataset_id": str(dataset_id), - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) @@ -250,7 +250,7 @@ def get_datasets_router() -> APIRouter: "endpoint": f"DELETE /v1/datasets/{str(dataset_id)}/data/{str(data_id)}", "dataset_id": str(dataset_id), "data_id": str(data_id), - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) @@ -332,7 +332,7 @@ def get_datasets_router() -> APIRouter: additional_properties={ "endpoint": f"GET /v1/datasets/{str(dataset_id)}/data", "dataset_id": str(dataset_id), - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) @@ -393,7 +393,7 @@ def get_datasets_router() -> APIRouter: additional_properties={ "endpoint": "GET /v1/datasets/status", "datasets": [str(dataset_id) for dataset_id in datasets], - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) @@ -440,7 +440,7 @@ def get_datasets_router() -> APIRouter: "endpoint": f"GET /v1/datasets/{str(dataset_id)}/data/{str(data_id)}/raw", "dataset_id": str(dataset_id), "data_id": str(data_id), - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) diff --git a/cognee/api/v1/delete/routers/get_delete_router.py b/cognee/api/v1/delete/routers/get_delete_router.py index 977b042c9..3ff97681d 100644 --- a/cognee/api/v1/delete/routers/get_delete_router.py +++ b/cognee/api/v1/delete/routers/get_delete_router.py @@ -40,7 +40,7 @@ def get_delete_router() -> APIRouter: "endpoint": "DELETE /v1/delete", "dataset_id": str(dataset_id), "data_id": str(data_id), - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) diff --git a/cognee/api/v1/memify/routers/get_memify_router.py b/cognee/api/v1/memify/routers/get_memify_router.py index 706e3fae5..cc07a3a0c 100644 --- a/cognee/api/v1/memify/routers/get_memify_router.py +++ b/cognee/api/v1/memify/routers/get_memify_router.py @@ -13,6 +13,7 @@ from cognee.shared.utils import send_telemetry from cognee.modules.pipelines.models import PipelineRunErrored from cognee.shared.logging_utils import get_logger from cognee import __version__ as cognee_version + logger = get_logger() diff --git a/cognee/api/v1/permissions/routers/get_permissions_router.py b/cognee/api/v1/permissions/routers/get_permissions_router.py index ee3e9d1a0..565e95732 100644 --- a/cognee/api/v1/permissions/routers/get_permissions_router.py +++ b/cognee/api/v1/permissions/routers/get_permissions_router.py @@ -9,6 +9,7 @@ from cognee.modules.users.methods import get_authenticated_user from cognee.shared.utils import send_telemetry from cognee import __version__ as cognee_version + def get_permissions_router() -> APIRouter: permissions_router = APIRouter() @@ -48,7 +49,7 @@ def get_permissions_router() -> APIRouter: "endpoint": f"POST /v1/permissions/datasets/{str(principal_id)}", "dataset_ids": str(dataset_ids), "principal_id": str(principal_id), - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) @@ -90,7 +91,7 @@ def get_permissions_router() -> APIRouter: additional_properties={ "endpoint": "POST /v1/permissions/roles", "role_name": role_name, - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) @@ -135,7 +136,7 @@ def get_permissions_router() -> APIRouter: "endpoint": f"POST /v1/permissions/users/{str(user_id)}/roles", "user_id": str(user_id), "role_id": str(role_id), - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) @@ -178,7 +179,7 @@ def get_permissions_router() -> APIRouter: "endpoint": f"POST /v1/permissions/users/{str(user_id)}/tenants", "user_id": str(user_id), "tenant_id": str(tenant_id), - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) @@ -213,7 +214,7 @@ def get_permissions_router() -> APIRouter: additional_properties={ "endpoint": "POST /v1/permissions/tenants", "tenant_name": tenant_name, - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) diff --git a/cognee/api/v1/search/routers/get_search_router.py b/cognee/api/v1/search/routers/get_search_router.py index 38c037121..171c03e49 100644 --- a/cognee/api/v1/search/routers/get_search_router.py +++ b/cognee/api/v1/search/routers/get_search_router.py @@ -15,6 +15,7 @@ from cognee.modules.users.methods import get_authenticated_user from cognee.shared.utils import send_telemetry from cognee import __version__ as cognee_version + # Note: Datasets sent by name will only map to datasets owned by the request sender # To search for datasets not owned by the request sender dataset UUID is needed class SearchPayloadDTO(InDTO): @@ -61,10 +62,7 @@ def get_search_router() -> APIRouter: send_telemetry( "Search API Endpoint Invoked", user.id, - additional_properties={ - "endpoint": "GET /v1/search", - "cognee_version": cognee_version - }, + additional_properties={"endpoint": "GET /v1/search", "cognee_version": cognee_version}, ) try: @@ -119,7 +117,7 @@ def get_search_router() -> APIRouter: "top_k": payload.top_k, "only_context": payload.only_context, "use_combined_context": payload.use_combined_context, - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) diff --git a/cognee/api/v1/sync/routers/get_sync_router.py b/cognee/api/v1/sync/routers/get_sync_router.py index dbb2a5cd1..a7d466c10 100644 --- a/cognee/api/v1/sync/routers/get_sync_router.py +++ b/cognee/api/v1/sync/routers/get_sync_router.py @@ -207,7 +207,7 @@ def get_sync_router() -> APIRouter: user.id, additional_properties={ "endpoint": "GET /v1/sync/status", - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) diff --git a/cognee/api/v1/update/routers/get_update_router.py b/cognee/api/v1/update/routers/get_update_router.py index 1befdd16b..95e43b94f 100644 --- a/cognee/api/v1/update/routers/get_update_router.py +++ b/cognee/api/v1/update/routers/get_update_router.py @@ -65,7 +65,7 @@ def get_update_router() -> APIRouter: "dataset_id": str(dataset_id), "data_id": str(data_id), "node_set": str(node_set), - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) diff --git a/cognee/api/v1/users/routers/get_visualize_router.py b/cognee/api/v1/users/routers/get_visualize_router.py index 4c840c9e1..5dc3868a6 100644 --- a/cognee/api/v1/users/routers/get_visualize_router.py +++ b/cognee/api/v1/users/routers/get_visualize_router.py @@ -47,7 +47,7 @@ def get_visualize_router() -> APIRouter: additional_properties={ "endpoint": "GET /v1/visualize", "dataset_id": str(dataset_id), - "cognee_version": cognee_version + "cognee_version": cognee_version, }, ) diff --git a/cognee/modules/search/methods/search.py b/cognee/modules/search/methods/search.py index db6a78133..93c0ef5c8 100644 --- a/cognee/modules/search/methods/search.py +++ b/cognee/modules/search/methods/search.py @@ -64,7 +64,11 @@ async def search( Searching by dataset is only available in ENABLE_BACKEND_ACCESS_CONTROL mode """ query = await log_query(query_text, query_type.value, user.id) - send_telemetry("cognee.search EXECUTION STARTED", user.id, additional_properties={"cognee_version": cognee_version}) + send_telemetry( + "cognee.search EXECUTION STARTED", + user.id, + additional_properties={"cognee_version": cognee_version}, + ) # Use search function filtered by permissions if access control is enabled if os.getenv("ENABLE_BACKEND_ACCESS_CONTROL", "false").lower() == "true": @@ -101,7 +105,11 @@ async def search( ) ] - send_telemetry("cognee.search EXECUTION COMPLETED", user.id, additional_properties={"cognee_version": cognee_version}) + send_telemetry( + "cognee.search EXECUTION COMPLETED", + user.id, + additional_properties={"cognee_version": cognee_version}, + ) await log_result( query.id, diff --git a/cognee/shared/utils.py b/cognee/shared/utils.py index 5a3b039a9..08b478adf 100644 --- a/cognee/shared/utils.py +++ b/cognee/shared/utils.py @@ -50,6 +50,7 @@ def get_anonymous_id(): anonymous_id = f.read() return anonymous_id + def _sanitize_nested_properties(obj, property_names: list[str]): """ Recursively replaces any property whose key matches one of `property_names` @@ -77,7 +78,9 @@ def send_telemetry(event_name: str, user_id, additional_properties: dict = {}): env = os.getenv("ENV") if env in ["test", "dev"]: return - additional_properties = _sanitize_nested_properties(obj=additional_properties, property_names=['url']) + additional_properties = _sanitize_nested_properties( + obj=additional_properties, property_names=["url"] + ) current_time = datetime.now(timezone.utc) payload = { "anonymous_id": str(get_anonymous_id()),