diff --git a/env.example b/env.example index f7db6b05..ce666600 100644 --- a/env.example +++ b/env.example @@ -313,7 +313,7 @@ POSTGRES_IVFFLAT_LISTS=100 NEO4J_URI=neo4j+s://xxxxxxxx.databases.neo4j.io NEO4J_USERNAME=neo4j NEO4J_PASSWORD='your_password' -NEO4J_DATABASE=noe4j +NEO4J_DATABASE=neo4j NEO4J_MAX_CONNECTION_POOL_SIZE=100 NEO4J_CONNECTION_TIMEOUT=30 NEO4J_CONNECTION_ACQUISITION_TIMEOUT=30 diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 2a32307a..e034c50b 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -201,44 +201,72 @@ class Neo4JStorage(BaseGraphStorage): raise e if connected: - # Create index for workspace nodes on entity_id if it doesn't exist workspace_label = self._get_workspace_label() + # Create B-Tree index for entity_id for faster lookups try: async with self._driver.session(database=database) as session: - # Check if index exists first - check_query = f""" - CALL db.indexes() YIELD name, labelsOrTypes, properties - WHERE labelsOrTypes = ['{workspace_label}'] AND properties = ['entity_id'] - RETURN count(*) > 0 AS exists - """ - try: - check_result = await session.run(check_query) - record = await check_result.single() - await check_result.consume() - - index_exists = record and record.get("exists", False) - - if not index_exists: - # Create index only if it doesn't exist - result = await session.run( - f"CREATE INDEX FOR (n:`{workspace_label}`) ON (n.entity_id)" - ) - await result.consume() - logger.info( - f"[{self.workspace}] Created index for {workspace_label} nodes on entity_id in {database}" - ) - except Exception: - # Fallback if db.indexes() is not supported in this Neo4j version - result = await session.run( - f"CREATE INDEX IF NOT EXISTS FOR (n:`{workspace_label}`) ON (n.entity_id)" - ) - await result.consume() + await session.run( + f"CREATE INDEX IF NOT EXISTS FOR (n:`{workspace_label}`) ON (n.entity_id)" + ) + logger.info( + f"[{self.workspace}] Ensured B-Tree index on entity_id for {workspace_label} in {database}" + ) except Exception as e: logger.warning( - f"[{self.workspace}] Failed to create index: {str(e)}" + f"[{self.workspace}] Failed to create B-Tree index: {str(e)}" ) + + # Create full-text index for entity_id for faster text searches + await self._create_fulltext_index( + self._driver, self._DATABASE, workspace_label + ) break + async def _create_fulltext_index( + self, driver: AsyncDriver, database: str, workspace_label: str + ): + """Create a full-text index on the entity_id property if it doesn't exist.""" + index_name = "entity_id_fulltext_idx" + try: + async with driver.session(database=database) as session: + # Check if the full-text index exists + check_index_query = "SHOW FULLTEXT INDEXES" + result = await session.run(check_index_query) + indexes = await result.data() + await result.consume() + + index_exists = any(idx["name"] == index_name for idx in indexes) + + if not index_exists: + logger.info( + f"[{self.workspace}] Full-text index '{index_name}' not found. Creating it now." + ) + # Create the full-text index + create_index_query = f""" + CREATE FULLTEXT INDEX {index_name} FOR (n:`{workspace_label}`) ON EACH [n.entity_id] + """ + result = await session.run(create_index_query) + await result.consume() + logger.info( + f"[{self.workspace}] Successfully created full-text index '{index_name}'." + ) + else: + logger.debug( + f"[{self.workspace}] Full-text index '{index_name}' already exists." + ) + except Exception as e: + # Handle cases where the command might not be supported (e.g., community edition before 5.x) + if "Unknown command" in str(e) or "invalid syntax" in str(e).lower(): + logger.warning( + f"[{self.workspace}] Could not create or verify full-text index '{index_name}'. " + "This might be because you are using a Neo4j version that does not support it. " + "Search functionality will fall back to slower, non-indexed queries." + ) + else: + logger.error( + f"[{self.workspace}] Failed to create or verify full-text index '{index_name}': {str(e)}" + ) + async def finalize(self): """Close the Neo4j driver and release all resources""" async with get_graph_db_lock(): @@ -251,7 +279,7 @@ class Neo4JStorage(BaseGraphStorage): await self.finalize() async def index_done_callback(self) -> None: - # Noe4J handles persistence automatically + # Neo4J handles persistence automatically pass async def has_node(self, node_id: str) -> bool: @@ -1523,6 +1551,139 @@ class Neo4JStorage(BaseGraphStorage): await result.consume() return edges + async def get_popular_labels(self, limit: int = 300) -> list[str]: + """Get popular labels by node degree (most connected entities) + + Args: + limit: Maximum number of labels to return + + Returns: + List of labels sorted by degree (highest first) + """ + workspace_label = self._get_workspace_label() + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + try: + query = f""" + MATCH (n:`{workspace_label}`) + WHERE n.entity_id IS NOT NULL + OPTIONAL MATCH (n)-[r]-() + WITH n.entity_id AS label, count(r) AS degree + ORDER BY degree DESC, label ASC + LIMIT $limit + RETURN label + """ + result = await session.run(query, limit=limit) + labels = [] + async for record in result: + labels.append(record["label"]) + await result.consume() + + logger.debug( + f"[{self.workspace}] Retrieved {len(labels)} popular labels (limit: {limit})" + ) + return labels + except Exception as e: + logger.error( + f"[{self.workspace}] Error getting popular labels: {str(e)}" + ) + await result.consume() + raise + + async def search_labels(self, query: str, limit: int = 50) -> list[str]: + """ + Search labels with fuzzy matching, using a full-text index for performance if available. + Falls back to a slower CONTAINS search if the index is not available or fails. + """ + workspace_label = self._get_workspace_label() + query_strip = query.strip() + if not query_strip: + return [] + + query_lower = query_strip.lower() + + index_name = "entity_id_fulltext_idx" + + # Attempt to use the full-text index first + try: + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + # The query uses a full-text index. + # The native score from the index is used as the primary sorting key. + # We add extra scoring for exact and prefix matches to align with NetworkX logic. + cypher_query = f""" + CALL db.index.fulltext.queryNodes($index_name, $search_query) YIELD node, score + WITH node, score + WHERE node:`{workspace_label}` + WITH node.entity_id AS label, toLower(node.entity_id) AS label_lower, score + WITH label, label_lower, score, + CASE + WHEN label_lower = $query_lower THEN score + 1000 + WHEN label_lower STARTS WITH $query_lower THEN score + 500 + WHEN label_lower CONTAINS ' ' + $query_lower OR label_lower CONTAINS '_' + $query_lower THEN score + 50 + ELSE score + END AS final_score + RETURN label + ORDER BY final_score DESC, label ASC + LIMIT $limit + """ + result = await session.run( + cypher_query, + index_name=index_name, + search_query=f"{query_strip}*", # Add wildcard for prefix/contains matching + query_lower=query_lower, + limit=limit, + ) + labels = [record["label"] async for record in result] + await result.consume() + + logger.debug( + f"[{self.workspace}] Full-text search for '{query}' returned {len(labels)} results (limit: {limit})" + ) + return labels + except Exception as e: + # If the full-text search fails (e.g., index doesn't exist, unsupported version), + # fall back to the old, slower method. + logger.warning( + f"[{self.workspace}] Full-text search failed with error: {str(e)}. " + "Falling back to slower, non-indexed search. " + "Ensure you are on Neo4j Enterprise or a version supporting full-text indexes." + ) + logger.warning( + f"[{self.workspace}] Falling back to slower, non-indexed search. " + "Ensure you are on Neo4j Enterprise or a version supporting full-text indexes." + ) + # Fallback implementation + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + cypher_query = f""" + MATCH (n:`{workspace_label}`) + WHERE n.entity_id IS NOT NULL + WITH n.entity_id AS label, toLower(n.entity_id) AS label_lower + WHERE label_lower CONTAINS $query_lower + WITH label, label_lower, + CASE + WHEN label_lower = $query_lower THEN 1000 + WHEN label_lower STARTS WITH $query_lower THEN 500 + ELSE 100 - size(label) + END AS score + ORDER BY score DESC, label ASC + LIMIT $limit + RETURN label + """ + result = await session.run( + cypher_query, query_lower=query_lower, limit=limit + ) + labels = [record["label"] async for record in result] + await result.consume() + logger.debug( + f"[{self.workspace}] Fallback search for '{query}' returned {len(labels)} results (limit: {limit})" + ) + return labels + async def drop(self) -> dict[str, str]: """Drop all data from current workspace storage and clean up resources