chore: Remove old eval files [cog-1567] (#649)

<!-- .github/pull_request_template.md -->

## Description
Removed old, unused eval files. 
- swe-bench eval files are kept here as swe-bench eval is not handled by
the new eval framework
- EC2_readme and cloud/setup_ubuntu_instance.sh will be removed (and
moved to the docs website) as part of another task

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin
This commit is contained in:
alekszievr 2025-03-17 19:19:39 +01:00 committed by GitHub
parent 9b9fe48843
commit 219b68c6b0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 4 additions and 1366 deletions

View file

@ -1,111 +0,0 @@
from deepeval.metrics import BaseMetric, GEval
from deepeval.test_case import LLMTestCase, LLMTestCaseParams
from evals.official_hotpot_metrics import exact_match_score, f1_score
from cognee.infrastructure.llm.prompts.llm_judge_prompts import llm_judge_prompts
correctness_metric = GEval(
name="Correctness",
model="gpt-4o-mini",
evaluation_params=[LLMTestCaseParams.ACTUAL_OUTPUT, LLMTestCaseParams.EXPECTED_OUTPUT],
evaluation_steps=[llm_judge_prompts["correctness"]],
)
comprehensiveness_metric = GEval(
name="Comprehensiveness",
model="gpt-4o-mini",
evaluation_params=[
LLMTestCaseParams.INPUT,
LLMTestCaseParams.ACTUAL_OUTPUT,
LLMTestCaseParams.EXPECTED_OUTPUT,
],
evaluation_steps=[llm_judge_prompts["comprehensiveness"]],
)
diversity_metric = GEval(
name="Diversity",
model="gpt-4o-mini",
evaluation_params=[
LLMTestCaseParams.INPUT,
LLMTestCaseParams.ACTUAL_OUTPUT,
LLMTestCaseParams.EXPECTED_OUTPUT,
],
evaluation_steps=[llm_judge_prompts["diversity"]],
)
empowerment_metric = GEval(
name="Empowerment",
model="gpt-4o-mini",
evaluation_params=[
LLMTestCaseParams.INPUT,
LLMTestCaseParams.ACTUAL_OUTPUT,
LLMTestCaseParams.EXPECTED_OUTPUT,
],
evaluation_steps=[llm_judge_prompts["empowerment"]],
)
directness_metric = GEval(
name="Directness",
model="gpt-4o-mini",
evaluation_params=[
LLMTestCaseParams.INPUT,
LLMTestCaseParams.ACTUAL_OUTPUT,
LLMTestCaseParams.EXPECTED_OUTPUT,
],
evaluation_steps=[llm_judge_prompts["directness"]],
)
class f1_score_metric(BaseMetric):
"""F1 score taken directly from the official hotpot benchmark
implementation and wrapped into a deepeval metric."""
def __init__(self, threshold: float = 0.5):
self.threshold = threshold
def measure(self, test_case: LLMTestCase):
f1, precision, recall = f1_score(
prediction=test_case.actual_output,
ground_truth=test_case.expected_output,
)
self.score = f1
self.success = self.score >= self.threshold
return self.score
# Reusing regular measure as async F1 score is not implemented
async def a_measure(self, test_case: LLMTestCase):
return self.measure(test_case)
def is_successful(self):
return self.success
@property
def __name__(self):
return "Official hotpot F1 score"
class em_score_metric(BaseMetric):
"""Exact Match score taken directly from the official hotpot benchmark
implementation and wrapped into a deepeval metric."""
def __init__(self, threshold: float = 0.5):
self.threshold = threshold
def measure(self, test_case: LLMTestCase):
self.score = exact_match_score(
prediction=test_case.actual_output,
ground_truth=test_case.expected_output,
)
self.success = self.score >= self.threshold
return self.score
# Reusing regular measure as async F1 score is not implemented
async def a_measure(self, test_case: LLMTestCase):
return self.measure(test_case)
def is_successful(self):
return self.success
@property
def __name__(self):
return "Official hotpot EM score"

View file

@ -1,192 +0,0 @@
import argparse
import asyncio
import statistics
from deepeval.dataset import EvaluationDataset
from deepeval.test_case import LLMTestCase
from tqdm import tqdm
import logging
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.llm.prompts import read_query_prompt, render_prompt
from evals.qa_dataset_utils import load_qa_dataset
from evals.qa_metrics_utils import get_metrics
from evals.qa_context_provider_utils import qa_context_providers, valid_pipeline_slices
import random
import os
import json
from pathlib import Path
logger = logging.getLogger(__name__)
async def answer_qa_instance(instance, context_provider, contexts_filename):
if os.path.exists(contexts_filename):
with open(contexts_filename, "r") as file:
preloaded_contexts = json.load(file)
else:
preloaded_contexts = {}
if instance["_id"] in preloaded_contexts:
context = preloaded_contexts[instance["_id"]]
else:
context = await context_provider(instance)
preloaded_contexts[instance["_id"]] = context
with open(contexts_filename, "w") as file:
json.dump(preloaded_contexts, file)
args = {
"question": instance["question"],
"context": context,
}
user_prompt = render_prompt("context_for_question.txt", args)
system_prompt = read_query_prompt("answer_hotpot_using_cognee_search.txt")
llm_client = get_llm_client()
answer_prediction = await llm_client.acreate_structured_output(
text_input=user_prompt,
system_prompt=system_prompt,
response_model=str,
)
return answer_prediction
async def deepeval_answers(instances, answers, eval_metrics):
test_cases = []
for instance, answer in zip(instances, answers):
test_case = LLMTestCase(
input=instance["question"], actual_output=answer, expected_output=instance["answer"]
)
test_cases.append(test_case)
eval_set = EvaluationDataset(test_cases)
eval_results = eval_set.evaluate(eval_metrics)
return eval_results
async def deepeval_on_instances(
instances, context_provider, eval_metrics, answers_filename, contexts_filename
):
if os.path.exists(answers_filename):
with open(answers_filename, "r") as file:
preloaded_answers = json.load(file)
else:
preloaded_answers = {}
answers = []
for instance in tqdm(instances, desc="Getting answers"):
if instance["_id"] in preloaded_answers:
answer = preloaded_answers[instance["_id"]]
else:
answer = await answer_qa_instance(instance, context_provider, contexts_filename)
preloaded_answers[instance["_id"]] = answer
answers.append(answer)
with open(answers_filename, "w") as file:
json.dump(preloaded_answers, file)
eval_results = await deepeval_answers(instances, answers, eval_metrics)
score_lists_dict = {}
for instance_result in eval_results.test_results:
for metric_result in instance_result.metrics_data:
if metric_result.name not in score_lists_dict:
score_lists_dict[metric_result.name] = []
score_lists_dict[metric_result.name].append(metric_result.score)
avg_scores = {
metric_name: statistics.mean(scorelist)
for metric_name, scorelist in score_lists_dict.items()
}
return avg_scores
async def eval_on_QA_dataset(
dataset_name_or_filename: str, context_provider_name, num_samples, metric_name_list, out_path
):
dataset = load_qa_dataset(dataset_name_or_filename)
context_provider = qa_context_providers[context_provider_name]
eval_metrics = get_metrics(metric_name_list)
out_path = Path(out_path)
if not out_path.exists():
out_path.mkdir(parents=True, exist_ok=True)
random.seed(43)
instances = dataset if not num_samples else random.sample(dataset, num_samples)
contexts_filename = out_path / Path(
f"contexts_{dataset_name_or_filename.split('.')[0]}_{context_provider_name}.json"
)
if "promptfoo_metrics" in eval_metrics:
promptfoo_results = await eval_metrics["promptfoo_metrics"].measure(
instances, context_provider, contexts_filename
)
else:
promptfoo_results = {}
answers_filename = out_path / Path(
f"answers_{dataset_name_or_filename.split('.')[0]}_{context_provider_name}.json"
)
deepeval_results = await deepeval_on_instances(
instances,
context_provider,
eval_metrics["deepeval_metrics"],
answers_filename,
contexts_filename,
)
results = promptfoo_results | deepeval_results
return results
async def incremental_eval_on_QA_dataset(
dataset_name_or_filename: str, num_samples, metric_name_list, out_path
):
pipeline_slice_names = valid_pipeline_slices.keys()
incremental_results = {}
for pipeline_slice_name in pipeline_slice_names:
results = await eval_on_QA_dataset(
dataset_name_or_filename, pipeline_slice_name, num_samples, metric_name_list, out_path
)
incremental_results[pipeline_slice_name] = results
return incremental_results
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dataset", type=str, required=True, help="Which dataset to evaluate on")
parser.add_argument(
"--rag_option",
type=str,
choices=list(qa_context_providers.keys()) + ["cognee_incremental"],
required=True,
help="RAG option to use for providing context",
)
parser.add_argument("--num_samples", type=int, default=500)
parser.add_argument("--metrics", type=str, nargs="+", default=["Correctness"])
parser.add_argument("--out_dir", type=str, help="Dir to save eval results")
args = parser.parse_args()
if args.rag_option == "cognee_incremental":
avg_scores = await incremental_eval_on_QA_dataset(
args.dataset, args.num_samples, args.metrics, args.out_dir
)
else:
avg_scores = await eval_on_QA_dataset(
args.dataset, args.rag_option, args.num_samples, args.metrics, args.out_dir
)
logger.info(f"{avg_scores}")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -82,9 +82,11 @@ async def generate_patch_with_cognee(instance):
return answer_prediction
async def generate_patch_without_cognee(instance, llm_client):
async def generate_patch_without_cognee(instance):
instructions = read_query_prompt("patch_gen_instructions.txt")
llm_client = get_llm_client()
answer_prediction = await llm_client.acreate_structured_output(
text_input=instance["text"],
system_prompt=instructions,
@ -128,7 +130,7 @@ async def main():
if args.cognee_off:
dataset_name = "princeton-nlp/SWE-bench_Lite_bm25_13K"
dataset = load_swebench_dataset(dataset_name, split="test")
dataset = load_swebench_dataset(dataset_name, split="test")[:2]
predictions_path = "preds_nocognee.json"
if not Path(predictions_path).exists():
preds = await get_preds(dataset, with_cognee=False)

View file

@ -1,45 +0,0 @@
from deepeval.dataset import EvaluationDataset
from deepeval.synthesizer import Synthesizer
import dotenv
from deepeval.test_case import LLMTestCase
# import pytest
# from deepeval import assert_test
from deepeval.metrics import AnswerRelevancyMetric
dotenv.load_dotenv()
# synthesizer = Synthesizer()
# synthesizer.generate_goldens_from_docs(
# document_paths=['natural_language_processing.txt', 'soldiers_home.pdf', 'trump.txt'],
# max_goldens_per_document=5,
# num_evolutions=5,
# include_expected_output=True,
# enable_breadth_evolve=True,
# )
#
# synthesizer.save_as(
# file_type='json', # or 'csv'
# directory="./synthetic_data"
# )
dataset = EvaluationDataset()
dataset.generate_goldens_from_docs(
document_paths=["natural_language_processing.txt", "soldiers_home.pdf", "trump.txt"],
max_goldens_per_document=10,
num_evolutions=5,
enable_breadth_evolve=True,
)
print(dataset.goldens)
print(dataset)
answer_relevancy_metric = AnswerRelevancyMetric(threshold=0.5)
# from deepeval import evaluate
# evaluate(dataset, [answer_relevancy_metric])

View file

@ -1,75 +0,0 @@
import subprocess
import json
import argparse
import os
from typing import List
import sys
def run_command(command: List[str]):
try:
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1
)
while True:
stdout_line = process.stdout.readline()
stderr_line = process.stderr.readline()
if stdout_line == "" and stderr_line == "" and process.poll() is not None:
break
if stdout_line:
print(stdout_line.rstrip())
if stderr_line:
print(f"Error: {stderr_line.rstrip()}", file=sys.stderr)
if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode, command)
finally:
process.stdout.close()
process.stderr.close()
def run_evals_for_paramsfile(params_file, out_dir):
with open(params_file, "r") as file:
parameters = json.load(file)
for metric in parameters["metric_names"]:
params = parameters
params["metric_names"] = [metric]
temp_paramfile = params_file.replace(".json", f"_{metric}.json")
with open(temp_paramfile, "w") as file:
json.dump(params, file)
command = [
"python",
"evals/run_qa_eval.py",
"--params_file",
temp_paramfile,
"--out_dir",
out_dir,
]
run_command(command)
if os.path.exists(temp_paramfile):
os.remove(temp_paramfile)
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--params_file", type=str, required=True, help="Which dataset to evaluate on"
)
parser.add_argument("--out_dir", type=str, help="Dir to save eval results")
args = parser.parse_args()
run_evals_for_paramsfile(args.params_file, args.out_dir)
if __name__ == "__main__":
main()

