diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 429aff96..1b35697b 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -231,6 +231,7 @@ class Neo4JStorage(BaseGraphStorage): ): """Create a full-text index on the entity_id property with Chinese tokenizer support.""" index_name = f"entity_id_fulltext_idx_{workspace_label}" + legacy_index_name = "entity_id_fulltext_idx" try: async with driver.session(database=database) as session: # Check if the full-text index exists and get its configuration @@ -240,11 +241,34 @@ class Neo4JStorage(BaseGraphStorage): await result.consume() existing_index = None + legacy_index = None for idx in indexes: if idx["name"] == index_name: existing_index = idx + elif idx["name"] == legacy_index_name: + legacy_index = idx + # Break early if we found both indexes + if existing_index and legacy_index: break + # Handle legacy index migration + if legacy_index and not existing_index: + logger.info( + f"[{self.workspace}] Found legacy index '{legacy_index_name}'. Migrating to '{index_name}'." + ) + try: + # Drop the legacy index + drop_query = f"DROP INDEX {legacy_index_name}" + result = await session.run(drop_query) + await result.consume() + logger.info( + f"[{self.workspace}] Dropped legacy index '{legacy_index_name}'" + ) + except Exception as drop_error: + logger.warning( + f"[{self.workspace}] Failed to drop legacy index: {str(drop_error)}" + ) + # Check if index exists and is online if existing_index: index_state = existing_index.get("state", "UNKNOWN") @@ -1641,7 +1665,7 @@ class Neo4JStorage(BaseGraphStorage): query_lower = query_strip.lower() is_chinese = self._is_chinese_text(query_strip) - index_name = "entity_id_fulltext_idx" + index_name = f"entity_id_fulltext_idx_{workspace_label}" # Attempt to use the full-text index first try: diff --git a/tests/test_neo4j_fulltext_index.py b/tests/test_neo4j_fulltext_index.py new file mode 100644 index 00000000..3b10dd47 --- /dev/null +++ b/tests/test_neo4j_fulltext_index.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python +""" +Test Neo4j full-text index functionality, specifically: +1. Workspace-specific index naming +2. Legacy index migration +3. search_labels functionality with workspace-specific indexes +""" + +import asyncio +import os +import sys +import pytest +import numpy as np + +# Add the project root directory to the Python path +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from lightrag.kg.shared_storage import initialize_share_data + + +# Mock embedding function that returns random vectors +async def mock_embedding_func(texts): + return np.random.rand(len(texts), 10) # Return 10-dimensional random vectors + + +@pytest.fixture +async def neo4j_storage(): + """ + Initialize Neo4j storage for testing. + Requires Neo4j to be running and configured via environment variables. + """ + # Check if Neo4j is configured + if not os.getenv("NEO4J_URI"): + pytest.skip("Neo4j not configured (NEO4J_URI not set)") + + from lightrag.kg.neo4j_impl import Neo4JStorage + + # Initialize shared_storage for locks + initialize_share_data() + + global_config = { + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5}, + "working_dir": os.environ.get("WORKING_DIR", "./rag_storage"), + } + + storage = Neo4JStorage( + namespace="test_fulltext_index", + workspace="test_workspace", + global_config=global_config, + embedding_func=mock_embedding_func, + ) + + # Initialize the connection + await storage.initialize() + + # Clean up any existing data + await storage.drop() + + yield storage + + # Cleanup + await storage.drop() + await storage.finalize() + + +@pytest.mark.integration +@pytest.mark.requires_db +async def test_fulltext_index_creation(neo4j_storage): + """ + Test that the full-text index is created with the workspace-specific name. + """ + storage = neo4j_storage + workspace_label = storage._get_workspace_label() + expected_index_name = f"entity_id_fulltext_idx_{workspace_label}" + + # Query Neo4j to check if the index exists + async with storage._driver.session(database=storage._DATABASE) as session: + result = await session.run("SHOW FULLTEXT INDEXES") + indexes = await result.data() + await result.consume() + + # Check if the workspace-specific index exists + index_names = [idx["name"] for idx in indexes] + assert ( + expected_index_name in index_names + ), f"Expected index '{expected_index_name}' not found. Found indexes: {index_names}" + + # Check if the legacy index doesn't exist (should be migrated if it was there) + legacy_index_name = "entity_id_fulltext_idx" + if legacy_index_name in index_names: + # If legacy index exists, it should be for a different workspace + # or it means migration didn't happen + print( + f"Warning: Legacy index '{legacy_index_name}' still exists alongside '{expected_index_name}'" + ) + + +@pytest.mark.integration +@pytest.mark.requires_db +async def test_search_labels_with_workspace_index(neo4j_storage): + """ + Test that search_labels uses the workspace-specific index and returns results. + """ + storage = neo4j_storage + + # Insert test nodes + test_nodes = [ + { + "entity_id": "Artificial Intelligence", + "description": "AI field", + "keywords": "AI,ML,DL", + "entity_type": "Technology", + }, + { + "entity_id": "Machine Learning", + "description": "ML subfield", + "keywords": "supervised,unsupervised", + "entity_type": "Technology", + }, + { + "entity_id": "Deep Learning", + "description": "DL subfield", + "keywords": "neural networks", + "entity_type": "Technology", + }, + { + "entity_id": "Natural Language Processing", + "description": "NLP field", + "keywords": "text,language", + "entity_type": "Technology", + }, + ] + + for node_data in test_nodes: + await storage.upsert_node(node_data["entity_id"], node_data) + + # Give the index time to become consistent (eventually consistent index) + await asyncio.sleep(2) + + # Test search_labels + results = await storage.search_labels("Learning", limit=10) + + # Should find nodes with "Learning" in them + assert len(results) > 0, "search_labels should return results for 'Learning'" + assert any( + "Learning" in result for result in results + ), "Results should contain 'Learning'" + + # Test case-insensitive search + results_lower = await storage.search_labels("learning", limit=10) + assert len(results_lower) > 0, "search_labels should be case-insensitive" + + # Test partial match + results_partial = await storage.search_labels("Intelli", limit=10) + assert ( + len(results_partial) > 0 + ), "search_labels should support partial matching with wildcard" + assert any( + "Intelligence" in result for result in results_partial + ), "Should find 'Artificial Intelligence'" + + +@pytest.mark.integration +@pytest.mark.requires_db +async def test_search_labels_chinese_text(neo4j_storage): + """ + Test that search_labels works with Chinese text using the CJK analyzer. + """ + storage = neo4j_storage + + # Insert Chinese test nodes + chinese_nodes = [ + { + "entity_id": "人工智能", + "description": "人工智能领域", + "keywords": "AI,机器学习", + "entity_type": "技术", + }, + { + "entity_id": "机器学习", + "description": "机器学习子领域", + "keywords": "监督学习,无监督学习", + "entity_type": "技术", + }, + { + "entity_id": "深度学习", + "description": "深度学习子领域", + "keywords": "神经网络", + "entity_type": "技术", + }, + ] + + for node_data in chinese_nodes: + await storage.upsert_node(node_data["entity_id"], node_data) + + # Give the index time to become consistent + await asyncio.sleep(2) + + # Test Chinese text search + results = await storage.search_labels("学习", limit=10) + + # Should find nodes with "学习" in them + assert len(results) > 0, "search_labels should return results for Chinese text" + assert any( + "学习" in result for result in results + ), "Results should contain Chinese characters '学习'" + + +@pytest.mark.integration +@pytest.mark.requires_db +async def test_search_labels_fallback_to_contains(neo4j_storage): + """ + Test that search_labels falls back to CONTAINS search if the index fails. + This can happen with older Neo4j versions or if the index is not yet available. + """ + storage = neo4j_storage + + # Insert test nodes + test_nodes = [ + { + "entity_id": "Test Node Alpha", + "description": "Test node", + "keywords": "test", + "entity_type": "Test", + }, + { + "entity_id": "Test Node Beta", + "description": "Test node", + "keywords": "test", + "entity_type": "Test", + }, + ] + + for node_data in test_nodes: + await storage.upsert_node(node_data["entity_id"], node_data) + + # Even if the full-text index is not available, CONTAINS should work + results = await storage.search_labels("Alpha", limit=10) + + # Should find the node using fallback CONTAINS search + assert len(results) > 0, "Fallback CONTAINS search should return results" + assert "Test Node Alpha" in results, "Should find 'Test Node Alpha'" + + +@pytest.mark.integration +@pytest.mark.requires_db +async def test_multiple_workspaces_have_separate_indexes(neo4j_storage): + """ + Test that different workspaces have their own separate indexes. + """ + from lightrag.kg.neo4j_impl import Neo4JStorage + + # Create storage for workspace 1 + storage1 = neo4j_storage + + # Create storage for workspace 2 + global_config = { + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.5}, + "working_dir": os.environ.get("WORKING_DIR", "./rag_storage"), + } + + storage2 = Neo4JStorage( + namespace="test_fulltext_index", + workspace="test_workspace_2", + global_config=global_config, + embedding_func=mock_embedding_func, + ) + + await storage2.initialize() + await storage2.drop() + + try: + # Check that both workspaces have their own indexes + async with storage1._driver.session(database=storage1._DATABASE) as session: + result = await session.run("SHOW FULLTEXT INDEXES") + indexes = await result.data() + await result.consume() + + index_names = [idx["name"] for idx in indexes] + workspace1_index = f"entity_id_fulltext_idx_{storage1._get_workspace_label()}" + workspace2_index = f"entity_id_fulltext_idx_{storage2._get_workspace_label()}" + + assert ( + workspace1_index in index_names + ), f"Workspace 1 index '{workspace1_index}' should exist" + assert ( + workspace2_index in index_names + ), f"Workspace 2 index '{workspace2_index}' should exist" + + finally: + await storage2.drop() + await storage2.finalize() + + +if __name__ == "__main__": + # Run tests with pytest + pytest.main([__file__, "-v", "--run-integration"])