feat: add ontology endpoint in REST API (#1724)

## Description

This PR resolves #1446 by adding support to upload ontology files and
refer to them in the cognee POST request.

  Implementation Details:
- New endpoint: POST /api/v1/ontologies for ontology file upload with a
simple key parameter
  that can be referenced in POST cognify requests
- File storage: Ontology files are stored in /tmp/ontologies/{user_id}/
with metadata
  management
- New service: OntologyService created for file management and metadata
handling
- Resolver: RDFLibOntologyResolver modified to handle file-like objects.


## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [X] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):

## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [X] **I have tested my changes thoroughly before submitting this PR**
- [X] **This PR contains minimal changes necessary to address the
issue/feature**
- [X] My code follows the project's coding standards and style
guidelines
- [X] I have added tests that prove my fix is effective or that my
feature works
- [X] I have added necessary documentation (if applicable)
- [X] All new and existing tests pass
- [X] I have searched existing PRs to ensure this change hasn't been
submitted already
- [X] I have linked any relevant issues in the description
- [X] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Vasilije 2025-11-22 14:49:09 -08:00 committed by GitHub
commit e81613ea6e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 729 additions and 26 deletions

View file

@ -23,6 +23,7 @@ from cognee.api.v1.settings.routers import get_settings_router
from cognee.api.v1.datasets.routers import get_datasets_router
from cognee.api.v1.cognify.routers import get_code_pipeline_router, get_cognify_router
from cognee.api.v1.search.routers import get_search_router
from cognee.api.v1.ontologies.routers.get_ontology_router import get_ontology_router
from cognee.api.v1.memify.routers import get_memify_router
from cognee.api.v1.add.routers import get_add_router
from cognee.api.v1.delete.routers import get_delete_router
@ -263,6 +264,8 @@ app.include_router(
app.include_router(get_datasets_router(), prefix="/api/v1/datasets", tags=["datasets"])
app.include_router(get_ontology_router(), prefix="/api/v1/ontologies", tags=["ontologies"])
app.include_router(get_settings_router(), prefix="/api/v1/settings", tags=["settings"])
app.include_router(get_visualize_router(), prefix="/api/v1/visualize", tags=["visualize"])

View file

@ -41,6 +41,9 @@ class CognifyPayloadDTO(InDTO):
custom_prompt: Optional[str] = Field(
default="", description="Custom prompt for entity extraction and graph generation"
)
ontology_key: Optional[List[str]] = Field(
default=None, description="Reference to one or more previously uploaded ontologies"
)
def get_cognify_router() -> APIRouter:
@ -68,6 +71,7 @@ def get_cognify_router() -> APIRouter:
- **dataset_ids** (Optional[List[UUID]]): List of existing dataset UUIDs to process. UUIDs allow processing of datasets not owned by the user (if permitted).
- **run_in_background** (Optional[bool]): Whether to execute processing asynchronously. Defaults to False (blocking).
- **custom_prompt** (Optional[str]): Custom prompt for entity extraction and graph generation. If provided, this prompt will be used instead of the default prompts for knowledge graph extraction.
- **ontology_key** (Optional[List[str]]): Reference to one or more previously uploaded ontology files to use for knowledge graph construction.
## Response
- **Blocking execution**: Complete pipeline run information with entity counts, processing duration, and success/failure status
@ -82,7 +86,8 @@ def get_cognify_router() -> APIRouter:
{
"datasets": ["research_papers", "documentation"],
"run_in_background": false,
"custom_prompt": "Extract entities focusing on technical concepts and their relationships. Identify key technologies, methodologies, and their interconnections."
"custom_prompt": "Extract entities focusing on technical concepts and their relationships. Identify key technologies, methodologies, and their interconnections.",
"ontology_key": ["medical_ontology_v1"]
}
```
@ -108,13 +113,35 @@ def get_cognify_router() -> APIRouter:
)
from cognee.api.v1.cognify import cognify as cognee_cognify
from cognee.api.v1.ontologies.ontologies import OntologyService
try:
datasets = payload.dataset_ids if payload.dataset_ids else payload.datasets
config_to_use = None
if payload.ontology_key:
ontology_service = OntologyService()
ontology_contents = ontology_service.get_ontology_contents(
payload.ontology_key, user
)
from cognee.modules.ontology.ontology_config import Config
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import (
RDFLibOntologyResolver,
)
from io import StringIO
ontology_streams = [StringIO(content) for content in ontology_contents]
config_to_use: Config = {
"ontology_config": {
"ontology_resolver": RDFLibOntologyResolver(ontology_file=ontology_streams)
}
}
cognify_run = await cognee_cognify(
datasets,
user,
config=config_to_use,
run_in_background=payload.run_in_background,
custom_prompt=payload.custom_prompt,
)

View file

@ -0,0 +1,4 @@
from .ontologies import OntologyService
from .routers.get_ontology_router import get_ontology_router
__all__ = ["OntologyService", "get_ontology_router"]

View file

@ -0,0 +1,183 @@
import os
import json
import tempfile
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional, List
from dataclasses import dataclass
@dataclass
class OntologyMetadata:
ontology_key: str
filename: str
size_bytes: int
uploaded_at: str
description: Optional[str] = None
class OntologyService:
def __init__(self):
pass
@property
def base_dir(self) -> Path:
return Path(tempfile.gettempdir()) / "ontologies"
def _get_user_dir(self, user_id: str) -> Path:
user_dir = self.base_dir / str(user_id)
user_dir.mkdir(parents=True, exist_ok=True)
return user_dir
def _get_metadata_path(self, user_dir: Path) -> Path:
return user_dir / "metadata.json"
def _load_metadata(self, user_dir: Path) -> dict:
metadata_path = self._get_metadata_path(user_dir)
if metadata_path.exists():
with open(metadata_path, "r") as f:
return json.load(f)
return {}
def _save_metadata(self, user_dir: Path, metadata: dict):
metadata_path = self._get_metadata_path(user_dir)
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
async def upload_ontology(
self, ontology_key: str, file, user, description: Optional[str] = None
) -> OntologyMetadata:
if not file.filename.lower().endswith(".owl"):
raise ValueError("File must be in .owl format")
user_dir = self._get_user_dir(str(user.id))
metadata = self._load_metadata(user_dir)
if ontology_key in metadata:
raise ValueError(f"Ontology key '{ontology_key}' already exists")
content = await file.read()
if len(content) > 10 * 1024 * 1024:
raise ValueError("File size exceeds 10MB limit")
file_path = user_dir / f"{ontology_key}.owl"
with open(file_path, "wb") as f:
f.write(content)
ontology_metadata = {
"filename": file.filename,
"size_bytes": len(content),
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"description": description,
}
metadata[ontology_key] = ontology_metadata
self._save_metadata(user_dir, metadata)
return OntologyMetadata(
ontology_key=ontology_key,
filename=file.filename,
size_bytes=len(content),
uploaded_at=ontology_metadata["uploaded_at"],
description=description,
)
async def upload_ontologies(
self, ontology_key: List[str], files: List, user, descriptions: Optional[List[str]] = None
) -> List[OntologyMetadata]:
"""
Upload ontology files with their respective keys.
Args:
ontology_key: List of unique keys for each ontology
files: List of UploadFile objects (same length as keys)
user: Authenticated user
descriptions: Optional list of descriptions for each file
Returns:
List of OntologyMetadata objects for uploaded files
Raises:
ValueError: If keys duplicate, file format invalid, or array lengths don't match
"""
if len(ontology_key) != len(files):
raise ValueError("Number of keys must match number of files")
if len(set(ontology_key)) != len(ontology_key):
raise ValueError("Duplicate ontology keys not allowed")
if descriptions and len(descriptions) != len(files):
raise ValueError("Number of descriptions must match number of files")
results = []
user_dir = self._get_user_dir(str(user.id))
metadata = self._load_metadata(user_dir)
for i, (key, file) in enumerate(zip(ontology_key, files)):
if key in metadata:
raise ValueError(f"Ontology key '{key}' already exists")
if not file.filename.lower().endswith(".owl"):
raise ValueError(f"File '{file.filename}' must be in .owl format")
content = await file.read()
if len(content) > 10 * 1024 * 1024:
raise ValueError(f"File '{file.filename}' exceeds 10MB limit")
file_path = user_dir / f"{key}.owl"
with open(file_path, "wb") as f:
f.write(content)
ontology_metadata = {
"filename": file.filename,
"size_bytes": len(content),
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"description": descriptions[i] if descriptions else None,
}
metadata[key] = ontology_metadata
results.append(
OntologyMetadata(
ontology_key=key,
filename=file.filename,
size_bytes=len(content),
uploaded_at=ontology_metadata["uploaded_at"],
description=descriptions[i] if descriptions else None,
)
)
self._save_metadata(user_dir, metadata)
return results
def get_ontology_contents(self, ontology_key: List[str], user) -> List[str]:
"""
Retrieve ontology content for one or more keys.
Args:
ontology_key: List of ontology keys to retrieve (can contain single item)
user: Authenticated user
Returns:
List of ontology content strings
Raises:
ValueError: If any ontology key not found
"""
user_dir = self._get_user_dir(str(user.id))
metadata = self._load_metadata(user_dir)
contents = []
for key in ontology_key:
if key not in metadata:
raise ValueError(f"Ontology key '{key}' not found")
file_path = user_dir / f"{key}.owl"
if not file_path.exists():
raise ValueError(f"Ontology file for key '{key}' not found")
with open(file_path, "r", encoding="utf-8") as f:
contents.append(f.read())
return contents
def list_ontologies(self, user) -> dict:
user_dir = self._get_user_dir(str(user.id))
return self._load_metadata(user_dir)

View file

@ -0,0 +1,107 @@
from fastapi import APIRouter, File, Form, UploadFile, Depends, HTTPException
from fastapi.responses import JSONResponse
from typing import Optional, List
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
from cognee.shared.utils import send_telemetry
from cognee import __version__ as cognee_version
from ..ontologies import OntologyService
def get_ontology_router() -> APIRouter:
router = APIRouter()
ontology_service = OntologyService()
@router.post("", response_model=dict)
async def upload_ontology(
ontology_key: str = Form(...),
ontology_file: List[UploadFile] = File(...),
descriptions: Optional[str] = Form(None),
user: User = Depends(get_authenticated_user),
):
"""
Upload ontology files with their respective keys for later use in cognify operations.
Supports both single and multiple file uploads:
- Single file: ontology_key=["key"], ontology_file=[file]
- Multiple files: ontology_key=["key1", "key2"], ontology_file=[file1, file2]
## Request Parameters
- **ontology_key** (str): JSON array string of user-defined identifiers for the ontologies
- **ontology_file** (List[UploadFile]): OWL format ontology files
- **descriptions** (Optional[str]): JSON array string of optional descriptions
## Response
Returns metadata about uploaded ontologies including keys, filenames, sizes, and upload timestamps.
## Error Codes
- **400 Bad Request**: Invalid file format, duplicate keys, array length mismatches, file size exceeded
- **500 Internal Server Error**: File system or processing errors
"""
send_telemetry(
"Ontology Upload API Endpoint Invoked",
user.id,
additional_properties={
"endpoint": "POST /api/v1/ontologies",
"cognee_version": cognee_version,
},
)
try:
import json
ontology_keys = json.loads(ontology_key)
description_list = json.loads(descriptions) if descriptions else None
if not isinstance(ontology_keys, list):
raise ValueError("ontology_key must be a JSON array")
results = await ontology_service.upload_ontologies(
ontology_keys, ontology_file, user, description_list
)
return {
"uploaded_ontologies": [
{
"ontology_key": result.ontology_key,
"filename": result.filename,
"size_bytes": result.size_bytes,
"uploaded_at": result.uploaded_at,
"description": result.description,
}
for result in results
]
}
except (json.JSONDecodeError, ValueError) as e:
return JSONResponse(status_code=400, content={"error": str(e)})
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
@router.get("", response_model=dict)
async def list_ontologies(user: User = Depends(get_authenticated_user)):
"""
List all uploaded ontologies for the authenticated user.
## Response
Returns a dictionary mapping ontology keys to their metadata including filename, size, and upload timestamp.
## Error Codes
- **500 Internal Server Error**: File system or processing errors
"""
send_telemetry(
"Ontology List API Endpoint Invoked",
user.id,
additional_properties={
"endpoint": "GET /api/v1/ontologies",
"cognee_version": cognee_version,
},
)
try:
metadata = ontology_service.list_ontologies(user)
return metadata
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
return router

View file

@ -2,7 +2,7 @@ import os
import difflib
from cognee.shared.logging_utils import get_logger
from collections import deque
from typing import List, Tuple, Dict, Optional, Any, Union
from typing import List, Tuple, Dict, Optional, Any, Union, IO
from rdflib import Graph, URIRef, RDF, RDFS, OWL
from cognee.modules.ontology.exceptions import (
@ -26,44 +26,76 @@ class RDFLibOntologyResolver(BaseOntologyResolver):
def __init__(
self,
ontology_file: Optional[Union[str, List[str]]] = None,
ontology_file: Optional[Union[str, List[str], IO, List[IO]]] = None,
matching_strategy: Optional[MatchingStrategy] = None,
) -> None:
super().__init__(matching_strategy)
self.ontology_file = ontology_file
try:
files_to_load = []
self.graph = None
if ontology_file is not None:
if isinstance(ontology_file, str):
files_to_load = []
file_objects = []
if hasattr(ontology_file, "read"):
file_objects = [ontology_file]
elif isinstance(ontology_file, str):
files_to_load = [ontology_file]
elif isinstance(ontology_file, list):
files_to_load = ontology_file
if all(hasattr(item, "read") for item in ontology_file):
file_objects = ontology_file
else:
files_to_load = ontology_file
else:
raise ValueError(
f"ontology_file must be a string, list of strings, or None. Got: {type(ontology_file)}"
f"ontology_file must be a string, list of strings, file-like object, list of file-like objects, or None. Got: {type(ontology_file)}"
)
if files_to_load:
self.graph = Graph()
loaded_files = []
for file_path in files_to_load:
if os.path.exists(file_path):
self.graph.parse(file_path)
loaded_files.append(file_path)
logger.info("Ontology loaded successfully from file: %s", file_path)
else:
logger.warning(
"Ontology file '%s' not found. Skipping this file.",
file_path,
if file_objects:
self.graph = Graph()
loaded_objects = []
for file_obj in file_objects:
try:
content = file_obj.read()
self.graph.parse(data=content, format="xml")
loaded_objects.append(file_obj)
logger.info("Ontology loaded successfully from file object")
except Exception as e:
logger.warning("Failed to parse ontology file object: %s", str(e))
if not loaded_objects:
logger.info(
"No valid ontology file objects found. No owl ontology will be attached to the graph."
)
self.graph = None
else:
logger.info("Total ontology file objects loaded: %d", len(loaded_objects))
if not loaded_files:
logger.info(
"No valid ontology files found. No owl ontology will be attached to the graph."
)
self.graph = None
elif files_to_load:
self.graph = Graph()
loaded_files = []
for file_path in files_to_load:
if os.path.exists(file_path):
self.graph.parse(file_path)
loaded_files.append(file_path)
logger.info("Ontology loaded successfully from file: %s", file_path)
else:
logger.warning(
"Ontology file '%s' not found. Skipping this file.",
file_path,
)
if not loaded_files:
logger.info(
"No valid ontology files found. No owl ontology will be attached to the graph."
)
self.graph = None
else:
logger.info("Total ontology files loaded: %d", len(loaded_files))
else:
logger.info("Total ontology files loaded: %d", len(loaded_files))
logger.info(
"No ontology file provided. No owl ontology will be attached to the graph."
)
else:
logger.info(
"No ontology file provided. No owl ontology will be attached to the graph."

View file

@ -7,6 +7,7 @@ import requests
from pathlib import Path
import sys
import uuid
import json
class TestCogneeServerStart(unittest.TestCase):
@ -90,12 +91,71 @@ class TestCogneeServerStart(unittest.TestCase):
)
}
payload = {"datasets": [dataset_name]}
ontology_key = f"test_ontology_{uuid.uuid4().hex[:8]}"
payload = {"datasets": [dataset_name], "ontology_key": [ontology_key]}
add_response = requests.post(url, headers=headers, data=form_data, files=file, timeout=50)
if add_response.status_code not in [200, 201]:
add_response.raise_for_status()
ontology_content = b"""<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns:owl="http://www.w3.org/2002/07/owl#"
xmlns:rdfs="http://www.w3.org/2000/01/rdf-schema#"
xmlns="http://example.org/ontology#"
xml:base="http://example.org/ontology">
<owl:Ontology rdf:about="http://example.org/ontology"/>
<!-- Classes -->
<owl:Class rdf:ID="Problem"/>
<owl:Class rdf:ID="HardwareProblem"/>
<owl:Class rdf:ID="SoftwareProblem"/>
<owl:Class rdf:ID="Concept"/>
<owl:Class rdf:ID="Object"/>
<owl:Class rdf:ID="Joke"/>
<owl:Class rdf:ID="Image"/>
<owl:Class rdf:ID="Person"/>
<rdf:Description rdf:about="#HardwareProblem">
<rdfs:subClassOf rdf:resource="#Problem"/>
<rdfs:comment>A failure caused by physical components.</rdfs:comment>
</rdf:Description>
<rdf:Description rdf:about="#SoftwareProblem">
<rdfs:subClassOf rdf:resource="#Problem"/>
<rdfs:comment>An error caused by software logic or configuration.</rdfs:comment>
</rdf:Description>
<rdf:Description rdf:about="#Person">
<rdfs:comment>A human being or individual.</rdfs:comment>
</rdf:Description>
<!-- Individuals -->
<Person rdf:ID="programmers">
<rdfs:label>Programmers</rdfs:label>
</Person>
<Object rdf:ID="light_bulb">
<rdfs:label>Light Bulb</rdfs:label>
</Object>
<HardwareProblem rdf:ID="hardware_problem">
<rdfs:label>Hardware Problem</rdfs:label>
</HardwareProblem>
</rdf:RDF>"""
ontology_response = requests.post(
"http://127.0.0.1:8000/api/v1/ontologies",
headers=headers,
files=[("ontology_file", ("test.owl", ontology_content, "application/xml"))],
data={
"ontology_key": json.dumps([ontology_key]),
"description": json.dumps(["Test ontology"]),
},
)
self.assertEqual(ontology_response.status_code, 200)
# Cognify request
url = "http://127.0.0.1:8000/api/v1/cognify"
headers = {
@ -107,6 +167,29 @@ class TestCogneeServerStart(unittest.TestCase):
if cognify_response.status_code not in [200, 201]:
cognify_response.raise_for_status()
datasets_response = requests.get("http://127.0.0.1:8000/api/v1/datasets", headers=headers)
datasets = datasets_response.json()
dataset_id = None
for dataset in datasets:
if dataset["name"] == dataset_name:
dataset_id = dataset["id"]
break
graph_response = requests.get(
f"http://127.0.0.1:8000/api/v1/datasets/{dataset_id}/graph", headers=headers
)
self.assertEqual(graph_response.status_code, 200)
graph_data = graph_response.json()
ontology_nodes = [
node for node in graph_data.get("nodes") if node.get("properties").get("ontology_valid")
]
self.assertGreater(
len(ontology_nodes), 0, "No ontology nodes found - ontology was not integrated"
)
# TODO: Add test to verify cognify pipeline is complete before testing search
# Search request

View file

@ -0,0 +1,264 @@
import pytest
import uuid
from fastapi.testclient import TestClient
from unittest.mock import patch, Mock, AsyncMock
from types import SimpleNamespace
import importlib
from cognee.api.client import app
gau_mod = importlib.import_module("cognee.modules.users.methods.get_authenticated_user")
@pytest.fixture
def client():
return TestClient(app)
@pytest.fixture
def mock_user():
user = Mock()
user.id = "test-user-123"
return user
@pytest.fixture
def mock_default_user():
"""Mock default user for testing."""
return SimpleNamespace(
id=uuid.uuid4(), email="default@example.com", is_active=True, tenant_id=uuid.uuid4()
)
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_upload_ontology_success(mock_get_default_user, client, mock_default_user):
"""Test successful ontology upload"""
import json
mock_get_default_user.return_value = mock_default_user
ontology_content = (
b"<rdf:RDF xmlns:rdf='http://www.w3.org/1999/02/22-rdf-syntax-ns#'></rdf:RDF>"
)
unique_key = f"test_ontology_{uuid.uuid4().hex[:8]}"
response = client.post(
"/api/v1/ontologies",
files=[("ontology_file", ("test.owl", ontology_content, "application/xml"))],
data={"ontology_key": json.dumps([unique_key]), "description": json.dumps(["Test"])},
)
assert response.status_code == 200
data = response.json()
assert data["uploaded_ontologies"][0]["ontology_key"] == unique_key
assert "uploaded_at" in data["uploaded_ontologies"][0]
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_upload_ontology_invalid_file(mock_get_default_user, client, mock_default_user):
"""Test 400 response for non-.owl files"""
mock_get_default_user.return_value = mock_default_user
unique_key = f"test_ontology_{uuid.uuid4().hex[:8]}"
response = client.post(
"/api/v1/ontologies",
files={"ontology_file": ("test.txt", b"not xml")},
data={"ontology_key": unique_key},
)
assert response.status_code == 400
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_upload_ontology_missing_data(mock_get_default_user, client, mock_default_user):
"""Test 400 response for missing file or key"""
import json
mock_get_default_user.return_value = mock_default_user
# Missing file
response = client.post("/api/v1/ontologies", data={"ontology_key": json.dumps(["test"])})
assert response.status_code == 400
# Missing key
response = client.post(
"/api/v1/ontologies", files=[("ontology_file", ("test.owl", b"xml", "application/xml"))]
)
assert response.status_code == 400
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_upload_ontology_unauthorized(mock_get_default_user, client, mock_default_user):
"""Test behavior when default user is provided (no explicit authentication)"""
import json
unique_key = f"test_ontology_{uuid.uuid4().hex[:8]}"
mock_get_default_user.return_value = mock_default_user
response = client.post(
"/api/v1/ontologies",
files=[("ontology_file", ("test.owl", b"<rdf></rdf>", "application/xml"))],
data={"ontology_key": json.dumps([unique_key])},
)
# The current system provides a default user when no explicit authentication is given
# This test verifies the system works with conditional authentication
assert response.status_code == 200
data = response.json()
assert data["uploaded_ontologies"][0]["ontology_key"] == unique_key
assert "uploaded_at" in data["uploaded_ontologies"][0]
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_upload_multiple_ontologies(mock_get_default_user, client, mock_default_user):
"""Test uploading multiple ontology files in single request"""
import io
# Create mock files
file1_content = b"<rdf:RDF xmlns:rdf='http://www.w3.org/1999/02/22-rdf-syntax-ns#'></rdf:RDF>"
file2_content = b"<rdf:RDF xmlns:rdf='http://www.w3.org/1999/02/22-rdf-syntax-ns#'></rdf:RDF>"
files = [
("ontology_file", ("vehicles.owl", io.BytesIO(file1_content), "application/xml")),
("ontology_file", ("manufacturers.owl", io.BytesIO(file2_content), "application/xml")),
]
data = {
"ontology_key": '["vehicles", "manufacturers"]',
"descriptions": '["Base vehicles", "Car manufacturers"]',
}
response = client.post("/api/v1/ontologies", files=files, data=data)
assert response.status_code == 200
result = response.json()
assert "uploaded_ontologies" in result
assert len(result["uploaded_ontologies"]) == 2
assert result["uploaded_ontologies"][0]["ontology_key"] == "vehicles"
assert result["uploaded_ontologies"][1]["ontology_key"] == "manufacturers"
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_upload_endpoint_accepts_arrays(mock_get_default_user, client, mock_default_user):
"""Test that upload endpoint accepts array parameters"""
import io
import json
file_content = b"<rdf:RDF xmlns:rdf='http://www.w3.org/1999/02/22-rdf-syntax-ns#'></rdf:RDF>"
files = [("ontology_file", ("single.owl", io.BytesIO(file_content), "application/xml"))]
data = {
"ontology_key": json.dumps(["single_key"]),
"descriptions": json.dumps(["Single ontology"]),
}
response = client.post("/api/v1/ontologies", files=files, data=data)
assert response.status_code == 200
result = response.json()
assert result["uploaded_ontologies"][0]["ontology_key"] == "single_key"
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_cognify_with_multiple_ontologies(mock_get_default_user, client, mock_default_user):
"""Test cognify endpoint accepts multiple ontology keys"""
payload = {
"datasets": ["test_dataset"],
"ontology_key": ["ontology1", "ontology2"], # Array instead of string
"run_in_background": False,
}
response = client.post("/api/v1/cognify", json=payload)
# Should not fail due to ontology_key type
assert response.status_code in [200, 400, 409] # May fail for other reasons, not type
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_complete_multifile_workflow(mock_get_default_user, client, mock_default_user):
"""Test complete workflow: upload multiple ontologies → cognify with multiple keys"""
import io
import json
# Step 1: Upload multiple ontologies
file1_content = b"""<?xml version="1.0"?>
<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns:owl="http://www.w3.org/2002/07/owl#">
<owl:Class rdf:ID="Vehicle"/>
</rdf:RDF>"""
file2_content = b"""<?xml version="1.0"?>
<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns:owl="http://www.w3.org/2002/07/owl#">
<owl:Class rdf:ID="Manufacturer"/>
</rdf:RDF>"""
files = [
("ontology_file", ("vehicles.owl", io.BytesIO(file1_content), "application/xml")),
("ontology_file", ("manufacturers.owl", io.BytesIO(file2_content), "application/xml")),
]
data = {
"ontology_key": json.dumps(["vehicles", "manufacturers"]),
"descriptions": json.dumps(["Vehicle ontology", "Manufacturer ontology"]),
}
upload_response = client.post("/api/v1/ontologies", files=files, data=data)
assert upload_response.status_code == 200
# Step 2: Verify ontologies are listed
list_response = client.get("/api/v1/ontologies")
assert list_response.status_code == 200
ontologies = list_response.json()
assert "vehicles" in ontologies
assert "manufacturers" in ontologies
# Step 3: Test cognify with multiple ontologies
cognify_payload = {
"datasets": ["test_dataset"],
"ontology_key": ["vehicles", "manufacturers"],
"run_in_background": False,
}
cognify_response = client.post("/api/v1/cognify", json=cognify_payload)
# Should not fail due to ontology handling (may fail for dataset reasons)
assert cognify_response.status_code != 400 # Not a validation error
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_multifile_error_handling(mock_get_default_user, client, mock_default_user):
"""Test error handling for invalid multifile uploads"""
import io
import json
# Test mismatched array lengths
file_content = b"<rdf:RDF></rdf:RDF>"
files = [("ontology_file", ("test.owl", io.BytesIO(file_content), "application/xml"))]
data = {
"ontology_key": json.dumps(["key1", "key2"]), # 2 keys, 1 file
"descriptions": json.dumps(["desc1"]),
}
response = client.post("/api/v1/ontologies", files=files, data=data)
assert response.status_code == 400
assert "Number of keys must match number of files" in response.json()["error"]
# Test duplicate keys
files = [
("ontology_file", ("test1.owl", io.BytesIO(file_content), "application/xml")),
("ontology_file", ("test2.owl", io.BytesIO(file_content), "application/xml")),
]
data = {
"ontology_key": json.dumps(["duplicate", "duplicate"]),
"descriptions": json.dumps(["desc1", "desc2"]),
}
response = client.post("/api/v1/ontologies", files=files, data=data)
assert response.status_code == 400
assert "Duplicate ontology keys not allowed" in response.json()["error"]
@patch.object(gau_mod, "get_default_user", new_callable=AsyncMock)
def test_cognify_missing_ontology_key(mock_get_default_user, client, mock_default_user):
"""Test cognify with non-existent ontology key"""
payload = {
"datasets": ["test_dataset"],
"ontology_key": ["nonexistent_key"],
"run_in_background": False,
}
response = client.post("/api/v1/cognify", json=payload)
assert response.status_code == 409
assert "Ontology key 'nonexistent_key' not found" in response.json()["error"]