Neo4j base node label (#903)

Adds base node label to Neo4j Adapter

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.

---------

Signed-off-by: Diego B Theuerkauf <diego.theuerkauf@tuebingen.mpg.de>
Co-authored-by: Boris <boris@topoteretes.com>
Co-authored-by: vasilije <vas.markovic@gmail.com>
Co-authored-by: Igor Ilic <30923996+dexters1@users.noreply.github.com>
Co-authored-by: Vasilije <8619304+Vasilije1990@users.noreply.github.com>
Co-authored-by: Igor Ilic <igorilic03@gmail.com>
Co-authored-by: Hande <159312713+hande-k@users.noreply.github.com>
Co-authored-by: Matea Pesic <80577904+matea16@users.noreply.github.com>
Co-authored-by: hajdul88 <52442977+hajdul88@users.noreply.github.com>
Co-authored-by: Daniel Molnar <soobrosa@gmail.com>
Co-authored-by: Diego Baptista Theuerkauf <34717973+diegoabt@users.noreply.github.com>
Co-authored-by: Dmitrii Galkin <36552323+dm1tryG@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: lxobr <122801072+lxobr@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions@users.noreply.github.com>
This commit is contained in:
Tomaz Bratanic 2025-06-05 18:15:40 +02:00 committed by GitHub
parent d720a4dbb2
commit e8487a06b5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 60 additions and 55 deletions

View file

@ -28,7 +28,7 @@
Build dynamic Agent memory using scalable, modular ECL (Extract, Cognify, Load) pipelines.
Build dynamic memory for Agents and replace RAG using scalable, modular ECL (Extract, Cognify, Load) pipelines.
More on [use-cases](https://docs.cognee.ai/use-cases) and [evals](https://github.com/topoteretes/cognee/tree/main/evals)
@ -55,7 +55,7 @@ More on [use-cases](https://docs.cognee.ai/use-cases) and [evals](https://github
## Features
- Interconnect and retrieve your past conversations, documents, images and audio transcriptions
- Reduce hallucinations, developer effort, and cost.
- Replaces RAG systems and reduces developer effort, and cost.
- Load data to graph and vector databases using only Pydantic
- Manipulate your data while ingesting from 30+ data sources

View file

@ -29,6 +29,8 @@ from .neo4j_metrics_utils import (
logger = get_logger("Neo4jAdapter", level=ERROR)
BASE_LABEL = "__Node__"
class Neo4jAdapter(GraphDBInterface):
"""
@ -48,6 +50,11 @@ class Neo4jAdapter(GraphDBInterface):
graph_database_url,
auth=(graph_database_username, graph_database_password),
max_connection_lifetime=120,
notifications_min_severity="OFF",
)
# Create contraint/index
self.query(
(f"CREATE CONSTRAINT IF NOT EXISTS FOR (n:`{BASE_LABEL}`) REQUIRE n.id IS UNIQUE;")
)
@asynccontextmanager
@ -103,8 +110,8 @@ class Neo4jAdapter(GraphDBInterface):
- bool: True if the node exists, otherwise False.
"""
results = self.query(
"""
MATCH (n)
f"""
MATCH (n:`{BASE_LABEL}`)
WHERE n.id = $node_id
RETURN COUNT(n) > 0 AS node_exists
""",
@ -129,7 +136,7 @@ class Neo4jAdapter(GraphDBInterface):
serialized_properties = self.serialize_properties(node.model_dump())
query = dedent(
"""MERGE (node {id: $node_id})
f"""MERGE (node: `{BASE_LABEL}`{{id: $node_id}})
ON CREATE SET node += $properties, node.updated_at = timestamp()
ON MATCH SET node += $properties, node.updated_at = timestamp()
WITH node, $node_label AS label
@ -161,9 +168,9 @@ class Neo4jAdapter(GraphDBInterface):
- None: None
"""
query = """
query = f"""
UNWIND $nodes AS node
MERGE (n {id: node.node_id})
MERGE (n: `{BASE_LABEL}`{{id: node.node_id}})
ON CREATE SET n += node.properties, n.updated_at = timestamp()
ON MATCH SET n += node.properties, n.updated_at = timestamp()
WITH n, node.label AS label
@ -215,9 +222,9 @@ class Neo4jAdapter(GraphDBInterface):
A list of nodes represented as dictionaries.
"""
query = """
query = f"""
UNWIND $node_ids AS id
MATCH (node {id: id})
MATCH (node: `{BASE_LABEL}`{{id: id}})
RETURN node"""
params = {"node_ids": node_ids}
@ -240,7 +247,7 @@ class Neo4jAdapter(GraphDBInterface):
The result of the query execution, typically indicating success or failure.
"""
query = "MATCH (node {id: $node_id}) DETACH DELETE node"
query = f"MATCH (node: `{BASE_LABEL}`{{id: $node_id}}) DETACH DELETE node"
params = {"node_id": node_id}
return await self.query(query, params)
@ -259,9 +266,9 @@ class Neo4jAdapter(GraphDBInterface):
- None: None
"""
query = """
query = f"""
UNWIND $node_ids AS id
MATCH (node {id: id})
MATCH (node: `{BASE_LABEL}`{{id: id}})
DETACH DELETE node"""
params = {"node_ids": node_ids}
@ -284,16 +291,15 @@ class Neo4jAdapter(GraphDBInterface):
- bool: True if the edge exists, otherwise False.
"""
query = """
MATCH (from_node)-[relationship]->(to_node)
WHERE from_node.id = $from_node_id AND to_node.id = $to_node_id AND type(relationship) = $edge_label
query = f"""
MATCH (from_node: `{BASE_LABEL}`)-[:`{edge_label}`]->(to_node: `{BASE_LABEL}`)
WHERE from_node.id = $from_node_id AND to_node.id = $to_node_id
RETURN COUNT(relationship) > 0 AS edge_exists
"""
params = {
"from_node_id": str(from_node),
"to_node_id": str(to_node),
"edge_label": edge_label,
}
edge_exists = await self.query(query, params)
@ -366,9 +372,9 @@ class Neo4jAdapter(GraphDBInterface):
query = dedent(
f"""\
MATCH (from_node {{id: $from_node}}),
(to_node {{id: $to_node}})
MERGE (from_node)-[r:{relationship_name}]->(to_node)
MATCH (from_node :`{BASE_LABEL}`{{id: $from_node}}),
(to_node :`{BASE_LABEL}`{{id: $to_node}})
MERGE (from_node)-[r:`{relationship_name}`]->(to_node)
ON CREATE SET r += $properties, r.updated_at = timestamp()
ON MATCH SET r += $properties, r.updated_at = timestamp()
RETURN r
@ -400,17 +406,17 @@ class Neo4jAdapter(GraphDBInterface):
- None: None
"""
query = """
query = f"""
UNWIND $edges AS edge
MATCH (from_node {id: edge.from_node})
MATCH (to_node {id: edge.to_node})
MATCH (from_node: `{BASE_LABEL}`{{id: edge.from_node}})
MATCH (to_node: `{BASE_LABEL}`{{id: edge.to_node}})
CALL apoc.merge.relationship(
from_node,
edge.relationship_name,
{
{{
source_node_id: edge.from_node,
target_node_id: edge.to_node
},
}},
edge.properties,
to_node
) YIELD rel
@ -451,8 +457,8 @@ class Neo4jAdapter(GraphDBInterface):
A list of edges connecting to the specified node, represented as tuples of details.
"""
query = """
MATCH (n {id: $node_id})-[r]-(m)
query = f"""
MATCH (n: `{BASE_LABEL}`{{id: $node_id}})-[r]-(m)
RETURN n, r, m
"""
@ -525,9 +531,9 @@ class Neo4jAdapter(GraphDBInterface):
- list[str]: A list of predecessor node IDs.
"""
if edge_label is not None:
query = """
MATCH (node)<-[r]-(predecessor)
WHERE node.id = $node_id AND type(r) = $edge_label
query = f"""
MATCH (node: `{BASE_LABEL}`)<-[r:`{edge_label}`]-(predecessor)
WHERE node.id = $node_id
RETURN predecessor
"""
@ -535,14 +541,13 @@ class Neo4jAdapter(GraphDBInterface):
query,
dict(
node_id=node_id,
edge_label=edge_label,
),
)
return [result["predecessor"] for result in results]
else:
query = """
MATCH (node)<-[r]-(predecessor)
query = f"""
MATCH (node: `{BASE_LABEL}`)<-[r]-(predecessor)
WHERE node.id = $node_id
RETURN predecessor
"""
@ -572,9 +577,9 @@ class Neo4jAdapter(GraphDBInterface):
- list[str]: A list of successor node IDs.
"""
if edge_label is not None:
query = """
MATCH (node)-[r]->(successor)
WHERE node.id = $node_id AND type(r) = $edge_label
query = f"""
MATCH (node: `{BASE_LABEL}`)-[r:`{edge_label}`]->(successor)
WHERE node.id = $node_id
RETURN successor
"""
@ -588,8 +593,8 @@ class Neo4jAdapter(GraphDBInterface):
return [result["successor"] for result in results]
else:
query = """
MATCH (node)-[r]->(successor)
query = f"""
MATCH (node: `{BASE_LABEL}`)-[r]->(successor)
WHERE node.id = $node_id
RETURN successor
"""
@ -634,8 +639,8 @@ class Neo4jAdapter(GraphDBInterface):
- Optional[Dict[str, Any]]: The requested node as a dictionary, or None if it does
not exist.
"""
query = """
MATCH (node {id: $node_id})
query = f"""
MATCH (node: `{BASE_LABEL}`{{id: $node_id}})
RETURN node
"""
results = await self.query(query, {"node_id": node_id})
@ -655,9 +660,9 @@ class Neo4jAdapter(GraphDBInterface):
- List[Dict[str, Any]]: A list of nodes represented as dictionaries.
"""
query = """
query = f"""
UNWIND $node_ids AS id
MATCH (node {id: id})
MATCH (node:`{BASE_LABEL}` {{id: id}})
RETURN node
"""
results = await self.query(query, {"node_ids": node_ids})
@ -677,13 +682,13 @@ class Neo4jAdapter(GraphDBInterface):
- list: A list of connections represented as tuples of details.
"""
predecessors_query = """
MATCH (node)<-[relation]-(neighbour)
predecessors_query = f"""
MATCH (node:`{BASE_LABEL}`)<-[relation]-(neighbour)
WHERE node.id = $node_id
RETURN neighbour, relation, node
"""
successors_query = """
MATCH (node)-[relation]->(neighbour)
successors_query = f"""
MATCH (node:`{BASE_LABEL}`)-[relation]->(neighbour)
WHERE node.id = $node_id
RETURN node, relation, neighbour
"""
@ -723,6 +728,7 @@ class Neo4jAdapter(GraphDBInterface):
- None: None
"""
# Not understanding
query = f"""
UNWIND $node_ids AS id
MATCH (node:`{id}`)-[r:{edge_label}]->(predecessor)
@ -751,6 +757,7 @@ class Neo4jAdapter(GraphDBInterface):
- None: None
"""
# Not understanding
query = f"""
UNWIND $node_ids AS id
MATCH (node:`{id}`)<-[r:{edge_label}]-(successor)

View file

@ -57,9 +57,9 @@ async def get_num_connected_components(adapter: Neo4jAdapter, graph_name: str):
found.
"""
query = f"""
CALL gds.wcc.stream('{graph_name}')
YIELD componentId
RETURN count(DISTINCT componentId) AS num_connected_components;
CALL gds.wcc.stats('{graph_name}')
YIELD componentCount
RETURN componentCount AS num_connected_components;
"""
result = await adapter.query(query)
@ -181,9 +181,9 @@ async def get_avg_clustering(adapter: Neo4jAdapter, graph_name: str):
The average clustering coefficient as a float, or 0 if no results are available.
"""
query = f"""
CALL gds.localClusteringCoefficient.stream('{graph_name}')
YIELD localClusteringCoefficient
RETURN avg(localClusteringCoefficient) AS avg_clustering;
CALL gds.localClusteringCoefficient.stats('{graph_name}')
YIELD averageClusteringCoefficient
RETURN averageClusteringCoefficient AS avg_clustering;
"""
result = await adapter.query(query)

View file

@ -21,7 +21,7 @@ async def get_authenticated_user(authorization: str = Header(...)) -> SimpleName
token, os.getenv("FASTAPI_USERS_JWT_SECRET", "super_secret"), algorithms=["HS256"]
)
if payload["tenant_id"]:
if payload.get("tenant_id"):
# SimpleNamespace lets us access dictionary elements like attributes
auth_data = SimpleNamespace(
id=UUID(payload["user_id"]),
@ -29,9 +29,7 @@ async def get_authenticated_user(authorization: str = Header(...)) -> SimpleName
roles=payload["roles"],
)
else:
auth_data = SimpleNamespace(
id=UUID(payload["user_id"]), tenant_id=None, roles=payload["roles"]
)
auth_data = SimpleNamespace(id=UUID(payload["user_id"]), tenant_id=None, roles=[])
return auth_data