fix: search without prior cognify (#1570)
I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin ## Description #1548 did not handle multi-tenant case, where graph db context is set further downstream. This version adds empty graph logging/handling that works for multi-tenant case
This commit is contained in:
commit
411f27aeaf
14 changed files with 146 additions and 9 deletions
|
|
@ -1,4 +1,5 @@
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
from cognee.modules.data.methods import has_dataset_data
|
||||||
from cognee.modules.users.methods import get_default_user
|
from cognee.modules.users.methods import get_default_user
|
||||||
from cognee.modules.ingestion import discover_directory_datasets
|
from cognee.modules.ingestion import discover_directory_datasets
|
||||||
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
|
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
|
||||||
|
|
@ -26,6 +27,16 @@ class datasets:
|
||||||
|
|
||||||
return await get_dataset_data(dataset.id)
|
return await get_dataset_data(dataset.id)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def has_data(dataset_id: str) -> bool:
|
||||||
|
from cognee.modules.data.methods import get_dataset
|
||||||
|
|
||||||
|
user = await get_default_user()
|
||||||
|
|
||||||
|
dataset = await get_dataset(user.id, dataset_id)
|
||||||
|
|
||||||
|
return await has_dataset_data(dataset.id)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def get_status(dataset_ids: list[UUID]) -> dict:
|
async def get_status(dataset_ids: list[UUID]) -> dict:
|
||||||
return await get_pipeline_status(dataset_ids, pipeline_name="cognify_pipeline")
|
return await get_pipeline_status(dataset_ids, pipeline_name="cognify_pipeline")
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
from typing import Union, Optional, List, Type
|
from typing import Union, Optional, List, Type
|
||||||
|
|
||||||
|
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||||
from cognee.modules.engine.models.node_set import NodeSet
|
from cognee.modules.engine.models.node_set import NodeSet
|
||||||
from cognee.modules.users.models import User
|
from cognee.modules.users.models import User
|
||||||
from cognee.modules.search.types import SearchResult, SearchType, CombinedSearchResult
|
from cognee.modules.search.types import SearchResult, SearchType, CombinedSearchResult
|
||||||
|
|
|
||||||
|
|
@ -159,6 +159,11 @@ class GraphDBInterface(ABC):
|
||||||
- get_connections
|
- get_connections
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
async def is_empty(self) -> bool:
|
||||||
|
logger.warning("is_empty() is not implemented")
|
||||||
|
return True
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def query(self, query: str, params: dict) -> List[Any]:
|
async def query(self, query: str, params: dict) -> List[Any]:
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
|
|
@ -198,6 +198,15 @@ class KuzuAdapter(GraphDBInterface):
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
logger.warning(f"Kuzu S3 storage file not found: {self.db_path}")
|
logger.warning(f"Kuzu S3 storage file not found: {self.db_path}")
|
||||||
|
|
||||||
|
async def is_empty(self) -> bool:
|
||||||
|
query = """
|
||||||
|
MATCH (n)
|
||||||
|
RETURN true
|
||||||
|
LIMIT 1;
|
||||||
|
"""
|
||||||
|
query_result = await self.query(query)
|
||||||
|
return len(query_result) == 0
|
||||||
|
|
||||||
async def query(self, query: str, params: Optional[dict] = None) -> List[Tuple]:
|
async def query(self, query: str, params: Optional[dict] = None) -> List[Tuple]:
|
||||||
"""
|
"""
|
||||||
Execute a Kuzu query asynchronously with automatic reconnection.
|
Execute a Kuzu query asynchronously with automatic reconnection.
|
||||||
|
|
|
||||||
|
|
@ -87,6 +87,15 @@ class Neo4jAdapter(GraphDBInterface):
|
||||||
async with self.driver.session(database=self.graph_database_name) as session:
|
async with self.driver.session(database=self.graph_database_name) as session:
|
||||||
yield session
|
yield session
|
||||||
|
|
||||||
|
async def is_empty(self) -> bool:
|
||||||
|
query = """
|
||||||
|
RETURN EXISTS {
|
||||||
|
MATCH (n)
|
||||||
|
} AS node_exists;
|
||||||
|
"""
|
||||||
|
query_result = await self.query(query)
|
||||||
|
return not query_result[0]["node_exists"]
|
||||||
|
|
||||||
@deadlock_retry()
|
@deadlock_retry()
|
||||||
async def query(
|
async def query(
|
||||||
self,
|
self,
|
||||||
|
|
|
||||||
|
|
@ -23,3 +23,6 @@ from .create_authorized_dataset import create_authorized_dataset
|
||||||
|
|
||||||
# Check
|
# Check
|
||||||
from .check_dataset_name import check_dataset_name
|
from .check_dataset_name import check_dataset_name
|
||||||
|
|
||||||
|
# Boolean check
|
||||||
|
from .has_dataset_data import has_dataset_data
|
||||||
|
|
|
||||||
21
cognee/modules/data/methods/has_dataset_data.py
Normal file
21
cognee/modules/data/methods/has_dataset_data.py
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.sql import func
|
||||||
|
|
||||||
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
from cognee.modules.data.models import DatasetData
|
||||||
|
|
||||||
|
|
||||||
|
async def has_dataset_data(dataset_id: UUID) -> bool:
|
||||||
|
db_engine = get_relational_engine()
|
||||||
|
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
count_query = (
|
||||||
|
select(func.count())
|
||||||
|
.select_from(DatasetData)
|
||||||
|
.where(DatasetData.dataset_id == dataset_id)
|
||||||
|
)
|
||||||
|
count = await session.execute(count_query)
|
||||||
|
|
||||||
|
return count.scalar_one() > 0
|
||||||
|
|
@ -44,6 +44,12 @@ class CypherSearchRetriever(BaseRetriever):
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
graph_engine = await get_graph_engine()
|
graph_engine = await get_graph_engine()
|
||||||
|
is_empty = await graph_engine.is_empty()
|
||||||
|
|
||||||
|
if is_empty:
|
||||||
|
logger.warning("Search attempt on an empty knowledge graph")
|
||||||
|
return []
|
||||||
|
|
||||||
result = await graph_engine.query(query)
|
result = await graph_engine.query(query)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Failed to execture cypher search retrieval: %s", str(e))
|
logger.error("Failed to execture cypher search retrieval: %s", str(e))
|
||||||
|
|
|
||||||
|
|
@ -124,6 +124,13 @@ class GraphCompletionRetriever(BaseGraphRetriever):
|
||||||
- str: A string representing the resolved context from the retrieved triplets, or an
|
- str: A string representing the resolved context from the retrieved triplets, or an
|
||||||
empty string if no triplets are found.
|
empty string if no triplets are found.
|
||||||
"""
|
"""
|
||||||
|
graph_engine = await get_graph_engine()
|
||||||
|
is_empty = await graph_engine.is_empty()
|
||||||
|
|
||||||
|
if is_empty:
|
||||||
|
logger.warning("Search attempt on an empty knowledge graph")
|
||||||
|
return []
|
||||||
|
|
||||||
triplets = await self.get_triplets(query)
|
triplets = await self.get_triplets(query)
|
||||||
|
|
||||||
if len(triplets) == 0:
|
if len(triplets) == 0:
|
||||||
|
|
|
||||||
|
|
@ -122,6 +122,11 @@ class NaturalLanguageRetriever(BaseRetriever):
|
||||||
query.
|
query.
|
||||||
"""
|
"""
|
||||||
graph_engine = await get_graph_engine()
|
graph_engine = await get_graph_engine()
|
||||||
|
is_empty = await graph_engine.is_empty()
|
||||||
|
|
||||||
|
if is_empty:
|
||||||
|
logger.warning("Search attempt on an empty knowledge graph")
|
||||||
|
return []
|
||||||
|
|
||||||
return await self._execute_cypher_query(query, graph_engine)
|
return await self._execute_cypher_query(query, graph_engine)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,16 @@
|
||||||
from typing import Any, List, Optional, Tuple, Type, Union
|
from typing import Any, List, Optional, Tuple, Type, Union
|
||||||
|
|
||||||
|
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||||
from cognee.modules.data.models.Dataset import Dataset
|
from cognee.modules.data.models.Dataset import Dataset
|
||||||
from cognee.modules.engine.models.node_set import NodeSet
|
from cognee.modules.engine.models.node_set import NodeSet
|
||||||
from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge
|
from cognee.modules.graph.cognee_graph.CogneeGraphElements import Edge
|
||||||
from cognee.modules.search.types import SearchType
|
from cognee.modules.search.types import SearchType
|
||||||
|
from cognee.shared.logging_utils import get_logger
|
||||||
|
|
||||||
from .get_search_type_tools import get_search_type_tools
|
from .get_search_type_tools import get_search_type_tools
|
||||||
|
|
||||||
|
logger = get_logger()
|
||||||
|
|
||||||
|
|
||||||
async def no_access_control_search(
|
async def no_access_control_search(
|
||||||
query_type: SearchType,
|
query_type: SearchType,
|
||||||
|
|
@ -32,6 +36,12 @@ async def no_access_control_search(
|
||||||
save_interaction=save_interaction,
|
save_interaction=save_interaction,
|
||||||
last_k=last_k,
|
last_k=last_k,
|
||||||
)
|
)
|
||||||
|
graph_engine = await get_graph_engine()
|
||||||
|
is_empty = await graph_engine.is_empty()
|
||||||
|
|
||||||
|
if is_empty:
|
||||||
|
# TODO: we can log here, but not all search types use graph. Still keeping this here for reviewer input
|
||||||
|
logger.warning("Search attempt on an empty knowledge graph")
|
||||||
if len(search_tools) == 2:
|
if len(search_tools) == 2:
|
||||||
[get_completion, get_context] = search_tools
|
[get_completion, get_context] = search_tools
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@ from uuid import UUID
|
||||||
from fastapi.encoders import jsonable_encoder
|
from fastapi.encoders import jsonable_encoder
|
||||||
from typing import Any, List, Optional, Tuple, Type, Union
|
from typing import Any, List, Optional, Tuple, Type, Union
|
||||||
|
|
||||||
|
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||||
|
from cognee.shared.logging_utils import get_logger
|
||||||
from cognee.shared.utils import send_telemetry
|
from cognee.shared.utils import send_telemetry
|
||||||
from cognee.context_global_variables import set_database_global_context_variables
|
from cognee.context_global_variables import set_database_global_context_variables
|
||||||
|
|
||||||
|
|
@ -27,6 +29,8 @@ from .get_search_type_tools import get_search_type_tools
|
||||||
from .no_access_control_search import no_access_control_search
|
from .no_access_control_search import no_access_control_search
|
||||||
from ..utils.prepare_search_result import prepare_search_result
|
from ..utils.prepare_search_result import prepare_search_result
|
||||||
|
|
||||||
|
logger = get_logger()
|
||||||
|
|
||||||
|
|
||||||
async def search(
|
async def search(
|
||||||
query_text: str,
|
query_text: str,
|
||||||
|
|
@ -329,6 +333,25 @@ async def search_in_datasets_context(
|
||||||
# Set database configuration in async context for each dataset user has access for
|
# Set database configuration in async context for each dataset user has access for
|
||||||
await set_database_global_context_variables(dataset.id, dataset.owner_id)
|
await set_database_global_context_variables(dataset.id, dataset.owner_id)
|
||||||
|
|
||||||
|
graph_engine = await get_graph_engine()
|
||||||
|
is_empty = await graph_engine.is_empty()
|
||||||
|
|
||||||
|
if is_empty:
|
||||||
|
# TODO: we can log here, but not all search types use graph. Still keeping this here for reviewer input
|
||||||
|
from cognee.modules.data.methods import get_dataset_data
|
||||||
|
|
||||||
|
dataset_data = await get_dataset_data(dataset.id)
|
||||||
|
|
||||||
|
if len(dataset_data) > 0:
|
||||||
|
logger.warning(
|
||||||
|
f"Dataset '{dataset.name}' has {len(dataset_data)} data item(s) but the knowledge graph is empty. "
|
||||||
|
"Please run cognify to process the data before searching."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"Search attempt on an empty knowledge graph - no data has been added to this dataset"
|
||||||
|
)
|
||||||
|
|
||||||
specific_search_tools = await get_search_type_tools(
|
specific_search_tools = await get_search_type_tools(
|
||||||
query_type=query_type,
|
query_type=query_type,
|
||||||
query_text=query_text,
|
query_text=query_text,
|
||||||
|
|
|
||||||
|
|
@ -47,10 +47,26 @@ async def main():
|
||||||
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
|
pathlib.Path(__file__).parent, "test_data/Quantum_computers.txt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||||
|
|
||||||
|
graph_engine = await get_graph_engine()
|
||||||
|
|
||||||
|
is_empty = await graph_engine.is_empty()
|
||||||
|
|
||||||
|
assert is_empty, "Kuzu graph database is not empty"
|
||||||
|
|
||||||
await cognee.add([explanation_file_path_quantum], dataset_name)
|
await cognee.add([explanation_file_path_quantum], dataset_name)
|
||||||
|
|
||||||
|
is_empty = await graph_engine.is_empty()
|
||||||
|
|
||||||
|
assert is_empty, "Kuzu graph database should be empty before cognify"
|
||||||
|
|
||||||
await cognee.cognify([dataset_name])
|
await cognee.cognify([dataset_name])
|
||||||
|
|
||||||
|
is_empty = await graph_engine.is_empty()
|
||||||
|
|
||||||
|
assert not is_empty, "Kuzu graph database should not be empty"
|
||||||
|
|
||||||
from cognee.infrastructure.databases.vector import get_vector_engine
|
from cognee.infrastructure.databases.vector import get_vector_engine
|
||||||
|
|
||||||
vector_engine = get_vector_engine()
|
vector_engine = get_vector_engine()
|
||||||
|
|
@ -114,11 +130,10 @@ async def main():
|
||||||
assert not os.path.isdir(data_root_directory), "Local data files are not deleted"
|
assert not os.path.isdir(data_root_directory), "Local data files are not deleted"
|
||||||
|
|
||||||
await cognee.prune.prune_system(metadata=True)
|
await cognee.prune.prune_system(metadata=True)
|
||||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
|
||||||
|
|
||||||
graph_engine = await get_graph_engine()
|
is_empty = await graph_engine.is_empty()
|
||||||
nodes, edges = await graph_engine.get_graph_data()
|
|
||||||
assert len(nodes) == 0 and len(edges) == 0, "Kuzu graph database is not empty"
|
assert is_empty, "Kuzu graph database is not empty"
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# Ensure cleanup even if tests fail
|
# Ensure cleanup even if tests fail
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,14 @@ async def main():
|
||||||
explanation_file_path_nlp = os.path.join(
|
explanation_file_path_nlp = os.path.join(
|
||||||
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
|
pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt"
|
||||||
)
|
)
|
||||||
|
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||||
|
|
||||||
|
graph_engine = await get_graph_engine()
|
||||||
|
|
||||||
|
is_empty = await graph_engine.is_empty()
|
||||||
|
|
||||||
|
assert is_empty, "Graph has to be empty"
|
||||||
|
|
||||||
await cognee.add([explanation_file_path_nlp], dataset_name)
|
await cognee.add([explanation_file_path_nlp], dataset_name)
|
||||||
|
|
||||||
explanation_file_path_quantum = os.path.join(
|
explanation_file_path_quantum = os.path.join(
|
||||||
|
|
@ -42,9 +50,16 @@ async def main():
|
||||||
)
|
)
|
||||||
|
|
||||||
await cognee.add([explanation_file_path_quantum], dataset_name)
|
await cognee.add([explanation_file_path_quantum], dataset_name)
|
||||||
|
is_empty = await graph_engine.is_empty()
|
||||||
|
|
||||||
|
assert is_empty, "Graph has to be empty before cognify"
|
||||||
|
|
||||||
await cognee.cognify([dataset_name])
|
await cognee.cognify([dataset_name])
|
||||||
|
|
||||||
|
is_empty = await graph_engine.is_empty()
|
||||||
|
|
||||||
|
assert not is_empty, "Graph shouldn't be empty"
|
||||||
|
|
||||||
from cognee.infrastructure.databases.vector import get_vector_engine
|
from cognee.infrastructure.databases.vector import get_vector_engine
|
||||||
|
|
||||||
vector_engine = get_vector_engine()
|
vector_engine = get_vector_engine()
|
||||||
|
|
@ -117,11 +132,8 @@ async def main():
|
||||||
assert not os.path.isdir(data_root_directory), "Local data files are not deleted"
|
assert not os.path.isdir(data_root_directory), "Local data files are not deleted"
|
||||||
|
|
||||||
await cognee.prune.prune_system(metadata=True)
|
await cognee.prune.prune_system(metadata=True)
|
||||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
is_empty = await graph_engine.is_empty()
|
||||||
|
assert is_empty, "Neo4j graph database is not empty"
|
||||||
graph_engine = await get_graph_engine()
|
|
||||||
nodes, edges = await graph_engine.get_graph_data()
|
|
||||||
assert len(nodes) == 0 and len(edges) == 0, "Neo4j graph database is not empty"
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue