feat: OpenAI compatible route /api/v1/responses (#792)

## Description

/api/v1/responses
In this PR manages function calls 
- search
- cognify
- prune

Next steps
- codify


## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.

---------

Signed-off-by: Diego B Theuerkauf <diego.theuerkauf@tuebingen.mpg.de>
Co-authored-by: Hande <159312713+hande-k@users.noreply.github.com>
Co-authored-by: Vasilije <8619304+Vasilije1990@users.noreply.github.com>
Co-authored-by: Diego Baptista Theuerkauf <34717973+diegoabt@users.noreply.github.com>
Co-authored-by: Boris <boris@topoteretes.com>
Co-authored-by: Boris <borisarzentar@gmail.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
This commit is contained in:
Dmitrii Galkin 2025-05-16 11:17:18 +04:00 committed by GitHub
parent 8178b72559
commit 1dd179b6dd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 625 additions and 0 deletions

View file

@ -14,6 +14,7 @@ from cognee.api.v1.cognify.routers import get_code_pipeline_router, get_cognify_
from cognee.api.v1.search.routers import get_search_router
from cognee.api.v1.add.routers import get_add_router
from cognee.api.v1.delete.routers import get_delete_router
from cognee.api.v1.responses.routers import get_responses_router
from fastapi import Request
from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import RequestValidationError
@ -167,6 +168,8 @@ app.include_router(get_visualize_router(), prefix="/api/v1/visualize", tags=["vi
app.include_router(get_delete_router(), prefix="/api/v1/delete", tags=["delete"])
app.include_router(get_responses_router(), prefix="/api/v1/responses", tags=["responses"])
codegraph_routes = get_code_pipeline_router()
if codegraph_routes:
app.include_router(codegraph_routes, prefix="/api/v1/code-pipeline", tags=["code-pipeline"])

View file

@ -0,0 +1,3 @@
from cognee.api.v1.responses.routers import get_responses_router
__all__ = ["get_responses_router"]

View file

@ -0,0 +1,63 @@
DEFAULT_TOOLS = [
{
"type": "function",
"name": "search",
"description": "Search for information within the knowledge graph",
"parameters": {
"type": "object",
"properties": {
"search_query": {
"type": "string",
"description": "The query to search for in the knowledge graph",
},
"search_type": {
"type": "string",
"description": "Type of search to perform",
"enum": [
"INSIGHTS",
"CODE",
"GRAPH_COMPLETION",
"SEMANTIC",
"NATURAL_LANGUAGE",
],
},
"top_k": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 10,
},
"datasets": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of dataset names to search within",
},
},
"required": ["search_query"],
},
},
{
"type": "function",
"name": "cognify",
"description": "Convert text into a knowledge graph or process all added content",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Text content to be converted into a knowledge graph",
},
"ontology_file_path": {
"type": "string",
"description": "Path to a custom ontology file",
},
},
"required": ["text"],
},
},
# Commented as dangerous
# {
# "type": "function",
# "name": "prune",
# "description": "Prune memory",
# },
]

View file

