diff --git a/.github/workflows/basic_tests.yml b/.github/workflows/basic_tests.yml index b7f324310..a93e8dffe 100644 --- a/.github/workflows/basic_tests.yml +++ b/.github/workflows/basic_tests.yml @@ -193,32 +193,3 @@ jobs: - name: Run Simple Examples run: uv run python ./examples/python/simple_example.py - - graph-tests: - name: Run Basic Graph Tests - runs-on: ubuntu-22.04 - env: - LLM_PROVIDER: openai - LLM_MODEL: ${{ secrets.LLM_MODEL }} - LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} - LLM_API_KEY: ${{ secrets.LLM_API_KEY }} - LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} - - EMBEDDING_PROVIDER: openai - EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} - EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} - EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} - EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} - steps: - - name: Check out repository - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - - name: Cognee Setup - uses: ./.github/actions/cognee_setup - with: - python-version: ${{ inputs.python-version }} - - - name: Run Graph Tests - run: uv run python ./examples/python/code_graph_example.py --repo_path ./cognee/tasks/graph diff --git a/cognee-mcp/src/server.py b/cognee-mcp/src/server.py index ce6dad88a..3a64ba65a 100755 --- a/cognee-mcp/src/server.py +++ b/cognee-mcp/src/server.py @@ -407,75 +407,6 @@ async def save_interaction(data: str) -> list: ] -@mcp.tool() -async def codify(repo_path: str) -> list: - """ - Analyze and generate a code-specific knowledge graph from a software repository. - - This function launches a background task that processes the provided repository - and builds a code knowledge graph. The function returns immediately while - the processing continues in the background due to MCP timeout constraints. - - Parameters - ---------- - repo_path : str - Path to the code repository to analyze. This can be a local file path or a - relative path to a repository. The path should point to the root of the - repository or a specific directory within it. - - Returns - ------- - list - A list containing a single TextContent object with information about the - background task launch and how to check its status. - - Notes - ----- - - The function launches a background task and returns immediately - - The code graph generation may take significant time for larger repositories - - Use the codify_status tool to check the progress of the operation - - Process results are logged to the standard Cognee log file - - All stdout is redirected to stderr to maintain MCP communication integrity - """ - - if cognee_client.use_api: - error_msg = "❌ Codify operation is not available in API mode. Please use direct mode for code graph pipeline." - logger.error(error_msg) - return [types.TextContent(type="text", text=error_msg)] - - async def codify_task(repo_path: str): - # NOTE: MCP uses stdout to communicate, we must redirect all output - # going to stdout ( like the print function ) to stderr. - with redirect_stdout(sys.stderr): - logger.info("Codify process starting.") - from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline - - results = [] - async for result in run_code_graph_pipeline(repo_path, False): - results.append(result) - logger.info(result) - if all(results): - logger.info("Codify process finished succesfully.") - else: - logger.info("Codify process failed.") - - asyncio.create_task(codify_task(repo_path)) - - log_file = get_log_file_location() - text = ( - f"Background process launched due to MCP timeout limitations.\n" - f"To check current codify status use the codify_status tool\n" - f"or you can check the log file at: {log_file}" - ) - - return [ - types.TextContent( - type="text", - text=text, - ) - ] - - @mcp.tool() async def search(search_query: str, search_type: str) -> list: """ @@ -954,48 +885,6 @@ async def cognify_status(): return [types.TextContent(type="text", text=error_msg)] -@mcp.tool() -async def codify_status(): - """ - Get the current status of the codify pipeline. - - This function retrieves information about current and recently completed codify operations - in the codebase dataset. It provides details on progress, success/failure status, and statistics - about the processed code repositories. - - Returns - ------- - list - A list containing a single TextContent object with the status information as a string. - The status includes information about active and completed jobs for the cognify_code_pipeline. - - Notes - ----- - - The function retrieves pipeline status specifically for the "cognify_code_pipeline" on the "codebase" dataset - - Status information includes job progress, execution time, and completion status - - The status is returned in string format for easy reading - - This operation is not available in API mode - """ - with redirect_stdout(sys.stderr): - try: - from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id - from cognee.modules.users.methods import get_default_user - - user = await get_default_user() - status = await cognee_client.get_pipeline_status( - [await get_unique_dataset_id("codebase", user)], "cognify_code_pipeline" - ) - return [types.TextContent(type="text", text=str(status))] - except NotImplementedError: - error_msg = "❌ Pipeline status is not available in API mode" - logger.error(error_msg) - return [types.TextContent(type="text", text=error_msg)] - except Exception as e: - error_msg = f"❌ Failed to get codify status: {str(e)}" - logger.error(error_msg) - return [types.TextContent(type="text", text=error_msg)] - - def node_to_string(node): node_data = ", ".join( [f'{key}: "{value}"' for key, value in node.items() if key in ["id", "name"]] diff --git a/cognee/api/client.py b/cognee/api/client.py index 6766c12de..ab64f3489 100644 --- a/cognee/api/client.py +++ b/cognee/api/client.py @@ -21,7 +21,7 @@ from cognee.api.v1.notebooks.routers import get_notebooks_router from cognee.api.v1.permissions.routers import get_permissions_router from cognee.api.v1.settings.routers import get_settings_router from cognee.api.v1.datasets.routers import get_datasets_router -from cognee.api.v1.cognify.routers import get_code_pipeline_router, get_cognify_router +from cognee.api.v1.cognify.routers import get_cognify_router from cognee.api.v1.search.routers import get_search_router from cognee.api.v1.memify.routers import get_memify_router from cognee.api.v1.add.routers import get_add_router @@ -270,10 +270,6 @@ app.include_router(get_responses_router(), prefix="/api/v1/responses", tags=["re app.include_router(get_sync_router(), prefix="/api/v1/sync", tags=["sync"]) -codegraph_routes = get_code_pipeline_router() -if codegraph_routes: - app.include_router(codegraph_routes, prefix="/api/v1/code-pipeline", tags=["code-pipeline"]) - app.include_router( get_users_router(), prefix="/api/v1/users", diff --git a/cognee/api/v1/cognify/code_graph_pipeline.py b/cognee/api/v1/cognify/code_graph_pipeline.py deleted file mode 100644 index fb3612857..000000000 --- a/cognee/api/v1/cognify/code_graph_pipeline.py +++ /dev/null @@ -1,119 +0,0 @@ -import os -import pathlib -import asyncio -from typing import Optional -from cognee.shared.logging_utils import get_logger, setup_logging -from cognee.modules.observability.get_observe import get_observe - -from cognee.api.v1.search import SearchType, search -from cognee.api.v1.visualize.visualize import visualize_graph -from cognee.modules.cognify.config import get_cognify_config -from cognee.modules.pipelines import run_tasks -from cognee.modules.pipelines.tasks.task import Task -from cognee.modules.users.methods import get_default_user -from cognee.shared.data_models import KnowledgeGraph -from cognee.modules.data.methods import create_dataset -from cognee.tasks.documents import classify_documents, extract_chunks_from_documents -from cognee.tasks.graph import extract_graph_from_data -from cognee.tasks.ingestion import ingest_data -from cognee.tasks.repo_processor import get_non_py_files, get_repo_file_dependencies - -from cognee.tasks.storage import add_data_points -from cognee.tasks.summarization import summarize_text -from cognee.infrastructure.llm import get_max_chunk_tokens -from cognee.infrastructure.databases.relational import get_relational_engine - -observe = get_observe() - -logger = get_logger("code_graph_pipeline") - - -@observe -async def run_code_graph_pipeline( - repo_path, - include_docs=False, - excluded_paths: Optional[list[str]] = None, - supported_languages: Optional[list[str]] = None, -): - import cognee - from cognee.low_level import setup - - await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) - await setup() - - cognee_config = get_cognify_config() - user = await get_default_user() - detailed_extraction = True - - tasks = [ - Task( - get_repo_file_dependencies, - detailed_extraction=detailed_extraction, - supported_languages=supported_languages, - excluded_paths=excluded_paths, - ), - # Task(summarize_code, task_config={"batch_size": 500}), # This task takes a long time to complete - Task(add_data_points, task_config={"batch_size": 30}), - ] - - if include_docs: - # This tasks take a long time to complete - non_code_tasks = [ - Task(get_non_py_files, task_config={"batch_size": 50}), - Task(ingest_data, dataset_name="repo_docs", user=user), - Task(classify_documents), - Task(extract_chunks_from_documents, max_chunk_size=get_max_chunk_tokens()), - Task( - extract_graph_from_data, - graph_model=KnowledgeGraph, - task_config={"batch_size": 50}, - ), - Task( - summarize_text, - summarization_model=cognee_config.summarization_model, - task_config={"batch_size": 50}, - ), - ] - - dataset_name = "codebase" - - # Save dataset to database - db_engine = get_relational_engine() - async with db_engine.get_async_session() as session: - dataset = await create_dataset(dataset_name, user, session) - - if include_docs: - non_code_pipeline_run = run_tasks( - non_code_tasks, dataset.id, repo_path, user, "cognify_pipeline" - ) - async for run_status in non_code_pipeline_run: - yield run_status - - async for run_status in run_tasks( - tasks, dataset.id, repo_path, user, "cognify_code_pipeline", incremental_loading=False - ): - yield run_status - - -if __name__ == "__main__": - - async def main(): - async for run_status in run_code_graph_pipeline("REPO_PATH"): - print(f"{run_status.pipeline_run_id}: {run_status.status}") - - file_path = os.path.join( - pathlib.Path(__file__).parent, ".artifacts", "graph_visualization.html" - ) - await visualize_graph(file_path) - - search_results = await search( - query_type=SearchType.CODE, - query_text="How is Relationship weight calculated?", - ) - - for file in search_results: - print(file["name"]) - - logger = setup_logging(name="code_graph_pipeline") - asyncio.run(main()) diff --git a/cognee/api/v1/cognify/routers/__init__.py b/cognee/api/v1/cognify/routers/__init__.py index a6da4a179..6e5f9cc9d 100644 --- a/cognee/api/v1/cognify/routers/__init__.py +++ b/cognee/api/v1/cognify/routers/__init__.py @@ -1,2 +1 @@ from .get_cognify_router import get_cognify_router -from .get_code_pipeline_router import get_code_pipeline_router diff --git a/cognee/api/v1/cognify/routers/get_code_pipeline_router.py b/cognee/api/v1/cognify/routers/get_code_pipeline_router.py deleted file mode 100644 index e016c60f9..000000000 --- a/cognee/api/v1/cognify/routers/get_code_pipeline_router.py +++ /dev/null @@ -1,90 +0,0 @@ -import json -from cognee.shared.logging_utils import get_logger -from fastapi import APIRouter -from fastapi.responses import JSONResponse -from cognee.api.DTO import InDTO -from cognee.modules.retrieval.code_retriever import CodeRetriever -from cognee.modules.storage.utils import JSONEncoder - - -logger = get_logger() - - -class CodePipelineIndexPayloadDTO(InDTO): - repo_path: str - include_docs: bool = False - - -class CodePipelineRetrievePayloadDTO(InDTO): - query: str - full_input: str - - -def get_code_pipeline_router() -> APIRouter: - try: - import cognee.api.v1.cognify.code_graph_pipeline - except ModuleNotFoundError: - logger.error("codegraph dependencies not found. Skipping codegraph API routes.") - return None - - router = APIRouter() - - @router.post("/index", response_model=None) - async def code_pipeline_index(payload: CodePipelineIndexPayloadDTO): - """ - Run indexation on a code repository. - - This endpoint processes a code repository to create a knowledge graph - of the codebase structure, dependencies, and relationships. - - ## Request Parameters - - **repo_path** (str): Path to the code repository - - **include_docs** (bool): Whether to include documentation files (default: false) - - ## Response - No content returned. Processing results are logged. - - ## Error Codes - - **409 Conflict**: Error during indexation process - """ - from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline - - try: - async for result in run_code_graph_pipeline(payload.repo_path, payload.include_docs): - logger.info(result) - except Exception as error: - return JSONResponse(status_code=409, content={"error": str(error)}) - - @router.post("/retrieve", response_model=list[dict]) - async def code_pipeline_retrieve(payload: CodePipelineRetrievePayloadDTO): - """ - Retrieve context from the code knowledge graph. - - This endpoint searches the indexed code repository to find relevant - context based on the provided query. - - ## Request Parameters - - **query** (str): Search query for code context - - **full_input** (str): Full input text for processing - - ## Response - Returns a list of relevant code files and context as JSON. - - ## Error Codes - - **409 Conflict**: Error during retrieval process - """ - try: - query = ( - payload.full_input.replace("cognee ", "") - if payload.full_input.startswith("cognee ") - else payload.full_input - ) - - retriever = CodeRetriever() - retrieved_files = await retriever.get_context(query) - - return json.dumps(retrieved_files, cls=JSONEncoder) - except Exception as error: - return JSONResponse(status_code=409, content={"error": str(error)}) - - return router diff --git a/cognee/modules/pipelines/__init__.py b/cognee/modules/pipelines/__init__.py index 6fca237ca..a0accaeed 100644 --- a/cognee/modules/pipelines/__init__.py +++ b/cognee/modules/pipelines/__init__.py @@ -2,3 +2,4 @@ from .tasks.task import Task from .operations.run_tasks import run_tasks from .operations.run_parallel import run_tasks_parallel from .operations.pipeline import run_pipeline +from .custom_pipeline_interface import CustomPipelineInterface diff --git a/cognee/modules/pipelines/custom_pipeline_interface.py b/cognee/modules/pipelines/custom_pipeline_interface.py new file mode 100644 index 000000000..04c3d113a --- /dev/null +++ b/cognee/modules/pipelines/custom_pipeline_interface.py @@ -0,0 +1,12 @@ +from typing import Protocol, Any +from abc import abstractmethod + + +class CustomPipelineInterface(Protocol): + """ + Defines an interface for creating and running a custom pipeline. + """ + + @abstractmethod + async def run_pipeline(self) -> Any: + raise NotImplementedError diff --git a/examples/python/code_graph_example.py b/examples/python/code_graph_example.py deleted file mode 100644 index 431069050..000000000 --- a/examples/python/code_graph_example.py +++ /dev/null @@ -1,58 +0,0 @@ -import argparse -import asyncio -import cognee -from cognee import SearchType -from cognee.shared.logging_utils import setup_logging, ERROR - -from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline - - -async def main(repo_path, include_docs): - run_status = False - async for run_status in run_code_graph_pipeline(repo_path, include_docs=include_docs): - run_status = run_status - - # Test CODE search - search_results = await cognee.search(query_type=SearchType.CODE, query_text="test") - assert len(search_results) != 0, "The search results list is empty." - print("\n\nSearch results are:\n") - for result in search_results: - print(f"{result}\n") - - return run_status - - -def parse_args(): - parser = argparse.ArgumentParser() - parser.add_argument("--repo_path", type=str, required=True, help="Path to the repository") - parser.add_argument( - "--include_docs", - type=lambda x: x.lower() in ("true", "1"), - default=False, - help="Whether or not to process non-code files", - ) - parser.add_argument( - "--time", - type=lambda x: x.lower() in ("true", "1"), - default=True, - help="Whether or not to time the pipeline run", - ) - return parser.parse_args() - - -if __name__ == "__main__": - logger = setup_logging(log_level=ERROR) - - args = parse_args() - - if args.time: - import time - - start_time = time.time() - asyncio.run(main(args.repo_path, args.include_docs)) - end_time = time.time() - print("\n" + "=" * 50) - print(f"Pipeline Execution Time: {end_time - start_time:.2f} seconds") - print("=" * 50 + "\n") - else: - asyncio.run(main(args.repo_path, args.include_docs))