Connect pipeline to benchmark (#42)

evals/eval_swe_bench runs the code graph pipeline, adds retrieval to the
end, then connects the whole thing with swe-bench

Some unnecessary utility functions were removed.

Note: the pipeline is called for a "graphrag" folder as an example, due
to bugs in the pipeline.
This commit is contained in:
Vasilije 2024-11-29 17:05:37 +01:00 committed by GitHub
commit 57754b3ca0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 57 additions and 153 deletions

2
.gitignore vendored
View file

@ -14,7 +14,7 @@ __pycache__/
*$py.class
full_run.ipynb
evals/
logs/
# C extensions
*.so

View file

@ -1,3 +1,3 @@
I need you to solve this issue by looking at the provided knowledge graph and
generating a single patch file that I can apply directly to this repository using git apply.
I need you to solve this issue by looking at the provided edges retrieved from a knowledge graph and
generate a single patch file that I can apply directly to this repository using git apply.
Please respond with a single patch file in the following format.

View file

@ -1,13 +1,15 @@
import asyncio
import logging
from typing import List
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
from cognee.modules.users.methods import get_default_user
from cognee.modules.users.models import User
from cognee.shared.utils import send_telemetry
def format_triplets(edges):
print("\n\n\n")
def filter_attributes(obj, attributes):
@ -48,16 +50,14 @@ def format_triplets(edges):
return "".join(triplets)
async def brute_force_triplet_search(query: str, user: User = None, top_k = 5) -> list:
async def brute_force_triplet_search(query: str, user: User = None, top_k = 5, collections = None) -> list:
if user is None:
user = await get_default_user()
if user is None:
raise PermissionError("No user found in the system. Please create a user.")
retrieved_results = await brute_force_search(query, user, top_k)
retrieved_results = await brute_force_search(query, user, top_k, collections=collections)
return retrieved_results

View file

@ -4,28 +4,24 @@ import subprocess
import sys
from pathlib import Path
from datasets import Dataset
from swebench.harness.utils import load_swebench_dataset
from swebench.inference.make_datasets.create_instance import PATCH_EXAMPLE
import cognee
from cognee.shared.data_models import SummarizedContent
from cognee.shared.utils import render_graph
from cognee.tasks.repo_processor import (
enrich_dependency_graph,
expand_dependency_graph,
get_repo_file_dependencies,
)
from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_code
from cognee.modules.pipelines import Task, run_tasks
from cognee.api.v1.cognify.code_graph_pipeline import code_graph_pipeline
from cognee.api.v1.search import SearchType
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.llm.prompts import read_query_prompt
from evals.eval_utils import download_instances
from cognee.modules.pipelines import Task, run_tasks
from cognee.modules.retrieval.brute_force_triplet_search import \
brute_force_triplet_search
from cognee.shared.data_models import SummarizedContent
from cognee.shared.utils import render_graph
from cognee.tasks.repo_processor import (enrich_dependency_graph,
expand_dependency_graph,
get_repo_file_dependencies)
from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_code
from evals.eval_utils import download_github_repo, retrieved_edges_to_string
def check_install_package(package_name):
@ -45,30 +41,27 @@ def check_install_package(package_name):
except subprocess.CalledProcessError:
return False
async def generate_patch_with_cognee(instance, llm_client, search_type=SearchType.CHUNKS):
await cognee.prune.prune_data()
await cognee.prune.prune_system()
#dataset_name = "SWE_test_data"
#await cognee.add('', dataset_name = dataset_name)
# repo_path = download_github_repo(instance, '../RAW_GIT_REPOS')
repo_path = '/Users/borisarzentar/Projects/graphrag'
tasks = [
Task(get_repo_file_dependencies),
Task(add_data_points, task_config = { "batch_size": 50 }),
Task(enrich_dependency_graph, task_config = { "batch_size": 50 }),
Task(expand_dependency_graph, task_config = { "batch_size": 50 }),
Task(add_data_points, task_config = { "batch_size": 50 }),
# Task(summarize_code, summarization_model = SummarizedContent),
Task(summarize_code, summarization_model = SummarizedContent),
]
pipeline = run_tasks(tasks, repo_path, "cognify_code_pipeline")
async for result in pipeline:
print(result)
@ -79,19 +72,20 @@ async def generate_patch_with_cognee(instance, llm_client, search_type=SearchTyp
problem_statement = instance['problem_statement']
instructions = read_query_prompt("patch_gen_kg_instructions.txt")
graph_str = 'HERE WE SHOULD PASS THE TRIPLETS FROM GRAPHRAG'
retrieved_edges = await brute_force_triplet_search(problem_statement, top_k = 3, collections = ["data_point_source_code", "data_point_text"])
retrieved_edges_str = retrieved_edges_to_string(retrieved_edges)
prompt = "\n".join(
[
problem_statement,
"<patch>",
PATCH_EXAMPLE,
"</patch>",
"This is the knowledge graph:",
graph_str,
]
)
prompt = "\n".join([
problem_statement,
"<patch>",
PATCH_EXAMPLE,
"</patch>",
"These are the retrieved edges:",
retrieved_edges_str
])
llm_client = get_llm_client()
answer_prediction = await llm_client.acreate_structured_output(
text_input=prompt,
system_prompt=instructions,
@ -162,13 +156,8 @@ async def main():
dataset_name = 'princeton-nlp/SWE-bench_Lite'
swe_dataset = load_swebench_dataset(
dataset_name, split='test')[:1]
filepath = Path("SWE-bench_testsample")
if filepath.exists():
dataset = Dataset.load_from_disk(filepath)
else:
dataset = download_instances(swe_dataset, filepath)
predictions_path = "preds.json"
preds = await get_preds(dataset, with_cognee=not args.cognee_off)
preds = await get_preds(swe_dataset, with_cognee=not args.cognee_off)
with open(predictions_path, "w") as file:
json.dump(preds, file)

View file

@ -1,107 +1,7 @@
import os
from copy import deepcopy
from pathlib import Path
from tempfile import TemporaryDirectory
from datasets import Dataset
from swebench.inference.make_datasets.create_instance import make_code_text
from swebench.inference.make_datasets.utils import (AutoContextManager,
ingest_directory_contents)
from tqdm.auto import tqdm
from git import Repo
import shutil
def ingest_files(filenames):
files_dict = dict()
for filename in filenames:
with open(filename) as f:
content = f.read()
files_dict[filename] = content
return files_dict
def ingest_repos(input_instances):
orig_dir = os.getcwd()
with TemporaryDirectory(
dir="/scratch" if os.path.exists("/scratch") else "/tmp"
) as root_dir:
for instance in tqdm(
input_instances.values(),
total=len(input_instances),
desc="Downloading repos on specific commits",
):
try:
with AutoContextManager(
instance, root_dir
) as cm:
readmes = cm.get_readme_files()
instance["readmes"] = ingest_files(readmes)
instance["file_contents"] = ingest_directory_contents(
cm.repo_path
)
finally:
# if AutoContextManager fails to exit properly future exits will return the wrong directory
os.chdir(orig_dir)
return input_instances
def extract_fields(instance):
readmes_text = make_code_text(instance["readmes"])
code_text = make_code_text(
instance["file_contents"], add_line_numbers=False)
text_inputs = "\n".join([readmes_text, code_text])
text_inputs = text_inputs.strip() + "\n\n"
# text_inputs = code_text
patch = "\n".join(["<patch>", instance["patch"], "</patch>"])
return {**instance, "text": text_inputs, "patch": patch}
def create_dataset(input_instances):
columns = [
"instance_id",
"text",
"repo",
"base_commit",
"problem_statement",
"hints_text",
"created_at",
"patch",
"test_patch",
"version",
"FAIL_TO_PASS",
"PASS_TO_PASS",
"environment_setup_commit",
]
data_table = {key: list() for key in columns}
for instance in input_instances.values():
datum = extract_fields(instance)
for key in columns:
data_table[key].append(datum[key] if key in datum else "")
dataset = Dataset.from_dict(data_table)
return dataset
def download_instances(
input_data,
path=Path("SWE-bench_testsample"),
verbose=False,
):
"""Downloads code from github.
Args:
- input_data: dictionary with unprocessed input instances.
- verbose: set ContextManager verbose to True
"""
input_instances = {x["instance_id"]: x for x in input_data}
input_instances_copy = deepcopy(input_instances)
input_instances_with_text = ingest_repos(input_instances_copy)
dataset = create_dataset(input_instances_with_text)
dataset.save_to_disk(path)
return dataset
from git import Repo
def download_github_repo(instance, output_dir):
@ -154,4 +54,19 @@ def delete_repo(repo_path):
else:
print(f"Repository path {repo_path} does not exist. Nothing to delete.")
except Exception as e:
print(f"Error deleting repository at {repo_path}: {e}")
print(f"Error deleting repository at {repo_path}: {e}")
def node_to_string(node):
text = node.attributes["text"]
type = node.attributes["type"]
return f"Node(id: {node.id}, type: {type}, description: {text})"
def retrieved_edges_to_string(retrieved_edges):
edge_strings = []
for edge in retrieved_edges:
relationship_type = edge.attributes["relationship_type"]
edge_str = f"{node_to_string(edge.node1)} {relationship_type} {node_to_string(edge.node2)}"
edge_strings.append(edge_str)
return "\n".join(edge_strings)