refactor: Remove codify and code_graph pipeline from main repo

This commit is contained in:
Andrej Milicevic 2025-11-05 12:56:17 +01:00
parent 8d7c4d5384
commit c481b87d58
9 changed files with 14 additions and 413 deletions

View file

@ -193,32 +193,3 @@ jobs:
- name: Run Simple Examples
run: uv run python ./examples/python/simple_example.py
graph-tests:
name: Run Basic Graph Tests
runs-on: ubuntu-22.04
env:
LLM_PROVIDER: openai
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_PROVIDER: openai
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
steps:
- name: Check out repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: ${{ inputs.python-version }}
- name: Run Graph Tests
run: uv run python ./examples/python/code_graph_example.py --repo_path ./cognee/tasks/graph

View file

@ -407,75 +407,6 @@ async def save_interaction(data: str) -> list:
]
@mcp.tool()
async def codify(repo_path: str) -> list:
"""
Analyze and generate a code-specific knowledge graph from a software repository.
This function launches a background task that processes the provided repository
and builds a code knowledge graph. The function returns immediately while
the processing continues in the background due to MCP timeout constraints.
Parameters
----------
repo_path : str
Path to the code repository to analyze. This can be a local file path or a
relative path to a repository. The path should point to the root of the
repository or a specific directory within it.
Returns
-------
list
A list containing a single TextContent object with information about the
background task launch and how to check its status.
Notes
-----
- The function launches a background task and returns immediately
- The code graph generation may take significant time for larger repositories
- Use the codify_status tool to check the progress of the operation
- Process results are logged to the standard Cognee log file
- All stdout is redirected to stderr to maintain MCP communication integrity
"""
if cognee_client.use_api:
error_msg = "❌ Codify operation is not available in API mode. Please use direct mode for code graph pipeline."
logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
async def codify_task(repo_path: str):
# NOTE: MCP uses stdout to communicate, we must redirect all output
# going to stdout ( like the print function ) to stderr.
with redirect_stdout(sys.stderr):
logger.info("Codify process starting.")
from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline
results = []
async for result in run_code_graph_pipeline(repo_path, False):
results.append(result)
logger.info(result)
if all(results):
logger.info("Codify process finished succesfully.")
else:
logger.info("Codify process failed.")
asyncio.create_task(codify_task(repo_path))
log_file = get_log_file_location()
text = (
f"Background process launched due to MCP timeout limitations.\n"
f"To check current codify status use the codify_status tool\n"
f"or you can check the log file at: {log_file}"
)
return [
types.TextContent(
type="text",
text=text,
)
]
@mcp.tool()
async def search(search_query: str, search_type: str) -> list:
"""
@ -954,48 +885,6 @@ async def cognify_status():
return [types.TextContent(type="text", text=error_msg)]
@mcp.tool()
async def codify_status():
"""
Get the current status of the codify pipeline.
This function retrieves information about current and recently completed codify operations
in the codebase dataset. It provides details on progress, success/failure status, and statistics
about the processed code repositories.
Returns
-------
list
A list containing a single TextContent object with the status information as a string.
The status includes information about active and completed jobs for the cognify_code_pipeline.
Notes
-----
- The function retrieves pipeline status specifically for the "cognify_code_pipeline" on the "codebase" dataset
- Status information includes job progress, execution time, and completion status
- The status is returned in string format for easy reading
- This operation is not available in API mode
"""
with redirect_stdout(sys.stderr):
try:
from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id
from cognee.modules.users.methods import get_default_user
user = await get_default_user()
status = await cognee_client.get_pipeline_status(
[await get_unique_dataset_id("codebase", user)], "cognify_code_pipeline"
)
return [types.TextContent(type="text", text=str(status))]
except NotImplementedError:
error_msg = "❌ Pipeline status is not available in API mode"
logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
except Exception as e:
error_msg = f"❌ Failed to get codify status: {str(e)}"
logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
def node_to_string(node):
node_data = ", ".join(
[f'{key}: "{value}"' for key, value in node.items() if key in ["id", "name"]]

View file

@ -21,7 +21,7 @@ from cognee.api.v1.notebooks.routers import get_notebooks_router
from cognee.api.v1.permissions.routers import get_permissions_router
from cognee.api.v1.settings.routers import get_settings_router
from cognee.api.v1.datasets.routers import get_datasets_router
from cognee.api.v1.cognify.routers import get_code_pipeline_router, get_cognify_router
from cognee.api.v1.cognify.routers import get_cognify_router
from cognee.api.v1.search.routers import get_search_router
from cognee.api.v1.memify.routers import get_memify_router
from cognee.api.v1.add.routers import get_add_router
@ -270,10 +270,6 @@ app.include_router(get_responses_router(), prefix="/api/v1/responses", tags=["re
app.include_router(get_sync_router(), prefix="/api/v1/sync", tags=["sync"])
codegraph_routes = get_code_pipeline_router()
if codegraph_routes:
app.include_router(codegraph_routes, prefix="/api/v1/code-pipeline", tags=["code-pipeline"])
app.include_router(
get_users_router(),
prefix="/api/v1/users",

View file

@ -1,119 +0,0 @@
import os
import pathlib
import asyncio
from typing import Optional
from cognee.shared.logging_utils import get_logger, setup_logging
from cognee.modules.observability.get_observe import get_observe
from cognee.api.v1.search import SearchType, search
from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.modules.cognify.config import get_cognify_config
from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.methods import get_default_user
from cognee.shared.data_models import KnowledgeGraph
from cognee.modules.data.methods import create_dataset
from cognee.tasks.documents import classify_documents, extract_chunks_from_documents
from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.ingestion import ingest_data
from cognee.tasks.repo_processor import get_non_py_files, get_repo_file_dependencies
from cognee.tasks.storage import add_data_points
from cognee.tasks.summarization import summarize_text
from cognee.infrastructure.llm import get_max_chunk_tokens
from cognee.infrastructure.databases.relational import get_relational_engine
observe = get_observe()
logger = get_logger("code_graph_pipeline")
@observe
async def run_code_graph_pipeline(
repo_path,
include_docs=False,
excluded_paths: Optional[list[str]] = None,
supported_languages: Optional[list[str]] = None,
):
import cognee
from cognee.low_level import setup
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await setup()
cognee_config = get_cognify_config()
user = await get_default_user()
detailed_extraction = True
tasks = [
Task(
get_repo_file_dependencies,
detailed_extraction=detailed_extraction,
supported_languages=supported_languages,
excluded_paths=excluded_paths,
),
# Task(summarize_code, task_config={"batch_size": 500}), # This task takes a long time to complete
Task(add_data_points, task_config={"batch_size": 30}),
]
if include_docs:
# This tasks take a long time to complete
non_code_tasks = [
Task(get_non_py_files, task_config={"batch_size": 50}),
Task(ingest_data, dataset_name="repo_docs", user=user),
Task(classify_documents),
Task(extract_chunks_from_documents, max_chunk_size=get_max_chunk_tokens()),
Task(
extract_graph_from_data,
graph_model=KnowledgeGraph,
task_config={"batch_size": 50},
),
Task(
summarize_text,
summarization_model=cognee_config.summarization_model,
task_config={"batch_size": 50},
),
]
dataset_name = "codebase"
# Save dataset to database
db_engine = get_relational_engine()
async with db_engine.get_async_session() as session:
dataset = await create_dataset(dataset_name, user, session)
if include_docs:
non_code_pipeline_run = run_tasks(
non_code_tasks, dataset.id, repo_path, user, "cognify_pipeline"
)
async for run_status in non_code_pipeline_run:
yield run_status
async for run_status in run_tasks(
tasks, dataset.id, repo_path, user, "cognify_code_pipeline", incremental_loading=False
):
yield run_status
if __name__ == "__main__":
async def main():
async for run_status in run_code_graph_pipeline("REPO_PATH"):
print(f"{run_status.pipeline_run_id}: {run_status.status}")
file_path = os.path.join(
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization.html"
)
await visualize_graph(file_path)
search_results = await search(
query_type=SearchType.CODE,
query_text="How is Relationship weight calculated?",
)
for file in search_results:
print(file["name"])
logger = setup_logging(name="code_graph_pipeline")
asyncio.run(main())

View file

@ -1,2 +1 @@
from .get_cognify_router import get_cognify_router
from .get_code_pipeline_router import get_code_pipeline_router

View file

@ -1,90 +0,0 @@
import json
from cognee.shared.logging_utils import get_logger
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from cognee.api.DTO import InDTO
from cognee.modules.retrieval.code_retriever import CodeRetriever
from cognee.modules.storage.utils import JSONEncoder
logger = get_logger()
class CodePipelineIndexPayloadDTO(InDTO):
repo_path: str
include_docs: bool = False
class CodePipelineRetrievePayloadDTO(InDTO):
query: str
full_input: str
def get_code_pipeline_router() -> APIRouter:
try:
import cognee.api.v1.cognify.code_graph_pipeline
except ModuleNotFoundError:
logger.error("codegraph dependencies not found. Skipping codegraph API routes.")
return None
router = APIRouter()
@router.post("/index", response_model=None)
async def code_pipeline_index(payload: CodePipelineIndexPayloadDTO):
"""
Run indexation on a code repository.
This endpoint processes a code repository to create a knowledge graph
of the codebase structure, dependencies, and relationships.
## Request Parameters
- **repo_path** (str): Path to the code repository
- **include_docs** (bool): Whether to include documentation files (default: false)
## Response
No content returned. Processing results are logged.
## Error Codes
- **409 Conflict**: Error during indexation process
"""
from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline
try:
async for result in run_code_graph_pipeline(payload.repo_path, payload.include_docs):
logger.info(result)
except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)})
@router.post("/retrieve", response_model=list[dict])
async def code_pipeline_retrieve(payload: CodePipelineRetrievePayloadDTO):
"""
Retrieve context from the code knowledge graph.
This endpoint searches the indexed code repository to find relevant
context based on the provided query.
## Request Parameters
- **query** (str): Search query for code context
- **full_input** (str): Full input text for processing
## Response
Returns a list of relevant code files and context as JSON.
## Error Codes
- **409 Conflict**: Error during retrieval process
"""
try:
query = (
payload.full_input.replace("cognee ", "")
if payload.full_input.startswith("cognee ")
else payload.full_input
)
retriever = CodeRetriever()
retrieved_files = await retriever.get_context(query)
return json.dumps(retrieved_files, cls=JSONEncoder)
except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)})
return router

View file

@ -2,3 +2,4 @@ from .tasks.task import Task
from .operations.run_tasks import run_tasks
from .operations.run_parallel import run_tasks_parallel
from .operations.pipeline import run_pipeline
from .custom_pipeline_interface import CustomPipelineInterface

View file

@ -0,0 +1,12 @@
from typing import Protocol, Any
from abc import abstractmethod
class CustomPipelineInterface(Protocol):
"""
Defines an interface for creating and running a custom pipeline.
"""
@abstractmethod
async def run_pipeline(self) -> Any:
raise NotImplementedError

View file

@ -1,58 +0,0 @@
import argparse
import asyncio
import cognee
from cognee import SearchType
from cognee.shared.logging_utils import setup_logging, ERROR
from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline
async def main(repo_path, include_docs):
run_status = False
async for run_status in run_code_graph_pipeline(repo_path, include_docs=include_docs):
run_status = run_status
# Test CODE search
search_results = await cognee.search(query_type=SearchType.CODE, query_text="test")
assert len(search_results) != 0, "The search results list is empty."
print("\n\nSearch results are:\n")
for result in search_results:
print(f"{result}\n")
return run_status
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--repo_path", type=str, required=True, help="Path to the repository")
parser.add_argument(
"--include_docs",
type=lambda x: x.lower() in ("true", "1"),
default=False,
help="Whether or not to process non-code files",
)
parser.add_argument(
"--time",
type=lambda x: x.lower() in ("true", "1"),
default=True,
help="Whether or not to time the pipeline run",
)
return parser.parse_args()
if __name__ == "__main__":
logger = setup_logging(log_level=ERROR)
args = parse_args()
if args.time:
import time
start_time = time.time()
asyncio.run(main(args.repo_path, args.include_docs))
end_time = time.time()
print("\n" + "=" * 50)
print(f"Pipeline Execution Time: {end_time - start_time:.2f} seconds")
print("=" * 50 + "\n")
else:
asyncio.run(main(args.repo_path, args.include_docs))