chore: deletes unused old version of the codegraph
This commit is contained in:
parent
2481575768
commit
696a22447b
1 changed files with 0 additions and 89 deletions
|
|
@ -1,7 +1,3 @@
|
||||||
# NOTICE: This module contains deprecated functions.
|
|
||||||
# Use only the run_code_graph_pipeline function; all other functions are deprecated.
|
|
||||||
# Related issue: COG-906
|
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
@ -39,95 +35,10 @@ if monitoring == MonitoringTool.LANGFUSE:
|
||||||
|
|
||||||
from cognee.tasks.summarization import summarize_code
|
from cognee.tasks.summarization import summarize_code
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger("code_graph_pipeline")
|
logger = logging.getLogger("code_graph_pipeline")
|
||||||
|
|
||||||
update_status_lock = asyncio.Lock()
|
update_status_lock = asyncio.Lock()
|
||||||
|
|
||||||
async def code_graph_pipeline(datasets: Union[str, list[str]] = None, user: User = None):
|
|
||||||
if user is None:
|
|
||||||
user = await get_default_user()
|
|
||||||
|
|
||||||
existing_datasets = await get_datasets(user.id)
|
|
||||||
|
|
||||||
if datasets is None or len(datasets) == 0:
|
|
||||||
# If no datasets are provided, cognify all existing datasets.
|
|
||||||
datasets = existing_datasets
|
|
||||||
|
|
||||||
if type(datasets[0]) == str:
|
|
||||||
datasets = await get_datasets_by_name(datasets, user.id)
|
|
||||||
|
|
||||||
existing_datasets_map = {
|
|
||||||
generate_dataset_name(dataset.name): True for dataset in existing_datasets
|
|
||||||
}
|
|
||||||
|
|
||||||
awaitables = []
|
|
||||||
|
|
||||||
for dataset in datasets:
|
|
||||||
dataset_name = generate_dataset_name(dataset.name)
|
|
||||||
|
|
||||||
if dataset_name in existing_datasets_map:
|
|
||||||
awaitables.append(run_pipeline(dataset, user))
|
|
||||||
|
|
||||||
return await asyncio.gather(*awaitables)
|
|
||||||
|
|
||||||
@observe
|
@observe
|
||||||
async def run_pipeline(dataset: Dataset, user: User):
|
|
||||||
'''DEPRECATED: Use `run_code_graph_pipeline` instead. This function will be removed.'''
|
|
||||||
data_documents: list[Data] = await get_dataset_data(dataset_id = dataset.id)
|
|
||||||
|
|
||||||
document_ids_str = [str(document.id) for document in data_documents]
|
|
||||||
|
|
||||||
dataset_id = dataset.id
|
|
||||||
dataset_name = generate_dataset_name(dataset.name)
|
|
||||||
|
|
||||||
send_telemetry("code_graph_pipeline EXECUTION STARTED", user.id)
|
|
||||||
|
|
||||||
async with update_status_lock:
|
|
||||||
task_status = await get_pipeline_status([dataset_id])
|
|
||||||
|
|
||||||
if dataset_id in task_status and task_status[dataset_id] == PipelineRunStatus.DATASET_PROCESSING_STARTED:
|
|
||||||
logger.info("Dataset %s is already being processed.", dataset_name)
|
|
||||||
return
|
|
||||||
|
|
||||||
await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_STARTED, {
|
|
||||||
"dataset_name": dataset_name,
|
|
||||||
"files": document_ids_str,
|
|
||||||
})
|
|
||||||
try:
|
|
||||||
tasks = [
|
|
||||||
Task(classify_documents),
|
|
||||||
Task(check_permissions_on_documents, user = user, permissions = ["write"]),
|
|
||||||
Task(extract_chunks_from_documents), # Extract text chunks based on the document type.
|
|
||||||
Task(add_data_points, task_config = { "batch_size": 10 }),
|
|
||||||
Task(extract_graph_from_code, graph_model = SourceCodeGraph, task_config = { "batch_size": 10 }), # Generate knowledge graphs from the document chunks.
|
|
||||||
]
|
|
||||||
|
|
||||||
pipeline = run_tasks(tasks, data_documents, "code_graph_pipeline")
|
|
||||||
|
|
||||||
async for result in pipeline:
|
|
||||||
print(result)
|
|
||||||
|
|
||||||
send_telemetry("code_graph_pipeline EXECUTION COMPLETED", user.id)
|
|
||||||
|
|
||||||
await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_COMPLETED, {
|
|
||||||
"dataset_name": dataset_name,
|
|
||||||
"files": document_ids_str,
|
|
||||||
})
|
|
||||||
except Exception as error:
|
|
||||||
send_telemetry("code_graph_pipeline EXECUTION ERRORED", user.id)
|
|
||||||
|
|
||||||
await log_pipeline_status(dataset_id, PipelineRunStatus.DATASET_PROCESSING_ERRORED, {
|
|
||||||
"dataset_name": dataset_name,
|
|
||||||
"files": document_ids_str,
|
|
||||||
})
|
|
||||||
raise error
|
|
||||||
|
|
||||||
|
|
||||||
def generate_dataset_name(dataset_name: str) -> str:
|
|
||||||
return dataset_name.replace(".", "_").replace(" ", "_")
|
|
||||||
|
|
||||||
|
|
||||||
async def run_code_graph_pipeline(repo_path):
|
async def run_code_graph_pipeline(repo_path):
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue