feat: add ontology endpoint in REST API

- Add POST /api/v1/ontologies endpoint for file upload
  - Add GET /api/v1/ontologies endpoint for listing ontologies
  - Implement OntologyService for file management and metadata
  - Integrate ontology_key parameter in cognify endpoint
  - Update RDFLibOntologyResolver to support file-like objects
  - Add essential test suite for ontology endpoints
This commit is contained in:
Fahad Shoaib 2025-11-02 17:05:03 +05:00
parent abb1aba517
commit a4a9e76246
8 changed files with 356 additions and 30 deletions

View file

@ -23,6 +23,7 @@ from cognee.api.v1.settings.routers import get_settings_router
from cognee.api.v1.datasets.routers import get_datasets_router
from cognee.api.v1.cognify.routers import get_code_pipeline_router, get_cognify_router
from cognee.api.v1.search.routers import get_search_router
from cognee.api.v1.ontologies.routers.get_ontology_router import get_ontology_router
from cognee.api.v1.memify.routers import get_memify_router
from cognee.api.v1.add.routers import get_add_router
from cognee.api.v1.delete.routers import get_delete_router
@ -258,6 +259,8 @@ app.include_router(
app.include_router(get_datasets_router(), prefix="/api/v1/datasets", tags=["datasets"])
app.include_router(get_ontology_router(), prefix="/api/v1/ontologies", tags=["ontologies"])
app.include_router(get_settings_router(), prefix="/api/v1/settings", tags=["settings"])
app.include_router(get_visualize_router(), prefix="/api/v1/visualize", tags=["visualize"])

View file

@ -41,6 +41,10 @@ class CognifyPayloadDTO(InDTO):
custom_prompt: Optional[str] = Field(
default="", description="Custom prompt for entity extraction and graph generation"
)
ontology_key: Optional[str] = Field(
default=None,
description="Reference to previously uploaded ontology"
)
def get_cognify_router() -> APIRouter:
@ -68,6 +72,7 @@ def get_cognify_router() -> APIRouter:
- **dataset_ids** (Optional[List[UUID]]): List of existing dataset UUIDs to process. UUIDs allow processing of datasets not owned by the user (if permitted).
- **run_in_background** (Optional[bool]): Whether to execute processing asynchronously. Defaults to False (blocking).
- **custom_prompt** (Optional[str]): Custom prompt for entity extraction and graph generation. If provided, this prompt will be used instead of the default prompts for knowledge graph extraction.
- **ontology_key** (Optional[str]): Reference to a previously uploaded ontology file to use for knowledge graph construction.
## Response
- **Blocking execution**: Complete pipeline run information with entity counts, processing duration, and success/failure status
@ -82,7 +87,8 @@ def get_cognify_router() -> APIRouter:
{
"datasets": ["research_papers", "documentation"],
"run_in_background": false,
"custom_prompt": "Extract entities focusing on technical concepts and their relationships. Identify key technologies, methodologies, and their interconnections."
"custom_prompt": "Extract entities focusing on technical concepts and their relationships. Identify key technologies, methodologies, and their interconnections.",
"ontology_key": "medical_ontology_v1"
}
```
@ -108,13 +114,36 @@ def get_cognify_router() -> APIRouter:
)
from cognee.api.v1.cognify import cognify as cognee_cognify
from cognee.api.v1.ontologies.ontologies import OntologyService
try:
datasets = payload.dataset_ids if payload.dataset_ids else payload.datasets
config_to_use = None
if payload.ontology_key:
ontology_service = OntologyService()
try:
ontology_content = ontology_service.get_ontology_content(payload.ontology_key, user)
from cognee.modules.ontology.ontology_config import Config
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import RDFLibOntologyResolver
from io import StringIO
ontology_stream = StringIO(ontology_content)
config_to_use: Config = {
"ontology_config": {
"ontology_resolver": RDFLibOntologyResolver(ontology_file=ontology_stream)
}
}
except ValueError as e:
return JSONResponse(
status_code=400, content={"error": f"Ontology error: {str(e)}"}
)
cognify_run = await cognee_cognify(
datasets,
user,
config=config_to_use,
run_in_background=payload.run_in_background,
custom_prompt=payload.custom_prompt,
)

View file

@ -0,0 +1,4 @@
from .ontologies import OntologyService
from .routers.get_ontology_router import get_ontology_router
__all__ = ["OntologyService", "get_ontology_router"]

View file

@ -0,0 +1,101 @@
import os
import json
import tempfile
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
from dataclasses import dataclass
@dataclass
class OntologyMetadata:
ontology_key: str
filename: str
size_bytes: int
uploaded_at: str
description: Optional[str] = None
class OntologyService:
def __init__(self):
pass
@property
def base_dir(self) -> Path:
return Path(tempfile.gettempdir()) / "ontologies"
def _get_user_dir(self, user_id: str) -> Path:
user_dir = self.base_dir / str(user_id)
user_dir.mkdir(parents=True, exist_ok=True)
return user_dir
def _get_metadata_path(self, user_dir: Path) -> Path:
return user_dir / "metadata.json"
def _load_metadata(self, user_dir: Path) -> dict:
metadata_path = self._get_metadata_path(user_dir)
if metadata_path.exists():
with open(metadata_path, 'r') as f:
return json.load(f)
return {}
def _save_metadata(self, user_dir: Path, metadata: dict):
metadata_path = self._get_metadata_path(user_dir)
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
async def upload_ontology(self, ontology_key: str, file, user, description: Optional[str] = None) -> OntologyMetadata:
# Validate file format
if not file.filename.lower().endswith('.owl'):
raise ValueError("File must be in .owl format")
user_dir = self._get_user_dir(str(user.id))
metadata = self._load_metadata(user_dir)
# Check for duplicate key
if ontology_key in metadata:
raise ValueError(f"Ontology key '{ontology_key}' already exists")
# Read file content
content = await file.read()
if len(content) > 10 * 1024 * 1024: # 10MB limit
raise ValueError("File size exceeds 10MB limit")
# Save file
file_path = user_dir / f"{ontology_key}.owl"
with open(file_path, 'wb') as f:
f.write(content)
# Update metadata
ontology_metadata = {
"filename": file.filename,
"size_bytes": len(content),
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"description": description
}
metadata[ontology_key] = ontology_metadata
self._save_metadata(user_dir, metadata)
return OntologyMetadata(
ontology_key=ontology_key,
filename=file.filename,
size_bytes=len(content),
uploaded_at=ontology_metadata["uploaded_at"],
description=description
)
def get_ontology_content(self, ontology_key: str, user) -> str:
user_dir = self._get_user_dir(str(user.id))
metadata = self._load_metadata(user_dir)
if ontology_key not in metadata:
raise ValueError(f"Ontology key '{ontology_key}' not found")
file_path = user_dir / f"{ontology_key}.owl"
if not file_path.exists():
raise ValueError(f"Ontology file for key '{ontology_key}' not found")
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def list_ontologies(self, user) -> dict:
user_dir = self._get_user_dir(str(user.id))
return self._load_metadata(user_dir)

View file

@ -0,0 +1,89 @@
from fastapi import APIRouter, File, Form, UploadFile, Depends, HTTPException
from fastapi.responses import JSONResponse
from typing import Optional
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
from cognee.shared.utils import send_telemetry
from cognee import __version__ as cognee_version
from ..ontologies import OntologyService
def get_ontology_router() -> APIRouter:
router = APIRouter()
ontology_service = OntologyService()
@router.post("", response_model=dict)
async def upload_ontology(
ontology_key: str = Form(...),
ontology_file: UploadFile = File(...),
description: Optional[str] = Form(None),
user: User = Depends(get_authenticated_user)
):
"""
Upload an ontology file with a named key for later use in cognify operations.
## Request Parameters
- **ontology_key** (str): User-defined identifier for the ontology
- **ontology_file** (UploadFile): OWL format ontology file
- **description** (Optional[str]): Optional description of the ontology
## Response
Returns metadata about the uploaded ontology including key, filename, size, and upload timestamp.
## Error Codes
- **400 Bad Request**: Invalid file format, duplicate key, file size exceeded
- **500 Internal Server Error**: File system or processing errors
"""
send_telemetry(
"Ontology Upload API Endpoint Invoked",
user.id,
additional_properties={
"endpoint": "POST /api/v1/ontologies",
"cognee_version": cognee_version,
},
)
try:
result = await ontology_service.upload_ontology(
ontology_key, ontology_file, user, description
)
return {
"ontology_key": result.ontology_key,
"filename": result.filename,
"size_bytes": result.size_bytes,
"uploaded_at": result.uploaded_at
}
except ValueError as e:
return JSONResponse(status_code=400, content={"error": str(e)})
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
@router.get("", response_model=dict)
async def list_ontologies(
user: User = Depends(get_authenticated_user)
):
"""
List all uploaded ontologies for the authenticated user.
## Response
Returns a dictionary mapping ontology keys to their metadata including filename, size, and upload timestamp.
## Error Codes
- **500 Internal Server Error**: File system or processing errors
"""
send_telemetry(
"Ontology List API Endpoint Invoked",
user.id,
additional_properties={
"endpoint": "GET /api/v1/ontologies",
"cognee_version": cognee_version,
},
)
try:
metadata = ontology_service.list_ontologies(user)
return metadata
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
return router

View file

@ -2,7 +2,7 @@ import os
import difflib
from cognee.shared.logging_utils import get_logger
from collections import deque
from typing import List, Tuple, Dict, Optional, Any, Union
from typing import List, Tuple, Dict, Optional, Any, Union, IO
from rdflib import Graph, URIRef, RDF, RDFS, OWL
from cognee.modules.ontology.exceptions import (
@ -26,44 +26,55 @@ class RDFLibOntologyResolver(BaseOntologyResolver):
def __init__(
self,
ontology_file: Optional[Union[str, List[str]]] = None,
ontology_file: Optional[Union[str, List[str], IO]] = None,
matching_strategy: Optional[MatchingStrategy] = None,
) -> None:
super().__init__(matching_strategy)
self.ontology_file = ontology_file
try:
files_to_load = []
self.graph = None
if ontology_file is not None:
if isinstance(ontology_file, str):
files_to_load = [ontology_file]
elif isinstance(ontology_file, list):
files_to_load = ontology_file
if hasattr(ontology_file, "read"):
self.graph = Graph()
content = ontology_file.read()
self.graph.parse(data=content, format="xml")
logger.info("Ontology loaded successfully from file object")
else:
raise ValueError(
f"ontology_file must be a string, list of strings, or None. Got: {type(ontology_file)}"
)
if files_to_load:
self.graph = Graph()
loaded_files = []
for file_path in files_to_load:
if os.path.exists(file_path):
self.graph.parse(file_path)
loaded_files.append(file_path)
logger.info("Ontology loaded successfully from file: %s", file_path)
files_to_load = []
if isinstance(ontology_file, str):
files_to_load = [ontology_file]
elif isinstance(ontology_file, list):
files_to_load = ontology_file
else:
logger.warning(
"Ontology file '%s' not found. Skipping this file.",
file_path,
raise ValueError(
f"ontology_file must be a string, list of strings, file-like object, or None. Got: {type(ontology_file)}"
)
if not loaded_files:
logger.info(
"No valid ontology files found. No owl ontology will be attached to the graph."
)
self.graph = None
else:
logger.info("Total ontology files loaded: %d", len(loaded_files))
if files_to_load:
self.graph = Graph()
loaded_files = []
for file_path in files_to_load:
if os.path.exists(file_path):
self.graph.parse(file_path)
loaded_files.append(file_path)
logger.info("Ontology loaded successfully from file: %s", file_path)
else:
logger.warning(
"Ontology file '%s' not found. Skipping this file.",
file_path,
)
if not loaded_files:
logger.info(
"No valid ontology files found. No owl ontology will be attached to the graph."
)
self.graph = None
else:
logger.info("Total ontology files loaded: %d", len(loaded_files))
else:
logger.info(
"No ontology file provided. No owl ontology will be attached to the graph."
)
else:
logger.info(
"No ontology file provided. No owl ontology will be attached to the graph."

View file

@ -0,0 +1,89 @@
import pytest
import uuid
from fastapi.testclient import TestClient
from unittest.mock import patch, Mock, AsyncMock
from types import SimpleNamespace
import importlib
from cognee.api.client import app
gau_mod = importlib.import_module("cognee.modules.users.methods.get_authenticated_user")
@pytest.fixture
def client():
return TestClient(app)
@pytest.fixture
def mock_user():
user = Mock()
user.id = "test-user-123"
return user
@pytest.fixture
def mock_default_user():
"""Mock default user for testing."""
return SimpleNamespace(
id=uuid.uuid4(),
email="default@example.com",
is_active=True,
tenant_id=uuid.uuid4()
)
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_upload_ontology_success(mock_get_default_user, client, mock_default_user):
"""Test successful ontology upload"""
mock_get_default_user.return_value = mock_default_user
ontology_content = b"<rdf:RDF xmlns:rdf='http://www.w3.org/1999/02/22-rdf-syntax-ns#'></rdf:RDF>"
unique_key = f"test_ontology_{uuid.uuid4().hex[:8]}"
response = client.post(
"/api/v1/ontologies",
files={"ontology_file": ("test.owl", ontology_content)},
data={"ontology_key": unique_key, "description": "Test"}
)
assert response.status_code == 200
data = response.json()
assert data["ontology_key"] == unique_key
assert "uploaded_at" in data
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_upload_ontology_invalid_file(mock_get_default_user, client, mock_default_user):
"""Test 400 response for non-.owl files"""
mock_get_default_user.return_value = mock_default_user
unique_key = f"test_ontology_{uuid.uuid4().hex[:8]}"
response = client.post(
"/api/v1/ontologies",
files={"ontology_file": ("test.txt", b"not xml")},
data={"ontology_key": unique_key}
)
assert response.status_code == 400
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_upload_ontology_missing_data(mock_get_default_user, client, mock_default_user):
"""Test 400 response for missing file or key"""
mock_get_default_user.return_value = mock_default_user
# Missing file
response = client.post("/api/v1/ontologies", data={"ontology_key": "test"})
assert response.status_code == 400
# Missing key
response = client.post("/api/v1/ontologies", files={"ontology_file": ("test.owl", b"xml")})
assert response.status_code == 400
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_upload_ontology_unauthorized(mock_get_default_user, client, mock_default_user):
"""Test behavior when default user is provided (no explicit authentication)"""
unique_key = f"test_ontology_{uuid.uuid4().hex[:8]}"
mock_get_default_user.return_value = mock_default_user
response = client.post(
"/api/v1/ontologies",
files={"ontology_file": ("test.owl", b"<rdf></rdf>")},
data={"ontology_key": unique_key}
)
# The current system provides a default user when no explicit authentication is given
# This test verifies the system works with conditional authentication
assert response.status_code == 200
data = response.json()
assert data["ontology_key"] == unique_key
assert "uploaded_at" in data