From 1592566b87f6ebdbd53ee95292b32114851ad1c4 Mon Sep 17 00:00:00 2001 From: Boris Arzentar Date: Thu, 21 Mar 2024 09:57:47 +0100 Subject: [PATCH] feat: integrate weaviate vector db --- cognee - Get Started.ipynb | 319 +------------- cognee/api/v1/cognify/cognify.py | 54 +-- cognee/database/graphdb/graph.py | 2 +- cognee/infrastructure/InfrastructureConfig.py | 13 +- .../databases/vector/__init__.py | 7 +- .../databases/vector/get_vector_database.py | 6 +- .../vector/models/CollectionConfig.py | 5 + .../databases/vector/models/DataPoint.py | 10 + .../databases/vector/models/ScoredResult.py | 8 + .../databases/vector/models/VectorConfig.py | 6 + .../databases/vector/qdrant/QDrantAdapter.py | 130 ++++-- .../databases/vector/vector_db_interface.py | 18 +- .../databases/vector/weaviate/adapter.py | 417 ------------------ .../vector/weaviate_db/WeaviateAdapter.py | 72 +++ .../databases/vector/weaviate_db/__init__.py | 1 + .../cognify/graph/add_node_connections.py | 82 ++-- .../modules/cognify/graph/add_propositions.py | 10 +- cognee/modules/cognify/graph/create.py | 6 +- .../llm/add_node_connection_embeddings.py | 30 -- .../modules/cognify/llm/classify_content.py | 2 +- .../cognify/llm/content_to_cog_layers.py | 4 +- .../llm/resolve_cross_graph_references.py | 39 ++ .../cognify/vector/add_propositions.py | 52 +-- cognee/modules/cognify/vector/batch_search.py | 24 - .../modules/search/graph/search_adjacent.py | 2 +- .../search/vector/search_similarity.py | 30 +- cognee/shared/encode_uuid.py | 14 + poetry.lock | 74 +++- pyproject.toml | 1 + 29 files changed, 480 insertions(+), 958 deletions(-) create mode 100644 cognee/infrastructure/databases/vector/models/CollectionConfig.py create mode 100644 cognee/infrastructure/databases/vector/models/DataPoint.py create mode 100644 cognee/infrastructure/databases/vector/models/ScoredResult.py create mode 100644 cognee/infrastructure/databases/vector/models/VectorConfig.py delete mode 100644 cognee/infrastructure/databases/vector/weaviate/adapter.py create mode 100644 cognee/infrastructure/databases/vector/weaviate_db/WeaviateAdapter.py create mode 100644 cognee/infrastructure/databases/vector/weaviate_db/__init__.py delete mode 100644 cognee/modules/cognify/llm/add_node_connection_embeddings.py create mode 100644 cognee/modules/cognify/llm/resolve_cross_graph_references.py delete mode 100644 cognee/modules/cognify/vector/batch_search.py create mode 100644 cognee/shared/encode_uuid.py diff --git a/cognee - Get Started.ipynb b/cognee - Get Started.ipynb index e1f716a1f..5adfdfb43 100644 --- a/cognee - Get Started.ipynb +++ b/cognee - Get Started.ipynb @@ -194,21 +194,10 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "id": "5b3954c1-f537-4be7-a578-1d5037c21374", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Pipeline file_load_from_filesystem load step completed in 0.29 seconds\n", - "1 load package(s) were loaded to destination duckdb and into dataset izmene\n", - "The duckdb destination used duckdb:///:external: location to store data\n", - "Load package 1710408302.8475971 is LOADED and contains no failed jobs\n" - ] - } - ], + "outputs": [], "source": [ "from os import listdir, path\n", "from uuid import uuid5, UUID\n", @@ -223,19 +212,10 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": null, "id": "39df49ca-06f0-4b86-ae27-93c68ddceac3", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "/Users/vasa/Projects/cognee/cognee/data/cognee/cognee.duckdb\n", - "['izmene']\n" - ] - } - ], + "outputs": [], "source": [ "import duckdb\n", "from cognee.root_dir import get_absolute_path\n", @@ -251,9 +231,8 @@ "tables = db.sql(\"SELECT DISTINCT schema_name FROM duckdb_tables();\").df()\n", "print(list(filter(lambda table_name: table_name.endswith('staging') is False, tables.to_dict()[\"schema_name\"].values())))\n", "\n", - "# izmene = db.sql(f\"SELECT * FROM izmene.file_metadata;\")\n", - "\n", - "# print(izmene)\n", + "izmene = db.sql(f\"SELECT id FROM izmene.file_metadata;\")\n", + "print(izmene)\n", "\n", "# pravilnik = db.sql(f\"SELECT * FROM pravilnik.file_metadata;\")\n", "\n", @@ -264,36 +243,25 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": null, "id": "97e8647a-052c-4689-b1de-8d81765462e0", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "['izmene']\n", - "Processing document (881ecb36-2819-54c3-8147-ed80293084d6)\n", - "Document (881ecb36-2819-54c3-8147-ed80293084d6) categorized: ['Laws, regulations, and legal case documents']\n", - "Document (881ecb36-2819-54c3-8147-ed80293084d6) layer graphs created\n", - "Document (881ecb36-2819-54c3-8147-ed80293084d6) layers connected\n", - "Error during batch search for ID 6b81d9c2-4121-4366-ab53-71511ce61371: \n", - "Document (881ecb36-2819-54c3-8147-ed80293084d6) processed\n", - "Graph is visualized at: https://hub.graphistry.com/graph/graph.html?dataset=2c27d75e0d754e5c95c59d39dd2fe1ee&type=arrow&viztoken=3ddceed7-539d-49fd-ab33-2ca0691028d1&usertag=0b63180a-pygraphistry-0.33.5&splashAfter=1710408590&info=true\n", - "None\n" - ] - } - ], + "outputs": [], "source": [ "from os import path, listdir\n", "from cognee import cognify, list_datasets\n", "from cognee.utils import render_graph\n", + "from cognee.infrastructure import infrastructure_config\n", "\n", "# text = \"\"\"In the nicest possible way, Britons have always been a bit silly about animals. “Keeping pets, for the English, is not so much a leisure activity as it is an entire way of life,” wrote the anthropologist Kate Fox in Watching the English, nearly 20 years ago. Our dogs, in particular, have been an acceptable outlet for emotions and impulses we otherwise keep strictly controlled – our latent desire to be demonstratively affectionate, to be silly and chat to strangers. If this seems like an exaggeration, consider the different reactions you’d get if you struck up a conversation with someone in a park with a dog, versus someone on the train.\n", "# Indeed, British society has been set up to accommodate these four-legged ambassadors. In the UK – unlike Australia, say, or New Zealand – dogs are not just permitted on public transport but often openly encouraged. Many pubs and shops display waggish signs, reading, “Dogs welcome, people tolerated”, and have treat jars on their counters. The other day, as I was waiting outside a cafe with a friend’s dog, the barista urged me to bring her inside.\n", "# For years, Britons’ non-partisan passion for animals has been consistent amid dwindling common ground. But lately, rather than bringing out the best in us, our relationship with dogs is increasingly revealing us at our worst – and our supposed “best friends” are paying the price.\n", "# As with so many latent traits in the national psyche, it all came unleashed with the pandemic, when many people thought they might as well make the most of all that time at home and in local parks with a dog. Between 2019 and 2022, the number of pet dogs in the UK rose from about nine million to 13 million. But there’s long been a seasonal surge around this time of year, substantial enough for the Dogs Trust charity to coin its famous slogan back in 1978: “A dog is for life, not just for Christmas.”\"\"\"\n", "\n", + "vector_engine = infrastructure_config.get_config()[\"vector_engine\"] \n", + "\n", + "await vector_engine.prune()\n", + "\n", "print(list_datasets())\n", "\n", "graph = await cognify(\"izmene\")\n", @@ -304,25 +272,17 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": null, "id": "2487d12b-570c-424c-8b6c-4c23f4073e05", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "hello\n" - ] - } - ], + "outputs": [], "source": [ "print('hello')" ] }, { "cell_type": "code", - "execution_count": 14, + "execution_count": null, "id": "a228fb2c-5bbc-48b4-af3d-4a26e840a79e", "metadata": {}, "outputs": [], @@ -333,216 +293,15 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": null, "id": "4ee34d29-f75d-4766-8906-b851b57bd5ef", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "b006ca75-d5aa-4006-9c9a-4d2b700670e3\n", - "Ministry responsible for construction, transport, and infrastructure regulations in Serbia\n", - "Regulation that amends the procedure for creating and controlling technical documentation for buildings based on their class and purpose\n", - "Publication of the Amendment Regulation in the Official Gazette of the Republic of Serbia, No. 77/2015, on September 9, 2015\n", - "Regulations for the main project in construction as per the Planning and Construction Law and its amendments\n", - "Minister of the Ministry of Construction, Transport, and Infrastructure\n", - "Ministry of Construction, Transport, and Infrastructure\n", - "Official Gazette of RS\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Prof. Dr. Zorana Mihajlovic\n", - "Publication date of the amendments and supplements rule in the Official Gazette of RS\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Službeni glasnik RS, br. 77/2015\n", - "Ministar građevinarstva, saobraćaja i infrastrukture\n", - "Zakon o planiranju i izgradnji\n", - "Službeni glasnik RS references for Zakon o planiranju i izgradnji: br. 72/09, 81/09 - ispravka, 64/10 - US, 24/11, 121/12, 42/13 - US, 50/13, 98/13 - US\n", - "Ministry of Construction, Transport and Infrastructure\n", - "Regulation on Amendments and Supplements to the Regulation on the Content, Method, and Procedure of Preparation and Method of Control of Technical Documentation According to the Class and Purpose of Buildings\n", - "Official Gazette of the Republic of Serbia\n", - "Zorana Mihajlović\n", - "Law on Planning and Construction\n", - "Ministry of Construction, Transport, and Infrastructure\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Official Gazette of the Republic of Serbia, number 77/2015\n", - "September 9, 2015\n", - "Law on Planning and Construction\n", - "Official Gazette references for Law on Planning and Construction\n", - "Ministarstvo građevinarstva, saobraćaja i infrastrukture\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Službeni glasnik RS\n", - "Zakon o planiranju i izgradnji\n", - "Zorana Mihajlović\n", - "Minister of Construction, Transport and Infrastructure\n", - "Rulebook on Amendments and Supplements\n", - "Rulebook on the content, method, and procedure for the preparation and control of technical documentation according to the class and purpose of objects\n", - "Official Gazette of the Republic of Serbia\n", - "Zorana Mihajlovic, prof. dr\n", - "14cff510-4ce9-44fe-a826-aea5349d0a38\n", - "Ministry responsible for construction, transport, and infrastructure regulations in Serbia\n", - "Regulation that amends the procedure for creating and controlling technical documentation for buildings based on their class and purpose\n", - "Publication of the Amendment Regulation in the Official Gazette of the Republic of Serbia, No. 77/2015, on September 9, 2015\n", - "Regulations for the main project in construction as per the Planning and Construction Law and its amendments\n", - "Minister of the Ministry of Construction, Transport, and Infrastructure\n", - "Ministry of Construction, Transport, and Infrastructure\n", - "Official Gazette of RS\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Prof. Dr. Zorana Mihajlovic\n", - "Publication date of the amendments and supplements rule in the Official Gazette of RS\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Službeni glasnik RS, br. 77/2015\n", - "Ministar građevinarstva, saobraćaja i infrastrukture\n", - "Zakon o planiranju i izgradnji\n", - "Službeni glasnik RS references for Zakon o planiranju i izgradnji: br. 72/09, 81/09 - ispravka, 64/10 - US, 24/11, 121/12, 42/13 - US, 50/13, 98/13 - US\n", - "Ministry of Construction, Transport and Infrastructure\n", - "Regulation on Amendments and Supplements to the Regulation on the Content, Method, and Procedure of Preparation and Method of Control of Technical Documentation According to the Class and Purpose of Buildings\n", - "Official Gazette of the Republic of Serbia\n", - "Zorana Mihajlović\n", - "Law on Planning and Construction\n", - "Ministry of Construction, Transport, and Infrastructure\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Official Gazette of the Republic of Serbia, number 77/2015\n", - "September 9, 2015\n", - "Law on Planning and Construction\n", - "Official Gazette references for Law on Planning and Construction\n", - "Ministarstvo građevinarstva, saobraćaja i infrastrukture\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Službeni glasnik RS\n", - "Zakon o planiranju i izgradnji\n", - "Zorana Mihajlović\n", - "Minister of Construction, Transport and Infrastructure\n", - "Rulebook on Amendments and Supplements\n", - "Rulebook on the content, method, and procedure for the preparation and control of technical documentation according to the class and purpose of objects\n", - "Official Gazette of the Republic of Serbia\n", - "Zorana Mihajlovic, prof. dr\n", - "56f0f95f-28c0-4cdf-8b53-3a0d271f2a9a\n", - "Ministry responsible for construction, transport, and infrastructure regulations in Serbia\n", - "Regulation that amends the procedure for creating and controlling technical documentation for buildings based on their class and purpose\n", - "Publication of the Amendment Regulation in the Official Gazette of the Republic of Serbia, No. 77/2015, on September 9, 2015\n", - "Regulations for the main project in construction as per the Planning and Construction Law and its amendments\n", - "Minister of the Ministry of Construction, Transport, and Infrastructure\n", - "Ministry of Construction, Transport, and Infrastructure\n", - "Official Gazette of RS\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Prof. Dr. Zorana Mihajlovic\n", - "Publication date of the amendments and supplements rule in the Official Gazette of RS\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Službeni glasnik RS, br. 77/2015\n", - "Ministar građevinarstva, saobraćaja i infrastrukture\n", - "Zakon o planiranju i izgradnji\n", - "Službeni glasnik RS references for Zakon o planiranju i izgradnji: br. 72/09, 81/09 - ispravka, 64/10 - US, 24/11, 121/12, 42/13 - US, 50/13, 98/13 - US\n", - "Ministry of Construction, Transport and Infrastructure\n", - "Regulation on Amendments and Supplements to the Regulation on the Content, Method, and Procedure of Preparation and Method of Control of Technical Documentation According to the Class and Purpose of Buildings\n", - "Official Gazette of the Republic of Serbia\n", - "Zorana Mihajlović\n", - "Law on Planning and Construction\n", - "Ministry of Construction, Transport, and Infrastructure\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Official Gazette of the Republic of Serbia, number 77/2015\n", - "September 9, 2015\n", - "Law on Planning and Construction\n", - "Official Gazette references for Law on Planning and Construction\n", - "Ministarstvo građevinarstva, saobraćaja i infrastrukture\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Službeni glasnik RS\n", - "Zakon o planiranju i izgradnji\n", - "Zorana Mihajlović\n", - "Minister of Construction, Transport and Infrastructure\n", - "Rulebook on Amendments and Supplements\n", - "Rulebook on the content, method, and procedure for the preparation and control of technical documentation according to the class and purpose of objects\n", - "Official Gazette of the Republic of Serbia\n", - "Zorana Mihajlovic, prof. dr\n", - "065288b5-94a9-4c8d-9ec7-339fe0b21226\n", - "Ministry responsible for construction, transport, and infrastructure regulations in Serbia\n", - "Regulation that amends the procedure for creating and controlling technical documentation for buildings based on their class and purpose\n", - "Publication of the Amendment Regulation in the Official Gazette of the Republic of Serbia, No. 77/2015, on September 9, 2015\n", - "Regulations for the main project in construction as per the Planning and Construction Law and its amendments\n", - "Minister of the Ministry of Construction, Transport, and Infrastructure\n", - "Ministry of Construction, Transport, and Infrastructure\n", - "Official Gazette of RS\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Prof. Dr. Zorana Mihajlovic\n", - "Publication date of the amendments and supplements rule in the Official Gazette of RS\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Službeni glasnik RS, br. 77/2015\n", - "Ministar građevinarstva, saobraćaja i infrastrukture\n", - "Zakon o planiranju i izgradnji\n", - "Službeni glasnik RS references for Zakon o planiranju i izgradnji: br. 72/09, 81/09 - ispravka, 64/10 - US, 24/11, 121/12, 42/13 - US, 50/13, 98/13 - US\n", - "Ministry of Construction, Transport and Infrastructure\n", - "Regulation on Amendments and Supplements to the Regulation on the Content, Method, and Procedure of Preparation and Method of Control of Technical Documentation According to the Class and Purpose of Buildings\n", - "Official Gazette of the Republic of Serbia\n", - "Zorana Mihajlović\n", - "Law on Planning and Construction\n", - "Ministry of Construction, Transport, and Infrastructure\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Official Gazette of the Republic of Serbia, number 77/2015\n", - "September 9, 2015\n", - "Law on Planning and Construction\n", - "Official Gazette references for Law on Planning and Construction\n", - "Ministarstvo građevinarstva, saobraćaja i infrastrukture\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Službeni glasnik RS\n", - "Zakon o planiranju i izgradnji\n", - "Zorana Mihajlović\n", - "Minister of Construction, Transport and Infrastructure\n", - "Rulebook on Amendments and Supplements\n", - "Rulebook on the content, method, and procedure for the preparation and control of technical documentation according to the class and purpose of objects\n", - "Official Gazette of the Republic of Serbia\n", - "Zorana Mihajlovic, prof. dr\n", - "272b9058-de0c-4b56-bff4-29aba1ca5251\n", - "Ministry responsible for construction, transport, and infrastructure regulations in Serbia\n", - "Regulation that amends the procedure for creating and controlling technical documentation for buildings based on their class and purpose\n", - "Publication of the Amendment Regulation in the Official Gazette of the Republic of Serbia, No. 77/2015, on September 9, 2015\n", - "Regulations for the main project in construction as per the Planning and Construction Law and its amendments\n", - "Minister of the Ministry of Construction, Transport, and Infrastructure\n", - "Ministry of Construction, Transport, and Infrastructure\n", - "Official Gazette of RS\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Prof. Dr. Zorana Mihajlovic\n", - "Publication date of the amendments and supplements rule in the Official Gazette of RS\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Službeni glasnik RS, br. 77/2015\n", - "Ministar građevinarstva, saobraćaja i infrastrukture\n", - "Zakon o planiranju i izgradnji\n", - "Službeni glasnik RS references for Zakon o planiranju i izgradnji: br. 72/09, 81/09 - ispravka, 64/10 - US, 24/11, 121/12, 42/13 - US, 50/13, 98/13 - US\n", - "Ministry of Construction, Transport and Infrastructure\n", - "Regulation on Amendments and Supplements to the Regulation on the Content, Method, and Procedure of Preparation and Method of Control of Technical Documentation According to the Class and Purpose of Buildings\n", - "Official Gazette of the Republic of Serbia\n", - "Zorana Mihajlović\n", - "Law on Planning and Construction\n", - "Ministry of Construction, Transport, and Infrastructure\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Official Gazette of the Republic of Serbia, number 77/2015\n", - "September 9, 2015\n", - "Law on Planning and Construction\n", - "Official Gazette references for Law on Planning and Construction\n", - "Ministarstvo građevinarstva, saobraćaja i infrastrukture\n", - "Pravilnik o izmenama i dopunama Pravilnika o sadržini, načinu i postupku izrade i način vršenja kontrole tehničke dokumentacije prema klasi i nameni objekata\n", - "Službeni glasnik RS\n", - "Zakon o planiranju i izgradnji\n", - "Zorana Mihajlović\n", - "Minister of Construction, Transport and Infrastructure\n", - "Rulebook on Amendments and Supplements\n", - "Rulebook on the content, method, and procedure for the preparation and control of technical documentation according to the class and purpose of objects\n", - "Official Gazette of the Republic of Serbia\n", - "Zorana Mihajlovic, prof. dr\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/var/folders/d_/1x0yyl7n5g5cc8vlgchdr06m0000gn/T/ipykernel_56167/1760472791.py:4: RuntimeWarning: coroutine 'search' was never awaited\n", - " out = await search(graph, query_params)\n", - "RuntimeWarning: Enable tracemalloc to get the object allocation traceback\n" - ] - } - ], + "outputs": [], "source": [ "from cognee import search\n", "from cognee.api.v1.search.search import SearchType\n", "query_params = {\n", - " SearchType.SIMILARITY: {'query': 'your search query here'}\n", + " SearchType.SIMILARITY: {'query': 'Ministarstvo'}\n", "}\n", "out = await search(graph, query_params)" ] @@ -627,46 +386,10 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": null, "id": "f087510e-6df7-4a03-945d-70f8c91c8d65", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Deleting collection: 94a675a8-4ab4-4c85-bb06-26b8674b9ac0\n", - "Collection '94a675a8-4ab4-4c85-bb06-26b8674b9ac0' deleted successfully.\n", - "Deleting collection: 4299de57-f252-4524-b42f-1bcbf94af122\n", - "Collection '4299de57-f252-4524-b42f-1bcbf94af122' deleted successfully.\n", - "Deleting collection: 45ad96db-b3d3-4d38-9aae-c02dc63bba15\n", - "Collection '45ad96db-b3d3-4d38-9aae-c02dc63bba15' deleted successfully.\n", - "Deleting collection: e3702209-2c4b-47c2-834f-25b3f92c41a7\n", - "Collection 'e3702209-2c4b-47c2-834f-25b3f92c41a7' deleted successfully.\n", - "Deleting collection: 53f96999-912a-4873-bba9-aa545903f5fd\n", - "Collection '53f96999-912a-4873-bba9-aa545903f5fd' deleted successfully.\n", - "Deleting collection: c726b009-fecc-45ac-bf16-68ceec0c9f29\n", - "Collection 'c726b009-fecc-45ac-bf16-68ceec0c9f29' deleted successfully.\n", - "Deleting collection: c4f28a0a-95b2-415d-a127-c78a142a068d\n", - "Collection 'c4f28a0a-95b2-415d-a127-c78a142a068d' deleted successfully.\n", - "Deleting collection: 6a6d69d6-16b3-4c1a-935b-739d51051b5a\n", - "Collection '6a6d69d6-16b3-4c1a-935b-739d51051b5a' deleted successfully.\n", - "Deleting collection: f78c2fbd-a6bc-40d1-a771-2e17a47c6ce3\n", - "Collection 'f78c2fbd-a6bc-40d1-a771-2e17a47c6ce3' deleted successfully.\n", - "Deleting collection: fc869dd2-e60b-4783-b617-63377690d748\n", - "Collection 'fc869dd2-e60b-4783-b617-63377690d748' deleted successfully.\n", - "Deleting collection: 2649b0a7-23b9-4e7d-9a96-f3b4eee3d3fe\n", - "Collection '2649b0a7-23b9-4e7d-9a96-f3b4eee3d3fe' deleted successfully.\n", - "Deleting collection: 71a7da39-ad23-47c7-8f5c-32f625308d3b\n", - "Collection '71a7da39-ad23-47c7-8f5c-32f625308d3b' deleted successfully.\n", - "Deleting collection: 94a73201-001a-4296-ab4f-cd4c4d98c44a\n", - "Collection '94a73201-001a-4296-ab4f-cd4c4d98c44a' deleted successfully.\n", - "Deleting collection: cad2276f-bdc2-497a-a75f-067a162f2bab\n", - "Collection 'cad2276f-bdc2-497a-a75f-067a162f2bab' deleted successfully.\n", - "All collections have been deleted.\n" - ] - } - ], + "outputs": [], "source": [ "from qdrant_client import models, QdrantClient\n", "import os\n", diff --git a/cognee/api/v1/cognify/cognify.py b/cognee/api/v1/cognify/cognify.py index fc8b88119..a84712f58 100644 --- a/cognee/api/v1/cognify/cognify.py +++ b/cognee/api/v1/cognify/cognify.py @@ -1,19 +1,16 @@ import asyncio # import logging from typing import List -from qdrant_client import models import instructor from openai import OpenAI from unstructured.cleaners.core import clean from unstructured.partition.pdf import partition_pdf -from cognee.infrastructure.databases.vector.qdrant.QDrantAdapter import CollectionConfig from cognee.infrastructure.llm.get_llm_client import get_llm_client from cognee.modules.cognify.graph.add_classification_nodes import add_classification_nodes -from cognee.modules.cognify.graph.add_node_connections import add_node_connection, graph_ready_output, \ +from cognee.modules.cognify.graph.add_node_connections import group_nodes_by_layer, graph_ready_output, \ connect_nodes_in_graph, extract_node_descriptions from cognee.modules.cognify.graph.add_propositions import append_to_graph -from cognee.modules.cognify.llm.add_node_connection_embeddings import process_items -from cognee.modules.cognify.vector.batch_search import adapted_qdrant_batch_search +from cognee.modules.cognify.llm.resolve_cross_graph_references import resolve_cross_graph_references from cognee.modules.cognify.vector.add_propositions import add_propositions from cognee.config import Config @@ -23,10 +20,11 @@ from cognee.modules.cognify.llm.generate_graph import generate_graph from cognee.shared.data_models import DefaultContentPrediction, KnowledgeGraph, DefaultCognitiveLayer from cognee.infrastructure.databases.graph.get_graph_client import get_graph_client from cognee.shared.data_models import GraphDBType -from cognee.infrastructure.databases.vector.get_vector_database import get_vector_database from cognee.infrastructure.databases.relational import DuckDBAdapter from cognee.modules.cognify.graph.add_document_node import add_document_node from cognee.modules.cognify.graph.initialize_graph import initialize_graph +from cognee.infrastructure.databases.vector import get_vector_database, CollectionConfig, VectorConfig +from cognee.infrastructure import infrastructure_config config = Config() config.load() @@ -58,7 +56,7 @@ async def cognify(dataset_name: str = "root"): async def process_text(input_text: str, file_metadata: dict): print(f"Processing document ({file_metadata['id']})") - + classified_categories = [] try: @@ -77,7 +75,6 @@ async def process_text(input_text: str, file_metadata: dict): print(f"Document ({file_metadata['id']}) categorized: {file_metadata['categories']}") cognitive_layers = await content_to_cog_layers( - "generate_cog_layers.txt", classified_categories[0], response_model = DefaultCognitiveLayer ) @@ -100,7 +97,7 @@ async def process_text(input_text: str, file_metadata: dict): await add_classification_nodes(f"DOCUMENT:{file_metadata['id']}", classified_categories[0]) - unique_layer_uuids = await append_to_graph(layer_graphs, classified_categories[0]) + await append_to_graph(layer_graphs, classified_categories[0]) print(f"Document ({file_metadata['id']}) layers connected") @@ -114,41 +111,32 @@ async def process_text(input_text: str, file_metadata: dict): node_descriptions = await extract_node_descriptions(graph.nodes(data = True)) # print(node_descriptions) - unique_layer_uuids = set(node["layer_decomposition_uuid"] for node in node_descriptions) + nodes_by_layer = await group_nodes_by_layer(node_descriptions) + + unique_layers = nodes_by_layer.keys() collection_config = CollectionConfig( - vector_config = { - "content": models.VectorParams( - distance = models.Distance.COSINE, - size = 3072 - ) - }, + vector_config = VectorConfig( + distance = "Cosine", + size = 3072 + ) ) try: - for layer in unique_layer_uuids: - db = get_vector_database() - await db.create_collection(layer, collection_config) + db_engine = infrastructure_config.get_config()["vector_engine"] + + for layer in unique_layers: + await db_engine.create_collection(layer, collection_config) except Exception as e: print(e) - await add_propositions(node_descriptions) + await add_propositions(nodes_by_layer) - grouped_data = await add_node_connection(node_descriptions) - # print("we are here, grouped_data", grouped_data) + results = await resolve_cross_graph_references(nodes_by_layer) - llm_client = get_llm_client() + relationships = graph_ready_output(results) - relationship_dict = await process_items(grouped_data, unique_layer_uuids, llm_client) - # print("we are here", relationship_dict[0]) - - results = await adapted_qdrant_batch_search(relationship_dict, db) - # print(results) - - relationship_d = graph_ready_output(results) - # print(relationship_d) - - connect_nodes_in_graph(graph, relationship_d) + connect_nodes_in_graph(graph, relationships) print(f"Document ({file_metadata['id']}) processed") diff --git a/cognee/database/graphdb/graph.py b/cognee/database/graphdb/graph.py index bcc8e31d9..929d39820 100644 --- a/cognee/database/graphdb/graph.py +++ b/cognee/database/graphdb/graph.py @@ -279,7 +279,7 @@ class Neo4jGraphDB(AbstractGraphDB): for edge in graph.edges: edge.created_at = time.time() edge.summarized = False - graph_dic = graph.dict() + graph_dic = graph.model_dump() node_variable_mapping = create_node_variable_mapping(graph_dic["nodes"]) edge_variable_mapping = create_edge_variable_mapping(graph_dic["edges"]) diff --git a/cognee/infrastructure/InfrastructureConfig.py b/cognee/infrastructure/InfrastructureConfig.py index 3dcee2758..b8a67c8aa 100644 --- a/cognee/infrastructure/InfrastructureConfig.py +++ b/cognee/infrastructure/InfrastructureConfig.py @@ -1,21 +1,32 @@ from cognee.config import Config from .databases.relational import SqliteEngine, DatabaseEngine +from .databases.vector import WeaviateAdapter, VectorDBInterface config = Config() config.load() class InfrastructureConfig(): database_engine: DatabaseEngine = None + vector_engine: VectorDBInterface = None def get_config(self) -> dict: if self.database_engine is None: self.database_engine = SqliteEngine(config.db_path, config.db_name) + if self.vector_engine is None: + self.vector_engine = WeaviateAdapter( + config.weaviate_url, + config.weaviate_api_key, + config.openai_key + ) + return { - "database_engine": self.database_engine + "database_engine": self.database_engine, + "vector_engine": self.vector_engine } def set_config(self, new_config: dict): self.database_engine = new_config["database_engine"] + self.vector_engine = new_config["vector_engine"] infrastructure_config = InfrastructureConfig() diff --git a/cognee/infrastructure/databases/vector/__init__.py b/cognee/infrastructure/databases/vector/__init__.py index b6f2a24b7..fe99e9afa 100644 --- a/cognee/infrastructure/databases/vector/__init__.py +++ b/cognee/infrastructure/databases/vector/__init__.py @@ -1,2 +1,7 @@ from .get_vector_database import get_vector_database -from .qdrant import QDrantAdapter, CollectionConfig +from .qdrant import QDrantAdapter +from .models.DataPoint import DataPoint +from .models.VectorConfig import VectorConfig +from .models.CollectionConfig import CollectionConfig +from .weaviate_db import WeaviateAdapter +from .vector_db_interface import VectorDBInterface diff --git a/cognee/infrastructure/databases/vector/get_vector_database.py b/cognee/infrastructure/databases/vector/get_vector_database.py index a3ed82cfc..45284f4d5 100644 --- a/cognee/infrastructure/databases/vector/get_vector_database.py +++ b/cognee/infrastructure/databases/vector/get_vector_database.py @@ -1,8 +1,10 @@ from cognee.config import Config -from .qdrant import QDrantAdapter +# from .qdrant import QDrantAdapter +from .weaviate_db import WeaviateAdapter config = Config() config.load() def get_vector_database(): - return QDrantAdapter(config.qdrant_path, config.qdrant_url, config.qdrant_api_key) + # return QDrantAdapter(config.qdrant_path, config.qdrant_url, config.qdrant_api_key) + return WeaviateAdapter(config.weaviate_url, config.weaviate_api_key, config.openai_key) diff --git a/cognee/infrastructure/databases/vector/models/CollectionConfig.py b/cognee/infrastructure/databases/vector/models/CollectionConfig.py new file mode 100644 index 000000000..b46bdabfc --- /dev/null +++ b/cognee/infrastructure/databases/vector/models/CollectionConfig.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel +from .VectorConfig import VectorConfig + +class CollectionConfig(BaseModel): + vector_config: VectorConfig diff --git a/cognee/infrastructure/databases/vector/models/DataPoint.py b/cognee/infrastructure/databases/vector/models/DataPoint.py new file mode 100644 index 000000000..fe380053a --- /dev/null +++ b/cognee/infrastructure/databases/vector/models/DataPoint.py @@ -0,0 +1,10 @@ +from typing import Dict +from pydantic import BaseModel + +class DataPoint(BaseModel): + id: str + payload: Dict[str, str] + embed_field: str + + def get_embeddable_data(self): + return self.payload[self.embed_field] diff --git a/cognee/infrastructure/databases/vector/models/ScoredResult.py b/cognee/infrastructure/databases/vector/models/ScoredResult.py new file mode 100644 index 000000000..ce56c43fd --- /dev/null +++ b/cognee/infrastructure/databases/vector/models/ScoredResult.py @@ -0,0 +1,8 @@ +from uuid import UUID +from typing import Any, Dict +from pydantic import BaseModel + +class ScoredResult(BaseModel): + id: UUID + score: int + payload: Dict[str, Any] diff --git a/cognee/infrastructure/databases/vector/models/VectorConfig.py b/cognee/infrastructure/databases/vector/models/VectorConfig.py new file mode 100644 index 000000000..2a968a997 --- /dev/null +++ b/cognee/infrastructure/databases/vector/models/VectorConfig.py @@ -0,0 +1,6 @@ +from typing import Literal +from pydantic import BaseModel + +class VectorConfig(BaseModel): + distance: Literal['Cosine', 'Dot'] + size: int diff --git a/cognee/infrastructure/databases/vector/qdrant/QDrantAdapter.py b/cognee/infrastructure/databases/vector/qdrant/QDrantAdapter.py index 2ef88e72a..847e96536 100644 --- a/cognee/infrastructure/databases/vector/qdrant/QDrantAdapter.py +++ b/cognee/infrastructure/databases/vector/qdrant/QDrantAdapter.py @@ -1,19 +1,59 @@ -from typing import List, Optional, Dict -from pydantic import BaseModel, Field +import asyncio +from typing import List, Dict +# from pydantic import BaseModel, Field from qdrant_client import AsyncQdrantClient, models from ..vector_db_interface import VectorDBInterface +from ..models.DataPoint import DataPoint +from ..models.VectorConfig import VectorConfig +from ..models.CollectionConfig import CollectionConfig +from cognee.infrastructure.llm.get_llm_client import get_llm_client -class CollectionConfig(BaseModel, extra = "forbid"): - vector_config: Dict[str, models.VectorParams] = Field(..., description="Vectors configuration" ) - hnsw_config: Optional[models.HnswConfig] = Field(default = None, description="HNSW vector index configuration") - optimizers_config: Optional[models.OptimizersConfig] = Field(default = None, description="Optimizers configuration") - quantization_config: Optional[models.QuantizationConfig] = Field(default = None, description="Quantization configuration") +# class CollectionConfig(BaseModel, extra = "forbid"): +# vector_config: Dict[str, models.VectorParams] = Field(..., description="Vectors configuration" ) +# hnsw_config: Optional[models.HnswConfig] = Field(default = None, description="HNSW vector index configuration") +# optimizers_config: Optional[models.OptimizersConfig] = Field(default = None, description="Optimizers configuration") +# quantization_config: Optional[models.QuantizationConfig] = Field(default = None, description="Quantization configuration") + +async def embed_data(data: str): + llm_client = get_llm_client() + + return await llm_client.async_get_embedding_with_backoff(data) + +async def convert_to_qdrant_point(data_point: DataPoint): + return models.PointStruct( + id = data_point.id, + payload = data_point.payload, + vector = { + "text": await embed_data(data_point.get_embeddable_data()) + } + ) + +def create_vector_config(vector_config: VectorConfig): + return models.VectorParams( + size = vector_config.size, + distance = vector_config.distance + ) + +def create_hnsw_config(hnsw_config: Dict): + if hnsw_config is not None: + return models.HnswConfig() + return None + +def create_optimizers_config(optimizers_config: Dict): + if optimizers_config is not None: + return models.OptimizersConfig() + return None + +def create_quantization_config(quantization_config: Dict): + if quantization_config is not None: + return models.QuantizationConfig() + return None class QDrantAdapter(VectorDBInterface): qdrant_url: str = None qdrant_path: str = None qdrant_api_key: str = None - + def __init__(self, qdrant_path, qdrant_url, qdrant_api_key): if qdrant_path is not None: self.qdrant_path = qdrant_path @@ -46,43 +86,49 @@ class QDrantAdapter(VectorDBInterface): return await client.create_collection( collection_name = collection_name, - vectors_config = collection_config.vector_config, - hnsw_config = collection_config.hnsw_config, - optimizers_config = collection_config.optimizers_config, - quantization_config = collection_config.quantization_config + vectors_config = { + "text": create_vector_config(collection_config.vector_config) + } ) - async def create_data_points(self, collection_name: str, data_points): + async def create_data_points(self, collection_name: str, data_points: List[DataPoint]): client = self.get_qdrant_client() + awaitables = [] + + for point in data_points: + awaitables.append(convert_to_qdrant_point(point)) + + points = await asyncio.gather(*awaitables) + return await client.upload_points( collection_name = collection_name, - points = data_points + points = points ) - async def search(self, collection_name: str, query_vector: List[float], limit: int, with_vector: bool = False): + async def search(self, collection_name: str, query_text: str, limit: int, with_vector: bool = False): client = self.get_qdrant_client() return await client.search( collection_name = collection_name, - query_vector = ( - "content", query_vector), + query_vector = models.NamedVector( + name = "text", + vector = await embed_data(query_text) + ), limit = limit, with_vectors = with_vector ) - async def batch_search(self, collection_name: str, embeddings: List[List[float]], - with_vectors: List[bool] = None): + async def batch_search(self, collection_name: str, query_texts: List[str], limit: int, with_vectors: bool = False): """ Perform batch search in a Qdrant collection with dynamic search requests. Args: - collection_name (str): Name of the collection to search in. - - embeddings (List[List[float]]): List of embeddings to search for. - - limits (List[int]): List of result limits for each search request. - - with_vectors (List[bool], optional): List indicating whether to return vectors for each search request. - Defaults to None, in which case vectors are not returned. + - query_texts (List[str]): List of query texts to search for. + - limit (int): List of result limits for search requests. + - with_vectors (bool, optional): Bool indicating whether to return vectors for search requests. Returns: - results: The search results from Qdrant. @@ -90,30 +136,32 @@ class QDrantAdapter(VectorDBInterface): client = self.get_qdrant_client() - # Default with_vectors to False for each request if not provided - if with_vectors is None: - with_vectors = [False] * len(embeddings) - - # Ensure with_vectors list matches the length of embeddings and limits - if len(with_vectors) != len(embeddings): - raise ValueError("The length of with_vectors must match the length of embeddings and limits") + vectors = await asyncio.gather(*[embed_data(query_text) for query_text in query_texts]) # Generate dynamic search requests based on the provided embeddings requests = [ - models.SearchRequest(vector=models.NamedVector( - name="content", - vector=embedding, - ), - # vector= embedding, - limit=3, - with_vector=False - ) for embedding in [embeddings] + models.SearchRequest( + vector = models.NamedVector( + name = "text", + vector = vector + ), + limit = limit, + with_vector = with_vectors + ) for vector in vectors ] # Perform batch search with the dynamically generated requests results = await client.search_batch( - collection_name=collection_name, - requests=requests + collection_name = collection_name, + requests = requests ) - return results + return [filter(lambda result: result.score > 0.9, result_group) for result_group in results] + + async def prune(self): + client = self.get_qdrant_client() + + response = await client.get_collections() + + for collection in response.collections: + await client.delete_collection(collection.name) diff --git a/cognee/infrastructure/databases/vector/vector_db_interface.py b/cognee/infrastructure/databases/vector/vector_db_interface.py index 3f4e04d96..9bd124649 100644 --- a/cognee/infrastructure/databases/vector/vector_db_interface.py +++ b/cognee/infrastructure/databases/vector/vector_db_interface.py @@ -1,6 +1,7 @@ -from typing import List +from typing import List, Protocol from abc import abstractmethod -from typing import Protocol +from .models.CollectionConfig import CollectionConfig +from .models.DataPoint import DataPoint class VectorDBInterface(Protocol): """ Collections """ @@ -8,7 +9,7 @@ class VectorDBInterface(Protocol): async def create_collection( self, collection_name: str, - collection_config: object + collection_config: CollectionConfig ): raise NotImplementedError # @abstractmethod @@ -43,7 +44,7 @@ class VectorDBInterface(Protocol): async def create_data_points( self, collection_name: str, - data_points + data_points: List[DataPoint] ): raise NotImplementedError # @abstractmethod @@ -67,12 +68,13 @@ class VectorDBInterface(Protocol): # collection_name: str, # data_point_id: str # ): raise NotImplementedError + """ Search """ @abstractmethod async def search( self, collection_name: str, - query_vector: List[float], + query_text: str, limit: int, with_vector: bool = False @@ -82,7 +84,7 @@ class VectorDBInterface(Protocol): async def batch_search( self, collection_name: str, - embeddings: List[List[float]], - with_vectors: List[bool] = None + query_texts: List[str], + limit: int, + with_vectors: bool = False ): raise NotImplementedError - diff --git a/cognee/infrastructure/databases/vector/weaviate/adapter.py b/cognee/infrastructure/databases/vector/weaviate/adapter.py deleted file mode 100644 index 17c965ca4..000000000 --- a/cognee/infrastructure/databases/vector/weaviate/adapter.py +++ /dev/null @@ -1,417 +0,0 @@ -from weaviate.gql.get import HybridFusion -from langchain.embeddings.openai import OpenAIEmbeddings -from langchain.retrievers import WeaviateHybridSearchRetriever, ParentDocumentRetriever -from databases.vector.vector_db_interface import VectorDBInterface -# from langchain.text_splitter import RecursiveCharacterTextSplitter -from cognee.database.vectordb.loaders.loaders import _document_loader - -class WeaviateVectorDB(VectorDBInterface): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.init_weaviate(embeddings=self.embeddings, namespace=self.namespace) - - def init_weaviate( - self, - embeddings=OpenAIEmbeddings(openai_api_key=os.getenv("OPENAI_API_KEY", "")), - namespace=None, - retriever_type="", - ): - # Weaviate initialization logic - auth_config = weaviate.auth.AuthApiKey( - api_key=os.environ.get("WEAVIATE_API_KEY") - ) - client = weaviate.Client( - url=os.environ.get("WEAVIATE_URL"), - auth_client_secret=auth_config, - additional_headers={"X-OpenAI-Api-Key": os.environ.get("OPENAI_API_KEY")}, - ) - - if retriever_type == "single_document_context": - retriever = WeaviateHybridSearchRetriever( - client=client, - index_name=namespace, - text_key="text", - attributes=[], - embedding=embeddings, - create_schema_if_missing=True, - ) - return retriever - elif retriever_type == "multi_document_context": - retriever = WeaviateHybridSearchRetriever( - client=client, - index_name=namespace, - text_key="text", - attributes=[], - embedding=embeddings, - create_schema_if_missing=True, - ) - return retriever - else: - return client - # child_splitter = RecursiveCharacterTextSplitter(chunk_size=400) - # store = InMemoryStore() - # retriever = ParentDocumentRetriever( - # vectorstore=vectorstore, - # docstore=store, - # child_splitter=child_splitter, - # ) - - from marshmallow import Schema, fields - - def create_document_structure(observation, params, metadata_schema_class=None): - """ - Create and validate a document structure with optional custom fields. - - :param observation: Content of the document. - :param params: Metadata information. - :param metadata_schema_class: Custom metadata schema class (optional). - :return: A list containing the validated document data. - """ - document_data = {"metadata": params, "page_content": observation} - - def get_document_schema(): - class DynamicDocumentSchema(Schema): - metadata = fields.Nested(metadata_schema_class, required=True) - page_content = fields.Str(required=True) - - return DynamicDocumentSchema - - # Validate and deserialize, defaulting to "1.0" if not provided - CurrentDocumentSchema = get_document_schema() - loaded_document = CurrentDocumentSchema().load(document_data) - return [loaded_document] - - def _stuct(self, observation, params, metadata_schema_class=None): - """Utility function to create the document structure with optional custom fields.""" - # Construct document data - document_data = {"metadata": params, "page_content": observation} - - def get_document_schema(): - class DynamicDocumentSchema(Schema): - metadata = fields.Nested(metadata_schema_class, required=True) - page_content = fields.Str(required=True) - - return DynamicDocumentSchema - - # Validate and deserialize # Default to "1.0" if not provided - CurrentDocumentSchema = get_document_schema() - loaded_document = CurrentDocumentSchema().load(document_data) - return [loaded_document] - - async def add_memories( - self, - observation, - loader_settings=None, - params=None, - namespace=None, - metadata_schema_class=None, - embeddings="hybrid", - ): - # Update Weaviate memories here - if namespace is None: - namespace = self.namespace - params["user_id"] = self.user_id - logging.info("User id is %s", self.user_id) - retriever = self.init_weaviate( - embeddings=OpenAIEmbeddings(), - namespace=namespace, - retriever_type="single_document_context", - ) - if loader_settings: - # Assuming _document_loader returns a list of documents - documents = await _document_loader(observation, loader_settings) - logging.info("here are the docs %s", str(documents)) - chunk_count = 0 - for doc_list in documents: - for doc in doc_list: - chunk_count += 1 - params["chunk_count"] = doc.metadata.get("chunk_count", "None") - logging.info( - "Loading document with provided loader settings %s", str(doc) - ) - params["source"] = doc.metadata.get("source", "None") - logging.info("Params are %s", str(params)) - retriever.add_documents( - [Document(metadata=params, page_content=doc.page_content)] - ) - else: - chunk_count = 0 - from cognee.database.vectordb.chunkers.chunkers import ( - chunk_data, - ) - - documents = [ - chunk_data( - chunk_strategy="VANILLA", - source_data=observation, - chunk_size=300, - chunk_overlap=20, - ) - ] - for doc in documents[0]: - chunk_count += 1 - params["chunk_order"] = chunk_count - params["source"] = "User loaded" - logging.info( - "Loading document with default loader settings %s", str(doc) - ) - logging.info("Params are %s", str(params)) - retriever.add_documents( - [Document(metadata=params, page_content=doc.page_content)] - ) - - async def fetch_memories( - self, - observation: str, - namespace: str = None, - search_type: str = "hybrid", - params=None, - **kwargs, - ): - """ - Fetch documents from weaviate. - - Parameters: - - observation (str): User query. - - namespace (str, optional): Type of memory accessed. - - search_type (str, optional): Type of search ('text', 'hybrid', 'bm25', 'generate', 'generate_grouped'). Defaults to 'hybrid'. - - **kwargs: Additional parameters for flexibility. - - Returns: - List of documents matching the query or an empty list in case of error. - - Example: - fetch_memories(query="some query", search_type='text', additional_param='value') - """ - client = self.init_weaviate(namespace=self.namespace) - if search_type is None: - search_type = "hybrid" - - if not namespace: - namespace = self.namespace - - logging.info("Query on namespace %s", namespace) - - params_user_id = { - "path": ["user_id"], - "operator": "Like", - "valueText": self.user_id, - } - - def list_objects_of_class(class_name, schema): - return [ - prop["name"] - for class_obj in schema["classes"] - if class_obj["class"] == class_name - for prop in class_obj["properties"] - ] - - base_query = ( - client.query.get( - namespace, list(list_objects_of_class(namespace, client.schema.get())) - ) - .with_additional( - ["id", "creationTimeUnix", "lastUpdateTimeUnix", "score", "distance"] - ) - .with_where(params_user_id) - .with_limit(10) - ) - - n_of_observations = kwargs.get("n_of_observations", 2) - - # try: - if search_type == "text": - query_output = ( - base_query.with_near_text({"concepts": [observation]}) - .with_autocut(n_of_observations) - .do() - ) - elif search_type == "hybrid": - query_output = ( - base_query.with_hybrid( - query=observation, fusion_type=HybridFusion.RELATIVE_SCORE - ) - .with_autocut(n_of_observations) - .do() - ) - elif search_type == "bm25": - query_output = ( - base_query.with_bm25(query=observation) - .with_autocut(n_of_observations) - .do() - ) - elif search_type == "summary": - filter_object = { - "operator": "And", - "operands": [ - { - "path": ["user_id"], - "operator": "Equal", - "valueText": self.user_id, - }, - { - "path": ["chunk_order"], - "operator": "LessThan", - "valueNumber": 30, - }, - ], - } - base_query = ( - client.query.get( - namespace, - list(list_objects_of_class(namespace, client.schema.get())), - ) - .with_additional( - [ - "id", - "creationTimeUnix", - "lastUpdateTimeUnix", - "score", - "distance", - ] - ) - .with_where(filter_object) - .with_limit(30) - ) - query_output = ( - base_query - # .with_hybrid(query=observation, fusion_type=HybridFusion.RELATIVE_SCORE) - .do() - ) - - elif search_type == "summary_filter_by_object_name": - filter_object = { - "operator": "And", - "operands": [ - { - "path": ["user_id"], - "operator": "Equal", - "valueText": self.user_id, - }, - { - "path": ["doc_id"], - "operator": "Equal", - "valueText": params, - }, - ], - } - base_query = ( - client.query.get( - namespace, - list(list_objects_of_class(namespace, client.schema.get())), - ) - .with_additional( - [ - "id", - "creationTimeUnix", - "lastUpdateTimeUnix", - "score", - "distance", - ] - ) - .with_where(filter_object) - .with_limit(30) - .with_hybrid(query=observation, fusion_type=HybridFusion.RELATIVE_SCORE) - ) - query_output = base_query.do() - - return query_output - elif search_type == "generate": - generate_prompt = kwargs.get("generate_prompt", "") - query_output = ( - base_query.with_generate(single_prompt=observation) - .with_near_text({"concepts": [observation]}) - .with_autocut(n_of_observations) - .do() - ) - elif search_type == "generate_grouped": - generate_prompt = kwargs.get("generate_prompt", "") - query_output = ( - base_query.with_generate(grouped_task=observation) - .with_near_text({"concepts": [observation]}) - .with_autocut(n_of_observations) - .do() - ) - else: - logging.error(f"Invalid search_type: {search_type}") - return [] - # except Exception as e: - # logging.error(f"Error executing query: {str(e)}") - # return [] - - return query_output - - async def delete_memories(self, namespace: str, params: dict = None): - if namespace is None: - namespace = self.namespace - client = self.init_weaviate(namespace=self.namespace) - if params: - where_filter = { - "path": ["id"], - "operator": "Equal", - "valueText": params.get("id", None), - } - return client.batch.delete_objects( - class_name=self.namespace, - # Same `where` filter as in the GraphQL API - where=where_filter, - ) - else: - # Delete all objects - return client.batch.delete_objects( - class_name=namespace, - where={ - "path": ["version"], - "operator": "Equal", - "valueText": "1.0", - }, - ) - - async def count_memories(self, namespace: str = None, params: dict = None) -> int: - """ - Count memories in a Weaviate database. - - Args: - namespace (str, optional): The Weaviate namespace to count memories in. If not provided, uses the default namespace. - - Returns: - int: The number of memories in the specified namespace. - """ - if namespace is None: - namespace = self.namespace - - client = self.init_weaviate(namespace=namespace) - - try: - object_count = client.query.aggregate(namespace).with_meta_count().do() - return object_count - except Exception as e: - logging.info(f"Error counting memories: {str(e)}") - # Handle the error or log it - return 0 - - def update_memories(self, observation, namespace: str, params: dict = None): - client = self.init_weaviate(namespace=self.namespace) - - client.data_object.update( - data_object={ - # "text": observation, - "user_id": str(self.user_id), - "version": params.get("version", None) or "", - "agreement_id": params.get("agreement_id", None) or "", - "privacy_policy": params.get("privacy_policy", None) or "", - "terms_of_service": params.get("terms_of_service", None) or "", - "format": params.get("format", None) or "", - "schema_version": params.get("schema_version", None) or "", - "checksum": params.get("checksum", None) or "", - "owner": params.get("owner", None) or "", - "license": params.get("license", None) or "", - "validity_start": params.get("validity_start", None) or "", - "validity_end": params.get("validity_end", None) or "" - # **source_metadata, - }, - class_name="Test", - uuid=params.get("id", None), - consistency_level=weaviate.data.replication.ConsistencyLevel.ALL, # default QUORUM - ) - return diff --git a/cognee/infrastructure/databases/vector/weaviate_db/WeaviateAdapter.py b/cognee/infrastructure/databases/vector/weaviate_db/WeaviateAdapter.py new file mode 100644 index 000000000..2a93990ea --- /dev/null +++ b/cognee/infrastructure/databases/vector/weaviate_db/WeaviateAdapter.py @@ -0,0 +1,72 @@ +from typing import List +from multiprocessing import Pool +import weaviate +import weaviate.classes as wvc +import weaviate.classes.config as wvcc +from weaviate.classes.data import DataObject +from ..vector_db_interface import VectorDBInterface +from ..models.DataPoint import DataPoint +from ..models.ScoredResult import ScoredResult + +class WeaviateAdapter(VectorDBInterface): + async_pool: Pool = None + + def __init__(self, url: str, api_key: str, openai_api_key: str): + self.client = weaviate.connect_to_wcs( + cluster_url = url, + auth_credentials = weaviate.auth.AuthApiKey(api_key), + headers = { + "X-OpenAI-Api-Key": openai_api_key + }, + additional_config = wvc.init.AdditionalConfig(timeou = wvc.init.Timeout(init = 30)) + ) + + async def create_collection(self, collection_name: str, collection_config: dict): + return self.client.collections.create( + name = collection_name, + vectorizer_config = wvcc.Configure.Vectorizer.text2vec_openai(), + generative_config = wvcc.Configure.Generative.openai(), + properties = [ + wvcc.Property( + name = "text", + data_type = wvcc.DataType.TEXT + ) + ] + ) + + def get_collection(self, collection_name: str): + return self.client.collections.get(collection_name) + + async def create_data_points(self, collection_name: str, data_points: List[DataPoint]): + def convert_to_weaviate_data_points(data_point: DataPoint): + return DataObject( + uuid = data_point.id, + properties = data_point.payload + ) + + objects = list(map(convert_to_weaviate_data_points, data_points)) + + return self.get_collection(collection_name).data.insert_many(objects) + + async def search(self, collection_name: str, query_text: str, limit: int, with_vector: bool = False): + search_result = self.get_collection(collection_name).query.bm25( + query = query_text, + limit = limit, + include_vector = with_vector, + return_metadata = wvc.query.MetadataQuery(score = True), + ) + + return list(map(lambda result: ScoredResult( + id = result.uuid, + payload = result.properties, + score = str(result.metadata.score) + ), search_result.objects)) + + async def batch_search(self, collection_name: str, query_texts: List[str], limit: int, with_vectors: bool = False): + def query_search(query_text): + return self.search(collection_name, query_text, limit = limit, with_vector = with_vectors) + + return [await query_search(query_text) for query_text in query_texts] + + async def prune(self): + self.client.collections.delete_all() diff --git a/cognee/infrastructure/databases/vector/weaviate_db/__init__.py b/cognee/infrastructure/databases/vector/weaviate_db/__init__.py new file mode 100644 index 000000000..01c6ca967 --- /dev/null +++ b/cognee/infrastructure/databases/vector/weaviate_db/__init__.py @@ -0,0 +1 @@ +from .WeaviateAdapter import WeaviateAdapter diff --git a/cognee/modules/cognify/graph/add_node_connections.py b/cognee/modules/cognify/graph/add_node_connections.py index b95360ca0..820bdb82a 100644 --- a/cognee/modules/cognify/graph/add_node_connections.py +++ b/cognee/modules/cognify/graph/add_node_connections.py @@ -3,20 +3,26 @@ from cognee.infrastructure.databases.graph.get_graph_client import get_graph_cli from cognee.shared.data_models import GraphDBType -async def extract_node_descriptions(data): +async def extract_node_descriptions(node): descriptions = [] - for node_id, attributes in data: - if 'description' in attributes and 'unique_id' in attributes: - descriptions.append({'node_id': attributes['unique_id'], 'description': attributes['description'], 'layer_uuid': attributes['layer_uuid'], 'layer_decomposition_uuid': attributes['layer_decomposition_uuid'] }) + for node_id, attributes in node: + if "description" in attributes and "unique_id" in attributes: + descriptions.append({ + "node_id": attributes["unique_id"], + "description": attributes["description"], + "layer_uuid": attributes["layer_uuid"], + "layer_decomposition_uuid": attributes["layer_decomposition_uuid"] + }) + return descriptions -async def add_node_connection(node_descriptions): +async def group_nodes_by_layer(node_descriptions): grouped_data = {} for item in node_descriptions: - uuid = item['layer_decomposition_uuid'] + uuid = item["layer_decomposition_uuid"] if uuid not in grouped_data: grouped_data[uuid] = [] @@ -35,19 +41,19 @@ def connect_nodes_in_graph(graph: Graph, relationship_dict: dict) -> Graph: """ for id, relationships in relationship_dict.items(): for relationship in relationships: - searched_node_attr_id = relationship['searched_node_id'] - score_attr_id = relationship['original_id_for_search'] - score = relationship['score'] + searched_node_attr_id = relationship["searched_node_id"] + score_attr_id = relationship["original_id_for_search"] + score = relationship["score"] # Initialize node keys for both searched_node and score_node searched_node_key, score_node_key = None, None # Find nodes in the graph that match the searched_node_id and score_id from their attributes - for node, attrs in graph.nodes(data=True): - if 'unique_id' in attrs: # Ensure there is an 'id' attribute - if attrs['unique_id'] == searched_node_attr_id: + for node, attrs in graph.nodes(data = True): + if "unique_id" in attrs: # Ensure there is an "id" attribute + if attrs["unique_id"] == searched_node_attr_id: searched_node_key = node - elif attrs['unique_id'] == score_attr_id: + elif attrs["unique_id"] == score_attr_id: score_node_key = node # If both nodes are found, no need to continue checking other nodes @@ -57,9 +63,13 @@ def connect_nodes_in_graph(graph: Graph, relationship_dict: dict) -> Graph: # Check if both nodes were found in the graph if searched_node_key is not None and score_node_key is not None: # If both nodes exist, create an edge between them - # You can customize the edge attributes as needed, here we use 'score' as an attribute - graph.add_edge(searched_node_key, score_node_key, weight=score, - score_metadata=relationship.get('score_metadata')) + # You can customize the edge attributes as needed, here we use "score" as an attribute + graph.add_edge( + searched_node_key, + score_node_key, + weight = score, + score_metadata = relationship.get("score_metadata") + ) return graph @@ -67,31 +77,23 @@ def connect_nodes_in_graph(graph: Graph, relationship_dict: dict) -> Graph: def graph_ready_output(results): relationship_dict = {} - for result_tuple in results: - - uuid, scored_points_list, desc, node_id = result_tuple - # Unpack the tuple + for result in results: + layer_id = result["layer_id"] + layer_nodes = result["layer_nodes"] # Ensure there's a list to collect related items for this uuid - if uuid not in relationship_dict: - relationship_dict[uuid] = [] + if layer_id not in relationship_dict: + relationship_dict[layer_id] = [] + + for node in layer_nodes: # Iterate over the list of ScoredPoint lists + for score_point in node["score_points"]: + # Append a new dictionary to the list associated with the uuid + relationship_dict[layer_id].append({ + "collection_id": layer_id, + "searched_node_id": node["id"], + "score": score_point.score, + "score_metadata": score_point.payload, + "original_id_for_search": score_point.id, + }) - for scored_points in scored_points_list: # Iterate over the list of ScoredPoint lists - for scored_point in scored_points: # Iterate over each ScoredPoint object - if scored_point.score > 0.9: # Check the score condition - # Append a new dictionary to the list associated with the uuid - relationship_dict[uuid].append({ - 'collection_name_uuid': uuid, - 'searched_node_id': scored_point.id, - 'score': scored_point.score, - 'score_metadata': scored_point.payload, - 'original_id_for_search': node_id, - }) return relationship_dict - - - - -if __name__ == '__main__': - graph_client = get_graph_client(GraphDBType.NETWORKX) - add_node_connection(graph_client) diff --git a/cognee/modules/cognify/graph/add_propositions.py b/cognee/modules/cognify/graph/add_propositions.py index bd56eab10..76ee52712 100644 --- a/cognee/modules/cognify/graph/add_propositions.py +++ b/cognee/modules/cognify/graph/add_propositions.py @@ -3,6 +3,7 @@ import uuid import json from datetime import datetime from cognee.infrastructure.databases.graph.get_graph_client import get_graph_client, GraphDBType +from cognee.shared.encode_uuid import encode_uuid async def add_propositions( @@ -69,7 +70,6 @@ async def add_propositions( async def append_to_graph(layer_graphs, required_layers): # Generate a UUID for the overall layer layer_uuid = uuid.uuid4() - decomposition_uuids = set() # Extract category name from required_layers data data_type = required_layers["data_type"] @@ -84,9 +84,7 @@ async def append_to_graph(layer_graphs, required_layers): layer_description = json.loads(layer_json) # Generate a UUID for this particular layer decomposition - layer_decomposition_uuid = uuid.uuid4() - - decomposition_uuids.add(layer_decomposition_uuid) + layer_decomposition_id = encode_uuid(uuid.uuid4()) # Assuming append_data_to_graph is defined elsewhere and appends data to graph_client # You would pass relevant information from knowledge_graph along with other details to this function @@ -96,11 +94,9 @@ async def append_to_graph(layer_graphs, required_layers): layer_description, knowledge_graph, layer_uuid, - layer_decomposition_uuid + layer_decomposition_id ) - return decomposition_uuids - # if __name__ == "__main__": # import asyncio diff --git a/cognee/modules/cognify/graph/create.py b/cognee/modules/cognify/graph/create.py index d6943fa6b..5dd6e8495 100644 --- a/cognee/modules/cognify/graph/create.py +++ b/cognee/modules/cognify/graph/create.py @@ -22,10 +22,10 @@ async def process_attribute(graph_client, parent_id: Optional[str], attribute: s if isinstance(value, BaseModel): node_id = await generate_node_id(value) - node_data = value.dict(exclude={"default_relationship"}) + node_data = value.model_dump(exclude = {"default_relationship"}) # Use the specified default relationship for the edge between the parent node and the current node - relationship_data = value.default_relationship.dict() if hasattr(value, "default_relationship") else {} + relationship_data = value.default_relationship.model_dump() if hasattr(value, "default_relationship") else {} await add_node_and_edge(graph_client, parent_id, node_id, node_data, relationship_data) @@ -41,7 +41,7 @@ async def process_attribute(graph_client, parent_id: Optional[str], attribute: s async def create_dynamic(graph_model) : root_id = await generate_node_id(graph_model) - node_data = graph_model.dict(exclude = {"default_relationship", "id"}) + node_data = graph_model.model_dump(exclude = {"default_relationship", "id"}) graph_client = get_graph_client(GraphDBType.NETWORKX) diff --git a/cognee/modules/cognify/llm/add_node_connection_embeddings.py b/cognee/modules/cognify/llm/add_node_connection_embeddings.py deleted file mode 100644 index 4230baef5..000000000 --- a/cognee/modules/cognify/llm/add_node_connection_embeddings.py +++ /dev/null @@ -1,30 +0,0 @@ -import asyncio - -async def process_items(grouped_data, unique_layer_uuids, llm_client): - results_to_check = [] # This will hold results excluding self comparisons - tasks = [] # List to hold all tasks - task_to_info = {} # Dictionary to map tasks to their corresponding group id and item info - - # Iterate through each group in grouped_data - for group_id, items in grouped_data.items(): - # Filter unique_layer_uuids to exclude the current group_id - target_uuids = [uuid for uuid in unique_layer_uuids if uuid != group_id] - - # Process each item in the group - for item in items: - # For each target UUID, create an async task for the item's embedding retrieval - for target_id in target_uuids: - task = asyncio.create_task(llm_client.async_get_embedding_with_backoff(item['description'], "text-embedding-3-large")) - tasks.append(task) - # Map the task to the target id, item's node_id, and description for later retrieval - task_to_info[task] = (target_id, item['node_id'], group_id, item['description']) - - # Await all tasks to complete and gather results - results = await asyncio.gather(*tasks) - - # Process the results, associating them with their target id, node id, and description - for task, embedding in zip(tasks, results): - target_id, node_id, group_id, description = task_to_info[task] - results_to_check.append([target_id, embedding, description, node_id, group_id]) - - return results_to_check diff --git a/cognee/modules/cognify/llm/classify_content.py b/cognee/modules/cognify/llm/classify_content.py index 965240e02..aec44c957 100644 --- a/cognee/modules/cognify/llm/classify_content.py +++ b/cognee/modules/cognify/llm/classify_content.py @@ -11,7 +11,7 @@ async def classify_into_categories(text_input: str, system_prompt_path: str, res llm_output = await llm_client.acreate_structured_output(text_input, system_prompt, response_model) - return extract_categories(llm_output.dict()) + return extract_categories(llm_output.model_dump()) def extract_categories(llm_output) -> List[dict]: # Extract the first subclass from the list (assuming there could be more) diff --git a/cognee/modules/cognify/llm/content_to_cog_layers.py b/cognee/modules/cognify/llm/content_to_cog_layers.py index 18c454ec3..8c27a8e92 100644 --- a/cognee/modules/cognify/llm/content_to_cog_layers.py +++ b/cognee/modules/cognify/llm/content_to_cog_layers.py @@ -4,9 +4,9 @@ from pydantic import BaseModel from cognee.infrastructure.llm.get_llm_client import get_llm_client from cognee.utils import async_render_template -async def content_to_cog_layers(filename: str, context, response_model: Type[BaseModel]): +async def content_to_cog_layers(context, response_model: Type[BaseModel]): llm_client = get_llm_client() - formatted_text_input = await async_render_template(filename, context) + formatted_text_input = await async_render_template("generate_cog_layers.txt", context) return await llm_client.acreate_structured_output(formatted_text_input, formatted_text_input, response_model) diff --git a/cognee/modules/cognify/llm/resolve_cross_graph_references.py b/cognee/modules/cognify/llm/resolve_cross_graph_references.py new file mode 100644 index 000000000..55ede0e92 --- /dev/null +++ b/cognee/modules/cognify/llm/resolve_cross_graph_references.py @@ -0,0 +1,39 @@ +from typing import Dict, List +from cognee.infrastructure.databases.vector import get_vector_database +from cognee.infrastructure import infrastructure_config + +async def resolve_cross_graph_references(nodes_by_layer: Dict): + results = [] + + unique_layers = nodes_by_layer.keys() + + for layer_id, layer_nodes in nodes_by_layer.items(): + # Filter unique_layer_uuids to exclude the current layer + other_layers = [uuid for uuid in unique_layers if uuid != layer_id] + + for other_layer in other_layers: + results.append(await get_nodes_by_layer(other_layer, layer_nodes)) + + return results + +async def get_nodes_by_layer(layer_id: str, layer_nodes: List): + vector_engine = infrastructure_config.get_config()["vector_engine"] + + score_points = await vector_engine.batch_search( + layer_id, + list(map(lambda node: node["description"], layer_nodes)), + limit = 3 + ) + + return { + "layer_id": layer_id, + "layer_nodes": connect_score_points_to_node(score_points, layer_nodes) + } + +def connect_score_points_to_node(score_points, layer_nodes): + return [ + { + "id": node["node_id"], + "score_points": score_points[node_index] + } for node_index, node in enumerate(layer_nodes) + ] diff --git a/cognee/modules/cognify/vector/add_propositions.py b/cognee/modules/cognify/vector/add_propositions.py index d54e1627c..13ba413ec 100644 --- a/cognee/modules/cognify/vector/add_propositions.py +++ b/cognee/modules/cognify/vector/add_propositions.py @@ -1,40 +1,28 @@ import asyncio -from qdrant_client import models -from cognee.infrastructure.llm.get_llm_client import get_llm_client -from cognee.infrastructure.databases.vector import get_vector_database +from cognee.infrastructure.databases.vector import DataPoint +from cognee.infrastructure import infrastructure_config -async def get_embeddings(texts:list): - """ Get embeddings for a list of texts""" - client = get_llm_client() - tasks = [client.async_get_embedding_with_backoff(text, "text-embedding-3-large") for text in texts] - return await asyncio.gather(*tasks) - -async def add_proposition_to_vector_store(id, metadata, embeddings, collection_name): - """ Upload a single embedding to a collection in Qdrant.""" - client = get_vector_database() - - await client.create_data_points( - collection_name = collection_name, - data_points = [ - models.PointStruct( - id = id, - payload = metadata, - vector = {"content" : embeddings} - ) - ] +def convert_to_data_point(node): + return DataPoint( + id = node["node_id"], + payload = { + "text": node["description"] + }, + embed_field = "text" ) +async def add_propositions(nodes_by_layer): + vector_engine = infrastructure_config.get_config()["vector_engine"] -async def add_propositions(node_descriptions): - for item in node_descriptions: - embeddings = await get_embeddings([item["description"]]) + awaitables = [] - await add_proposition_to_vector_store( - id = item["node_id"], - metadata = { - "meta": item["description"] - }, - embeddings = embeddings[0], - collection_name = item["layer_decomposition_uuid"] + for layer_id, layer_nodes in nodes_by_layer.items(): + awaitables.append( + vector_engine.create_data_points( + collection_name = layer_id, + data_points = list(map(convert_to_data_point, layer_nodes)) + ) ) + + return await asyncio.gather(*awaitables) diff --git a/cognee/modules/cognify/vector/batch_search.py b/cognee/modules/cognify/vector/batch_search.py deleted file mode 100644 index 8e1694181..000000000 --- a/cognee/modules/cognify/vector/batch_search.py +++ /dev/null @@ -1,24 +0,0 @@ - -async def adapted_qdrant_batch_search(results_to_check, vector_client): - search_results_list = [] - - for result in results_to_check: - id = result[0] - embedding = result[1] - node_id = result[2] - target = result[3] - - # Assuming each result in results_to_check contains a single embedding - limits = [3] * len(embedding) # Set a limit of 3 results for this embedding - - try: - #Perform the batch search for this id with its embedding - #Assuming qdrant_batch_search function accepts a single embedding and a list of limits - #qdrant_batch_search - id_search_results = await vector_client.batch_search(collection_name = id, embeddings = embedding, with_vectors = limits) - search_results_list.append((id, id_search_results, node_id, target)) - except Exception as e: - print(f"Error during batch search for ID {id}: {e}") - continue - - return search_results_list diff --git a/cognee/modules/search/graph/search_adjacent.py b/cognee/modules/search/graph/search_adjacent.py index aac309488..941a6d536 100644 --- a/cognee/modules/search/graph/search_adjacent.py +++ b/cognee/modules/search/graph/search_adjacent.py @@ -14,6 +14,6 @@ def search_adjacent(G, node_id:str)->dict: for neighbor in neighbors: # Access the 'description' attribute for each neighbor # The get method returns None if 'description' attribute does not exist for the node - neighbor_descriptions[neighbor] = G.nodes[neighbor].get('description') + neighbor_descriptions[neighbor] = G.nodes[neighbor].get("description") return neighbor_descriptions \ No newline at end of file diff --git a/cognee/modules/search/vector/search_similarity.py b/cognee/modules/search/vector/search_similarity.py index 04ac45ec1..214f2197c 100644 --- a/cognee/modules/search/vector/search_similarity.py +++ b/cognee/modules/search/vector/search_similarity.py @@ -1,24 +1,21 @@ -from cognee.infrastructure.llm.get_llm_client import get_llm_client from cognee.modules.cognify.graph.add_node_connections import extract_node_descriptions +from cognee.infrastructure import infrastructure_config -async def search_similarity(query:str ,graph): +async def search_similarity(query: str, graph): node_descriptions = await extract_node_descriptions(graph.nodes(data = True)) # print(node_descriptions) unique_layer_uuids = set(node["layer_decomposition_uuid"] for node in node_descriptions) - client = get_llm_client() out = [] - query = await client.async_get_embedding_with_backoff(query) - # print(query) - for id in unique_layer_uuids: - from cognee.infrastructure.databases.vector.get_vector_database import get_vector_database - vector_client = get_vector_database() - result = await vector_client.search(id, query,10) + for id in unique_layer_uuids: + vector_engine = infrastructure_config.get_config()["vector_engine"] + + result = await vector_engine.search(id, query, 10) if result: result_ = [ result_.id for result_ in result] @@ -28,15 +25,18 @@ async def search_similarity(query:str ,graph): relevant_context = [] + if len(out) == 0: + return [] + for proposition_id in out[0][0]: print(proposition_id) - for n,attr in graph.nodes(data=True): + for n, attr in graph.nodes(data = True): if proposition_id in n: for n_, attr_ in graph.nodes(data=True): - relevant_layer = attr['layer_uuid'] + relevant_layer = attr["layer_uuid"] - if attr_.get('layer_uuid') == relevant_layer: - print(attr_['description']) - relevant_context.append(attr_['description']) + if attr_.get("layer_uuid") == relevant_layer: + print(attr_["description"]) + relevant_context.append(attr_["description"]) - return relevant_context \ No newline at end of file + return relevant_context diff --git a/cognee/shared/encode_uuid.py b/cognee/shared/encode_uuid.py new file mode 100644 index 000000000..21fda1a91 --- /dev/null +++ b/cognee/shared/encode_uuid.py @@ -0,0 +1,14 @@ +from uuid import UUID + +def encode_uuid(uuid: UUID) -> str: + uuid_int = uuid.int + base = 52 + charset = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' + + encoded = '' + while len(encoded) < 36: + uuid_int, remainder = divmod(uuid_int, base) + uuid_int = uuid_int * 8 + encoded = charset[remainder] + encoded + + return encoded diff --git a/poetry.lock b/poetry.lock index df01c14d6..b52430875 100644 --- a/poetry.lock +++ b/poetry.lock @@ -412,6 +412,20 @@ tests = ["attrs[tests-no-zope]", "zope-interface"] tests-mypy = ["mypy (>=1.6)", "pytest-mypy-plugins"] tests-no-zope = ["attrs[tests-mypy]", "cloudpickle", "hypothesis", "pympler", "pytest (>=4.3.0)", "pytest-xdist[psutil]"] +[[package]] +name = "authlib" +version = "1.3.0" +description = "The ultimate Python library in building OAuth and OpenID Connect servers and clients." +optional = false +python-versions = ">=3.8" +files = [ + {file = "Authlib-1.3.0-py2.py3-none-any.whl", hash = "sha256:9637e4de1fb498310a56900b3e2043a206b03cb11c05422014b0302cbc814be3"}, + {file = "Authlib-1.3.0.tar.gz", hash = "sha256:959ea62a5b7b5123c5059758296122b57cd2585ae2ed1c0622c21b371ffdae06"}, +] + +[package.dependencies] +cryptography = "*" + [[package]] name = "babel" version = "2.14.0" @@ -1955,6 +1969,21 @@ files = [ [package.extras] protobuf = ["grpcio-tools (>=1.62.1)"] +[[package]] +name = "grpcio-health-checking" +version = "1.62.1" +description = "Standard Health Checking Service for gRPC" +optional = false +python-versions = ">=3.6" +files = [ + {file = "grpcio-health-checking-1.62.1.tar.gz", hash = "sha256:9e56180a941b1d32a077d7491e0611d0483c396358afd5349bf00152612e4583"}, + {file = "grpcio_health_checking-1.62.1-py3-none-any.whl", hash = "sha256:9ce761c09fc383e7aa2f7e6c0b0b65d5a1157c1b98d1f5871f7c38aca47d49b9"}, +] + +[package.dependencies] +grpcio = ">=1.62.1" +protobuf = ">=4.21.6" + [[package]] name = "grpcio-tools" version = "1.62.1" @@ -7690,6 +7719,28 @@ h11 = ">=0.8" [package.extras] standard = ["colorama (>=0.4)", "httptools (>=0.5.0)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1)", "watchfiles (>=0.13)", "websockets (>=10.4)"] +[[package]] +name = "validators" +version = "0.22.0" +description = "Python Data Validation for Humans™" +optional = false +python-versions = ">=3.8" +files = [ + {file = "validators-0.22.0-py3-none-any.whl", hash = "sha256:61cf7d4a62bbae559f2e54aed3b000cea9ff3e2fdbe463f51179b92c58c9585a"}, + {file = "validators-0.22.0.tar.gz", hash = "sha256:77b2689b172eeeb600d9605ab86194641670cdb73b60afd577142a9397873370"}, +] + +[package.extras] +docs-offline = ["myst-parser (>=2.0.0)", "pypandoc-binary (>=1.11)", "sphinx (>=7.1.1)"] +docs-online = ["mkdocs (>=1.5.2)", "mkdocs-git-revision-date-localized-plugin (>=1.2.0)", "mkdocs-material (>=9.2.6)", "mkdocstrings[python] (>=0.22.0)", "pyaml (>=23.7.0)"] +hooks = ["pre-commit (>=3.3.3)"] +package = ["build (>=1.0.0)", "twine (>=4.0.2)"] +runner = ["tox (>=4.11.1)"] +sast = ["bandit[toml] (>=1.7.5)"] +testing = ["pytest (>=7.4.0)"] +tooling = ["black (>=23.7.0)", "pyright (>=1.1.325)", "ruff (>=0.0.287)"] +tooling-extras = ["pyaml (>=23.7.0)", "pypandoc-binary (>=1.11)", "pytest (>=7.4.0)"] + [[package]] name = "watchdog" version = "4.0.0" @@ -7742,6 +7793,27 @@ files = [ {file = "wcwidth-0.2.13.tar.gz", hash = "sha256:72ea0c06399eb286d978fdedb6923a9eb47e1c486ce63e9b4e64fc18303972b5"}, ] +[[package]] +name = "weaviate-client" +version = "4.5.4" +description = "A python native Weaviate client" +optional = false +python-versions = ">=3.8" +files = [ + {file = "weaviate-client-4.5.4.tar.gz", hash = "sha256:fc53dc73cd53df453c5e6dc758e49a6a1549212d6670ddd013392107120692f8"}, + {file = "weaviate_client-4.5.4-py3-none-any.whl", hash = "sha256:f6d3a6b759e5aa0d3350067490526ea38b9274ae4043b4a3ae0064c28d56883f"}, +] + +[package.dependencies] +authlib = ">=1.2.1,<2.0.0" +grpcio = ">=1.57.0,<2.0.0" +grpcio-health-checking = ">=1.57.0,<2.0.0" +grpcio-tools = ">=1.57.0,<2.0.0" +httpx = "0.27.0" +pydantic = ">=2.5.0,<3.0.0" +requests = ">=2.30.0,<3.0.0" +validators = "0.22.0" + [[package]] name = "webcolors" version = "1.13" @@ -8057,4 +8129,4 @@ weaviate = [] [metadata] lock-version = "2.0" python-versions = "~3.10" -content-hash = "37a0db9a6a86b71a35c91ac5ef86204d76529033260032917906a907bffc8216" +content-hash = "d742617c6e8a62dc9bff8656fa97955f63c991720805388190b205da66e4712a" diff --git a/pyproject.toml b/pyproject.toml index 5c4a70c74..34b5e3bb4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ qdrant-client = "^1.8.0" duckdb-engine = "^0.11.2" graphistry = "^0.33.5" tenacity = "^8.2.3" +weaviate-client = "^4.5.4" [tool.poetry.extras] dbt = ["dbt-core", "dbt-redshift", "dbt-bigquery", "dbt-duckdb", "dbt-snowflake", "dbt-athena-community", "dbt-databricks"]