@ -0,0 +1,107 @@
import json
import logging
from typing import Any, Dict, Union
from cognee.api.v1.responses.models import ToolCall
from cognee.modules.search.types import SearchType
from cognee.api.v1.add import add
from cognee.api.v1.search import search
from cognee.api.v1.cognify import cognify
from cognee.api.v1.prune import prune
from cognee.modules.users.methods import get_default_user
from cognee.api.v1.responses.default_tools import DEFAULT_TOOLS
logger = logging.getLogger(__name__)
async def dispatch_function(tool_call: Union[ToolCall, Dict[str, Any]]) -> str:
"""
Dispatches a function call to the appropriate Cognee function.
"""
if isinstance(tool_call, dict):
function_data = tool_call.get("function", {})
function_name = function_data.get("name", "")
arguments_str = function_data.get("arguments", "{}")
else:
function_name = tool_call.function.name
arguments_str = tool_call.function.arguments
arguments = json.loads(arguments_str)
logger.info(f"Dispatching function: {function_name} with args: {arguments}")
user = await get_default_user()
if function_name == "search":
return await handle_search(arguments, user)
elif function_name == "cognify":
return await handle_cognify(arguments, user)
elif function_name == "prune":
return await handle_prune(arguments, user)
else:
return f"Error: Unknown function {function_name}"
async def handle_search(arguments: Dict[str, Any], user) -> list:
"""Handle search function call"""
search_tool = next((tool for tool in DEFAULT_TOOLS if tool["name"] == "search"), None)
required_params = (
search_tool["parameters"].get("required", []) if search_tool else ["search_query"]
)
query = arguments.get("search_query")
if not query and "search_query" in required_params:
return "Error: Missing required 'search_query' parameter"
search_type_str = arguments.get("search_type", "GRAPH_COMPLETION")
valid_search_types = (
search_tool["parameters"]["properties"]["search_type"]["enum"]
if search_tool
else ["INSIGHTS", "CODE", "GRAPH_COMPLETION", "SEMANTIC", "NATURAL_LANGUAGE"]
)
if search_type_str not in valid_search_types:
logger.warning(f"Invalid search_type: {search_type_str}, defaulting to GRAPH_COMPLETION")
search_type_str = "GRAPH_COMPLETION"
query_type = SearchType[search_type_str]
top_k = arguments.get("top_k")
datasets = arguments.get("datasets")
system_prompt_path = arguments.get("system_prompt_path", "answer_simple_question.txt")
results = await search(
query_text=query,
query_type=query_type,
datasets=datasets,
user=user,
system_prompt_path=system_prompt_path,
top_k=top_k if isinstance(top_k, int) else 10,
)
return results
async def handle_cognify(arguments: Dict[str, Any], user) -> str:
"""Handle cognify function call"""
text = arguments.get("text")
ontology_file_path = arguments.get("ontology_file_path")
if text:
await add(data=text, user=user)
await cognify(user=user, ontology_file_path=ontology_file_path if ontology_file_path else None)
return (
"Text successfully converted into knowledge graph."
if text
else "Knowledge graph successfully updated with new information."
)
async def handle_prune(arguments: Dict[str, Any], user) -> str:
"""Handle prune function call"""
await prune()
return "Memory has been pruned successfully."

View file

@ -0,0 +1,102 @@
import time
import uuid
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, Field
from enum import Enum
from cognee.api.DTO import InDTO, OutDTO
class CogneeModel(str, Enum):
"""Enum for supported model types"""
COGNEEV1 = "cognee-v1"
class FunctionParameters(BaseModel):
"""JSON Schema for function parameters"""
type: str = "object"
properties: Dict[str, Dict[str, Any]]
required: Optional[List[str]] = None
class Function(BaseModel):
"""Function definition compatible with OpenAI's format"""
name: str
description: str
parameters: FunctionParameters
class ToolFunction(BaseModel):
"""Tool function wrapper (for OpenAI compatibility)"""
type: str = "function"
function: Function
class FunctionCall(BaseModel):
"""Function call made by the assistant"""
name: str
arguments: str
class ToolCall(BaseModel):
"""Tool call made by the assistant"""
id: str = Field(default_factory=lambda: f"call_{uuid.uuid4().hex}")
type: str = "function"
function: FunctionCall
class ChatUsage(BaseModel):
"""Token usage information"""
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
class ResponseRequest(InDTO):
"""Request body for the new responses endpoint (OpenAI Responses API format)"""
model: CogneeModel = CogneeModel.COGNEEV1
input: str
tools: Optional[List[ToolFunction]] = None
tool_choice: Optional[Union[str, Dict[str, Any]]] = "auto"
user: Optional[str] = None
temperature: Optional[float] = 1.0
max_tokens: Optional[int] = None
class ToolCallOutput(BaseModel):
"""Output of a tool call in the responses API"""
status: str = "success" # success/error
data: Optional[Dict[str, Any]] = None
class ResponseToolCall(BaseModel):
"""Tool call in a response"""
id: str = Field(default_factory=lambda: f"call_{uuid.uuid4().hex}")
type: str = "function"
function: FunctionCall
output: Optional[ToolCallOutput] = None
class ResponseBody(OutDTO):
"""Response body for the new responses endpoint"""
id: str = Field(default_factory=lambda: f"resp_{uuid.uuid4().hex}")
created: int = Field(default_factory=lambda: int(time.time()))
model: str
object: str = "response"
status: str = "completed"
tool_calls: List[ResponseToolCall]
usage: Optional[ChatUsage] = None
metadata: Dict[str, Any] = None

