feat: enhance ontology handling to support multiple uploads and retrievals

This commit is contained in:
Fahad Shoaib 2025-11-14 22:13:00 +05:00
parent 79bd2b2576
commit 844b8d635a
4 changed files with 202 additions and 91 deletions

View file

@ -41,8 +41,8 @@ class CognifyPayloadDTO(InDTO):
custom_prompt: Optional[str] = Field( custom_prompt: Optional[str] = Field(
default="", description="Custom prompt for entity extraction and graph generation" default="", description="Custom prompt for entity extraction and graph generation"
) )
ontology_key: Optional[str] = Field( ontology_key: Optional[List[str]] = Field(
default=None, description="Reference to previously uploaded ontology" default=None, description="Reference to one or more previously uploaded ontologies"
) )
@ -71,7 +71,7 @@ def get_cognify_router() -> APIRouter:
- **dataset_ids** (Optional[List[UUID]]): List of existing dataset UUIDs to process. UUIDs allow processing of datasets not owned by the user (if permitted). - **dataset_ids** (Optional[List[UUID]]): List of existing dataset UUIDs to process. UUIDs allow processing of datasets not owned by the user (if permitted).
- **run_in_background** (Optional[bool]): Whether to execute processing asynchronously. Defaults to False (blocking). - **run_in_background** (Optional[bool]): Whether to execute processing asynchronously. Defaults to False (blocking).
- **custom_prompt** (Optional[str]): Custom prompt for entity extraction and graph generation. If provided, this prompt will be used instead of the default prompts for knowledge graph extraction. - **custom_prompt** (Optional[str]): Custom prompt for entity extraction and graph generation. If provided, this prompt will be used instead of the default prompts for knowledge graph extraction.
- **ontology_key** (Optional[str]): Reference to a previously uploaded ontology file to use for knowledge graph construction. - **ontology_key** (Optional[List[str]]): Reference to one or more previously uploaded ontology files to use for knowledge graph construction.
## Response ## Response
- **Blocking execution**: Complete pipeline run information with entity counts, processing duration, and success/failure status - **Blocking execution**: Complete pipeline run information with entity counts, processing duration, and success/failure status
@ -87,7 +87,7 @@ def get_cognify_router() -> APIRouter:
"datasets": ["research_papers", "documentation"], "datasets": ["research_papers", "documentation"],
"run_in_background": false, "run_in_background": false,
"custom_prompt": "Extract entities focusing on technical concepts and their relationships. Identify key technologies, methodologies, and their interconnections.", "custom_prompt": "Extract entities focusing on technical concepts and their relationships. Identify key technologies, methodologies, and their interconnections.",
"ontology_key": "medical_ontology_v1" "ontology_key": ["medical_ontology_v1"]
} }
``` ```
@ -121,29 +121,22 @@ def get_cognify_router() -> APIRouter:
if payload.ontology_key: if payload.ontology_key:
ontology_service = OntologyService() ontology_service = OntologyService()
try: ontology_contents = ontology_service.get_ontology_contents(
ontology_content = ontology_service.get_ontology_content( payload.ontology_key, user
payload.ontology_key, user )
)
from cognee.modules.ontology.ontology_config import Config from cognee.modules.ontology.ontology_config import Config
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import ( from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import (
RDFLibOntologyResolver, RDFLibOntologyResolver,
) )
from io import StringIO from io import StringIO
ontology_stream = StringIO(ontology_content) ontology_streams = [StringIO(content) for content in ontology_contents]
config_to_use: Config = { config_to_use: Config = {
"ontology_config": { "ontology_config": {
"ontology_resolver": RDFLibOntologyResolver( "ontology_resolver": RDFLibOntologyResolver(ontology_file=ontology_streams)
ontology_file=ontology_stream
)
}
} }
except ValueError as e: }
return JSONResponse(
status_code=400, content={"error": f"Ontology error: {str(e)}"}
)
cognify_run = await cognee_cognify( cognify_run = await cognee_cognify(
datasets, datasets,

View file

@ -3,7 +3,7 @@ import json
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional from typing import Optional, List
from dataclasses import dataclass from dataclasses import dataclass
@ -47,28 +47,23 @@ class OntologyService:
async def upload_ontology( async def upload_ontology(
self, ontology_key: str, file, user, description: Optional[str] = None self, ontology_key: str, file, user, description: Optional[str] = None
) -> OntologyMetadata: ) -> OntologyMetadata:
# Validate file format
if not file.filename.lower().endswith(".owl"): if not file.filename.lower().endswith(".owl"):
raise ValueError("File must be in .owl format") raise ValueError("File must be in .owl format")
user_dir = self._get_user_dir(str(user.id)) user_dir = self._get_user_dir(str(user.id))
metadata = self._load_metadata(user_dir) metadata = self._load_metadata(user_dir)
# Check for duplicate key
if ontology_key in metadata: if ontology_key in metadata:
raise ValueError(f"Ontology key '{ontology_key}' already exists") raise ValueError(f"Ontology key '{ontology_key}' already exists")
# Read file content
content = await file.read() content = await file.read()
if len(content) > 10 * 1024 * 1024: # 10MB limit if len(content) > 10 * 1024 * 1024:
raise ValueError("File size exceeds 10MB limit") raise ValueError("File size exceeds 10MB limit")
# Save file
file_path = user_dir / f"{ontology_key}.owl" file_path = user_dir / f"{ontology_key}.owl"
with open(file_path, "wb") as f: with open(file_path, "wb") as f:
f.write(content) f.write(content)
# Update metadata
ontology_metadata = { ontology_metadata = {
"filename": file.filename, "filename": file.filename,
"size_bytes": len(content), "size_bytes": len(content),
@ -86,19 +81,102 @@ class OntologyService:
description=description, description=description,
) )
def get_ontology_content(self, ontology_key: str, user) -> str: async def upload_ontologies(
self, ontology_key: List[str], files: List, user, descriptions: Optional[List[str]] = None
) -> List[OntologyMetadata]:
"""
Upload ontology files with their respective keys.
Args:
ontology_key: List of unique keys for each ontology
files: List of UploadFile objects (same length as keys)
user: Authenticated user
descriptions: Optional list of descriptions for each file
Returns:
List of OntologyMetadata objects for uploaded files
Raises:
ValueError: If keys duplicate, file format invalid, or array lengths don't match
"""
if len(ontology_key) != len(files):
raise ValueError("Number of keys must match number of files")
if len(set(ontology_key)) != len(ontology_key):
raise ValueError("Duplicate ontology keys not allowed")
if descriptions and len(descriptions) != len(files):
raise ValueError("Number of descriptions must match number of files")
results = []
user_dir = self._get_user_dir(str(user.id)) user_dir = self._get_user_dir(str(user.id))
metadata = self._load_metadata(user_dir) metadata = self._load_metadata(user_dir)
if ontology_key not in metadata: for i, (key, file) in enumerate(zip(ontology_key, files)):
raise ValueError(f"Ontology key '{ontology_key}' not found") if key in metadata:
raise ValueError(f"Ontology key '{key}' already exists")
file_path = user_dir / f"{ontology_key}.owl" if not file.filename.lower().endswith(".owl"):
if not file_path.exists(): raise ValueError(f"File '{file.filename}' must be in .owl format")
raise ValueError(f"Ontology file for key '{ontology_key}' not found")
with open(file_path, "r", encoding="utf-8") as f: content = await file.read()
return f.read() if len(content) > 10 * 1024 * 1024:
raise ValueError(f"File '{file.filename}' exceeds 10MB limit")
file_path = user_dir / f"{key}.owl"
with open(file_path, "wb") as f:
f.write(content)
ontology_metadata = {
"filename": file.filename,
"size_bytes": len(content),
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"description": descriptions[i] if descriptions else None,
}
metadata[key] = ontology_metadata
results.append(
OntologyMetadata(
ontology_key=key,
filename=file.filename,
size_bytes=len(content),
uploaded_at=ontology_metadata["uploaded_at"],
description=descriptions[i] if descriptions else None,
)
)
self._save_metadata(user_dir, metadata)
return results
def get_ontology_contents(self, ontology_key: List[str], user) -> List[str]:
"""
Retrieve ontology content for one or more keys.
Args:
ontology_key: List of ontology keys to retrieve (can contain single item)
user: Authenticated user
Returns:
List of ontology content strings
Raises:
ValueError: If any ontology key not found
"""
user_dir = self._get_user_dir(str(user.id))
metadata = self._load_metadata(user_dir)
contents = []
for key in ontology_key:
if key not in metadata:
raise ValueError(f"Ontology key '{key}' not found")
file_path = user_dir / f"{key}.owl"
if not file_path.exists():
raise ValueError(f"Ontology file for key '{key}' not found")
with open(file_path, "r", encoding="utf-8") as f:
contents.append(f.read())
return contents
def list_ontologies(self, user) -> dict: def list_ontologies(self, user) -> dict:
user_dir = self._get_user_dir(str(user.id)) user_dir = self._get_user_dir(str(user.id))

View file

@ -1,6 +1,6 @@
from fastapi import APIRouter, File, Form, UploadFile, Depends, HTTPException from fastapi import APIRouter, File, Form, UploadFile, Depends, HTTPException
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from typing import Optional from typing import Optional, List
from cognee.modules.users.models import User from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user from cognee.modules.users.methods import get_authenticated_user
@ -16,23 +16,27 @@ def get_ontology_router() -> APIRouter:
@router.post("", response_model=dict) @router.post("", response_model=dict)
async def upload_ontology( async def upload_ontology(
ontology_key: str = Form(...), ontology_key: str = Form(...),
ontology_file: UploadFile = File(...), ontology_file: List[UploadFile] = File(...),
description: Optional[str] = Form(None), descriptions: Optional[str] = Form(None),
user: User = Depends(get_authenticated_user), user: User = Depends(get_authenticated_user),
): ):
""" """
Upload an ontology file with a named key for later use in cognify operations. Upload ontology files with their respective keys for later use in cognify operations.
Supports both single and multiple file uploads:
- Single file: ontology_key=["key"], ontology_file=[file]
- Multiple files: ontology_key=["key1", "key2"], ontology_file=[file1, file2]
## Request Parameters ## Request Parameters
- **ontology_key** (str): User-defined identifier for the ontology - **ontology_key** (str): JSON array string of user-defined identifiers for the ontologies
- **ontology_file** (UploadFile): OWL format ontology file - **ontology_file** (List[UploadFile]): OWL format ontology files
- **description** (Optional[str]): Optional description of the ontology - **descriptions** (Optional[str]): JSON array string of optional descriptions
## Response ## Response
Returns metadata about the uploaded ontology including key, filename, size, and upload timestamp. Returns metadata about uploaded ontologies including keys, filenames, sizes, and upload timestamps.
## Error Codes ## Error Codes
- **400 Bad Request**: Invalid file format, duplicate key, file size exceeded - **400 Bad Request**: Invalid file format, duplicate keys, array length mismatches, file size exceeded
- **500 Internal Server Error**: File system or processing errors - **500 Internal Server Error**: File system or processing errors
""" """
send_telemetry( send_telemetry(
@ -45,16 +49,31 @@ def get_ontology_router() -> APIRouter:
) )
try: try:
result = await ontology_service.upload_ontology( import json
ontology_key, ontology_file, user, description
ontology_keys = json.loads(ontology_key)
description_list = json.loads(descriptions) if descriptions else None
if not isinstance(ontology_keys, list):
raise ValueError("ontology_key must be a JSON array")
results = await ontology_service.upload_ontologies(
ontology_keys, ontology_file, user, description_list
) )
return { return {
"ontology_key": result.ontology_key, "uploaded_ontologies": [
"filename": result.filename, {
"size_bytes": result.size_bytes, "ontology_key": result.ontology_key,
"uploaded_at": result.uploaded_at, "filename": result.filename,
"size_bytes": result.size_bytes,
"uploaded_at": result.uploaded_at,
"description": result.description,
}
for result in results
]
} }
except ValueError as e: except (json.JSONDecodeError, ValueError) as e:
return JSONResponse(status_code=400, content={"error": str(e)}) return JSONResponse(status_code=400, content={"error": str(e)})
except Exception as e: except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)}) return JSONResponse(status_code=500, content={"error": str(e)})

