From 8e2a1fa59ec4c7cbbe06a3a9d40329dd004cdec4 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 20 Sep 2025 15:19:22 +0800 Subject: [PATCH] Enhance Neo4j fulltext search with Chinese language support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add CJK analyzer for Chinese text • Auto-detect Chinese characters • Recreate index if needed • Separate Chinese/Latin search logic • Improve fallback for Chinese queries --- lightrag/kg/neo4j_impl.py | 238 +++++++++++++++++++++++++++----------- 1 file changed, 169 insertions(+), 69 deletions(-) diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index e034c50b..4c523637 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -70,6 +70,11 @@ class Neo4JStorage(BaseGraphStorage): """Return workspace label (guaranteed non-empty during initialization)""" return self.workspace + def _is_chinese_text(self, text: str) -> bool: + """Check if text contains Chinese characters.""" + chinese_pattern = re.compile(r'[\u4e00-\u9fff]+') + return bool(chinese_pattern.search(text)) + async def initialize(self): async with get_data_init_lock(): URI = os.environ.get("NEO4J_URI", config.get("neo4j", "uri", fallback=None)) @@ -225,37 +230,91 @@ class Neo4JStorage(BaseGraphStorage): async def _create_fulltext_index( self, driver: AsyncDriver, database: str, workspace_label: str ): - """Create a full-text index on the entity_id property if it doesn't exist.""" + """Create a full-text index on the entity_id property with Chinese tokenizer support.""" index_name = "entity_id_fulltext_idx" try: async with driver.session(database=database) as session: - # Check if the full-text index exists + # Check if the full-text index exists and get its configuration check_index_query = "SHOW FULLTEXT INDEXES" result = await session.run(check_index_query) indexes = await result.data() await result.consume() - index_exists = any(idx["name"] == index_name for idx in indexes) + existing_index = None + for idx in indexes: + if idx["name"] == index_name: + existing_index = idx + break - if not index_exists: + # Check if we need to recreate the index + needs_recreation = False + if existing_index: + # Check if the existing index has CJK analyzer + index_config = existing_index.get("options", {}) + current_analyzer = index_config.get("indexConfig", {}).get("fulltext.analyzer", "standard") + + if current_analyzer != "cjk": + logger.info( + f"[{self.workspace}] Existing index '{index_name}' uses '{current_analyzer}' analyzer. " + "Recreating with CJK analyzer for Chinese support." + ) + needs_recreation = True + else: + logger.debug( + f"[{self.workspace}] Full-text index '{index_name}' already exists with CJK analyzer." + ) + return + + if not existing_index or needs_recreation: + # Drop existing index if it needs recreation + if needs_recreation: + try: + drop_query = f"DROP INDEX {index_name}" + result = await session.run(drop_query) + await result.consume() + logger.info(f"[{self.workspace}] Dropped existing index '{index_name}'") + except Exception as drop_error: + logger.warning(f"[{self.workspace}] Failed to drop existing index: {str(drop_error)}") + + # Create new index with CJK analyzer logger.info( - f"[{self.workspace}] Full-text index '{index_name}' not found. Creating it now." - ) - # Create the full-text index - create_index_query = f""" - CREATE FULLTEXT INDEX {index_name} FOR (n:`{workspace_label}`) ON EACH [n.entity_id] - """ - result = await session.run(create_index_query) - await result.consume() - logger.info( - f"[{self.workspace}] Successfully created full-text index '{index_name}'." - ) - else: - logger.debug( - f"[{self.workspace}] Full-text index '{index_name}' already exists." + f"[{self.workspace}] Creating full-text index '{index_name}' with Chinese tokenizer support." ) + + try: + create_index_query = f""" + CREATE FULLTEXT INDEX {index_name} + FOR (n:`{workspace_label}`) ON EACH [n.entity_id] + OPTIONS {{ + indexConfig: {{ + `fulltext.analyzer`: 'cjk', + `fulltext.eventually_consistent`: true + }} + }} + """ + result = await session.run(create_index_query) + await result.consume() + logger.info( + f"[{self.workspace}] Successfully created full-text index '{index_name}' with CJK analyzer." + ) + except Exception as cjk_error: + # Fallback to standard analyzer if CJK is not supported + logger.warning( + f"[{self.workspace}] CJK analyzer not supported: {str(cjk_error)}. " + "Falling back to standard analyzer." + ) + create_index_query = f""" + CREATE FULLTEXT INDEX {index_name} + FOR (n:`{workspace_label}`) ON EACH [n.entity_id] + """ + result = await session.run(create_index_query) + await result.consume() + logger.info( + f"[{self.workspace}] Successfully created full-text index '{index_name}' with standard analyzer." + ) + except Exception as e: - # Handle cases where the command might not be supported (e.g., community edition before 5.x) + # Handle cases where the command might not be supported if "Unknown command" in str(e) or "invalid syntax" in str(e).lower(): logger.warning( f"[{self.workspace}] Could not create or verify full-text index '{index_name}'. " @@ -1594,6 +1653,7 @@ class Neo4JStorage(BaseGraphStorage): async def search_labels(self, query: str, limit: int = 50) -> list[str]: """ Search labels with fuzzy matching, using a full-text index for performance if available. + Enhanced with Chinese text support using CJK analyzer. Falls back to a slower CONTAINS search if the index is not available or fails. """ workspace_label = self._get_workspace_label() @@ -1602,7 +1662,7 @@ class Neo4JStorage(BaseGraphStorage): return [] query_lower = query_strip.lower() - + is_chinese = self._is_chinese_text(query_strip) index_name = "entity_id_fulltext_idx" # Attempt to use the full-text index first @@ -1610,77 +1670,117 @@ class Neo4JStorage(BaseGraphStorage): async with self._driver.session( database=self._DATABASE, default_access_mode="READ" ) as session: - # The query uses a full-text index. - # The native score from the index is used as the primary sorting key. - # We add extra scoring for exact and prefix matches to align with NetworkX logic. - cypher_query = f""" - CALL db.index.fulltext.queryNodes($index_name, $search_query) YIELD node, score - WITH node, score - WHERE node:`{workspace_label}` - WITH node.entity_id AS label, toLower(node.entity_id) AS label_lower, score - WITH label, label_lower, score, - CASE - WHEN label_lower = $query_lower THEN score + 1000 - WHEN label_lower STARTS WITH $query_lower THEN score + 500 - WHEN label_lower CONTAINS ' ' + $query_lower OR label_lower CONTAINS '_' + $query_lower THEN score + 50 - ELSE score - END AS final_score - RETURN label - ORDER BY final_score DESC, label ASC - LIMIT $limit - """ + if is_chinese: + # For Chinese text, use different search strategies + cypher_query = f""" + CALL db.index.fulltext.queryNodes($index_name, $search_query) YIELD node, score + WITH node, score + WHERE node:`{workspace_label}` + WITH node.entity_id AS label, score + WITH label, score, + CASE + WHEN label = $query_strip THEN score + 1000 + WHEN label CONTAINS $query_strip THEN score + 500 + ELSE score + END AS final_score + RETURN label + ORDER BY final_score DESC, label ASC + LIMIT $limit + """ + # For Chinese, don't add wildcard as it may not work properly with CJK analyzer + search_query = query_strip + else: + # For non-Chinese text, use the original logic with wildcard + cypher_query = f""" + CALL db.index.fulltext.queryNodes($index_name, $search_query) YIELD node, score + WITH node, score + WHERE node:`{workspace_label}` + WITH node.entity_id AS label, toLower(node.entity_id) AS label_lower, score + WITH label, label_lower, score, + CASE + WHEN label_lower = $query_lower THEN score + 1000 + WHEN label_lower STARTS WITH $query_lower THEN score + 500 + WHEN label_lower CONTAINS ' ' + $query_lower OR label_lower CONTAINS '_' + $query_lower THEN score + 50 + ELSE score + END AS final_score + RETURN label + ORDER BY final_score DESC, label ASC + LIMIT $limit + """ + search_query = f"{query_strip}*" + result = await session.run( cypher_query, index_name=index_name, - search_query=f"{query_strip}*", # Add wildcard for prefix/contains matching + search_query=search_query, query_lower=query_lower, + query_strip=query_strip, limit=limit, ) labels = [record["label"] async for record in result] await result.consume() logger.debug( - f"[{self.workspace}] Full-text search for '{query}' returned {len(labels)} results (limit: {limit})" + f"[{self.workspace}] Full-text search ({'Chinese' if is_chinese else 'Latin'}) for '{query}' returned {len(labels)} results (limit: {limit})" ) return labels + except Exception as e: - # If the full-text search fails (e.g., index doesn't exist, unsupported version), - # fall back to the old, slower method. + # If the full-text search fails, fall back to CONTAINS search logger.warning( f"[{self.workspace}] Full-text search failed with error: {str(e)}. " - "Falling back to slower, non-indexed search. " - "Ensure you are on Neo4j Enterprise or a version supporting full-text indexes." + "Falling back to slower, non-indexed search." ) - logger.warning( - f"[{self.workspace}] Falling back to slower, non-indexed search. " - "Ensure you are on Neo4j Enterprise or a version supporting full-text indexes." - ) - # Fallback implementation + + # Enhanced fallback implementation async with self._driver.session( database=self._DATABASE, default_access_mode="READ" ) as session: - cypher_query = f""" - MATCH (n:`{workspace_label}`) - WHERE n.entity_id IS NOT NULL - WITH n.entity_id AS label, toLower(n.entity_id) AS label_lower - WHERE label_lower CONTAINS $query_lower - WITH label, label_lower, - CASE - WHEN label_lower = $query_lower THEN 1000 - WHEN label_lower STARTS WITH $query_lower THEN 500 - ELSE 100 - size(label) - END AS score - ORDER BY score DESC, label ASC - LIMIT $limit - RETURN label - """ - result = await session.run( - cypher_query, query_lower=query_lower, limit=limit - ) + if is_chinese: + # For Chinese text, use direct CONTAINS without case conversion + cypher_query = f""" + MATCH (n:`{workspace_label}`) + WHERE n.entity_id IS NOT NULL + WITH n.entity_id AS label + WHERE label CONTAINS $query_strip + WITH label, + CASE + WHEN label = $query_strip THEN 1000 + WHEN label STARTS WITH $query_strip THEN 500 + ELSE 100 - size(label) + END AS score + ORDER BY score DESC, label ASC + LIMIT $limit + RETURN label + """ + result = await session.run( + cypher_query, query_strip=query_strip, limit=limit + ) + else: + # For non-Chinese text, use the original fallback logic + cypher_query = f""" + MATCH (n:`{workspace_label}`) + WHERE n.entity_id IS NOT NULL + WITH n.entity_id AS label, toLower(n.entity_id) AS label_lower + WHERE label_lower CONTAINS $query_lower + WITH label, label_lower, + CASE + WHEN label_lower = $query_lower THEN 1000 + WHEN label_lower STARTS WITH $query_lower THEN 500 + ELSE 100 - size(label) + END AS score + ORDER BY score DESC, label ASC + LIMIT $limit + RETURN label + """ + result = await session.run( + cypher_query, query_lower=query_lower, limit=limit + ) + labels = [record["label"] async for record in result] await result.consume() logger.debug( - f"[{self.workspace}] Fallback search for '{query}' returned {len(labels)} results (limit: {limit})" + f"[{self.workspace}] Fallback search ({'Chinese' if is_chinese else 'Latin'}) for '{query}' returned {len(labels)} results (limit: {limit})" ) return labels