View file

@ -0,0 +1,3 @@
from cognee.api.v1.responses.routers.get_responses_router import get_responses_router
__all__ = ["get_responses_router"]

View file

@ -0,0 +1,86 @@
DEFAULT_TOOLS = [
{
"type": "function",
"name": "search",
"description": "Search for information within the knowledge graph",
"parameters": {
"type": "object",
"properties": {
"search_query": {
"type": "string",
"description": "The query to search for in the knowledge graph",
},
"search_type": {
"type": "string",
"description": "Type of search to perform",
"enum": [
"INSIGHTS",
"CODE",
"GRAPH_COMPLETION",
"SEMANTIC",
"NATURAL_LANGUAGE",
],
},
"top_k": {
"type": "integer",
"description": "Maximum number of results to return",
"default": 10,
},
"datasets": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of dataset names to search within",
},
},
"required": ["search_query"],
},
},
{
"type": "function",
"name": "cognify",
"description": "Convert text into a knowledge graph or process all added content",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Text content to be converted into a knowledge graph",
},
"graph_model_name": {
"type": "string",
"description": "Name of the graph model to use",
},
"graph_model_file": {
"type": "string",
"description": "Path to a custom graph model file",
},
},
},
},
{
"type": "function",
"name": "prune",
"description": "Remove unnecessary or outdated information from the knowledge graph",
"parameters": {
"type": "object",
"properties": {
"prune_strategy": {
"type": "string",
"enum": ["light", "moderate", "aggressive"],
"description": "Strategy for pruning the knowledge graph",
"default": "moderate",
},
"min_confidence": {
"type": "number",
"description": "Minimum confidence score to retain (0-1)",
"minimum": 0,
"maximum": 1,
},
"older_than": {
"type": "string",
"description": "ISO date string - prune nodes older than this date",
},
},
},
},
]

View file

