feat: ontology resolver and matching strategy abstraction (#1429)

<!-- .github/pull_request_template.md -->

## Description
<!-- 
Please provide a clear, human-generated description of the changes in
this PR.
Adds abstraction for OntologyResolvers and matching mechanism

## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [x] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):

## Changes Made
<!-- List the specific changes made in this PR -->
Adds ontology abstraction + matching resolver logic to support multiple
solutions

## Testing
Testing of changes is done using manual testing and ci/cd tests

## Screenshots/Videos (if applicable)
None

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [ ] **I have tested my changes thoroughly before submitting this PR**
- [ ] **This PR contains minimal changes necessary to address the
issue/feature**
- [ ] My code follows the project's coding standards and style
guidelines
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added necessary documentation (if applicable)
- [ ] All new and existing tests pass
- [ ] I have searched existing PRs to ensure this change hasn't been
submitted already
- [ ] I have linked any relevant issues in the description
- [ ] My commits have clear and descriptive messages

## Related Issues
None

## Additional Notes
None

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Vasilije 2025-09-19 15:35:00 +02:00 committed by GitHub
commit 6da3810e1c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 739 additions and 74 deletions

View file

@ -116,7 +116,15 @@ VECTOR_DB_PROVIDER="lancedb"
VECTOR_DB_URL=
VECTOR_DB_KEY=
################################################################################
# 🧩 Ontology resolver settings
################################################################################
# -- Ontology resolver params --------------------------------------
# ONTOLOGY_RESOLVER=rdflib # Default: uses rdflib and owl file to read ontology structures
# MATCHING_STRATEGY=fuzzy # Default: uses fuzzy matching with 80% similarity threshold
# ONTOLOGY_FILE_PATH=YOUR_FULL_FULE_PATH # Default: empty
# To add ontology resolvers, either set them as it is set in ontology_example or add full_path and settings as envs.
################################################################################
# 🔄 MIGRATION (RELATIONAL → GRAPH) SETTINGS

View file

@ -3,6 +3,7 @@ from pydantic import BaseModel
from typing import Union, Optional
from uuid import UUID
from cognee.modules.ontology.ontology_env_config import get_ontology_env_config
from cognee.shared.logging_utils import get_logger
from cognee.shared.data_models import KnowledgeGraph
from cognee.infrastructure.llm import get_max_chunk_tokens
@ -10,7 +11,11 @@ from cognee.infrastructure.llm import get_max_chunk_tokens
from cognee.modules.pipelines import run_pipeline
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
from cognee.modules.ontology.ontology_config import Config
from cognee.modules.ontology.get_default_ontology_resolver import (
get_default_ontology_resolver,
get_ontology_resolver_from_env,
)
from cognee.modules.users.models import User
from cognee.tasks.documents import (
@ -39,7 +44,7 @@ async def cognify(
graph_model: BaseModel = KnowledgeGraph,
chunker=TextChunker,
chunk_size: int = None,
ontology_file_path: Optional[str] = None,
config: Config = None,
vector_db_config: dict = None,
graph_db_config: dict = None,
run_in_background: bool = False,
@ -100,8 +105,6 @@ async def cognify(
Formula: min(embedding_max_completion_tokens, llm_max_completion_tokens // 2)
Default limits: ~512-8192 tokens depending on models.
Smaller chunks = more granular but potentially fragmented knowledge.
ontology_file_path: Path to RDF/OWL ontology file for domain-specific entity types.
Useful for specialized fields like medical or legal documents.
vector_db_config: Custom vector database configuration for embeddings storage.
graph_db_config: Custom graph database configuration for relationship storage.
run_in_background: If True, starts processing asynchronously and returns immediately.
@ -188,11 +191,28 @@ async def cognify(
- LLM_RATE_LIMIT_ENABLED: Enable rate limiting (default: False)
- LLM_RATE_LIMIT_REQUESTS: Max requests per interval (default: 60)
"""
if config is None:
ontology_config = get_ontology_env_config()
if (
ontology_config.ontology_file_path
and ontology_config.ontology_resolver
and ontology_config.matching_strategy
):
config: Config = {
"ontology_config": {
"ontology_resolver": get_ontology_resolver_from_env(**ontology_config.to_dict())
}
}
else:
config: Config = {
"ontology_config": {"ontology_resolver": get_default_ontology_resolver()}
}
if temporal_cognify:
tasks = await get_temporal_tasks(user, chunker, chunk_size)
else:
tasks = await get_default_tasks(
user, graph_model, chunker, chunk_size, ontology_file_path, custom_prompt
user, graph_model, chunker, chunk_size, config, custom_prompt
)
# By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for
@ -216,9 +236,26 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
graph_model: BaseModel = KnowledgeGraph,
chunker=TextChunker,
chunk_size: int = None,
ontology_file_path: Optional[str] = None,
config: Config = None,
custom_prompt: Optional[str] = None,
) -> list[Task]:
if config is None:
ontology_config = get_ontology_env_config()
if (
ontology_config.ontology_file_path
and ontology_config.ontology_resolver
and ontology_config.matching_strategy
):
config: Config = {
"ontology_config": {
"ontology_resolver": get_ontology_resolver_from_env(**ontology_config.to_dict())
}
}
else:
config: Config = {
"ontology_config": {"ontology_resolver": get_default_ontology_resolver()}
}
default_tasks = [
Task(classify_documents),
Task(check_permissions_on_dataset, user=user, permissions=["write"]),
@ -230,7 +267,7 @@ async def get_default_tasks( # TODO: Find out a better way to do this (Boris's
Task(
extract_graph_from_data,
graph_model=graph_model,
ontology_adapter=OntologyResolver(ontology_file=ontology_file_path),
config=config,
custom_prompt=custom_prompt,
task_config={"batch_size": 10},
), # Generate knowledge graphs from the document chunks.

View file

@ -5,7 +5,7 @@ from cognee.modules.chunking.TextChunker import TextChunker
from cognee.tasks.graph import extract_graph_from_data
from cognee.tasks.storage import add_data_points
from cognee.shared.data_models import KnowledgeGraph
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import RDFLibOntologyResolver
async def get_default_tasks_by_indices(
@ -33,7 +33,7 @@ async def get_no_summary_tasks(
# Get base tasks (0=classify, 1=check_permissions, 2=extract_chunks)
base_tasks = await get_default_tasks_by_indices([0, 1, 2], chunk_size, chunker)
ontology_adapter = OntologyResolver(ontology_file=ontology_file_path)
ontology_adapter = RDFLibOntologyResolver(ontology_file=ontology_file_path)
graph_task = Task(
extract_graph_from_data,

View file

@ -7,8 +7,14 @@ from cognee.modules.engine.utils import (
generate_node_id,
generate_node_name,
)
from cognee.modules.ontology.base_ontology_resolver import BaseOntologyResolver
from cognee.modules.ontology.ontology_env_config import get_ontology_env_config
from cognee.shared.data_models import KnowledgeGraph
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import RDFLibOntologyResolver
from cognee.modules.ontology.get_default_ontology_resolver import (
get_default_ontology_resolver,
get_ontology_resolver_from_env,
)
def _create_node_key(node_id: str, category: str) -> str:
@ -83,7 +89,7 @@ def _process_ontology_edges(
def _create_type_node(
node_type: str,
ontology_resolver: OntologyResolver,
ontology_resolver: RDFLibOntologyResolver,
added_nodes_map: dict,
added_ontology_nodes_map: dict,
name_mapping: dict,
@ -141,7 +147,7 @@ def _create_entity_node(
node_name: str,
node_description: str,
type_node: EntityType,
ontology_resolver: OntologyResolver,
ontology_resolver: RDFLibOntologyResolver,
added_nodes_map: dict,
added_ontology_nodes_map: dict,
name_mapping: dict,
@ -198,7 +204,7 @@ def _create_entity_node(
def _process_graph_nodes(
data_chunk: DocumentChunk,
graph: KnowledgeGraph,
ontology_resolver: OntologyResolver,
ontology_resolver: RDFLibOntologyResolver,
added_nodes_map: dict,
added_ontology_nodes_map: dict,
name_mapping: dict,
@ -277,7 +283,7 @@ def _process_graph_edges(
def expand_with_nodes_and_edges(
data_chunks: list[DocumentChunk],
chunk_graphs: list[KnowledgeGraph],
ontology_resolver: OntologyResolver = None,
ontology_resolver: BaseOntologyResolver = None,
existing_edges_map: Optional[dict[str, bool]] = None,
):
"""
@ -296,8 +302,8 @@ def expand_with_nodes_and_edges(
chunk_graphs (list[KnowledgeGraph]): List of knowledge graphs corresponding to each
data chunk. Each graph contains nodes (entities) and edges (relationships) extracted
from the chunk content.
ontology_resolver (OntologyResolver, optional): Resolver for validating entities and
types against an ontology. If None, a default OntologyResolver is created.
ontology_resolver (BaseOntologyResolver, optional): Resolver for validating entities and
types against an ontology. If None, a default RDFLibOntologyResolver is created.
Defaults to None.
existing_edges_map (dict[str, bool], optional): Mapping of existing edge keys to prevent
duplicate edge creation. Keys are formatted as "{source_id}_{target_id}_{relation}".
@ -320,7 +326,15 @@ def expand_with_nodes_and_edges(
existing_edges_map = {}
if ontology_resolver is None:
ontology_resolver = OntologyResolver()
ontology_config = get_ontology_env_config()
if (
ontology_config.ontology_file_path
and ontology_config.ontology_resolver
and ontology_config.matching_strategy
):
ontology_resolver = get_ontology_resolver_from_env(**ontology_config.to_dict())
else:
ontology_resolver = get_default_ontology_resolver()
added_nodes_map = {}
added_ontology_nodes_map = {}

View file

@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
from typing import List, Tuple, Optional
from cognee.modules.ontology.models import AttachedOntologyNode
from cognee.modules.ontology.matching_strategies import MatchingStrategy, FuzzyMatchingStrategy
class BaseOntologyResolver(ABC):
"""Abstract base class for ontology resolvers."""
def __init__(self, matching_strategy: Optional[MatchingStrategy] = None):
"""Initialize the ontology resolver with a matching strategy.
Args:
matching_strategy: The strategy to use for entity matching.
Defaults to FuzzyMatchingStrategy if None.
"""
self.matching_strategy = matching_strategy or FuzzyMatchingStrategy()
@abstractmethod
def build_lookup(self) -> None:
"""Build the lookup dictionary for ontology entities."""
pass
@abstractmethod
def refresh_lookup(self) -> None:
"""Refresh the lookup dictionary."""
pass
@abstractmethod
def find_closest_match(self, name: str, category: str) -> Optional[str]:
"""Find the closest match for a given name in the specified category."""
pass
@abstractmethod
def get_subgraph(
self, node_name: str, node_type: str = "individuals", directed: bool = True
) -> Tuple[
List[AttachedOntologyNode], List[Tuple[str, str, str]], Optional[AttachedOntologyNode]
]:
"""Get a subgraph for the given node."""
pass

View file

@ -0,0 +1,41 @@
from cognee.modules.ontology.base_ontology_resolver import BaseOntologyResolver
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import RDFLibOntologyResolver
from cognee.modules.ontology.matching_strategies import FuzzyMatchingStrategy
def get_default_ontology_resolver() -> BaseOntologyResolver:
return RDFLibOntologyResolver(ontology_file=None, matching_strategy=FuzzyMatchingStrategy())
def get_ontology_resolver_from_env(
ontology_resolver: str = "", matching_strategy: str = "", ontology_file_path: str = ""
) -> BaseOntologyResolver:
"""
Create and return an ontology resolver instance based on environment parameters.
Currently, this function supports only the RDFLib-based ontology resolver
with a fuzzy matching strategy.
Args:
ontology_resolver (str): The ontology resolver type to use.
Supported value: "rdflib".
matching_strategy (str): The matching strategy to apply.
Supported value: "fuzzy".
ontology_file_path (str): Path to the ontology file required for the resolver.
Returns:
BaseOntologyResolver: An instance of the requested ontology resolver.
Raises:
EnvironmentError: If the provided resolver or strategy is unsupported,
or if required parameters are missing.
"""
if ontology_resolver == "rdflib" and matching_strategy == "fuzzy" and ontology_file_path:
return RDFLibOntologyResolver(
matching_strategy=FuzzyMatchingStrategy(), ontology_file=ontology_file_path
)
else:
raise EnvironmentError(
f"Unsupported ontology resolver: {ontology_resolver}. "
f"Supported resolvers are: RdfLib with FuzzyMatchingStrategy."
)

View file

@ -0,0 +1,53 @@
import difflib
from abc import ABC, abstractmethod
from typing import List, Optional
class MatchingStrategy(ABC):
"""Abstract base class for ontology entity matching strategies."""
@abstractmethod
def find_match(self, name: str, candidates: List[str]) -> Optional[str]:
"""Find the best match for a given name from a list of candidates.
Args:
name: The name to match
candidates: List of candidate names to match against
Returns:
The best matching candidate name, or None if no match found
"""
pass
class FuzzyMatchingStrategy(MatchingStrategy):
"""Fuzzy matching strategy using difflib for approximate string matching."""
def __init__(self, cutoff: float = 0.8):
"""Initialize fuzzy matching strategy.
Args:
cutoff: Minimum similarity score (0.0 to 1.0) for a match to be considered valid
"""
self.cutoff = cutoff
def find_match(self, name: str, candidates: List[str]) -> Optional[str]:
"""Find the closest fuzzy match for a given name.
Args:
name: The normalized name to match
candidates: List of normalized candidate names
Returns:
The best matching candidate name, or None if no match meets the cutoff
"""
if not candidates:
return None
# Check for exact match first
if name in candidates:
return name
# Find fuzzy match
best_match = difflib.get_close_matches(name, candidates, n=1, cutoff=self.cutoff)
return best_match[0] if best_match else None

View file

@ -0,0 +1,20 @@
from typing import Any
class AttachedOntologyNode:
"""Lightweight wrapper to be able to parse any ontology solution and generalize cognee interface."""
def __init__(self, uri: Any, category: str):
self.uri = uri
self.name = self._extract_name(uri)
self.category = category
@staticmethod
def _extract_name(uri: Any) -> str:
uri_str = str(uri)
if "#" in uri_str:
return uri_str.split("#")[-1]
return uri_str.rstrip("/").split("/")[-1]
def __repr__(self):
return f"AttachedOntologyNode(name={self.name}, category={self.category})"

View file

@ -0,0 +1,24 @@
from typing import TypedDict, Optional
from cognee.modules.ontology.base_ontology_resolver import BaseOntologyResolver
from cognee.modules.ontology.matching_strategies import MatchingStrategy
class OntologyConfig(TypedDict, total=False):
"""Configuration containing ontology resolver.
Attributes:
ontology_resolver: The ontology resolver instance to use
"""
ontology_resolver: Optional[BaseOntologyResolver]
class Config(TypedDict, total=False):
"""Top-level configuration dictionary.
Attributes:
ontology_config: Configuration containing ontology resolver
"""
ontology_config: Optional[OntologyConfig]

View file

@ -0,0 +1,45 @@
"""This module contains the configuration for ontology handling."""
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
class OntologyEnvConfig(BaseSettings):
"""
Represents the configuration for ontology handling, including parameters for
ontology file storage and resolution/matching strategies.
Public methods:
- to_dict
Instance variables:
- ontology_resolver
- ontology_matching
- ontology_file_path
- model_config
"""
ontology_resolver: str = "rdflib"
matching_strategy: str = "fuzzy"
ontology_file_path: str = ""
model_config = SettingsConfigDict(env_file=".env", extra="allow", populate_by_name=True)
def to_dict(self) -> dict:
"""
Return the configuration as a dictionary.
"""
return {
"ontology_resolver": self.ontology_resolver,
"matching_strategy": self.matching_strategy,
"ontology_file_path": self.ontology_file_path,
}
@lru_cache
def get_ontology_env_config():
"""
Retrieve the ontology configuration. This function utilizes caching to return a
singleton instance of the OntologyConfig class for efficiency.
"""
return OntologyEnvConfig()

View file

@ -10,31 +10,26 @@ from cognee.modules.ontology.exceptions import (
FindClosestMatchError,
GetSubgraphError,
)
from cognee.modules.ontology.base_ontology_resolver import BaseOntologyResolver
from cognee.modules.ontology.models import AttachedOntologyNode
from cognee.modules.ontology.matching_strategies import MatchingStrategy, FuzzyMatchingStrategy
logger = get_logger("OntologyAdapter")
class AttachedOntologyNode:
"""Lightweight wrapper to be able to parse any ontology solution and generalize cognee interface."""
class RDFLibOntologyResolver(BaseOntologyResolver):
"""RDFLib-based ontology resolver implementation.
def __init__(self, uri: URIRef, category: str):
self.uri = uri
self.name = self._extract_name(uri)
self.category = category
This implementation uses RDFLib to parse and work with RDF/OWL ontology files.
It provides fuzzy matching and subgraph extraction capabilities for ontology entities.
"""
@staticmethod
def _extract_name(uri: URIRef) -> str:
uri_str = str(uri)
if "#" in uri_str:
return uri_str.split("#")[-1]
return uri_str.rstrip("/").split("/")[-1]
def __repr__(self):
return f"AttachedOntologyNode(name={self.name}, category={self.category})"
class OntologyResolver:
def __init__(self, ontology_file: Optional[str] = None):
def __init__(
self,
ontology_file: Optional[str] = None,
matching_strategy: Optional[MatchingStrategy] = None,
) -> None:
super().__init__(matching_strategy)
self.ontology_file = ontology_file
try:
if ontology_file and os.path.exists(ontology_file):
@ -60,7 +55,7 @@ class OntologyResolver:
name = uri_str.rstrip("/").split("/")[-1]
return name.lower().replace(" ", "_").strip()
def build_lookup(self):
def build_lookup(self) -> None:
try:
classes: Dict[str, URIRef] = {}
individuals: Dict[str, URIRef] = {}
@ -97,7 +92,7 @@ class OntologyResolver:
logger.error("Failed to build lookup dictionary: %s", str(e))
raise RuntimeError("Lookup build failed") from e
def refresh_lookup(self):
def refresh_lookup(self) -> None:
self.build_lookup()
logger.info("Ontology lookup refreshed.")
@ -105,13 +100,8 @@ class OntologyResolver:
try:
normalized_name = name.lower().replace(" ", "_").strip()
possible_matches = list(self.lookup.get(category, {}).keys())
if normalized_name in possible_matches:
return normalized_name
best_match = difflib.get_close_matches(
normalized_name, possible_matches, n=1, cutoff=0.8
)
return best_match[0] if best_match else None
return self.matching_strategy.find_match(normalized_name, possible_matches)
except Exception as e:
logger.error("Error in find_closest_match: %s", str(e))
raise FindClosestMatchError() from e
@ -125,7 +115,9 @@ class OntologyResolver:
def get_subgraph(
self, node_name: str, node_type: str = "individuals", directed: bool = True
) -> Tuple[List[Any], List[Tuple[str, str, str]], Optional[Any]]:
) -> Tuple[
List[AttachedOntologyNode], List[Tuple[str, str, str]], Optional[AttachedOntologyNode]
]:
nodes_set = set()
edges: List[Tuple[str, str, str]] = []
visited = set()

View file

@ -3,8 +3,14 @@ from typing import Type, List, Optional
from pydantic import BaseModel
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.modules.ontology.ontology_env_config import get_ontology_env_config
from cognee.tasks.storage.add_data_points import add_data_points
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
from cognee.modules.ontology.ontology_config import Config
from cognee.modules.ontology.get_default_ontology_resolver import (
get_default_ontology_resolver,
get_ontology_resolver_from_env,
)
from cognee.modules.ontology.base_ontology_resolver import BaseOntologyResolver
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.modules.graph.utils import (
expand_with_nodes_and_edges,
@ -24,9 +30,28 @@ async def integrate_chunk_graphs(
data_chunks: list[DocumentChunk],
chunk_graphs: list,
graph_model: Type[BaseModel],
ontology_adapter: OntologyResolver,
ontology_resolver: BaseOntologyResolver,
) -> List[DocumentChunk]:
"""Updates DocumentChunk objects, integrates data points and edges into databases."""
"""Integrate chunk graphs with ontology validation and store in databases.
This function processes document chunks and their associated knowledge graphs,
validates entities against an ontology resolver, and stores the integrated
data points and edges in the configured databases.
Args:
data_chunks: List of document chunks containing source data
chunk_graphs: List of knowledge graphs corresponding to each chunk
graph_model: Pydantic model class for graph data validation
ontology_resolver: Resolver for validating entities against ontology
Returns:
List of updated DocumentChunk objects with integrated data
Raises:
InvalidChunkGraphInputError: If input validation fails
InvalidGraphModelError: If graph model validation fails
InvalidOntologyAdapterError: If ontology resolver validation fails
"""
if not isinstance(data_chunks, list) or not isinstance(chunk_graphs, list):
raise InvalidChunkGraphInputError("data_chunks and chunk_graphs must be lists.")
@ -36,9 +61,9 @@ async def integrate_chunk_graphs(
)
if not isinstance(graph_model, type) or not issubclass(graph_model, BaseModel):
raise InvalidGraphModelError(graph_model)
if ontology_adapter is None or not hasattr(ontology_adapter, "get_subgraph"):
if ontology_resolver is None or not hasattr(ontology_resolver, "get_subgraph"):
raise InvalidOntologyAdapterError(
type(ontology_adapter).__name__ if ontology_adapter else "None"
type(ontology_resolver).__name__ if ontology_resolver else "None"
)
graph_engine = await get_graph_engine()
@ -55,7 +80,7 @@ async def integrate_chunk_graphs(
)
graph_nodes, graph_edges = expand_with_nodes_and_edges(
data_chunks, chunk_graphs, ontology_adapter, existing_edges_map
data_chunks, chunk_graphs, ontology_resolver, existing_edges_map
)
if len(graph_nodes) > 0:
@ -70,7 +95,7 @@ async def integrate_chunk_graphs(
async def extract_graph_from_data(
data_chunks: List[DocumentChunk],
graph_model: Type[BaseModel],
ontology_adapter: OntologyResolver = None,
config: Config = None,
custom_prompt: Optional[str] = None,
) -> List[DocumentChunk]:
"""
@ -101,6 +126,24 @@ async def extract_graph_from_data(
if edge.source_node_id in valid_node_ids and edge.target_node_id in valid_node_ids
]
return await integrate_chunk_graphs(
data_chunks, chunk_graphs, graph_model, ontology_adapter or OntologyResolver()
)
# Extract resolver from config if provided, otherwise get default
if config is None:
ontology_config = get_ontology_env_config()
if (
ontology_config.ontology_file_path
and ontology_config.ontology_resolver
and ontology_config.matching_strategy
):
config: Config = {
"ontology_config": {
"ontology_resolver": get_ontology_resolver_from_env(**ontology_config.to_dict())
}
}
else:
config: Config = {
"ontology_config": {"ontology_resolver": get_default_ontology_resolver()}
}
ontology_resolver = config["ontology_config"]["ontology_resolver"]
return await integrate_chunk_graphs(data_chunks, chunk_graphs, graph_model, ontology_resolver)

View file

@ -3,7 +3,7 @@ from typing import List
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.shared.data_models import KnowledgeGraph
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
from cognee.modules.ontology.base_ontology_resolver import BaseOntologyResolver
from cognee.tasks.graph.cascade_extract.utils.extract_nodes import extract_nodes
from cognee.tasks.graph.cascade_extract.utils.extract_content_nodes_and_relationship_names import (
extract_content_nodes_and_relationship_names,
@ -17,9 +17,21 @@ from cognee.tasks.graph.extract_graph_from_data import integrate_chunk_graphs
async def extract_graph_from_data(
data_chunks: List[DocumentChunk],
n_rounds: int = 2,
ontology_adapter: OntologyResolver = None,
ontology_adapter: BaseOntologyResolver = None,
) -> List[DocumentChunk]:
"""Extract and update graph data from document chunks in multiple steps."""
"""Extract and update graph data from document chunks using cascade extraction.
This function performs multi-step graph extraction from document chunks,
using cascade extraction techniques to build comprehensive knowledge graphs.
Args:
data_chunks: List of document chunks to process
n_rounds: Number of extraction rounds to perform (default: 2)
ontology_adapter: Resolver for validating entities against ontology
Returns:
List of updated DocumentChunk objects with extracted graph data
"""
chunk_nodes = await asyncio.gather(
*[extract_nodes(chunk.text, n_rounds) for chunk in data_chunks]
)
@ -44,5 +56,5 @@ async def extract_graph_from_data(
data_chunks=data_chunks,
chunk_graphs=chunk_graphs,
graph_model=KnowledgeGraph,
ontology_adapter=ontology_adapter or OntologyResolver(),
ontology_adapter=ontology_adapter,
)

View file

@ -1,12 +1,14 @@
import pytest
from rdflib import Graph, Namespace, RDF, OWL, RDFS
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver, AttachedOntologyNode
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import RDFLibOntologyResolver
from cognee.modules.ontology.models import AttachedOntologyNode
from cognee.modules.ontology.get_default_ontology_resolver import get_default_ontology_resolver
def test_ontology_adapter_initialization_success():
"""Test successful initialization of OntologyAdapter."""
"""Test successful initialization of RDFLibOntologyResolver from get_default_ontology_resolver."""
adapter = OntologyResolver()
adapter = get_default_ontology_resolver()
adapter.build_lookup()
assert isinstance(adapter.lookup, dict)
@ -14,7 +16,7 @@ def test_ontology_adapter_initialization_success():
def test_ontology_adapter_initialization_file_not_found():
"""Test OntologyAdapter initialization with nonexistent file."""
adapter = OntologyResolver(ontology_file="nonexistent.owl")
adapter = RDFLibOntologyResolver(ontology_file="nonexistent.owl")
assert adapter.graph is None
@ -27,7 +29,7 @@ def test_build_lookup():
g.add((ns.Audi, RDF.type, ns.Car))
resolver = OntologyResolver()
resolver = RDFLibOntologyResolver()
resolver.graph = g
resolver.build_lookup()
@ -50,7 +52,7 @@ def test_find_closest_match_exact():
g.add((ns.Car, RDF.type, OWL.Class))
g.add((ns.Audi, RDF.type, ns.Car))
resolver = OntologyResolver()
resolver = RDFLibOntologyResolver()
resolver.graph = g
resolver.build_lookup()
@ -71,7 +73,7 @@ def test_find_closest_match_fuzzy():
g.add((ns.Audi, RDF.type, ns.Car))
g.add((ns.BMW, RDF.type, ns.Car))
resolver = OntologyResolver()
resolver = RDFLibOntologyResolver()
resolver.graph = g
resolver.build_lookup()
@ -92,7 +94,7 @@ def test_find_closest_match_no_match():
g.add((ns.Audi, RDF.type, ns.Car))
g.add((ns.BMW, RDF.type, ns.Car))
resolver = OntologyResolver()
resolver = RDFLibOntologyResolver()
resolver.graph = g
resolver.build_lookup()
@ -102,10 +104,10 @@ def test_find_closest_match_no_match():
def test_get_subgraph_no_match_rdflib():
"""Test get_subgraph returns empty results for a non-existent node."""
"""Test get_subgraph returns empty results for a non-existent node using RDFLibOntologyResolver."""
g = Graph()
resolver = OntologyResolver()
resolver = get_default_ontology_resolver()
resolver.graph = g
resolver.build_lookup()
@ -138,7 +140,7 @@ def test_get_subgraph_success_rdflib():
g.add((ns.VW, owns, ns.Audi))
g.add((ns.VW, owns, ns.Porsche))
resolver = OntologyResolver()
resolver = RDFLibOntologyResolver()
resolver.graph = g
resolver.build_lookup()
@ -160,10 +162,10 @@ def test_get_subgraph_success_rdflib():
def test_refresh_lookup_rdflib():
"""Test that refresh_lookup rebuilds the lookup dict into a new object."""
"""Test that refresh_lookup rebuilds the lookup dict into a new object using RDFLibOntologyResolver."""
g = Graph()
resolver = OntologyResolver()
resolver = get_default_ontology_resolver()
resolver.graph = g
resolver.build_lookup()
@ -172,3 +174,318 @@ def test_refresh_lookup_rdflib():
resolver.refresh_lookup()
assert resolver.lookup is not original_lookup
def test_fuzzy_matching_strategy_exact_match():
"""Test FuzzyMatchingStrategy finds exact matches."""
from cognee.modules.ontology.matching_strategies import FuzzyMatchingStrategy
strategy = FuzzyMatchingStrategy()
candidates = ["audi", "bmw", "mercedes"]
result = strategy.find_match("audi", candidates)
assert result == "audi"
def test_fuzzy_matching_strategy_fuzzy_match():
"""Test FuzzyMatchingStrategy finds fuzzy matches."""
from cognee.modules.ontology.matching_strategies import FuzzyMatchingStrategy
strategy = FuzzyMatchingStrategy(cutoff=0.6)
candidates = ["audi", "bmw", "mercedes"]
result = strategy.find_match("audii", candidates)
assert result == "audi"
def test_fuzzy_matching_strategy_no_match():
"""Test FuzzyMatchingStrategy returns None when no match meets cutoff."""
from cognee.modules.ontology.matching_strategies import FuzzyMatchingStrategy
strategy = FuzzyMatchingStrategy(cutoff=0.9)
candidates = ["audi", "bmw", "mercedes"]
result = strategy.find_match("completely_different", candidates)
assert result is None
def test_fuzzy_matching_strategy_empty_candidates():
"""Test FuzzyMatchingStrategy handles empty candidates list."""
from cognee.modules.ontology.matching_strategies import FuzzyMatchingStrategy
strategy = FuzzyMatchingStrategy()
result = strategy.find_match("audi", [])
assert result is None
def test_base_ontology_resolver_initialization():
"""Test BaseOntologyResolver initialization with default matching strategy."""
from cognee.modules.ontology.base_ontology_resolver import BaseOntologyResolver
from cognee.modules.ontology.matching_strategies import FuzzyMatchingStrategy
class TestOntologyResolver(BaseOntologyResolver):
def build_lookup(self):
pass
def refresh_lookup(self):
pass
def find_closest_match(self, name, category):
return None
def get_subgraph(self, node_name, node_type="individuals", directed=True):
return [], [], None
resolver = TestOntologyResolver()
assert isinstance(resolver.matching_strategy, FuzzyMatchingStrategy)
def test_base_ontology_resolver_custom_matching_strategy():
"""Test BaseOntologyResolver initialization with custom matching strategy."""
from cognee.modules.ontology.base_ontology_resolver import BaseOntologyResolver
from cognee.modules.ontology.matching_strategies import MatchingStrategy
class CustomMatchingStrategy(MatchingStrategy):
def find_match(self, name, candidates):
return "custom_match"
class TestOntologyResolver(BaseOntologyResolver):
def build_lookup(self):
pass
def refresh_lookup(self):
pass
def find_closest_match(self, name, category):
return None
def get_subgraph(self, node_name, node_type="individuals", directed=True):
return [], [], None
custom_strategy = CustomMatchingStrategy()
resolver = TestOntologyResolver(matching_strategy=custom_strategy)
assert resolver.matching_strategy == custom_strategy
def test_ontology_config_structure():
"""Test TypedDict structure for ontology configuration."""
from cognee.modules.ontology.ontology_config import Config
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import RDFLibOntologyResolver
from cognee.modules.ontology.matching_strategies import FuzzyMatchingStrategy
matching_strategy = FuzzyMatchingStrategy()
resolver = RDFLibOntologyResolver(matching_strategy=matching_strategy)
config: Config = {"ontology_config": {"ontology_resolver": resolver}}
assert config["ontology_config"]["ontology_resolver"] == resolver
def test_get_ontology_resolver_default():
"""Test get_default_ontology_resolver returns a properly configured RDFLibOntologyResolver with FuzzyMatchingStrategy."""
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import RDFLibOntologyResolver
from cognee.modules.ontology.matching_strategies import FuzzyMatchingStrategy
resolver = get_default_ontology_resolver()
assert isinstance(resolver, RDFLibOntologyResolver)
assert isinstance(resolver.matching_strategy, FuzzyMatchingStrategy)
def test_get_default_ontology_resolver():
"""Test get_default_ontology_resolver returns a properly configured RDFLibOntologyResolver with FuzzyMatchingStrategy."""
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import RDFLibOntologyResolver
from cognee.modules.ontology.matching_strategies import FuzzyMatchingStrategy
resolver = get_default_ontology_resolver()
assert isinstance(resolver, RDFLibOntologyResolver)
assert isinstance(resolver.matching_strategy, FuzzyMatchingStrategy)
def test_rdflib_ontology_resolver_uses_matching_strategy():
"""Test that RDFLibOntologyResolver uses the provided matching strategy."""
from cognee.modules.ontology.matching_strategies import MatchingStrategy
class TestMatchingStrategy(MatchingStrategy):
def find_match(self, name, candidates):
return "test_match" if candidates else None
ns = Namespace("http://example.org/test#")
g = Graph()
g.add((ns.Car, RDF.type, OWL.Class))
g.add((ns.Audi, RDF.type, ns.Car))
resolver = RDFLibOntologyResolver(matching_strategy=TestMatchingStrategy())
resolver.graph = g
resolver.build_lookup()
result = resolver.find_closest_match("Audi", "individuals")
assert result == "test_match"
def test_rdflib_ontology_resolver_default_matching_strategy():
"""Test that RDFLibOntologyResolver uses FuzzyMatchingStrategy by default."""
from cognee.modules.ontology.matching_strategies import FuzzyMatchingStrategy
resolver = RDFLibOntologyResolver()
assert isinstance(resolver.matching_strategy, FuzzyMatchingStrategy)
def test_get_ontology_resolver_from_env_success():
"""Test get_ontology_resolver_from_env returns correct resolver with valid parameters."""
from cognee.modules.ontology.get_default_ontology_resolver import get_ontology_resolver_from_env
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import RDFLibOntologyResolver
from cognee.modules.ontology.matching_strategies import FuzzyMatchingStrategy
resolver = get_ontology_resolver_from_env(
ontology_resolver="rdflib", matching_strategy="fuzzy", ontology_file_path="/test/path.owl"
)
assert isinstance(resolver, RDFLibOntologyResolver)
assert isinstance(resolver.matching_strategy, FuzzyMatchingStrategy)
assert resolver.ontology_file == "/test/path.owl"
def test_get_ontology_resolver_from_env_unsupported_resolver():
"""Test get_ontology_resolver_from_env raises EnvironmentError for unsupported resolver."""
from cognee.modules.ontology.get_default_ontology_resolver import get_ontology_resolver_from_env
with pytest.raises(EnvironmentError) as exc_info:
get_ontology_resolver_from_env(
ontology_resolver="unsupported",
matching_strategy="fuzzy",
ontology_file_path="/test/path.owl",
)
assert "Unsupported ontology resolver: unsupported" in str(exc_info.value)
assert "Supported resolvers are: RdfLib with FuzzyMatchingStrategy" in str(exc_info.value)
def test_get_ontology_resolver_from_env_unsupported_strategy():
"""Test get_ontology_resolver_from_env raises EnvironmentError for unsupported strategy."""
from cognee.modules.ontology.get_default_ontology_resolver import get_ontology_resolver_from_env
with pytest.raises(EnvironmentError) as exc_info:
get_ontology_resolver_from_env(
ontology_resolver="rdflib",
matching_strategy="unsupported",
ontology_file_path="/test/path.owl",
)
assert "Unsupported ontology resolver: rdflib" in str(exc_info.value)
def test_get_ontology_resolver_from_env_empty_file_path():
"""Test get_ontology_resolver_from_env raises EnvironmentError for empty file path."""
from cognee.modules.ontology.get_default_ontology_resolver import get_ontology_resolver_from_env
with pytest.raises(EnvironmentError) as exc_info:
get_ontology_resolver_from_env(
ontology_resolver="rdflib", matching_strategy="fuzzy", ontology_file_path=""
)
assert "Unsupported ontology resolver: rdflib" in str(exc_info.value)
def test_get_ontology_resolver_from_env_none_file_path():
"""Test get_ontology_resolver_from_env raises EnvironmentError for None file path."""
from cognee.modules.ontology.get_default_ontology_resolver import get_ontology_resolver_from_env
with pytest.raises(EnvironmentError) as exc_info:
get_ontology_resolver_from_env(
ontology_resolver="rdflib", matching_strategy="fuzzy", ontology_file_path=None
)
assert "Unsupported ontology resolver: rdflib" in str(exc_info.value)
def test_get_ontology_resolver_from_env_empty_resolver():
"""Test get_ontology_resolver_from_env raises EnvironmentError for empty resolver."""
from cognee.modules.ontology.get_default_ontology_resolver import get_ontology_resolver_from_env
with pytest.raises(EnvironmentError) as exc_info:
get_ontology_resolver_from_env(
ontology_resolver="", matching_strategy="fuzzy", ontology_file_path="/test/path.owl"
)
assert "Unsupported ontology resolver:" in str(exc_info.value)
def test_get_ontology_resolver_from_env_empty_strategy():
"""Test get_ontology_resolver_from_env raises EnvironmentError for empty strategy."""
from cognee.modules.ontology.get_default_ontology_resolver import get_ontology_resolver_from_env
with pytest.raises(EnvironmentError) as exc_info:
get_ontology_resolver_from_env(
ontology_resolver="rdflib", matching_strategy="", ontology_file_path="/test/path.owl"
)
assert "Unsupported ontology resolver: rdflib" in str(exc_info.value)
def test_get_ontology_resolver_from_env_default_parameters():
"""Test get_ontology_resolver_from_env with default empty parameters raises EnvironmentError."""
from cognee.modules.ontology.get_default_ontology_resolver import get_ontology_resolver_from_env
with pytest.raises(EnvironmentError) as exc_info:
get_ontology_resolver_from_env()
assert "Unsupported ontology resolver:" in str(exc_info.value)
def test_get_ontology_resolver_from_env_case_sensitivity():
"""Test get_ontology_resolver_from_env is case sensitive."""
from cognee.modules.ontology.get_default_ontology_resolver import get_ontology_resolver_from_env
with pytest.raises(EnvironmentError):
get_ontology_resolver_from_env(
ontology_resolver="RDFLIB",
matching_strategy="fuzzy",
ontology_file_path="/test/path.owl",
)
with pytest.raises(EnvironmentError):
get_ontology_resolver_from_env(
ontology_resolver="RdfLib",
matching_strategy="fuzzy",
ontology_file_path="/test/path.owl",
)
def test_get_ontology_resolver_from_env_with_actual_file():
"""Test get_ontology_resolver_from_env works with actual file path."""
from cognee.modules.ontology.get_default_ontology_resolver import get_ontology_resolver_from_env
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import RDFLibOntologyResolver
from cognee.modules.ontology.matching_strategies import FuzzyMatchingStrategy
resolver = get_ontology_resolver_from_env(
ontology_resolver="rdflib",
matching_strategy="fuzzy",
ontology_file_path="/path/to/ontology.owl",
)
assert isinstance(resolver, RDFLibOntologyResolver)
assert isinstance(resolver.matching_strategy, FuzzyMatchingStrategy)
assert resolver.ontology_file == "/path/to/ontology.owl"
def test_get_ontology_resolver_from_env_resolver_functionality():
"""Test that resolver created from env function works correctly."""
from cognee.modules.ontology.get_default_ontology_resolver import get_ontology_resolver_from_env
resolver = get_ontology_resolver_from_env(
ontology_resolver="rdflib", matching_strategy="fuzzy", ontology_file_path="/test/path.owl"
)
resolver.build_lookup()
assert isinstance(resolver.lookup, dict)
result = resolver.find_closest_match("test", "individuals")
assert result is None # Should return None for non-existent entity
nodes, relationships, start_node = resolver.get_subgraph("test", "individuals")
assert nodes == []
assert relationships == []
assert start_node is None

View file

@ -5,6 +5,8 @@ import cognee
from cognee.api.v1.search import SearchType
from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.shared.logging_utils import setup_logging
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import RDFLibOntologyResolver
from cognee.modules.ontology.ontology_config import Config
text_1 = """
1. Audi
@ -60,7 +62,14 @@ async def main():
os.path.dirname(os.path.abspath(__file__)), "ontology_input_example/basic_ontology.owl"
)
await cognee.cognify(ontology_file_path=ontology_path)
# Create full config structure manually
config: Config = {
"ontology_config": {
"ontology_resolver": RDFLibOntologyResolver(ontology_file=ontology_path)
}
}
await cognee.cognify(config=config)
print("Knowledge with ontology created.")
# Step 4: Query insights

View file

@ -5,6 +5,8 @@ import os
import textwrap
from cognee.api.v1.search import SearchType
from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.modules.ontology.rdf_xml.RDFLibOntologyResolver import RDFLibOntologyResolver
from cognee.modules.ontology.ontology_config import Config
async def run_pipeline(ontology_path=None):
@ -17,7 +19,13 @@ async def run_pipeline(ontology_path=None):
await cognee.add(scientific_papers_dir)
pipeline_run = await cognee.cognify(ontology_file_path=ontology_path)
config: Config = {
"ontology_config": {
"ontology_resolver": RDFLibOntologyResolver(ontology_file=ontology_path)
}
}
pipeline_run = await cognee.cognify(config=config)
return pipeline_run