View file

@ -26,7 +26,7 @@ class RDFLibOntologyResolver(BaseOntologyResolver):
def __init__( def __init__(
self, self,
ontology_file: Optional[Union[str, List[str], IO]] = None, ontology_file: Optional[Union[str, List[str], IO, List[IO]]] = None,
matching_strategy: Optional[MatchingStrategy] = None, matching_strategy: Optional[MatchingStrategy] = None,
) -> None: ) -> None:
super().__init__(matching_strategy) super().__init__(matching_strategy)
@ -34,47 +34,68 @@ class RDFLibOntologyResolver(BaseOntologyResolver):
try: try:
self.graph = None self.graph = None
if ontology_file is not None: if ontology_file is not None:
files_to_load = []
file_objects = []
if hasattr(ontology_file, "read"): if hasattr(ontology_file, "read"):
self.graph = Graph() file_objects = [ontology_file]
content = ontology_file.read() elif isinstance(ontology_file, str):
self.graph.parse(data=content, format="xml") files_to_load = [ontology_file]
logger.info("Ontology loaded successfully from file object") elif isinstance(ontology_file, list):
else: if all(hasattr(item, "read") for item in ontology_file):
files_to_load = [] file_objects = ontology_file
if isinstance(ontology_file, str): else:
files_to_load = [ontology_file]
elif isinstance(ontology_file, list):
files_to_load = ontology_file files_to_load = ontology_file
else: else:
raise ValueError( raise ValueError(
f"ontology_file must be a string, list of strings, file-like object, or None. Got: {type(ontology_file)}" f"ontology_file must be a string, list of strings, file-like object, list of file-like objects, or None. Got: {type(ontology_file)}"
) )
if files_to_load: if file_objects:
self.graph = Graph() self.graph = Graph()
loaded_files = [] loaded_objects = []
for file_path in files_to_load: for file_obj in file_objects:
if os.path.exists(file_path): try:
self.graph.parse(file_path) content = file_obj.read()
loaded_files.append(file_path) self.graph.parse(data=content, format="xml")
logger.info("Ontology loaded successfully from file: %s", file_path) loaded_objects.append(file_obj)
else: logger.info("Ontology loaded successfully from file object")
logger.warning( except Exception as e:
"Ontology file '%s' not found. Skipping this file.", logger.warning("Failed to parse ontology file object: %s", str(e))
file_path,
)
if not loaded_files: if not loaded_objects:
logger.info(
"No valid ontology files found. No owl ontology will be attached to the graph."
)
self.graph = None
else:
logger.info("Total ontology files loaded: %d", len(loaded_files))
else:
logger.info( logger.info(
"No ontology file provided. No owl ontology will be attached to the graph." "No valid ontology file objects found. No owl ontology will be attached to the graph."
) )
self.graph = None
else:
logger.info("Total ontology file objects loaded: %d", len(loaded_objects))
elif files_to_load:
self.graph = Graph()
loaded_files = []
for file_path in files_to_load:
if os.path.exists(file_path):
self.graph.parse(file_path)
loaded_files.append(file_path)
logger.info("Ontology loaded successfully from file: %s", file_path)
else:
logger.warning(
"Ontology file '%s' not found. Skipping this file.",
file_path,
)
if not loaded_files:
logger.info(
"No valid ontology files found. No owl ontology will be attached to the graph."
)
self.graph = None
else:
logger.info("Total ontology files loaded: %d", len(loaded_files))
else:
logger.info(
"No ontology file provided. No owl ontology will be attached to the graph."
)
else: else:
logger.info( logger.info(
"No ontology file provided. No owl ontology will be attached to the graph." "No ontology file provided. No owl ontology will be attached to the graph."