diff --git a/age_adapter/adapter.py b/age_adapter/adapter.py index c5405e828..352a077b8 100644 --- a/age_adapter/adapter.py +++ b/age_adapter/adapter.py @@ -272,48 +272,55 @@ class ApacheAGEAdapter: if not nodes: return - # Process in batches of 100 + # Group nodes by label for efficient batch processing + nodes_by_label = {} + for node_id, labels, properties in nodes: + label_key = ':'.join(labels) if labels else "Node" + if label_key not in nodes_by_label: + nodes_by_label[label_key] = [] + nodes_by_label[label_key].append((node_id, labels, properties)) + + # Process each label group in batches BATCH_SIZE = 100 - for i in range(0, len(nodes), BATCH_SIZE): - batch = nodes[i:i + BATCH_SIZE] - - node_data_list = [] - for node_id, labels, properties in batch: - props = {"id": node_id, **properties} - props_parts = [] - for k, v in props.items(): - if isinstance(v, str): - props_parts.append(f'{k}: "{v}"') - elif isinstance(v, bool): - props_parts.append(f'{k}: {str(v).lower()}') - elif isinstance(v, (int, float)): - props_parts.append(f'{k}: {v}') - elif v is None: - props_parts.append(f'{k}: null') - else: - props_parts.append(f'{k}: "{json.dumps(v)}"') - props_str = '{' + ', '.join(props_parts) + '}' - label_str = ':'.join(labels) if labels else "Node" - node_data_list.append(f'{{id: "{node_id}", props: {props_str}, label: "{label_str}"}}') - - unwind_data = '[' + ', '.join(node_data_list) + ']' - - all_prop_keys = set() - for node_id, labels, properties in batch: - all_prop_keys.update(properties.keys()) - all_prop_keys.add('id') - - set_clauses = [f"n.{key} = node_data.props.{key}" for key in sorted(all_prop_keys)] - set_clause = "SET " + ", ".join(set_clauses) - - common_label = batch[0][1][0] if batch[0][1] else "Node" - query = f""" - UNWIND {unwind_data} AS node_data - MERGE (n {{id: node_data.id}}) - {set_clause} - """ - await self.execute_cypher(query) + for label_key, label_nodes in nodes_by_label.items(): + for i in range(0, len(label_nodes), BATCH_SIZE): + batch = label_nodes[i:i + BATCH_SIZE] + + node_data_list = [] + for node_id, labels, properties in batch: + props = {"id": node_id, **properties} + props_parts = [] + for k, v in props.items(): + if isinstance(v, str): + props_parts.append(f'{k}: "{v}"') + elif isinstance(v, bool): + props_parts.append(f'{k}: {str(v).lower()}') + elif isinstance(v, (int, float)): + props_parts.append(f'{k}: {v}') + elif v is None: + props_parts.append(f'{k}: null') + else: + props_parts.append(f'{k}: "{json.dumps(v)}"') + props_str = '{' + ', '.join(props_parts) + '}' + node_data_list.append(f'{{id: "{node_id}", props: {props_str}}}') + + unwind_data = '[' + ', '.join(node_data_list) + ']' + + all_prop_keys = set() + for node_id, labels, properties in batch: + all_prop_keys.update(properties.keys()) + all_prop_keys.add('id') + + set_clauses = [f"n.{key} = node_data.props.{key}" for key in sorted(all_prop_keys)] + set_clause = "SET " + ", ".join(set_clauses) if set_clauses else "" + + query = f""" + UNWIND {unwind_data} AS node_data + MERGE (n:{label_key} {{id: node_data.id}}) + {set_clause} + """ + await self.execute_cypher(query) async def get_node(self, node_id: str) -> Optional[NodeData]: """ @@ -398,16 +405,14 @@ class ApacheAGEAdapter: # Use MERGE to avoid duplicate edges query = f""" - MATCH (a {{id: '{source_id}'}}), (b {{id: '{target_id}'}}) - MERGE (a)-[r:{relationship_type}]->(b) + MERGE (a {{id: '{source_id}'}})-[r:{relationship_type}]->(b {{id: '{target_id}'}}) SET r = {{{props_str}}} RETURN r """ else: # Use MERGE without properties query = f""" - MATCH (a {{id: '{source_id}'}}), (b {{id: '{target_id}'}}) - MERGE (a)-[r:{relationship_type}]->(b) + MERGE (a {{id: '{source_id}'}})-[r:{relationship_type}]->(b {{id: '{target_id}'}}) RETURN r """ @@ -497,8 +502,7 @@ class ApacheAGEAdapter: query = f""" UNWIND {values_list} AS edge - MATCH (a {{id: edge.src}}), (b {{id: edge.tgt}}) - MERGE (a)-[r:{rel_type}]->(b) + MERGE (a {{id: edge.src}})-[r:{rel_type}]->(b {{id: edge.tgt}}) {set_clause} """ diff --git a/age_adapter/benchmark.py b/age_adapter/benchmark.py index ef0947c23..cf0b5ebbf 100644 --- a/age_adapter/benchmark.py +++ b/age_adapter/benchmark.py @@ -89,6 +89,55 @@ async def main(): print(f"Node Ingestion Single (Merge - {half} existing, {len(new_nodes)} new): AGE={age_time_single_merge:.4f}s, Neo4j={neo4j_time_single_merge:.4f}s") + edges = [(f"node_{i}", f"node_{(i+1) % batch_size}", "CONNECTS", {"weight": 1.0}) + for i in range(batch_size)] + + start = time.perf_counter() + for source_id, target_id, rel_type, props in edges: + await age_adapter.add_edge(source_id, target_id, rel_type, props) + age_time_edge_single_new = time.perf_counter() - start + + start = time.perf_counter() + for source_id, target_id, rel_type, props in edges: + try: + src_uuid = UUID(source_id) if '-' in source_id else UUID(int=hash(source_id) & ((1 << 128) - 1)) + except: + src_uuid = UUID(int=hash(source_id) & ((1 << 128) - 1)) + try: + tgt_uuid = UUID(target_id) if '-' in target_id else UUID(int=hash(target_id) & ((1 << 128) - 1)) + except: + tgt_uuid = UUID(int=hash(target_id) & ((1 << 128) - 1)) + await neo4j_adapter.add_edge(src_uuid, tgt_uuid, rel_type, props) + neo4j_time_edge_single_new = time.perf_counter() - start + + print(f"Edge Ingestion Single (New): AGE={age_time_edge_single_new:.4f}s, Neo4j={neo4j_time_edge_single_new:.4f}s") + + half_edges = batch_size // 2 + existing_edges = edges[:half_edges] + new_edge_ids = [(f"node_{i}", f"node_{(i+1) % batch_size}", "CONNECTS", {"weight": 1.0}) + for i in range(batch_size, batch_size + half_edges)] + merge_edges = existing_edges + new_edge_ids + + start = time.perf_counter() + for source_id, target_id, rel_type, props in merge_edges: + await age_adapter.add_edge(source_id, target_id, rel_type, props) + age_time_edge_single_merge = time.perf_counter() - start + + start = time.perf_counter() + for source_id, target_id, rel_type, props in merge_edges: + try: + src_uuid = UUID(source_id) if '-' in source_id else UUID(int=hash(source_id) & ((1 << 128) - 1)) + except: + src_uuid = UUID(int=hash(source_id) & ((1 << 128) - 1)) + try: + tgt_uuid = UUID(target_id) if '-' in target_id else UUID(int=hash(target_id) & ((1 << 128) - 1)) + except: + tgt_uuid = UUID(int=hash(target_id) & ((1 << 128) - 1)) + await neo4j_adapter.add_edge(src_uuid, tgt_uuid, rel_type, props) + neo4j_time_edge_single_merge = time.perf_counter() - start + + print(f"Edge Ingestion Single (Merge - {half_edges} existing, {len(new_edge_ids)} new): AGE={age_time_edge_single_merge:.4f}s, Neo4j={neo4j_time_edge_single_merge:.4f}s") + await age_adapter.drop_graph(recreate=True) await neo4j_adapter.delete_graph() @@ -126,6 +175,58 @@ async def main(): print(f"Node Ingestion Batch (Merge - {half} existing, {len(new_nodes)} new): AGE={age_time_batch_merge:.4f}s, Neo4j={neo4j_time_batch_merge:.4f}s") + start = time.perf_counter() + for i in range(0, len(edges), 100): + await age_adapter.add_edges(edges[i:i+100]) + age_time_edge_batch_new = time.perf_counter() - start + + start = time.perf_counter() + for i in range(0, len(edges), 100): + batch = edges[i:i+100] + def to_uuid(s): + try: + return UUID(s) if '-' in s else UUID(int=hash(s) & ((1 << 128) - 1)) + except: + return UUID(int=hash(s) & ((1 << 128) - 1)) + edge_tuples = [(to_uuid(src), to_uuid(tgt), rel_type, props) + for src, tgt, rel_type, props in batch] + await neo4j_adapter.add_edges(edge_tuples) + neo4j_time_edge_batch_new = time.perf_counter() - start + + print(f"Edge Ingestion Batch (New): AGE={age_time_edge_batch_new:.4f}s, Neo4j={neo4j_time_edge_batch_new:.4f}s") + + for i in range(0, len(existing_edges), 100): + await age_adapter.add_edges(existing_edges[i:i+100]) + batch = existing_edges[i:i+100] + def to_uuid(s): + try: + return UUID(s) if '-' in s else UUID(int=hash(s) & ((1 << 128) - 1)) + except: + return UUID(int=hash(s) & ((1 << 128) - 1)) + edge_tuples = [(to_uuid(src), to_uuid(tgt), rel_type, props) + for src, tgt, rel_type, props in batch] + await neo4j_adapter.add_edges(edge_tuples) + + start = time.perf_counter() + for i in range(0, len(merge_edges), 100): + await age_adapter.add_edges(merge_edges[i:i+100]) + age_time_edge_batch_merge = time.perf_counter() - start + + start = time.perf_counter() + for i in range(0, len(merge_edges), 100): + batch = merge_edges[i:i+100] + def to_uuid(s): + try: + return UUID(s) if '-' in s else UUID(int=hash(s) & ((1 << 128) - 1)) + except: + return UUID(int=hash(s) & ((1 << 128) - 1)) + edge_tuples = [(to_uuid(src), to_uuid(tgt), rel_type, props) + for src, tgt, rel_type, props in batch] + await neo4j_adapter.add_edges(edge_tuples) + neo4j_time_edge_batch_merge = time.perf_counter() - start + + print(f"Edge Ingestion Batch (Merge - {half_edges} existing, {len(new_edge_ids)} new): AGE={age_time_edge_batch_merge:.4f}s, Neo4j={neo4j_time_edge_batch_merge:.4f}s") + await age_adapter.close() await neo4j_adapter.driver.close()