View file

@ -1,90 +0,0 @@
"""
These are the official evaluation metrics for HotpotQA taken from https://hotpotqa.github.io/
"""
import re
import string
from collections import Counter
def normalize_answer(s):
def remove_articles(text):
return re.sub(r"\b(a|an|the)\b", " ", text)
def white_space_fix(text):
return " ".join(text.split())
def remove_punc(text):
exclude = set(string.punctuation)
return "".join(ch for ch in text if ch not in exclude)
def lower(text):
return text.lower()
return white_space_fix(remove_articles(remove_punc(lower(s))))
def f1_score(prediction, ground_truth):
normalized_prediction = normalize_answer(prediction)
normalized_ground_truth = normalize_answer(ground_truth)
ZERO_METRIC = (0, 0, 0)
if (
normalized_prediction in ["yes", "no", "noanswer"]
and normalized_prediction != normalized_ground_truth
):
return ZERO_METRIC
if (
normalized_ground_truth in ["yes", "no", "noanswer"]
and normalized_prediction != normalized_ground_truth
):
return ZERO_METRIC
prediction_tokens = normalized_prediction.split()
ground_truth_tokens = normalized_ground_truth.split()
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
num_same = sum(common.values())
if num_same == 0:
return ZERO_METRIC
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1, precision, recall
def exact_match_score(prediction, ground_truth):
return normalize_answer(prediction) == normalize_answer(ground_truth)
def update_answer(metrics, prediction, gold):
em = exact_match_score(prediction, gold)
f1, prec, recall = f1_score(prediction, gold)
metrics["em"] += float(em)
metrics["f1"] += f1
metrics["prec"] += prec
metrics["recall"] += recall
return em, prec, recall
def update_sp(metrics, prediction, gold):
cur_sp_pred = set(map(tuple, prediction))
gold_sp_pred = set(map(tuple, gold))
tp, fp, fn = 0, 0, 0
for e in cur_sp_pred:
if e in gold_sp_pred:
tp += 1
else:
fp += 1
for e in gold_sp_pred:
if e not in cur_sp_pred:
fn += 1
prec = 1.0 * tp / (tp + fp) if tp + fp > 0 else 0.0
recall = 1.0 * tp / (tp + fn) if tp + fn > 0 else 0.0
f1 = 2 * prec * recall / (prec + recall) if prec + recall > 0 else 0.0
em = 1.0 if fp + fn == 0 else 0.0
metrics["sp_em"] += em
metrics["sp_f1"] += f1
metrics["sp_prec"] += prec
metrics["sp_recall"] += recall
return em, prec, recall

View file

@ -1,7 +0,0 @@
# yaml-language-server: $schema=https://promptfoo.dev/config-schema.json
# Learn more about building a configuration: https://promptfoo.dev/docs/configuration/guide
description: "My eval"
providers:
- id: openai:gpt-4o-mini

View file

@ -1,92 +0,0 @@
from evals.promptfoo_wrapper import PromptfooWrapper
import os
import yaml
import json
import shutil
from cognee.infrastructure.llm.prompts.llm_judge_prompts import llm_judge_prompts
def is_valid_promptfoo_metric(metric_name: str):
try:
prefix, suffix = metric_name.split(".")
except ValueError:
return False
if prefix != "promptfoo":
return False
if suffix not in llm_judge_prompts:
return False
return True
class PromptfooMetric:
def __init__(self, metric_name_list):
promptfoo_path = shutil.which("promptfoo")
self.wrapper = PromptfooWrapper(promptfoo_path=promptfoo_path)
self.prompts = {}
for metric_name in metric_name_list:
if is_valid_promptfoo_metric(metric_name):
self.prompts[metric_name] = llm_judge_prompts[metric_name.split(".")[1]]
else:
raise Exception(f"{metric_name} is not a valid promptfoo metric")
async def measure(self, instances, context_provider, contexts_filename):
with open(os.path.join(os.getcwd(), "evals/promptfoo_config_template.yaml"), "r") as file:
config = yaml.safe_load(file)
config["defaultTest"] = {
"assert": [
{"type": "llm-rubric", "value": prompt, "name": metric_name}
for metric_name, prompt in self.prompts.items()
]
}
tests = []
if os.path.exists(contexts_filename):
with open(contexts_filename, "r") as file:
preloaded_contexts = json.load(file)
else:
preloaded_contexts = {}
for instance in instances:
if instance["_id"] in preloaded_contexts:
context = preloaded_contexts[instance["_id"]]
else:
context = await context_provider(instance)
preloaded_contexts[instance["_id"]] = context
test = {
"vars": {
"name": instance["question"][:15],
"question": instance["question"],
"context": context,
}
}
tests.append(test)
config["tests"] = tests
with open(contexts_filename, "w") as file:
json.dump(preloaded_contexts, file)
# Write the updated YAML back, preserving formatting and structure
updated_yaml_file_path = os.path.join(os.getcwd(), "config_with_context.yaml")
with open(updated_yaml_file_path, "w") as file:
yaml.dump(config, file)
self.wrapper.run_eval(
prompt_file=os.path.join(os.getcwd(), "evals/promptfooprompt.json"),
config_file=os.path.join(os.getcwd(), "config_with_context.yaml"),
out_format="json",
)
file_path = os.path.join(os.getcwd(), "benchmark_results.json")
# Read and parse the JSON file
with open(file_path, "r") as file:
results = json.load(file)
scores = {}
for result in results["results"]["results"][0]["gradingResult"]["componentResults"]:
scores[result["assertion"]["name"]] = result["score"]
return scores

