Enhance Neo4j fulltext search with Chinese language support

• Add CJK analyzer for Chinese text
• Auto-detect Chinese characters
• Recreate index if needed
• Separate Chinese/Latin search logic
• Improve fallback for Chinese queries
This commit is contained in:
yangdx 2025-09-20 15:19:22 +08:00
parent 3b502af858
commit 8e2a1fa59e

View file

@ -70,6 +70,11 @@ class Neo4JStorage(BaseGraphStorage):
"""Return workspace label (guaranteed non-empty during initialization)"""
return self.workspace
def _is_chinese_text(self, text: str) -> bool:
"""Check if text contains Chinese characters."""
chinese_pattern = re.compile(r'[\u4e00-\u9fff]+')
return bool(chinese_pattern.search(text))
async def initialize(self):
async with get_data_init_lock():
URI = os.environ.get("NEO4J_URI", config.get("neo4j", "uri", fallback=None))
@ -225,37 +230,91 @@ class Neo4JStorage(BaseGraphStorage):
async def _create_fulltext_index(
self, driver: AsyncDriver, database: str, workspace_label: str
):
"""Create a full-text index on the entity_id property if it doesn't exist."""
"""Create a full-text index on the entity_id property with Chinese tokenizer support."""
index_name = "entity_id_fulltext_idx"
try:
async with driver.session(database=database) as session:
# Check if the full-text index exists
# Check if the full-text index exists and get its configuration
check_index_query = "SHOW FULLTEXT INDEXES"
result = await session.run(check_index_query)
indexes = await result.data()
await result.consume()
index_exists = any(idx["name"] == index_name for idx in indexes)
existing_index = None
for idx in indexes:
if idx["name"] == index_name:
existing_index = idx
break
if not index_exists:
# Check if we need to recreate the index
needs_recreation = False
if existing_index:
# Check if the existing index has CJK analyzer
index_config = existing_index.get("options", {})
current_analyzer = index_config.get("indexConfig", {}).get("fulltext.analyzer", "standard")
if current_analyzer != "cjk":
logger.info(
f"[{self.workspace}] Existing index '{index_name}' uses '{current_analyzer}' analyzer. "
"Recreating with CJK analyzer for Chinese support."
)
needs_recreation = True
else:
logger.debug(
f"[{self.workspace}] Full-text index '{index_name}' already exists with CJK analyzer."
)
return
if not existing_index or needs_recreation:
# Drop existing index if it needs recreation
if needs_recreation:
try:
drop_query = f"DROP INDEX {index_name}"
result = await session.run(drop_query)
await result.consume()
logger.info(f"[{self.workspace}] Dropped existing index '{index_name}'")
except Exception as drop_error:
logger.warning(f"[{self.workspace}] Failed to drop existing index: {str(drop_error)}")
# Create new index with CJK analyzer
logger.info(
f"[{self.workspace}] Full-text index '{index_name}' not found. Creating it now."
)
# Create the full-text index
create_index_query = f"""
CREATE FULLTEXT INDEX {index_name} FOR (n:`{workspace_label}`) ON EACH [n.entity_id]
"""
result = await session.run(create_index_query)
await result.consume()
logger.info(
f"[{self.workspace}] Successfully created full-text index '{index_name}'."
)
else:
logger.debug(
f"[{self.workspace}] Full-text index '{index_name}' already exists."
f"[{self.workspace}] Creating full-text index '{index_name}' with Chinese tokenizer support."
)
try:
create_index_query = f"""
CREATE FULLTEXT INDEX {index_name}
FOR (n:`{workspace_label}`) ON EACH [n.entity_id]
OPTIONS {{
indexConfig: {{
`fulltext.analyzer`: 'cjk',
`fulltext.eventually_consistent`: true
}}
}}
"""
result = await session.run(create_index_query)
await result.consume()
logger.info(
f"[{self.workspace}] Successfully created full-text index '{index_name}' with CJK analyzer."
)
except Exception as cjk_error:
# Fallback to standard analyzer if CJK is not supported
logger.warning(
f"[{self.workspace}] CJK analyzer not supported: {str(cjk_error)}. "
"Falling back to standard analyzer."
)
create_index_query = f"""
CREATE FULLTEXT INDEX {index_name}
FOR (n:`{workspace_label}`) ON EACH [n.entity_id]
"""
result = await session.run(create_index_query)
await result.consume()
logger.info(
f"[{self.workspace}] Successfully created full-text index '{index_name}' with standard analyzer."
)
except Exception as e:
# Handle cases where the command might not be supported (e.g., community edition before 5.x)
# Handle cases where the command might not be supported
if "Unknown command" in str(e) or "invalid syntax" in str(e).lower():
logger.warning(
f"[{self.workspace}] Could not create or verify full-text index '{index_name}'. "
@ -1594,6 +1653,7 @@ class Neo4JStorage(BaseGraphStorage):
async def search_labels(self, query: str, limit: int = 50) -> list[str]:
"""
Search labels with fuzzy matching, using a full-text index for performance if available.
Enhanced with Chinese text support using CJK analyzer.
Falls back to a slower CONTAINS search if the index is not available or fails.
"""
workspace_label = self._get_workspace_label()
@ -1602,7 +1662,7 @@ class Neo4JStorage(BaseGraphStorage):
return []
query_lower = query_strip.lower()
is_chinese = self._is_chinese_text(query_strip)
index_name = "entity_id_fulltext_idx"
# Attempt to use the full-text index first
@ -1610,77 +1670,117 @@ class Neo4JStorage(BaseGraphStorage):
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
# The query uses a full-text index.
# The native score from the index is used as the primary sorting key.
# We add extra scoring for exact and prefix matches to align with NetworkX logic.
cypher_query = f"""
CALL db.index.fulltext.queryNodes($index_name, $search_query) YIELD node, score
WITH node, score
WHERE node:`{workspace_label}`
WITH node.entity_id AS label, toLower(node.entity_id) AS label_lower, score
WITH label, label_lower, score,
CASE
WHEN label_lower = $query_lower THEN score + 1000
WHEN label_lower STARTS WITH $query_lower THEN score + 500
WHEN label_lower CONTAINS ' ' + $query_lower OR label_lower CONTAINS '_' + $query_lower THEN score + 50
ELSE score
END AS final_score
RETURN label
ORDER BY final_score DESC, label ASC
LIMIT $limit
"""
if is_chinese:
# For Chinese text, use different search strategies
cypher_query = f"""
CALL db.index.fulltext.queryNodes($index_name, $search_query) YIELD node, score
WITH node, score
WHERE node:`{workspace_label}`
WITH node.entity_id AS label, score
WITH label, score,
CASE
WHEN label = $query_strip THEN score + 1000
WHEN label CONTAINS $query_strip THEN score + 500
ELSE score
END AS final_score
RETURN label
ORDER BY final_score DESC, label ASC
LIMIT $limit
"""
# For Chinese, don't add wildcard as it may not work properly with CJK analyzer
search_query = query_strip
else:
# For non-Chinese text, use the original logic with wildcard
cypher_query = f"""
CALL db.index.fulltext.queryNodes($index_name, $search_query) YIELD node, score
WITH node, score
WHERE node:`{workspace_label}`
WITH node.entity_id AS label, toLower(node.entity_id) AS label_lower, score
WITH label, label_lower, score,
CASE
WHEN label_lower = $query_lower THEN score + 1000
WHEN label_lower STARTS WITH $query_lower THEN score + 500
WHEN label_lower CONTAINS ' ' + $query_lower OR label_lower CONTAINS '_' + $query_lower THEN score + 50
ELSE score
END AS final_score
RETURN label
ORDER BY final_score DESC, label ASC
LIMIT $limit
"""
search_query = f"{query_strip}*"
result = await session.run(
cypher_query,
index_name=index_name,
search_query=f"{query_strip}*", # Add wildcard for prefix/contains matching
search_query=search_query,
query_lower=query_lower,
query_strip=query_strip,
limit=limit,
)
labels = [record["label"] async for record in result]
await result.consume()
logger.debug(
f"[{self.workspace}] Full-text search for '{query}' returned {len(labels)} results (limit: {limit})"
f"[{self.workspace}] Full-text search ({'Chinese' if is_chinese else 'Latin'}) for '{query}' returned {len(labels)} results (limit: {limit})"
)
return labels
except Exception as e:
# If the full-text search fails (e.g., index doesn't exist, unsupported version),
# fall back to the old, slower method.
# If the full-text search fails, fall back to CONTAINS search
logger.warning(
f"[{self.workspace}] Full-text search failed with error: {str(e)}. "
"Falling back to slower, non-indexed search. "
"Ensure you are on Neo4j Enterprise or a version supporting full-text indexes."
"Falling back to slower, non-indexed search."
)
logger.warning(
f"[{self.workspace}] Falling back to slower, non-indexed search. "
"Ensure you are on Neo4j Enterprise or a version supporting full-text indexes."
)
# Fallback implementation
# Enhanced fallback implementation
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
cypher_query = f"""
MATCH (n:`{workspace_label}`)
WHERE n.entity_id IS NOT NULL
WITH n.entity_id AS label, toLower(n.entity_id) AS label_lower
WHERE label_lower CONTAINS $query_lower
WITH label, label_lower,
CASE
WHEN label_lower = $query_lower THEN 1000
WHEN label_lower STARTS WITH $query_lower THEN 500
ELSE 100 - size(label)
END AS score
ORDER BY score DESC, label ASC
LIMIT $limit
RETURN label
"""
result = await session.run(
cypher_query, query_lower=query_lower, limit=limit
)
if is_chinese:
# For Chinese text, use direct CONTAINS without case conversion
cypher_query = f"""
MATCH (n:`{workspace_label}`)
WHERE n.entity_id IS NOT NULL
WITH n.entity_id AS label
WHERE label CONTAINS $query_strip
WITH label,
CASE
WHEN label = $query_strip THEN 1000
WHEN label STARTS WITH $query_strip THEN 500
ELSE 100 - size(label)
END AS score
ORDER BY score DESC, label ASC
LIMIT $limit
RETURN label
"""
result = await session.run(
cypher_query, query_strip=query_strip, limit=limit
)
else:
# For non-Chinese text, use the original fallback logic
cypher_query = f"""
MATCH (n:`{workspace_label}`)
WHERE n.entity_id IS NOT NULL
WITH n.entity_id AS label, toLower(n.entity_id) AS label_lower
WHERE label_lower CONTAINS $query_lower
WITH label, label_lower,
CASE
WHEN label_lower = $query_lower THEN 1000
WHEN label_lower STARTS WITH $query_lower THEN 500
ELSE 100 - size(label)
END AS score
ORDER BY score DESC, label ASC
LIMIT $limit
RETURN label
"""
result = await session.run(
cypher_query, query_lower=query_lower, limit=limit
)
labels = [record["label"] async for record in result]
await result.consume()
logger.debug(
f"[{self.workspace}] Fallback search for '{query}' returned {len(labels)} results (limit: {limit})"
f"[{self.workspace}] Fallback search ({'Chinese' if is_chinese else 'Latin'}) for '{query}' returned {len(labels)} results (limit: {limit})"
)
return labels