Add: Graph exploration tools for general-purpose PKM

Implemented two new MCP tools and enhanced workflow instructions to enable
effective Personal Knowledge Management across all use cases (architecture
decisions, projects, coaching, research, etc.).

## New Tools

1. **get_entity_connections** - Direct graph traversal showing ALL relationships
   - Returns complete connection data for an entity
   - Guarantees completeness vs semantic search
   - Essential for pattern detection and exploration
   - Leverages EntityEdge.get_by_node_uuid()

2. **get_entity_timeline** - Chronological episode history
   - Shows ALL episodes mentioning an entity
   - Enables temporal tracking and evolution analysis
   - Critical for understanding how concepts evolved
   - Leverages EpisodicNode.get_by_entity_node_uuid()

## Enhanced Workflow Instructions

Updated GRAPHITI_MCP_INSTRUCTIONS with:
- Clear "SEARCH FIRST, THEN ADD" workflow with decision flowcharts
- Tool selection guide (when to use each tool)
- Distinction between graph traversal vs semantic search
- Multiple concrete examples across different domains
- Key principles for effective PKM usage

## Updated add_memory Docstring

Added prominent warning to search before adding:
- Step-by-step workflow guidance
- Emphasizes creating connections vs isolated nodes
- References new exploration tools

## Benefits

- Prevents disconnected/duplicate entities
- Enables reliable pattern recognition with complete data
- Cost-effective (single graph query vs multiple semantic searches)
- Temporal tracking for evolution analysis
- Works equally well for technical and personal knowledge

## Implementation Details

