fix: change weaviate batch update to use dynamic batch
This commit is contained in:
parent
c8c2d45cb1
commit
40bb4bc37f
2 changed files with 18 additions and 18 deletions
|
|
@ -63,11 +63,10 @@ class Neo4jAdapter(GraphDBInterface):
|
||||||
async def add_node(self, node: DataPoint):
|
async def add_node(self, node: DataPoint):
|
||||||
serialized_properties = self.serialize_properties(node.model_dump())
|
serialized_properties = self.serialize_properties(node.model_dump())
|
||||||
|
|
||||||
query = """MERGE (node {id: $node_id})
|
query = dedent("""MERGE (node {id: $node_id})
|
||||||
ON CREATE SET node += $properties
|
ON CREATE SET node += $properties, node.updated_at = timestamp()
|
||||||
ON MATCH SET node += $properties
|
ON MATCH SET node += $properties, node.updated_at = timestamp()
|
||||||
ON MATCH SET node.updated_at = timestamp()
|
RETURN ID(node) AS internal_id, node.id AS nodeId""")
|
||||||
RETURN ID(node) AS internal_id, node.id AS nodeId"""
|
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
"node_id": str(node.id),
|
"node_id": str(node.id),
|
||||||
|
|
@ -80,9 +79,8 @@ class Neo4jAdapter(GraphDBInterface):
|
||||||
query = """
|
query = """
|
||||||
UNWIND $nodes AS node
|
UNWIND $nodes AS node
|
||||||
MERGE (n {id: node.node_id})
|
MERGE (n {id: node.node_id})
|
||||||
ON CREATE SET n += node.properties
|
ON CREATE SET n += node.properties, n.updated_at = timestamp()
|
||||||
ON MATCH SET n += node.properties
|
ON MATCH SET n += node.properties, n.updated_at = timestamp()
|
||||||
ON MATCH SET n.updated_at = timestamp()
|
|
||||||
WITH n, node.node_id AS label
|
WITH n, node.node_id AS label
|
||||||
CALL apoc.create.addLabels(n, [label]) YIELD node AS labeledNode
|
CALL apoc.create.addLabels(n, [label]) YIELD node AS labeledNode
|
||||||
RETURN ID(labeledNode) AS internal_id, labeledNode.id AS nodeId
|
RETURN ID(labeledNode) AS internal_id, labeledNode.id AS nodeId
|
||||||
|
|
@ -137,8 +135,9 @@ class Neo4jAdapter(GraphDBInterface):
|
||||||
return await self.query(query, params)
|
return await self.query(query, params)
|
||||||
|
|
||||||
async def has_edge(self, from_node: UUID, to_node: UUID, edge_label: str) -> bool:
|
async def has_edge(self, from_node: UUID, to_node: UUID, edge_label: str) -> bool:
|
||||||
query = f"""
|
query = """
|
||||||
MATCH (from_node:`{str(from_node)}`)-[relationship:`{edge_label}`]->(to_node:`{str(to_node)}`)
|
MATCH (from_node)-[relationship]->(to_node)
|
||||||
|
WHERE from_node.id = $from_node_id AND to_node.id = $to_node_id AND type(relationship) = $edge_label
|
||||||
RETURN COUNT(relationship) > 0 AS edge_exists
|
RETURN COUNT(relationship) > 0 AS edge_exists
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
@ -178,17 +177,18 @@ class Neo4jAdapter(GraphDBInterface):
|
||||||
async def add_edge(self, from_node: UUID, to_node: UUID, relationship_name: str, edge_properties: Optional[Dict[str, Any]] = {}):
|
async def add_edge(self, from_node: UUID, to_node: UUID, relationship_name: str, edge_properties: Optional[Dict[str, Any]] = {}):
|
||||||
serialized_properties = self.serialize_properties(edge_properties)
|
serialized_properties = self.serialize_properties(edge_properties)
|
||||||
|
|
||||||
query = f"""MATCH (from_node:`{str(from_node)}`
|
query = dedent("""MATCH (from_node {id: $from_node}),
|
||||||
{{id: $from_node}}),
|
(to_node {id: $to_node})
|
||||||
(to_node:`{str(to_node)}` {{id: $to_node}})
|
MERGE (from_node)-[r]->(to_node)
|
||||||
MERGE (from_node)-[r:`{relationship_name}`]->(to_node)
|
ON CREATE SET r += $properties, r.updated_at = timestamp(), r.type = $relationship_name
|
||||||
ON CREATE SET r += $properties, r.updated_at = timestamp()
|
ON MATCH SET r += $properties, r.updated_at = timestamp()
|
||||||
ON MATCH SET r += $properties, r.updated_at = timestamp()
|
RETURN r
|
||||||
RETURN r"""
|
""")
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
"from_node": str(from_node),
|
"from_node": str(from_node),
|
||||||
"to_node": str(to_node),
|
"to_node": str(to_node),
|
||||||
|
"relationship_name": relationship_name,
|
||||||
"properties": serialized_properties
|
"properties": serialized_properties
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -115,7 +115,7 @@ class WeaviateAdapter(VectorDBInterface):
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
data_point: DataObject = data_points[0]
|
data_point: DataObject = data_points[0]
|
||||||
return collection.data.insert(
|
return collection.data.update(
|
||||||
uuid = data_point.uuid,
|
uuid = data_point.uuid,
|
||||||
vector = data_point.vector,
|
vector = data_point.vector,
|
||||||
properties = data_point.properties,
|
properties = data_point.properties,
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue