diff --git a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py index 85b15ebfb..e992ee788 100644 --- a/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py +++ b/cognee/infrastructure/databases/graph/neo4j_driver/adapter.py @@ -76,11 +76,14 @@ class Neo4jAdapter(GraphDBInterface): """MERGE (node {id: $node_id}) ON CREATE SET node += $properties, node.updated_at = timestamp() ON MATCH SET node += $properties, node.updated_at = timestamp() - RETURN ID(node) AS internal_id, node.id AS nodeId""" + WITH node, $node_label AS label + CALL apoc.create.addLabels(node, [label]) YIELD node AS labeledNode + RETURN ID(labeledNode) AS internal_id, labeledNode.id AS nodeId""" ) params = { "node_id": str(node.id), + "node_label": type(node).__name__, "properties": serialized_properties, } @@ -92,7 +95,7 @@ class Neo4jAdapter(GraphDBInterface): MERGE (n {id: node.node_id}) ON CREATE SET n += node.properties, n.updated_at = timestamp() ON MATCH SET n += node.properties, n.updated_at = timestamp() - WITH n, node.node_id AS label + WITH n, node.label AS label CALL apoc.create.addLabels(n, [label]) YIELD node AS labeledNode RETURN ID(labeledNode) AS internal_id, labeledNode.id AS nodeId """ @@ -100,6 +103,7 @@ class Neo4jAdapter(GraphDBInterface): nodes = [ { "node_id": str(node.id), + "label": type(node).__name__, "properties": self.serialize_properties(node.model_dump()), } for node in nodes diff --git a/cognee/tests/low_level/companies.json b/examples/low_level/companies.json similarity index 100% rename from cognee/tests/low_level/companies.json rename to examples/low_level/companies.json diff --git a/examples/low_level/customers.json b/examples/low_level/customers.json new file mode 100644 index 000000000..fa83f011d --- /dev/null +++ b/examples/low_level/customers.json @@ -0,0 +1,108 @@ +[{ + "id": "customer_1", + "name": "John Doe", + "preferences": [{ + "id": "preference_1", + "name": "ShoeSize", + "value": "40.5" + }, { + "id": "preference_2", + "name": "Color", + "value": "Navy Blue" + }, { + "id": "preference_3", + "name": "Color", + "value": "White" + }, { + "id": "preference_4", + "name": "ShoeType", + "value": "Regular Sneakers" + }], + "products": [{ + "id": "product_1", + "name": "Sneakers", + "price": 79.99, + "colors": ["Blue", "Brown"], + "type": "Regular Sneakers", + "action": "purchased" + }, { + "id": "product_2", + "name": "Shirt", + "price": 19.99, + "colors": ["Black"], + "type": "T-Shirt", + "action": "liked" + }, { + "id": "product_3", + "name": "Jacket", + "price": 59.99, + "colors": ["Gray", "White"], + "type": "Jacket", + "action": "purchased" + }, { + "id": "product_4", + "name": "Shoes", + "price": 129.99, + "colors": ["Red", "Black"], + "type": "Formal Shoes", + "action": "liked" + }] +}, { + "id": "customer_2", + "name": "Jane Smith", + "preferences": [{ + "id": "preference_5", + "name": "ShoeSize", + "value": "38.5" + }, { + "id": "preference_6", + "name": "Color", + "value": "Black" + }, { + "id": "preference_7", + "name": "ShoeType", + "value": "Slip-On" + }], + "products": [{ + "id": "product_5", + "name": "Sneakers", + "price": 69.99, + "colors": ["Blue", "White"], + "type": "Slip-On", + "action": "purchased" + }, { + "id": "product_6", + "name": "Shirt", + "price": 14.99, + "colors": ["Red", "Blue"], + "type": "T-Shirt", + "action": "purchased" + }, { + "id": "product_7", + "name": "Jacket", + "price": 49.99, + "colors": ["Gray", "Black"], + "type": "Jacket", + "action": "liked" + }] +}, { + "id": "customer_3", + "name": "Michael Johnson", + "preferences": [{ + "id": "preference_8", + "name": "Color", + "value": "Red" + }, { + "id": "preference_9", + "name": "ShoeType", + "value": "Boots" + }], + "products": [{ + "id": "product_8", + "name": "Cowboy Boots", + "price": 299.99, + "colors": ["Red", "White"], + "type": "Cowboy Boots", + "action": "purchased" + }] +}] diff --git a/cognee/tests/low_level/people.json b/examples/low_level/people.json similarity index 100% rename from cognee/tests/low_level/people.json rename to examples/low_level/people.json diff --git a/cognee/tests/low_level/pipeline.py b/examples/low_level/pipeline.py similarity index 100% rename from cognee/tests/low_level/pipeline.py rename to examples/low_level/pipeline.py diff --git a/examples/low_level/product_recommendation.py b/examples/low_level/product_recommendation.py new file mode 100644 index 000000000..15fb9a7d0 --- /dev/null +++ b/examples/low_level/product_recommendation.py @@ -0,0 +1,197 @@ +import os +import json +import asyncio +from neo4j import exceptions + +from cognee import prune + +# from cognee import visualize_graph +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.low_level import setup, DataPoint +from cognee.pipelines import run_tasks, Task +from cognee.tasks.storage import add_data_points +from cognee.shared.utils import render_graph + + +class Products(DataPoint): + name: str = "Products" + + +products_aggregator_node = Products() + + +class Product(DataPoint): + id: str + name: str + type: str + price: float + colors: list[str] + is_type: Products = products_aggregator_node + + +class Preferences(DataPoint): + name: str = "Preferences" + + +preferences_aggregator_node = Preferences() + + +class Preference(DataPoint): + id: str + name: str + value: str + is_type: Preferences = preferences_aggregator_node + + +class Customers(DataPoint): + name: str = "Customers" + + +customers_aggregator_node = Customers() + + +class Customer(DataPoint): + id: str + name: str + has_preference: list[Preference] + purchased: list[Product] + liked: list[Product] + is_type: Customers = customers_aggregator_node + + +def ingest_files(): + customers_file_path = os.path.join(os.path.dirname(__file__), "customers.json") + customers = json.loads(open(customers_file_path, "r").read()) + + customers_data_points = {} + products_data_points = {} + preferences_data_points = {} + + for customer in customers: + new_customer = Customer( + id=customer["id"], + name=customer["name"], + liked=[], + purchased=[], + has_preference=[], + ) + customers_data_points[customer["name"]] = new_customer + + for product in customer["products"]: + if product["id"] not in products_data_points: + products_data_points[product["id"]] = Product( + id=product["id"], + type=product["type"], + name=product["name"], + price=product["price"], + colors=product["colors"], + ) + + new_product = products_data_points[product["id"]] + + if product["action"] == "purchased": + new_customer.purchased.append(new_product) + elif product["action"] == "liked": + new_customer.liked.append(new_product) + + for preference in customer["preferences"]: + if preference["id"] not in preferences_data_points: + preferences_data_points[preference["id"]] = Preference( + id=preference["id"], + name=preference["name"], + value=preference["value"], + ) + + new_preference = preferences_data_points[preference["id"]] + new_customer.has_preference.append(new_preference) + + return customers_data_points.values() + + +async def main(): + await prune.prune_data() + await prune.prune_system(metadata=True) + + await setup() + + pipeline = run_tasks([Task(ingest_files), Task(add_data_points)]) + + async for status in pipeline: + print(status) + + # Get a graphistry url (Register for a free account at https://www.graphistry.com) + await render_graph() + + graph_engine = await get_graph_engine() + + products_results = await graph_engine.query( + """ + // Step 1: Use new customers's preferences from input + UNWIND $preferences AS pref_input + + // Step 2: Find other customers who have these preferences + MATCH (other_customer:Customer)-[:has_preference]->(preference:Preference) + WHERE preference.value = pref_input + + WITH other_customer, count(preference) AS similarity_score + + // Step 3: Limit to the top-N most similar customers + ORDER BY similarity_score DESC + LIMIT 5 + + // Step 4: Get products that these similar customers have purchased + MATCH (other_customer)-[:purchased]->(product:Product) + + // Step 5: Rank products based on frequency + RETURN product, count(*) AS recommendation_score + ORDER BY recommendation_score DESC + LIMIT 10 + """, + { + "preferences": ["White", "Navy Blue", "Regular Sneakers"], + }, + ) + + print("Top 10 recommended products:") + for result in products_results: + print(f"{result['product']['id']}: {result['product']['name']}") + + try: + await graph_engine.query( + """ + // Match the customer and their stored shoe size preference + MATCH (customer:Customer {id: $customer_id}) + OPTIONAL MATCH (customer)-[:has_preference]->(preference:Preference {name: 'ShoeSize'}) + + // Assume the new shoe size is passed as a parameter $new_size + WITH customer, preference, $new_size AS new_size + + // If a stored preference exists and it does not match the new value, + // raise an error using APOC's utility procedure. + CALL apoc.util.validate( + preference IS NOT NULL AND preference.value <> new_size, + "Conflicting shoe size preference: existing size is " + preference.value + " and new size is " + new_size, + [] + ) + + // If no conflict, continue with the update or further processing + // ... + RETURN customer + """, + { + "customer_id": "customer_1", + "new_size": "42", + }, + ) + except exceptions.ClientError as error: + print(f"Anomaly detected: {str(error.message)}") + + # # Or use our simple graph preview + # graph_file_path = str( + # os.path.join(os.path.dirname(__file__), ".artifacts/graph_visualization.html") + # ) + # await visualize_graph(graph_file_path) + + +if __name__ == "__main__": + asyncio.run(main())