View file

@ -1,157 +0,0 @@
import subprocess
import json
import logging
import os
from typing import List, Optional, Dict, Generator
import shutil
import platform
from dotenv import load_dotenv
logger = logging.getLogger(__name__)
# Load environment variables from .env file
load_dotenv()
class PromptfooWrapper:
"""
A Python wrapper class around the promptfoo CLI tool, allowing you to:
- Evaluate prompts against different language models.
- Compare responses from multiple models.
- Pass configuration and prompt files.
- Retrieve the outputs in a structured format, including binary output if needed.
This class assumes you have the promptfoo CLI installed and accessible in your environment.
For more details on promptfoo, see: https://github.com/promptfoo/promptfoo
"""
def __init__(self, promptfoo_path: str = ""):
"""
Initialize the wrapper with the path to the promptfoo executable.
:param promptfoo_path: Path to the promptfoo binary (default: 'promptfoo')
"""
self.promptfoo_path = promptfoo_path
logger.debug(f"Initialized PromptfooWrapper with binary at: {self.promptfoo_path}")
def _validate_path(self, file_path: Optional[str]) -> None:
"""
Validate that a file path is accessible if provided.
Raise FileNotFoundError if it does not exist.
"""
if file_path and not os.path.isfile(file_path):
logger.error(f"File not found: {file_path}")
raise FileNotFoundError(f"File not found: {file_path}")
def _get_node_bin_dir(self) -> str:
"""
Determine the Node.js binary directory dynamically for macOS and Linux.
"""
node_executable = shutil.which("node")
if not node_executable:
logger.error("Node.js is not installed or not found in the system PATH.")
raise EnvironmentError("Node.js is not installed or not in PATH.")
# Determine the Node.js binary directory
node_bin_dir = os.path.dirname(node_executable)
# Special handling for macOS, where Homebrew installs Node in /usr/local or /opt/homebrew
if platform.system() == "Darwin": # macOS
logger.debug("Running on macOS")
brew_prefix = os.popen("brew --prefix node").read().strip()
if brew_prefix and os.path.exists(brew_prefix):
node_bin_dir = os.path.join(brew_prefix, "bin")
logger.debug(f"Detected Node.js binary directory using Homebrew: {node_bin_dir}")
# For Linux, Node.js installed via package managers should work out of the box
logger.debug(f"Detected Node.js binary directory: {node_bin_dir}")
return node_bin_dir
def _run_command(
self,
cmd: List[str],
filename,
) -> Generator[Dict, None, None]:
"""
Run a given command using subprocess and parse the output.
"""
logger.debug(f"Running command: {' '.join(cmd)}")
# Make a copy of the current environment
env = os.environ.copy()
try:
node_bin_dir = self._get_node_bin_dir()
print(node_bin_dir)
env["PATH"] = f"{node_bin_dir}:{env['PATH']}"
except EnvironmentError as e:
logger.error(f"Failed to set Node.js binary directory: {e}")
raise
# Add node's bin directory to the PATH
# node_bin_dir = "/Users/vasilije/Library/Application Support/JetBrains/PyCharm2024.2/node/versions/20.15.0/bin"
# # env["PATH"] = f"{node_bin_dir}:{env['PATH']}"
result = subprocess.run(cmd, capture_output=True, text=True, check=False, env=env)
print(result.stderr)
with open(filename, "r", encoding="utf-8") as file:
read_data = json.load(file)
print(f"{filename} created and written.")
# Log raw stdout for debugging
logger.debug(f"Raw command output:\n{result.stdout}")
# Use the parse_promptfoo_output function to yield parsed results
return read_data
def run_eval(
self,
prompt_file: Optional[str] = None,
config_file: Optional[str] = None,
eval_file: Optional[str] = None,
out_format: str = "json",
extra_args: Optional[List[str]] = None,
binary_output: bool = False,
) -> Dict:
"""
Run the `promptfoo eval` command with the provided parameters and return parsed results.
:param prompt_file: Path to a file containing one or more prompts.
:param config_file: Path to a config file specifying models, scoring methods, etc.
:param eval_file: Path to an eval file with test data.
:param out_format: Output format, e.g., 'json', 'yaml', or 'table'.
:param extra_args: Additional command-line arguments for fine-tuning evaluation.
:param binary_output: If True, interpret output as binary data instead of text.
:return: List of parsed results (each result is a dictionary).
"""
self._validate_path(prompt_file)
self._validate_path(config_file)
self._validate_path(eval_file)
filename = "benchmark_results"
filename = os.path.join(os.getcwd(), f"{filename}.json")
# Create an empty JSON file
with open(filename, "w") as file:
json.dump({}, file)
cmd = [self.promptfoo_path, "eval"]
if prompt_file:
cmd.extend(["--prompts", prompt_file])
if config_file:
cmd.extend(["--config", config_file])
if eval_file:
cmd.extend(["--eval", eval_file])
cmd.extend(["--output", filename])
if extra_args:
cmd.extend(extra_args)
# Log the constructed command for debugging
logger.debug(f"Constructed command: {' '.join(cmd)}")
# Collect results from the generator
results = self._run_command(cmd, filename=filename)
logger.debug(f"Parsed results: {json.dumps(results, indent=4)}")
return results

View file

@ -1,10 +0,0 @@
[
{
"role": "system",
"content": "Answer the question using the provided context. Be as brief as possible."
},
{
"role": "user",
"content": "The question is: `{{ question }}` \n And here is the context: `{{ context }}`"
}
]

View file

@ -1,152 +0,0 @@
import cognee
from cognee.modules.search.types import SearchType
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.modules.retrieval.utils.brute_force_triplet_search import brute_force_triplet_search
from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever
from functools import partial
from cognee.api.v1.cognify.cognify_v2 import get_default_tasks
import logging
logger = logging.getLogger(__name__)
async def get_raw_context(instance: dict) -> str:
return instance["context"]
async def cognify_instance(instance: dict, task_indices: list[int] = None):
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
for title, sentences in instance["context"]:
await cognee.add("\n".join(sentences), dataset_name="QA")
all_cognify_tasks = await get_default_tasks()
if task_indices:
selected_tasks = [all_cognify_tasks[ind] for ind in task_indices]
else:
selected_tasks = all_cognify_tasks
await cognee.cognify("QA", tasks=selected_tasks)
def _insight_to_string(triplet: tuple) -> str:
if not (isinstance(triplet, tuple) and len(triplet) == 3):
logger.warning("Invalid input: Expected a tuple of length 3.")
return ""
node1, edge, node2 = triplet
if not (isinstance(node1, dict) and isinstance(edge, dict) and isinstance(node2, dict)):
logger.warning("Invalid input: Each element in the tuple must be a dictionary.")
return ""
node1_name = node1["name"] if "name" in node1 else "N/A"
node1_description = (
node1["description"]
if "description" in node1
else node1["text"]
if "text" in node1
else "N/A"
)
node1_string = f"name: {node1_name}, description: {node1_description}"
node2_name = node2["name"] if "name" in node2 else "N/A"
node2_description = (
node2["description"]
if "description" in node2
else node2["text"]
if "text" in node2
else "N/A"
)
node2_string = f"name: {node2_name}, description: {node2_description}"
edge_string = edge.get("relationship_name", "")
if not edge_string:
logger.warning("Missing required field: 'relationship_name' in edge dictionary.")
return ""
triplet_str = f"{node1_string} -- {edge_string} -- {node2_string}"
return triplet_str
async def get_context_with_cognee(
instance: dict,
task_indices: list[int] = None,
search_types: list[SearchType] = [SearchType.INSIGHTS, SearchType.SUMMARIES, SearchType.CHUNKS],
) -> str:
await cognify_instance(instance, task_indices)
search_results = []
for search_type in search_types:
raw_search_results = await cognee.search(
query_type=search_type, query_text=instance["question"]
)
if search_type == SearchType.INSIGHTS:
res_list = [_insight_to_string(edge) for edge in raw_search_results]
else:
res_list = [
context_item.get("text", "")
for context_item in raw_search_results
if isinstance(context_item, dict)
]
if all(not text for text in res_list):
logger.warning(
"res_list contains only empty strings: No valid 'text' entries found in raw_search_results."
)
search_results += res_list
search_results_str = "\n".join(search_results)
return search_results_str
def create_cognee_context_getter(
task_indices=None, search_types=[SearchType.SUMMARIES, SearchType.CHUNKS]
):
return partial(get_context_with_cognee, task_indices=task_indices, search_types=search_types)
async def get_context_with_simple_rag(instance: dict) -> str:
await cognify_instance(instance)
vector_engine = get_vector_engine()
found_chunks = await vector_engine.search("DocumentChunk_text", instance["question"], limit=5)
search_results_str = "\n".join([context_item.payload["text"] for context_item in found_chunks])
return search_results_str
async def get_context_with_brute_force_triplet_search(instance: dict) -> str:
await cognify_instance(instance)
found_triplets = await brute_force_triplet_search(instance["question"], top_k=5)
retriever = GraphCompletionRetriever()
search_results_str = await retriever.resolve_edges_to_text(found_triplets)
return search_results_str
valid_pipeline_slices = {
"extract_graph": {
"slice": [0, 1, 2, 3, 5],
"search_types": [SearchType.INSIGHTS, SearchType.CHUNKS],
},
"summarize": {
"slice": [0, 1, 2, 3, 4, 5],
"search_types": [SearchType.INSIGHTS, SearchType.SUMMARIES, SearchType.CHUNKS],
},
}
qa_context_providers = {
"no_rag": get_raw_context,
"cognee": get_context_with_cognee,
"simple_rag": get_context_with_simple_rag,
"brute_force": get_context_with_brute_force_triplet_search,
} | {
name: create_cognee_context_getter(
task_indices=value["slice"], search_types=value["search_types"]
)
for name, value in valid_pipeline_slices.items()
}

