From a59517409c8e4d29899ecd3d4fafe42b052b8dc5 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Tue, 26 Nov 2024 14:45:48 +0100 Subject: [PATCH] chore: Fixes some of the issues based on PR review + restructures things --- .../vector/pgvector/PGVectorAdapter.py | 2 - .../modules/graph/cognee_graph/CogneeGraph.py | 6 +-- ...iever.py => brute_force_triplet_search.py} | 42 +++++++++---------- .../retriever/diffusion_retriever.py | 25 ----------- cognee/pipelines/retriever/g_retriever.py | 25 ----------- examples/python/dynamic_steps_example.py | 6 +-- 6 files changed, 26 insertions(+), 80 deletions(-) rename cognee/pipelines/retriever/{two_steps_retriever.py => brute_force_triplet_search.py} (75%) delete mode 100644 cognee/pipelines/retriever/diffusion_retriever.py delete mode 100644 cognee/pipelines/retriever/g_retriever.py diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index 97571a274..a4cfcf789 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -192,8 +192,6 @@ class PGVectorAdapter(SQLAlchemyAdapter, VectorDBInterface): # Get PGVectorDataPoint Table from database PGVectorDataPoint = await self.get_table(collection_name) - closest_items = [] - # Use async session to connect to the database async with self.get_async_session() as session: # Find closest vectors to query_vector diff --git a/cognee/modules/graph/cognee_graph/CogneeGraph.py b/cognee/modules/graph/cognee_graph/CogneeGraph.py index 158fb9d07..0b5aebce4 100644 --- a/cognee/modules/graph/cognee_graph/CogneeGraph.py +++ b/cognee/modules/graph/cognee_graph/CogneeGraph.py @@ -164,9 +164,9 @@ class CogneeGraph(CogneeAbstractGraph): source_node = self.get_node(edge.node1.id) target_node = self.get_node(edge.node2.id) - source_distance = source_node.attributes.get("vector_distance", 0) if source_node else 0 - target_distance = target_node.attributes.get("vector_distance", 0) if target_node else 0 - edge_distance = edge.attributes.get("vector_distance", 0) + source_distance = source_node.attributes.get("vector_distance", 1) if source_node else 1 + target_distance = target_node.attributes.get("vector_distance", 1) if target_node else 1 + edge_distance = edge.attributes.get("vector_distance", 1) total_distance = source_distance + target_distance + edge_distance diff --git a/cognee/pipelines/retriever/two_steps_retriever.py b/cognee/pipelines/retriever/brute_force_triplet_search.py similarity index 75% rename from cognee/pipelines/retriever/two_steps_retriever.py rename to cognee/pipelines/retriever/brute_force_triplet_search.py index 92ef2be2e..6fef6104f 100644 --- a/cognee/pipelines/retriever/two_steps_retriever.py +++ b/cognee/pipelines/retriever/brute_force_triplet_search.py @@ -1,15 +1,11 @@ import asyncio -from uuid import UUID -from enum import Enum -from typing import Callable, Dict -from cognee.shared.utils import send_telemetry +from typing import Dict, List from cognee.modules.users.models import User from cognee.modules.users.methods import get_default_user -from cognee.modules.users.permissions.methods import get_document_ids_for_user from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph from cognee.infrastructure.databases.vector import get_vector_engine from cognee.infrastructure.databases.graph import get_graph_engine - +from cognee.shared.utils import send_telemetry def format_triplets(edges): print("\n\n\n") @@ -44,24 +40,22 @@ def format_triplets(edges): triplet = ( f"Node1: {node1_info}\n" f"Edge: {edge_info}\n" - f"Node2: {node2_info}\n\n\n" # Add three blank lines for separation + f"Node2: {node2_info}\n\n\n" ) triplets.append(triplet) return "".join(triplets) -async def two_step_retriever(query: Dict[str, str], user: User = None) -> list: +async def brute_force_triplet_search(query: str, user: User = None, top_k = 5) -> list: if user is None: user = await get_default_user() if user is None: raise PermissionError("No user found in the system. Please create a user.") - own_document_ids = await get_document_ids_for_user(user.id) - retrieved_results = await run_two_step_retriever(query, user) + retrieved_results = await brute_force_search(query, user, top_k) - filtered_search_results = [] return retrieved_results @@ -82,18 +76,22 @@ def delete_duplicated_vector_db_elements(collections, results): #:TODO: This is return results_dict -async def run_two_step_retriever(query: str, user, community_filter = []) -> list: +async def brute_force_search(query: str, user: User, top_k: int, collections: List[str] = None) -> list: + if collections is None: + collections = ["entity_name", "text_summary_text", "entity_type_name", "document_chunk_text"] + vector_engine = get_vector_engine() graph_engine = await get_graph_engine() - collections = ["Entity_name", "TextSummary_text", 'EntityType_name', 'DocumentChunk_text'] + send_telemetry("cognee.brute_force_triplet_search EXECUTION STARTED", user.id) + results = await asyncio.gather( *[vector_engine.get_distances_of_collection(collection, query_text=query) for collection in collections] ) - ############################################# This part is a quick fix til we don't fix the vector db inconsistency - node_distances = delete_duplicated_vector_db_elements(collections, results)# :TODO: Change when vector db is fixed - # results_dict = {collection: result for collection, result in zip(collections, results)} + ############################################# :TODO: Change when vector db does not contain duplicates + node_distances = delete_duplicated_vector_db_elements(collections, results) + # node_distances = {collection: result for collection, result in zip(collections, results)} ############################################## memory_fragment = CogneeGraph() @@ -104,16 +102,16 @@ async def run_two_step_retriever(query: str, user, community_filter = []) -> lis 'name', 'type', 'text'], - edge_properties_to_project=['id', - 'relationship_name']) + edge_properties_to_project=['relationship_name']) await memory_fragment.map_vector_distances_to_graph_nodes(node_distances=node_distances) - await memory_fragment.map_vector_distances_to_graph_edges(vector_engine, query)# :TODO: This should be coming from vector db + #:TODO: Change when vectordb contains edge embeddings + await memory_fragment.map_vector_distances_to_graph_edges(vector_engine, query) - results = await memory_fragment.calculate_top_triplet_importances(k=5) + results = await memory_fragment.calculate_top_triplet_importances(k=top_k) - print(format_triplets(results)) - print(f'Query was the following:{query}' ) + send_telemetry("cognee.brute_force_triplet_search EXECUTION STARTED", user.id) + #:TODO: Once we have Edge pydantic models we should retrieve the exact edge and node objects from graph db return results diff --git a/cognee/pipelines/retriever/diffusion_retriever.py b/cognee/pipelines/retriever/diffusion_retriever.py deleted file mode 100644 index a6b79310e..000000000 --- a/cognee/pipelines/retriever/diffusion_retriever.py +++ /dev/null @@ -1,25 +0,0 @@ -from uuid import UUID -from enum import Enum -from typing import Callable, Dict -from cognee.shared.utils import send_telemetry -from cognee.modules.users.models import User -from cognee.modules.users.methods import get_default_user -from cognee.modules.users.permissions.methods import get_document_ids_for_user - -async def two_step_retriever(query: Dict[str, str], user: User = None) -> list: - if user is None: - user = await get_default_user() - - if user is None: - raise PermissionError("No user found in the system. Please create a user.") - - own_document_ids = await get_document_ids_for_user(user.id) - retrieved_results = await diffusion_retriever(query, user) - - filtered_search_results = [] - - - return retrieved_results - -async def diffusion_retriever(query: str, user, community_filter = []) -> list: - raise(NotImplementedError) \ No newline at end of file diff --git a/cognee/pipelines/retriever/g_retriever.py b/cognee/pipelines/retriever/g_retriever.py deleted file mode 100644 index 4b319acd9..000000000 --- a/cognee/pipelines/retriever/g_retriever.py +++ /dev/null @@ -1,25 +0,0 @@ -from uuid import UUID -from enum import Enum -from typing import Callable, Dict -from cognee.shared.utils import send_telemetry -from cognee.modules.users.models import User -from cognee.modules.users.methods import get_default_user -from cognee.modules.users.permissions.methods import get_document_ids_for_user - -async def two_step_retriever(query: Dict[str, str], user: User = None) -> list: - if user is None: - user = await get_default_user() - - if user is None: - raise PermissionError("No user found in the system. Please create a user.") - - own_document_ids = await get_document_ids_for_user(user.id) - retrieved_results = await g_retriever(query, user) - - filtered_search_results = [] - - - return retrieved_results - -async def g_retriever(query: str, user, community_filter = []) -> list: - raise(NotImplementedError) \ No newline at end of file diff --git a/examples/python/dynamic_steps_example.py b/examples/python/dynamic_steps_example.py index 49b41db1c..6b75310cf 100644 --- a/examples/python/dynamic_steps_example.py +++ b/examples/python/dynamic_steps_example.py @@ -1,6 +1,6 @@ import cognee import asyncio -from cognee.pipelines.retriever.two_steps_retriever import two_step_retriever +from cognee.pipelines.retriever.brute_force_triplet_search import brute_force_triplet_search job_1 = """ CV 1: Relevant @@ -181,13 +181,13 @@ async def main(enable_steps): # Step 4: Query insights if enable_steps.get("retriever"): - await two_step_retriever('Who has Phd?') + await brute_force_triplet_search('Who has Phd?') if __name__ == '__main__': # Flags to enable/disable steps - rebuild_kg = True + rebuild_kg = False retrieve = True steps_to_enable = { "prune_data": rebuild_kg,