- 0 changes to graphiti_core (uses existing features only)
- All new code in mcp_server/src/graphiti_mcp_server.py
- Backward compatible (adds tools, doesn't modify existing)
- Follows existing MCP tool patterns and conventions
- Passes all lint and syntax checks

Related: DOCS/IMPLEMENTATION-Graph-Exploration-Tools.md
This commit is contained in:
Claude 2025-11-15 09:51:11 +00:00
parent 13af424b35
commit 21c0eae78f
No known key found for this signature in database
7 changed files with 548 additions and 154 deletions

View file

@ -1,62 +1,72 @@
#!/usr/bin/env python3
"""Check what's in the source database."""
from neo4j import GraphDatabase
import os
NEO4J_URI = "bolt://192.168.1.25:7687"
NEO4J_USER = "neo4j"
from neo4j import GraphDatabase
NEO4J_URI = 'bolt://192.168.1.25:7687'
NEO4J_USER = 'neo4j'
NEO4J_PASSWORD = '!"MiTa1205'
SOURCE_DATABASE = "neo4j"
SOURCE_GROUP_ID = "lvarming73"
SOURCE_DATABASE = 'neo4j'
SOURCE_GROUP_ID = 'lvarming73'
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
print("=" * 70)
print("Checking Source Database")
print("=" * 70)
print('=' * 70)
print('Checking Source Database')
print('=' * 70)
with driver.session(database=SOURCE_DATABASE) as session:
# Check total nodes
result = session.run("""
result = session.run(
"""
MATCH (n {group_id: $group_id})
RETURN count(n) as total
""", group_id=SOURCE_GROUP_ID)
""",
group_id=SOURCE_GROUP_ID,
)
total = result.single()['total']
print(f"\n✓ Total nodes with group_id '{SOURCE_GROUP_ID}': {total}")
# Check date range
result = session.run("""
result = session.run(
"""
MATCH (n:Episodic {group_id: $group_id})
WHERE n.created_at IS NOT NULL
RETURN
min(n.created_at) as earliest,
max(n.created_at) as latest,
count(n) as total
""", group_id=SOURCE_GROUP_ID)
""",
group_id=SOURCE_GROUP_ID,
)
dates = result.single()
if dates and dates['total'] > 0:
print(f"\n✓ Episodic date range:")
print(f" Earliest: {dates['earliest']}")
print(f" Latest: {dates['latest']}")
print(f" Total episodes: {dates['total']}")
print(f'\n✓ Episodic date range:')
print(f' Earliest: {dates["earliest"]}')
print(f' Latest: {dates["latest"]}')
print(f' Total episodes: {dates["total"]}')
else:
print("\n⚠️ No episodic nodes with dates found")
print('\n⚠️ No episodic nodes with dates found')
# Sample episodic nodes by date
result = session.run("""
result = session.run(
"""
MATCH (n:Episodic {group_id: $group_id})
RETURN n.name as name, n.created_at as created_at
ORDER BY n.created_at
LIMIT 10
""", group_id=SOURCE_GROUP_ID)
""",
group_id=SOURCE_GROUP_ID,
)
print(f"\n✓ Oldest episodic nodes:")
print(f'\n✓ Oldest episodic nodes:')
for record in result:
print(f" - {record['name']}: {record['created_at']}")
print(f' - {record["name"]}: {record["created_at"]}')
# Check for other group_ids in neo4j database
result = session.run("""
@ -68,7 +78,7 @@ with driver.session(database=SOURCE_DATABASE) as session:
print(f"\n✓ All group_ids in '{SOURCE_DATABASE}' database:")
for record in result:
print(f" {record['group_id']}: {record['count']} nodes")
print(f' {record["group_id"]}: {record["count"]} nodes')
driver.close()
print("\n" + "=" * 70)
print('\n' + '=' * 70)

View file

@ -115,33 +115,148 @@ config: GraphitiConfig
# MCP server instructions
GRAPHITI_MCP_INSTRUCTIONS = """
Graphiti is a memory service for AI agents built on a knowledge graph. Graphiti performs well
with dynamic data such as user interactions, changing enterprise data, and external information.
Graphiti is a memory service for AI agents built on a knowledge graph. It transforms information
into a richly connected knowledge network of entities and relationships.
Graphiti transforms information into a richly connected knowledge network, allowing you to
capture relationships between concepts, entities, and information. The system organizes data as episodes
(content snippets), nodes (entities), and facts (relationships between entities), creating a dynamic,
queryable memory store that evolves with new information. Graphiti supports multiple data formats, including
structured JSON data, enabling seamless integration with existing data pipelines and systems.
The system organizes data as:
- **Episodes**: Content snippets (conversations, notes, documents)
- **Nodes**: Entities (people, projects, concepts, decisions, anything)
- **Facts**: Relationships between entities with temporal metadata
Facts contain temporal metadata, allowing you to track the time of creation and whether a fact is invalid
(superseded by new information).
## Core Workflow: SEARCH FIRST, THEN ADD
Key capabilities:
1. Add episodes (text, messages, or JSON) to the knowledge graph with the add_memory tool
2. Search for nodes (entities) in the graph using natural language queries with search_nodes
3. Find relevant facts (relationships between entities) with search_facts
4. Retrieve specific entity edges or episodes by UUID
5. Manage the knowledge graph with tools like delete_episode, delete_entity_edge, and clear_graph
**Always explore existing knowledge before adding new information.**
The server connects to a database for persistent storage and uses language models for certain operations.
Each piece of information is organized by group_id, allowing you to maintain separate knowledge domains.
### WHEN ADDING INFORMATION:
When adding information, provide descriptive names and detailed content to improve search quality.
When searching, use specific queries and consider filtering by group_id for more relevant results.
```
User provides information
1. search_nodes(extract key entities/concepts)
2. IF entities found:
get_entity_connections(entity_uuid) - See what's already linked
Optional: get_entity_timeline(entity_uuid) - Understand history
3. add_memory(episode_body with explicit references to found entities)
Result: New episode automatically connects to existing knowledge
```
For optimal performance, ensure the database is properly configured and accessible, and valid
API keys are provided for any language model operations.
### WHEN RETRIEVING INFORMATION:
```
User asks question
1. search_nodes(extract keywords from question)
2. For relevant entities:
get_entity_connections(uuid) - Explore neighborhood
get_entity_timeline(uuid) - See evolution/history
3. Optional: search_memory_facts(semantic query) - Find specific relationships
Result: Comprehensive answer from complete graph context
```
## Tool Selection Guide
**Finding entities:**
- `search_nodes` - Semantic search for entities by keywords/description
**Exploring connections:**
- `get_entity_connections` - **ALL** relationships for an entity (complete, direct graph traversal)
- `search_memory_facts` - Semantic search for relationships (query-driven, may miss some)
**Understanding history:**
- `get_entity_timeline` - **ALL** episodes mentioning an entity (chronological, complete)
**Adding information:**
- `add_memory` - Store new episodes (AFTER searching existing knowledge)
**Retrieval vs Graph Traversal:**
- Use `get_entity_connections` when you need COMPLETE data (pattern detection, exploration)
- Use `search_memory_facts` when you have a specific semantic query
## Examples
### Example 1: Adding Technical Decision
```python
# ❌ BAD: Creates disconnected node
add_memory(
name="Database choice",
episode_body="Chose PostgreSQL for new service"
)
# ✅ GOOD: Connects to existing knowledge
nodes = search_nodes(query="database architecture microservices")
# Found: MySQL (main db), Redis (cache), microservices pattern
connections = get_entity_connections(entity_uuid=nodes[0]['uuid'])
# Sees: MySQL connects to user-service, payment-service
add_memory(
name="Database choice",
episode_body="Chose PostgreSQL for new notification-service. Different from
existing MySQL used by user-service and payment-service because
we need better JSON support for notification templates."
)
# Result: Rich connections created automatically
```
### Example 2: Exploring Project Context
```python
# User asks: "What database considerations do we have?"
nodes = search_nodes(query="database")
# Returns: PostgreSQL, MySQL, Redis entities
for node in nodes:
connections = get_entity_connections(entity_uuid=node['uuid'])
# Shows ALL services, constraints, decisions connected to each database
timeline = get_entity_timeline(entity_uuid=node['uuid'])
# Shows when each database was discussed, decisions made over time
# Synthesize comprehensive answer from COMPLETE data
```
### Example 3: Pattern Recognition
```python
# User: "I'm feeling stressed today"
nodes = search_nodes(query="stress")
connections = get_entity_connections(entity_uuid=nodes[0]['uuid'])
# Discovers: stress ↔ work, sleep, project-deadline, coffee-intake
timeline = get_entity_timeline(entity_uuid=nodes[0]['uuid'])
# Shows: First mentioned 3 months ago, frequency increasing
# Can now make informed observations based on complete data
add_memory(
name="Stress discussion",
episode_body="Discussed stress today. User recognizes connection to
project deadlines and sleep quality from our past conversations."
)
```
## Key Principles
1. **Always search before adding** - Even for trivial data
2. **Reference found entities by name** - Creates automatic connections
3. **Use complete data for patterns** - Graph traversal, not semantic guessing
4. **Track evolution over time** - Timeline shows how understanding changed
## Technical Notes
- All tools respect `group_id` filtering for namespace isolation
- Temporal metadata tracks both creation time and domain time
- Facts can be invalidated (superseded) without deletion
- Processing is asynchronous - episodes queued for background processing
"""
# MCP server instance
@ -384,14 +499,21 @@ async def add_memory(
) -> SuccessResponse | ErrorResponse:
"""Add information to memory. **This is the PRIMARY method for storing information.**
**PRIORITY: Use this tool FIRST when storing any information.**
** IMPORTANT: SEARCH FIRST, THEN ADD**
Before using this tool, always:
1. Search for related entities: search_nodes(query="relevant keywords")
2. Explore their connections: get_entity_connections(entity_uuid=found_uuid)
3. Then add with context: Reference found entities by name in episode_body
This creates rich automatic connections instead of isolated nodes.
Processes content asynchronously, automatically extracting entities, relationships,
and deduplicating similar information. Returns immediately while processing continues
in background.
WHEN TO USE THIS TOOL:
- Storing information add_memory (this tool) **USE THIS FIRST**
- Storing information add_memory (this tool) **USE AFTER SEARCHING**
- Searching information use search_nodes or search_memory_facts
- Deleting information use delete_episode or delete_entity_edge
@ -1426,6 +1548,215 @@ async def clear_graph(
return ErrorResponse(error=f'Error clearing graph: {error_msg}')
@mcp.tool(
annotations={
'title': 'Get Entity Connections',
'readOnlyHint': True,
'destructiveHint': False,
'idempotentHint': True,
'openWorldHint': True,
},
)
async def get_entity_connections(
entity_uuid: str,
group_ids: list[str] | None = None,
max_connections: int = 50,
) -> FactSearchResponse | ErrorResponse:
"""Get ALL relationships connected to a specific entity. **Complete graph traversal.**
**Use this to explore what's already known about an entity before adding new information.**
Unlike search_memory_facts which requires a semantic query, this performs direct graph
traversal to return EVERY relationship where the entity is involved. Guarantees completeness.
WHEN TO USE THIS TOOL:
- Exploring entity neighborhood get_entity_connections **USE THIS**
- Before adding related info get_entity_connections **USE THIS**
- Pattern detection (need complete data) get_entity_connections **USE THIS**
- Understanding full context without formulating queries
WHEN NOT to use:
- Specific semantic query use search_memory_facts instead
- Finding entities use search_nodes instead
Use Cases:
- "Show everything connected to PostgreSQL decision"
- "What's linked to the authentication service?"
- "All relationships for Django project"
- Before adding: "Let me see what's already known about X"
Args:
entity_uuid: UUID of entity to explore (from search_nodes or previous results)
group_ids: Optional list of namespaces to filter connections
max_connections: Maximum relationships to return (default: 50)
Returns:
FactSearchResponse with all connected relationships and temporal metadata
Examples:
# After finding entity
nodes = search_nodes(query="Django project")
django_uuid = nodes[0]['uuid']
# Get all connections
connections = get_entity_connections(entity_uuid=django_uuid)
# Limited results for specific namespace
connections = get_entity_connections(
entity_uuid=django_uuid,
group_ids=["work"],
max_connections=20
)
"""
global graphiti_service
if graphiti_service is None:
return ErrorResponse(error='Graphiti service not initialized')
try:
client = await graphiti_service.get_client()
# Use existing EntityEdge.get_by_node_uuid() method
edges = await EntityEdge.get_by_node_uuid(client.driver, entity_uuid)
# Filter by group_ids if provided
if group_ids:
edges = [e for e in edges if e.group_id in group_ids]
# Limit results
edges = edges[:max_connections]
if not edges:
return FactSearchResponse(
message=f'No connections found for entity {entity_uuid}', facts=[]
)
# Format using existing formatter
facts = [format_fact_result(edge) for edge in edges]
return FactSearchResponse(
message=f'Found {len(facts)} connection(s) for entity', facts=facts
)
except Exception as e:
error_msg = str(e)
logger.error(f'Error getting entity connections: {error_msg}')
return ErrorResponse(error=f'Error getting entity connections: {error_msg}')
@mcp.tool(
annotations={
'title': 'Get Entity Timeline',
'readOnlyHint': True,
'destructiveHint': False,
'idempotentHint': True,
'openWorldHint': True,
},
)
async def get_entity_timeline(
entity_uuid: str,
group_ids: list[str] | None = None,
max_episodes: int = 20,
) -> EpisodeSearchResponse | ErrorResponse:
"""Get chronological episode history for an entity. **Shows evolution over time.**
**Use this to understand how an entity was discussed across conversations.**
Returns ALL episodes where this entity was mentioned, in chronological order.
Shows the full conversational history and temporal evolution of concepts, decisions,
or any tracked entity.
WHEN TO USE THIS TOOL:
- Understanding entity history get_entity_timeline **USE THIS**
- "When did we discuss X?" get_entity_timeline **USE THIS**
- Tracking evolution over time get_entity_timeline **USE THIS**
- Seeing context from original sources
WHEN NOT to use:
- Semantic search across all episodes use search_episodes instead
- Finding entities use search_nodes instead
Use Cases:
- "When did we first discuss microservices architecture?"
- "Show all mentions of the deployment pipeline"
- "Timeline of stress mentions"
- "How did our understanding of GraphQL evolve?"
Args:
entity_uuid: UUID of entity (from search_nodes or previous results)
group_ids: Optional list of namespaces to filter episodes
max_episodes: Maximum episodes to return (default: 20, chronological)
Returns:
EpisodeSearchResponse with episodes ordered chronologically
Examples:
# After finding entity
nodes = search_nodes(query="microservices")
arch_uuid = nodes[0]['uuid']
# Get conversation history
timeline = get_entity_timeline(entity_uuid=arch_uuid)
# Recent episodes only for specific namespace
timeline = get_entity_timeline(
entity_uuid=arch_uuid,
group_ids=["architecture"],
max_episodes=10
)
"""
global graphiti_service
if graphiti_service is None:
return ErrorResponse(error='Graphiti service not initialized')
try:
client = await graphiti_service.get_client()
# Use existing EpisodicNode.get_by_entity_node_uuid() method
episodes = await EpisodicNode.get_by_entity_node_uuid(client.driver, entity_uuid)
# Filter by group_ids if provided
if group_ids:
episodes = [e for e in episodes if e.group_id in group_ids]
# Sort by valid_at (chronological order)
episodes.sort(key=lambda e: e.valid_at)
# Limit results
episodes = episodes[:max_episodes]
if not episodes:
return EpisodeSearchResponse(
message=f'No episodes found mentioning entity {entity_uuid}', episodes=[]
)
# Format episodes
episode_results = []
for ep in episodes:
episode_results.append(
{
'uuid': ep.uuid,
'name': ep.name,
'content': ep.content,
'valid_at': ep.valid_at.isoformat() if ep.valid_at else None,
'created_at': ep.created_at.isoformat() if ep.created_at else None,
'source': ep.source.value if ep.source else None,
'group_id': ep.group_id,
}
)
return EpisodeSearchResponse(
message=f'Found {len(episode_results)} episode(s) mentioning entity',
episodes=episode_results,
)
except Exception as e:
error_msg = str(e)
logger.error(f'Error getting entity timeline: {error_msg}')
return ErrorResponse(error=f'Error getting entity timeline: {error_msg}')
@mcp.tool(
annotations={
'title': 'Get Server Status',

View file

@ -1,6 +1,7 @@
"""Response type definitions for Graphiti MCP Server."""
from typing import Any
from typing_extensions import TypedDict

View file

@ -8,14 +8,15 @@ from pathlib import Path
# Setup path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
def test_neo4j_database_parameter():
"""Test that Neo4j database parameter is included in configuration."""
from src.config.schema import GraphitiConfig
from src.services.factories import DatabaseDriverFactory
print('\n' + '='*70)
print('\n' + '=' * 70)
print('Testing Neo4j Database Parameter Configuration')
print('='*70 + '\n')
print('=' * 70 + '\n')
# Test 1: Default database value
print('Test 1: Default database value')
@ -25,7 +26,9 @@ def test_neo4j_database_parameter():
assert 'database' in db_config, 'Database parameter missing from config!'
print(f' ✓ Database parameter present in config')
print(f' ✓ Default database value: {db_config["database"]}')
assert db_config['database'] == 'neo4j', f'Expected default "neo4j", got {db_config["database"]}'
assert db_config['database'] == 'neo4j', (
f'Expected default "neo4j", got {db_config["database"]}'
)
print(f' ✓ Default value matches expected: neo4j\n')
# Test 2: Environment variable override
@ -37,7 +40,9 @@ def test_neo4j_database_parameter():
assert 'database' in db_config2, 'Database parameter missing from config!'
print(f' ✓ Database parameter present in config')
print(f' ✓ Overridden database value: {db_config2["database"]}')
assert db_config2['database'] == 'graphiti', f'Expected "graphiti", got {db_config2["database"]}'
assert db_config2['database'] == 'graphiti', (
f'Expected "graphiti", got {db_config2["database"]}'
)
print(f' ✓ Environment override works correctly\n')
# Clean up
@ -50,9 +55,9 @@ def test_neo4j_database_parameter():
assert param in db_config, f'Required parameter "{param}" missing!'
print(f'{param}: present')
print('\n' + '='*70)
print('\n' + '=' * 70)
print('✅ All database parameter tests passed!')
print('='*70)
print('=' * 70)
print('\nSummary:')
print(' - database parameter is included in Neo4j config')
print(' - Default value is "neo4j"')
@ -70,5 +75,6 @@ if __name__ == '__main__':
except Exception as e:
print(f'\n❌ Unexpected error: {e}\n')
import traceback
traceback.print_exc()
sys.exit(1)

View file

@ -10,20 +10,20 @@ This script migrates data from:
Target: graphiti database, group_id='6910959f2128b5c4faa22283'
"""
from neo4j import GraphDatabase
import os
from neo4j import GraphDatabase
# Configuration
NEO4J_URI = "bolt://192.168.1.25:7687"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = os.environ.get("NEO4J_PASSWORD", '!"MiTa1205')
NEO4J_URI = 'bolt://192.168.1.25:7687'
NEO4J_USER = 'neo4j'
NEO4J_PASSWORD = os.environ.get('NEO4J_PASSWORD', '!"MiTa1205')
SOURCE_DATABASE = "neo4j"
SOURCE_GROUP_ID = "lvarming73"
SOURCE_DATABASE = 'neo4j'
SOURCE_GROUP_ID = 'lvarming73'
TARGET_DATABASE = "graphiti"
TARGET_GROUP_ID = "6910959f2128b5c4faa22283"
TARGET_DATABASE = 'graphiti'
TARGET_GROUP_ID = '6910959f2128b5c4faa22283'
def migrate_data():
@ -33,41 +33,49 @@ def migrate_data():
try:
# Step 1: Export data from source database
print(f"\n📤 Exporting data from {SOURCE_DATABASE} database (group_id: {SOURCE_GROUP_ID})...")
print(
f'\n📤 Exporting data from {SOURCE_DATABASE} database (group_id: {SOURCE_GROUP_ID})...'
)
with driver.session(database=SOURCE_DATABASE) as session:
# Get all nodes with the source group_id
nodes_result = session.run("""
nodes_result = session.run(
"""
MATCH (n {group_id: $group_id})
RETURN
id(n) as old_id,
labels(n) as labels,
properties(n) as props
ORDER BY old_id
""", group_id=SOURCE_GROUP_ID)
""",
group_id=SOURCE_GROUP_ID,
)
nodes = list(nodes_result)
print(f" Found {len(nodes)} nodes to migrate")
print(f' Found {len(nodes)} nodes to migrate')
if len(nodes) == 0:
print(" ⚠️ No nodes found. Nothing to migrate.")
print(' ⚠️ No nodes found. Nothing to migrate.')
return
# Get all relationships between nodes with the source group_id
rels_result = session.run("""
rels_result = session.run(
"""
MATCH (n {group_id: $group_id})-[r]->(m {group_id: $group_id})
RETURN
id(startNode(r)) as from_id,
id(endNode(r)) as to_id,
type(r) as rel_type,
properties(r) as props
""", group_id=SOURCE_GROUP_ID)
""",
group_id=SOURCE_GROUP_ID,
)
relationships = list(rels_result)
print(f" Found {len(relationships)} relationships to migrate")
print(f' Found {len(relationships)} relationships to migrate')
# Step 2: Create ID mapping (old Neo4j internal ID -> new node UUID)
print(f"\n📥 Importing data to {TARGET_DATABASE} database (group_id: {TARGET_GROUP_ID})...")
print(f'\n📥 Importing data to {TARGET_DATABASE} database (group_id: {TARGET_GROUP_ID})...')
id_mapping = {}
@ -88,16 +96,19 @@ def migrate_data():
labels_str = ':'.join(labels)
# Create node
result = session.run(f"""
result = session.run(
f"""
CREATE (n:{labels_str})
SET n = $props
RETURN id(n) as new_id, n.uuid as uuid
""", props=props)
""",
props=props,
)
record = result.single()
id_mapping[old_id] = record['new_id']
print(f" ✅ Created {len(nodes)} nodes")
print(f' ✅ Created {len(nodes)} nodes')
# Create relationships
rel_count = 0
@ -116,74 +127,87 @@ def migrate_data():
to_new_id = id_mapping.get(to_old_id)
if from_new_id is None or to_new_id is None:
print(f" ⚠️ Skipping relationship: node mapping not found")
print(f' ⚠️ Skipping relationship: node mapping not found')
continue
# Create relationship
session.run(f"""
session.run(
f"""
MATCH (a), (b)
WHERE id(a) = $from_id AND id(b) = $to_id
CREATE (a)-[r:{rel_type}]->(b)
SET r = $props
""", from_id=from_new_id, to_id=to_new_id, props=props)
""",
from_id=from_new_id,
to_id=to_new_id,
props=props,
)
rel_count += 1
print(f" ✅ Created {rel_count} relationships")
print(f' ✅ Created {rel_count} relationships')
# Step 3: Verify migration
print(f"\n✅ Migration complete!")
print(f"\n📊 Verification:")
print(f'\n✅ Migration complete!')
print(f'\n📊 Verification:')
with driver.session(database=TARGET_DATABASE) as session:
# Count nodes in target
result = session.run("""
result = session.run(
"""
MATCH (n {group_id: $group_id})
RETURN count(n) as node_count
""", group_id=TARGET_GROUP_ID)
""",
group_id=TARGET_GROUP_ID,
)
target_count = result.single()['node_count']
print(f" Target database now has {target_count} nodes with group_id={TARGET_GROUP_ID}")
print(
f' Target database now has {target_count} nodes with group_id={TARGET_GROUP_ID}'
)
# Show node types
result = session.run("""
result = session.run(
"""
MATCH (n {group_id: $group_id})
RETURN labels(n) as labels, count(*) as count
ORDER BY count DESC
""", group_id=TARGET_GROUP_ID)
""",
group_id=TARGET_GROUP_ID,
)
print(f"\n Node types:")
print(f'\n Node types:')
for record in result:
labels = ':'.join(record['labels'])
count = record['count']
print(f" {labels}: {count}")
print(f' {labels}: {count}')
print(f"\n🎉 Done! Your data has been migrated successfully.")
print(f"\nNext steps:")
print(f"1. Verify the data in Neo4j Browser:")
print(f" :use graphiti")
print(f'\n🎉 Done! Your data has been migrated successfully.')
print(f'\nNext steps:')
print(f'1. Verify the data in Neo4j Browser:')
print(f' :use graphiti')
print(f" MATCH (n {{group_id: '{TARGET_GROUP_ID}'}}) RETURN n LIMIT 25")
print(f"2. Test in LibreChat to ensure everything works")
print(f"3. Once verified, you can delete the old data:")
print(f" :use neo4j")
print(f'2. Test in LibreChat to ensure everything works')
print(f'3. Once verified, you can delete the old data:')
print(f' :use neo4j')
print(f" MATCH (n {{group_id: '{SOURCE_GROUP_ID}'}}) DETACH DELETE n")
finally:
driver.close()
if __name__ == "__main__":
print("=" * 70)
print("Graphiti Data Migration Script")
print("=" * 70)
if __name__ == '__main__':
print('=' * 70)
print('Graphiti Data Migration Script')
print('=' * 70)
print(f"\nSource: {SOURCE_DATABASE} database, group_id='{SOURCE_GROUP_ID}'")
print(f"Target: {TARGET_DATABASE} database, group_id='{TARGET_GROUP_ID}'")
print(f"\nNeo4j URI: {NEO4J_URI}")
print("=" * 70)
print(f'\nNeo4j URI: {NEO4J_URI}')
print('=' * 70)
response = input("\n⚠️ Ready to migrate? This will copy all data. Type 'yes' to continue: ")
if response.lower() == 'yes':
migrate_data()
else:
print("\n❌ Migration cancelled.")
print('\n❌ Migration cancelled.')

4
uv.lock generated
View file

@ -782,8 +782,8 @@ wheels = [
]
[[package]]
name = "graphiti-core"
version = "0.23.0"
name = "graphiti-core-varming"
version = "0.23.2"
source = { editable = "." }
dependencies = [
{ name = "diskcache" },

View file

@ -1,101 +1,118 @@
#!/usr/bin/env python3
"""Verify migration data in Neo4j."""
from neo4j import GraphDatabase
import os
import json
import os
NEO4J_URI = "bolt://192.168.1.25:7687"
NEO4J_USER = "neo4j"
from neo4j import GraphDatabase
NEO4J_URI = 'bolt://192.168.1.25:7687'
NEO4J_USER = 'neo4j'
NEO4J_PASSWORD = '!"MiTa1205'
TARGET_DATABASE = "graphiti"
TARGET_GROUP_ID = "6910959f2128b5c4faa22283"
TARGET_DATABASE = 'graphiti'
TARGET_GROUP_ID = '6910959f2128b5c4faa22283'
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
print("=" * 70)
print("Verifying Migration Data")
print("=" * 70)
print('=' * 70)
print('Verifying Migration Data')
print('=' * 70)
with driver.session(database=TARGET_DATABASE) as session:
# Check total nodes
result = session.run("""
result = session.run(
"""
MATCH (n {group_id: $group_id})
RETURN count(n) as total
""", group_id=TARGET_GROUP_ID)
""",
group_id=TARGET_GROUP_ID,
)
total = result.single()['total']
print(f"\n✓ Total nodes with group_id '{TARGET_GROUP_ID}': {total}")
# Check node labels and properties
result = session.run("""
result = session.run(
"""
MATCH (n {group_id: $group_id})
RETURN DISTINCT labels(n) as labels, count(*) as count
ORDER BY count DESC
""", group_id=TARGET_GROUP_ID)
""",
group_id=TARGET_GROUP_ID,
)
print(f"\n✓ Node types:")
print(f'\n✓ Node types:')
for record in result:
labels = ':'.join(record['labels'])
count = record['count']
print(f" {labels}: {count}")
print(f' {labels}: {count}')
# Sample some episodic nodes
result = session.run("""
result = session.run(
"""
MATCH (n:Episodic {group_id: $group_id})
RETURN n.uuid as uuid, n.name as name, n.content as content, n.created_at as created_at
LIMIT 5
""", group_id=TARGET_GROUP_ID)
""",
group_id=TARGET_GROUP_ID,
)
print(f"\n✓ Sample Episodic nodes:")
print(f'\n✓ Sample Episodic nodes:')
episodes = list(result)
if episodes:
for record in episodes:
print(f" - {record['name']}")
print(f" UUID: {record['uuid']}")
print(f" Created: {record['created_at']}")
print(f" Content: {record['content'][:100] if record['content'] else 'None'}...")
print(f' - {record["name"]}')
print(f' UUID: {record["uuid"]}')
print(f' Created: {record["created_at"]}')
print(f' Content: {record["content"][:100] if record["content"] else "None"}...')
else:
print(" ⚠️ No episodic nodes found!")
print(' ⚠️ No episodic nodes found!')
# Sample some entity nodes
result = session.run("""
result = session.run(
"""
MATCH (n:Entity {group_id: $group_id})
RETURN n.uuid as uuid, n.name as name, labels(n) as labels, n.summary as summary
LIMIT 10
""", group_id=TARGET_GROUP_ID)
""",
group_id=TARGET_GROUP_ID,
)
print(f"\n✓ Sample Entity nodes:")
print(f'\n✓ Sample Entity nodes:')
entities = list(result)
if entities:
for record in entities:
labels = ':'.join(record['labels'])
print(f" - {record['name']} ({labels})")
print(f" UUID: {record['uuid']}")
print(f' - {record["name"]} ({labels})')
print(f' UUID: {record["uuid"]}')
if record['summary']:
print(f" Summary: {record['summary'][:80]}...")
print(f' Summary: {record["summary"][:80]}...')
else:
print(" ⚠️ No entity nodes found!")
print(' ⚠️ No entity nodes found!')
# Check relationships
result = session.run("""
result = session.run(
"""
MATCH (n {group_id: $group_id})-[r]->(m {group_id: $group_id})
RETURN type(r) as rel_type, count(*) as count
ORDER BY count DESC
LIMIT 10
""", group_id=TARGET_GROUP_ID)
""",
group_id=TARGET_GROUP_ID,
)
print(f"\n✓ Relationship types:")
print(f'\n✓ Relationship types:')
rels = list(result)
if rels:
for record in rels:
print(f" {record['rel_type']}: {record['count']}")
print(f' {record["rel_type"]}: {record["count"]}')
else:
print(" ⚠️ No relationships found!")
print(' ⚠️ No relationships found!')
# Check if nodes have required properties
result = session.run("""
result = session.run(
"""
MATCH (n:Episodic {group_id: $group_id})
RETURN
count(n) as total,
@ -104,19 +121,22 @@ with driver.session(database=TARGET_DATABASE) as session:
count(n.content) as has_content,
count(n.created_at) as has_created_at,
count(n.valid_at) as has_valid_at
""", group_id=TARGET_GROUP_ID)
""",
group_id=TARGET_GROUP_ID,
)
props = result.single()
print(f"\n✓ Episodic node properties:")
print(f" Total: {props['total']}")
print(f" Has uuid: {props['has_uuid']}")
print(f" Has name: {props['has_name']}")
print(f" Has content: {props['has_content']}")
print(f" Has created_at: {props['has_created_at']}")
print(f" Has valid_at: {props['has_valid_at']}")
print(f'\n✓ Episodic node properties:')
print(f' Total: {props["total"]}')
print(f' Has uuid: {props["has_uuid"]}')
print(f' Has name: {props["has_name"]}')
print(f' Has content: {props["has_content"]}')
print(f' Has created_at: {props["has_created_at"]}')
print(f' Has valid_at: {props["has_valid_at"]}')
# Check Entity properties
result = session.run("""
result = session.run(
"""
MATCH (n:Entity {group_id: $group_id})
RETURN
count(n) as total,
@ -124,15 +144,17 @@ with driver.session(database=TARGET_DATABASE) as session:
count(n.name) as has_name,
count(n.summary) as has_summary,
count(n.created_at) as has_created_at
""", group_id=TARGET_GROUP_ID)
""",
group_id=TARGET_GROUP_ID,
)
props = result.single()
print(f"\n✓ Entity node properties:")
print(f" Total: {props['total']}")
print(f" Has uuid: {props['has_uuid']}")
print(f" Has name: {props['has_name']}")
print(f" Has summary: {props['has_summary']}")
print(f" Has created_at: {props['has_created_at']}")
print(f'\n✓ Entity node properties:')
print(f' Total: {props["total"]}')
print(f' Has uuid: {props["has_uuid"]}')
print(f' Has name: {props["has_name"]}')
print(f' Has summary: {props["has_summary"]}')
print(f' Has created_at: {props["has_created_at"]}')
driver.close()
print("\n" + "=" * 70)
print('\n' + '=' * 70)