View file

@ -1,82 +0,0 @@
from cognee.root_dir import get_absolute_path
import json
import requests
from jsonschema import ValidationError, validate
from pathlib import Path
qa_datasets = {
"hotpotqa": {
"filename": "hotpot_dev_fullwiki_v1.json",
"URL": "http://curtis.ml.cmu.edu/datasets/hotpot/hotpot_dev_fullwiki_v1.json",
},
"2wikimultihop": {
"filename": "data/dev.json",
"URL": "https://www.dropbox.com/scl/fi/heid2pkiswhfaqr5g0piw/data.zip?rlkey=ira57daau8lxfj022xvk1irju&e=1",
},
}
qa_json_schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"answer": {"type": "string"},
"question": {"type": "string"},
"context": {"type": "array"},
},
"required": ["answer", "question", "context"],
"additionalProperties": True,
},
}
def download_qa_dataset(dataset_name: str, filepath: Path):
if dataset_name not in qa_datasets:
raise ValueError(f"{dataset_name} is not a supported dataset.")
url = qa_datasets[dataset_name]["URL"]
if dataset_name == "2wikimultihop":
raise Exception(
"Please download 2wikimultihop dataset (data.zip) manually from \
https://www.dropbox.com/scl/fi/heid2pkiswhfaqr5g0piw/data.zip?rlkey=ira57daau8lxfj022xvk1irju&e=1 \
and unzip it."
)
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(filepath, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
print(f"Dataset {dataset_name} downloaded and saved to {filepath}")
else:
print(f"Failed to download {dataset_name}. Status code: {response.status_code}")
def load_qa_dataset(dataset_name_or_filename: str) -> list[dict]:
if dataset_name_or_filename in qa_datasets:
dataset_name = dataset_name_or_filename
filename = qa_datasets[dataset_name]["filename"]
data_root_dir = get_absolute_path("../.data")
if not Path(data_root_dir).exists():
Path(data_root_dir).mkdir()
filepath = data_root_dir / Path(filename)
if not filepath.exists():
download_qa_dataset(dataset_name, filepath)
else:
filename = dataset_name_or_filename
filepath = Path(filename)
with open(filepath, "r") as file:
dataset = json.load(file)
try:
validate(instance=dataset, schema=qa_json_schema)
except ValidationError as e:
raise ValidationError(f"Invalid QA dataset: {e.message}")
return dataset

View file

@ -1,18 +0,0 @@
{
"dataset": [
"hotpotqa"
],
"rag_option": [
"cognee_incremental",
"no_rag",
"simple_rag",
"brute_force"
],
"num_samples": [
2
],
"metric_names": [
"Correctness",
"Comprehensiveness"
]
}

View file

@ -1,65 +0,0 @@
import itertools
import matplotlib.pyplot as plt
from jsonschema import ValidationError, validate
import pandas as pd
from pathlib import Path
paramset_json_schema = {
"type": "object",
"properties": {
"dataset": {
"type": "array",
"items": {"type": "string"},
},
"rag_option": {
"type": "array",
"items": {"type": "string"},
},
"num_samples": {
"type": "array",
"items": {"type": "integer", "minimum": 1},
},
"metric_names": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["dataset", "rag_option", "num_samples", "metric_names"],
"additionalProperties": False,
}
def save_table_as_image(df, image_path):
plt.figure(figsize=(10, 6))
plt.axis("tight")
plt.axis("off")
plt.table(cellText=df.values, colLabels=df.columns, rowLabels=df.index, loc="center")
plt.title(f"{df.index.name}")
plt.savefig(image_path, bbox_inches="tight")
plt.close()
def save_results_as_image(results, out_path):
for dataset, num_samples_data in results.items():
for num_samples, table_data in num_samples_data.items():
for rag_option, metric_data in table_data.items():
for name, value in metric_data.items():
metric_name = name
break
df = pd.DataFrame.from_dict(table_data, orient="index")
df.index.name = f"Dataset: {dataset}, Num Samples: {num_samples}"
image_path = out_path / Path(f"table_{dataset}_{num_samples}_{metric_name}.png")
save_table_as_image(df, image_path)
def get_combinations(parameters):
try:
validate(instance=parameters, schema=paramset_json_schema)
except ValidationError as e:
raise ValidationError(f"Invalid parameter set: {e.message}")
# params_for_combos = {k: v for k, v in parameters.items() if k != "metric_name"}
params_for_combos = {k: v for k, v in parameters.items()}
keys, values = zip(*params_for_combos.items())
combinations = [dict(zip(keys, combo)) for combo in itertools.product(*values)]
return combinations

View file

@ -1,66 +0,0 @@
from evals.deepeval_metrics import (
correctness_metric,
comprehensiveness_metric,
diversity_metric,
empowerment_metric,
directness_metric,
f1_score_metric,
em_score_metric,
)
from deepeval.metrics import AnswerRelevancyMetric
import deepeval.metrics
from evals.promptfoo_metrics import is_valid_promptfoo_metric, PromptfooMetric
native_deepeval_metrics = {"AnswerRelevancy": AnswerRelevancyMetric}
custom_deepeval_metrics = {
"Correctness": correctness_metric,
"Comprehensiveness": comprehensiveness_metric,
"Diversity": diversity_metric,
"Empowerment": empowerment_metric,
"Directness": directness_metric,
"F1": f1_score_metric,
"EM": em_score_metric,
}
qa_metrics = native_deepeval_metrics | custom_deepeval_metrics
def get_deepeval_metric(metric_name: str):
if metric_name in qa_metrics:
metric = qa_metrics[metric_name]
else:
try:
metric_cls = getattr(deepeval.metrics, metric_name)
metric = metric_cls()
except AttributeError:
raise Exception(f"Metric {metric_name} not supported")
if isinstance(metric, type):
metric = metric()
return metric
def get_metrics(metric_name_list: list[str]):
metrics = {
"deepeval_metrics": [],
}
promptfoo_metric_names = []
for metric_name in metric_name_list:
if (
(metric_name in native_deepeval_metrics)
or (metric_name in custom_deepeval_metrics)
or hasattr(deepeval.metrics, metric_name)
):
metric = get_deepeval_metric(metric_name)
metrics["deepeval_metrics"].append(metric)
elif is_valid_promptfoo_metric(metric_name):
promptfoo_metric_names.append(metric_name)
if len(promptfoo_metric_names) > 0:
metrics["promptfoo_metrics"] = PromptfooMetric(promptfoo_metric_names)
return metrics

View file

@ -1,59 +0,0 @@
import asyncio
from evals.eval_on_hotpot import eval_on_QA_dataset, incremental_eval_on_QA_dataset
from evals.qa_eval_utils import get_combinations, save_results_as_image
import argparse
from pathlib import Path
import json
async def run_evals_on_paramset(paramset: dict, out_path: str):
combinations = get_combinations(paramset)
json_path = Path(out_path) / Path("results.json")
results = {}
for params in combinations:
dataset = params["dataset"]
num_samples = params["num_samples"]
rag_option = params["rag_option"]
if dataset not in results:
results[dataset] = {}
if num_samples not in results[dataset]:
results[dataset][num_samples] = {}
if rag_option == "cognee_incremental":
result = await incremental_eval_on_QA_dataset(
dataset, num_samples, paramset["metric_names"], out_path
)
results[dataset][num_samples] |= result
else:
result = await eval_on_QA_dataset(
dataset, rag_option, num_samples, paramset["metric_names"], out_path
)
results[dataset][num_samples][rag_option] = result
with open(json_path, "w") as file:
json.dump(results, file, indent=1)
save_results_as_image(results, out_path)
return results
async def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--params_file", type=str, required=True, help="Which dataset to evaluate on"
)
parser.add_argument("--out_dir", type=str, help="Dir to save eval results")
args = parser.parse_args()
with open(args.params_file, "r") as file:
parameters = json.load(file)
await run_evals_on_paramset(parameters, args.out_dir)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -1,143 +0,0 @@
from deepeval.dataset import EvaluationDataset
from pydantic import BaseModel
import os
from typing import List, Type
from deepeval.test_case import LLMTestCase
import dotenv
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.base_config import get_base_config
import logging
logger = logging.getLogger(__name__)
dotenv.load_dotenv()
dataset = EvaluationDataset()
dataset.add_test_cases_from_json_file(
# file_path is the absolute path to you .json file
file_path="./synthetic_data/20240519_185842.json",
input_key_name="input",
actual_output_key_name="actual_output",
expected_output_key_name="expected_output",
context_key_name="context",
)
print(dataset)
# from deepeval.synthesizer import Synthesizer
#
# synthesizer = Synthesizer(model="gpt-3.5-turbo")
#
# dataset = EvaluationDataset()
# dataset.generate_goldens_from_docs(
# synthesizer=synthesizer,
# document_paths=['natural_language_processing.txt', 'soldiers_home.pdf', 'trump.txt'],
# max_goldens_per_document=10,
# num_evolutions=5,
# enable_breadth_evolve=True,
# )
print(dataset.goldens)
print(dataset)
class AnswerModel(BaseModel):
response: str
def get_answer_base(content: str, context: str, response_model: Type[BaseModel]):
llm_client = get_llm_client()
system_prompt = "THIS IS YOUR CONTEXT:" + str(context)
return llm_client.create_structured_output(content, system_prompt, response_model)
def get_answer(content: str, context, model: Type[BaseModel] = AnswerModel):
try:
return get_answer_base(content, context, model)
except Exception as error:
logger.error("Error extracting cognitive layers from content: %s", error, exc_info=True)
raise error
async def run_cognify_base_rag():
from cognee.api.v1.add import add
from cognee.api.v1.prune import prune
from cognee.api.v1.cognify.cognify import cognify
await prune.prune_system()
await add("data://test_datasets", "initial_test")
graph = await cognify("initial_test")
return graph
async def cognify_search_base_rag(content: str, context: str):
base_config = get_base_config()
cognee_directory_path = os.path.abspath(".cognee_system")
base_config.system_root_directory = cognee_directory_path
vector_engine = get_vector_engine()
return_ = await vector_engine.search(collection_name="basic_rag", query_text=content, limit=10)
print("results", return_)
return return_
async def cognify_search_graph(content: str, context: str):
from cognee.api.v1.search import search, SearchType
results = await search(query_type=SearchType.INSIGHTS, query_text="Donald Trump")
print("results", results)
return results
def convert_goldens_to_test_cases(test_cases_raw: List[LLMTestCase]) -> List[LLMTestCase]:
test_cases = []
for case in test_cases_raw:
test_case = LLMTestCase(
input=case.input,
# Generate actual output using the 'input' and 'additional_metadata'
actual_output=str(get_answer(case.input, case.context).model_dump()["response"]),
expected_output=case.expected_output,
context=case.context,
retrieval_context=["retrieval_context"],
)
test_cases.append(test_case)
return test_cases
# # Data preprocessing before setting the dataset test cases
# dataset.test_cases = convert_goldens_to_test_cases(dataset.test_cases)
#
#
# from deepeval.metrics import HallucinationMetric
#
#
# metric = HallucinationMetric()
# dataset.evaluate([metric])
if __name__ == "__main__":
import asyncio
async def main():
# await run_cognify_base_rag()
# await cognify_search_base_rag("show_all_processes", "context")
await cognify_search_graph("show_all_processes", "context")
asyncio.run(main())
# run_cognify_base_rag_and_search()
# # Data preprocessing before setting the dataset test cases
# dataset.test_cases = convert_goldens_to_test_cases(dataset.test_cases)
# from deepeval.metrics import HallucinationMetric
# metric = HallucinationMetric()
# dataset.evaluate([metric])
pass