@ -0,0 +1,149 @@
"""
Get router for the OpenAI-compatible responses API.
"""
import logging
import uuid
from typing import Dict, List, Optional, Any
import openai
from fastapi import APIRouter, Depends
from cognee.api.v1.responses.models import (
ResponseRequest,
ResponseBody,
ResponseToolCall,
ChatUsage,
FunctionCall,
ToolCallOutput,
)
from cognee.api.v1.responses.dispatch_function import dispatch_function
from cognee.api.v1.responses.default_tools import DEFAULT_TOOLS
from cognee.infrastructure.llm.config import get_llm_config
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
def get_responses_router() -> APIRouter:
"""
Returns the FastAPI router for OpenAI-compatible responses.
This implementation follows the new OpenAI Responses API format as described in:
https://platform.openai.com/docs/api-reference/responses/create
"""
router = APIRouter()
logger = logging.getLogger(__name__)
def _get_model_client():
"""
Get appropriate client based on model name
"""
llm_config = get_llm_config()
return openai.AsyncOpenAI(api_key=llm_config.llm_api_key)
async def call_openai_api_for_model(
input_text: str,
model: str,
tools: Optional[List[Dict[str, Any]]] = DEFAULT_TOOLS,
tool_choice: Any = "auto",
temperature: float = 1.0,
) -> Dict[str, Any]:
"""
Call appropriate model API based on model name
"""
# TODO: Support other models (e.g. cognee-v1-openai-gpt-3.5-turbo, etc.)
model = "gpt-4o"
client = _get_model_client()
logger.debug(f"Using model: {model}")
response = await client.responses.create(
model=model,
input=input_text,
temperature=temperature,
tools=tools,
tool_choice=tool_choice,
)
logger.info(f"Response: {response}")
return response.model_dump()
@router.post("/", response_model=ResponseBody)
async def create_response(
request: ResponseRequest,
user: User = Depends(get_authenticated_user),
) -> ResponseBody:
"""
OpenAI-compatible responses endpoint with function calling support
"""
# Use default tools if none provided
tools = request.tools or DEFAULT_TOOLS
# Call the API
response = await call_openai_api_for_model(
input_text=request.input,
model=request.model,
tools=tools,
tool_choice=request.tool_choice,
temperature=request.temperature,
)
# Use the response ID from the API or generate a new one
response_id = response.get("id", f"resp_{uuid.uuid4().hex}")
# Check if there are function tool calls in the output
output = response.get("output", [])
processed_tool_calls = []
# Process any function tool calls from the output
for item in output:
if isinstance(item, dict) and item.get("type") == "function_call":
# This is a tool call from the new format
function_name = item.get("name", "")
arguments_str = item.get("arguments", "{}")
call_id = item.get("call_id", f"call_{uuid.uuid4().hex}")
# Create a format the dispatcher can handle
tool_call = {
"id": call_id,
"function": {"name": function_name, "arguments": arguments_str},
"type": "function",
}
# Dispatch the function
try:
function_result = await dispatch_function(tool_call)
output_status = "success"
except Exception as e:
logger.exception(f"Error executing function {function_name}: {e}")
function_result = f"Error executing {function_name}: {str(e)}"
output_status = "error"
processed_call = ResponseToolCall(
id=call_id,
type="function",
function=FunctionCall(name=function_name, arguments=arguments_str),
output=ToolCallOutput(status=output_status, data={"result": function_result}),
)
processed_tool_calls.append(processed_call)
# Get usage data from the response if available
usage = response.get("usage", {})
# Create the response object with all processed tool calls
response_obj = ResponseBody(
id=response_id,
model=request.model,
tool_calls=processed_tool_calls,
usage=ChatUsage(
prompt_tokens=usage.get("input_tokens", 0),
completion_tokens=usage.get("output_tokens", 0),
total_tokens=usage.get("total_tokens", 0),
),
)
return response_obj
return router

View file

@ -0,0 +1,109 @@
{
"cells": [
{
"cell_type": "code",
"metadata": {
"ExecuteTime": {
"end_time": "2025-05-03T09:53:28.509066Z",
"start_time": "2025-05-03T09:52:53.464324Z"
}
},
"source": [
"from openai import OpenAI\n",
"\n",
"# Use /api/v1/auth/login to get JWT\n",
"\n",
"client = OpenAI(api_key=\"COGNEE_API_KEY\", base_url=\"http://localhost:8000/api/v1/\")\n",
"\n",
"client.responses.create(\n",
" model=\"cognee-v1\",\n",
" input=\"Cognify: Natural language processing (NLP) is an interdisciplinary subfield of computer science and information retrieval.\",\n",
")"
],
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\n",
"\u001B[1mHTTP Request: POST http://localhost:8000/api/v1/responses \"HTTP/1.1 307 Temporary Redirect\"\u001B[0m\n",
"\u001B[1mHTTP Request: POST http://localhost:8000/api/v1/responses/ \"HTTP/1.1 200 OK\"\u001B[0m"
]
},
{
"data": {
"text/plain": [
"Response(id='resp_6815e775ed1c8191a7c9e5cd7d18dd6d01678e9c98ad3fea', created_at=None, error=None, incomplete_details=None, instructions=None, metadata=None, model='cognee-v1', object='response', output=None, parallel_tool_calls=None, temperature=None, tool_choice=None, tools=None, top_p=None, max_output_tokens=None, previous_response_id=None, reasoning=None, status='completed', text=None, truncation=None, usage=ResponseUsage(input_tokens=None, output_tokens=None, output_tokens_details=None, total_tokens=242, prompt_tokens=207, completion_tokens=35), user=None, created=1746266008, toolCalls=[{'id': 'call_VPk6HLbJClOEeznVbLe5l5zJ', 'type': 'function', 'function': {'name': 'cognify', 'arguments': '{\"text\":\"Natural language processing (NLP) is an interdisciplinary subfield of computer science and information retrieval.\"}'}, 'output': {'status': 'success', 'data': {'result': 'Text successfully converted into knowledge graph.'}}}])"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"execution_count": 29
},
{
"cell_type": "code",
"metadata": {
"ExecuteTime": {
"end_time": "2025-05-03T09:53:35.039433Z",
"start_time": "2025-05-03T09:53:31.791117Z"
}
},
"source": [
"from openai import OpenAI\n",
"\n",
"client = OpenAI(api_key=\"COGNEE_API_KEY\", base_url=\"http://localhost:8000/api/v1/\")\n",
"\n",
"client.responses.create(\n",
" model=\"cognee-v1\",\n",
" input=\"Search insights about NLP\",\n",
")"
],
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\n",
"\u001B[1mHTTP Request: POST http://localhost:8000/api/v1/responses \"HTTP/1.1 307 Temporary Redirect\"\u001B[0m\n",
"\u001B[1mHTTP Request: POST http://localhost:8000/api/v1/responses/ \"HTTP/1.1 200 OK\"\u001B[0m"
]
},
{
"data": {
"text/plain": [
"Response(id='resp_6815e79c2e5c8191a2b2279408544f560bbe226c4ebfeb1f', created_at=None, error=None, incomplete_details=None, instructions=None, metadata=None, model='cognee-v1', object='response', output=None, parallel_tool_calls=None, temperature=None, tool_choice=None, tools=None, top_p=None, max_output_tokens=None, previous_response_id=None, reasoning=None, status='completed', text=None, truncation=None, usage=ResponseUsage(input_tokens=None, output_tokens=None, output_tokens_details=None, total_tokens=215, prompt_tokens=188, completion_tokens=27), user=None, created=1746266015, toolCalls=[{'id': 'call_lP8Le7A76id2MIKqVYa3gvI8', 'type': 'function', 'function': {'name': 'search', 'arguments': '{\"search_query\":\"NLP\",\"search_type\":\"INSIGHTS\",\"top_k\":5}'}, 'output': {'status': 'success', 'data': {'result': [[{'created_at': 1746265986926, 'updated_at': '2025-05-03T09:53:06.926000Z', 'ontology_valid': False, 'version': 1, 'topological_rank': 0, 'metadata': {'index_fields': ['text']}, 'type': 'DocumentChunk', 'belongs_to_set': None, 'text': 'Natural language processing (NLP) is an interdisciplinary subfield of computer science and information retrieval.', 'chunk_size': 34, 'chunk_index': 0, 'cut_type': 'sentence_end', 'id': '6b85cc38-be93-5ada-bed1-90cddad3c385'}, {'source_node_id': '6b85cc38-be93-5ada-bed1-90cddad3c385', 'target_node_id': 'bc338a39-64d6-549a-acec-da60846dd90d', 'relationship_name': 'contains', 'updated_at': '2025-05-03T09:53:24.988192Z'}, {'created_at': 1746265992781, 'updated_at': '2025-05-03T09:53:12.781000Z', 'ontology_valid': False, 'version': 1, 'topological_rank': 0, 'metadata': {'index_fields': ['name']}, 'type': 'Entity', 'belongs_to_set': None, 'name': 'natural language processing', 'description': 'An interdisciplinary subfield of computer science and information retrieval.', 'id': 'bc338a39-64d6-549a-acec-da60846dd90d'}], [{'created_at': 1746265992781, 'updated_at': '2025-05-03T09:53:12.781000Z', 'ontology_valid': False, 'version': 1, 'topological_rank': 0, 'metadata': {'index_fields': ['name']}, 'type': 'Entity', 'belongs_to_set': None, 'name': 'natural language processing', 'description': 'An interdisciplinary subfield of computer science and information retrieval.', 'id': 'bc338a39-64d6-549a-acec-da60846dd90d'}, {'source_node_id': 'bc338a39-64d6-549a-acec-da60846dd90d', 'target_node_id': 'dd9713b7-dc20-5101-aad0-1c4216811147', 'relationship_name': 'is_a', 'updated_at': '2025-05-03T09:53:24.988195Z'}, {'created_at': 1746265992781, 'updated_at': '2025-05-03T09:53:12.781000Z', 'ontology_valid': False, 'version': 1, 'topological_rank': 0, 'metadata': {'index_fields': ['name']}, 'type': 'EntityType', 'belongs_to_set': None, 'name': 'concept', 'description': 'concept', 'id': 'dd9713b7-dc20-5101-aad0-1c4216811147'}], [{'created_at': 1746265992781, 'updated_at': '2025-05-03T09:53:12.781000Z', 'ontology_valid': False, 'version': 1, 'topological_rank': 0, 'metadata': {'index_fields': ['name']}, 'type': 'Entity', 'belongs_to_set': None, 'name': 'natural language processing', 'description': 'An interdisciplinary subfield of computer science and information retrieval.', 'id': 'bc338a39-64d6-549a-acec-da60846dd90d'}, {'relationship_name': 'is_a_subfield_of', 'source_node_id': 'bc338a39-64d6-549a-acec-da60846dd90d', 'target_node_id': '6218dbab-eb6a-5759-a864-b3419755ffe0', 'ontology_valid': False, 'updated_at': '2025-05-03T09:53:18.000683Z'}, {'created_at': 1746265992782, 'updated_at': '2025-05-03T09:53:12.782000Z', 'ontology_valid': False, 'version': 1, 'topological_rank': 0, 'metadata': {'index_fields': ['name']}, 'type': 'Entity', 'belongs_to_set': None, 'name': 'computer science', 'description': 'The study of computers and computational systems.', 'id': '6218dbab-eb6a-5759-a864-b3419755ffe0'}], [{'created_at': 1746265992781, 'updated_at': '2025-05-03T09:53:12.781000Z', 'ontology_valid': False, 'version': 1, 'topological_rank': 0, 'metadata': {'index_fields': ['name']}, 'type': 'Entity', 'belongs_to_set': None, 'name': 'natural language processing', 'description': 'An interdisciplinary subfield of computer science and information retrieval.', 'id': 'bc338a39-64d6-549a-acec-da60846dd90d'}, {'relationship_name': 'is_a_subfield_of', 'source_node_id': 'bc338a39-64d6-549a-acec-da60846dd90d', 'target_node_id': '02bdab9a-0981-518c-a0d4-1684e0329447', 'ontology_valid': False, 'updated_at': '2025-05-03T09:53:18.000691Z'}, {'created_at': 1746265992782, 'updated_at': '2025-05-03T09:53:12.782000Z', 'ontology_valid': False, 'version': 1, 'topological_rank': 0, 'metadata': {'index_fields': ['name']}, 'type': 'Entity', 'belongs_to_set': None, 'name': 'information retrieval', 'description': 'The activity of obtaining information system resources that are relevant to an information request.', 'id': '02bdab9a-0981-518c-a0d4-1684e0329447'}]]}}}])"
]
},
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
],
"execution_count": 30
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}