Fix Neo4j typo and add fulltext search with performance optimizations
- Fix NEO4J_DATABASE typo in env.example - Add fulltext index for entity searches - Implement get_popular_labels method - Add search_labels with fuzzy matching - Simplify B-Tree index creation logic
This commit is contained in:
parent
9db8f2fce5
commit
e14cee69a3
2 changed files with 193 additions and 32 deletions
|
|
@ -313,7 +313,7 @@ POSTGRES_IVFFLAT_LISTS=100
|
||||||
NEO4J_URI=neo4j+s://xxxxxxxx.databases.neo4j.io
|
NEO4J_URI=neo4j+s://xxxxxxxx.databases.neo4j.io
|
||||||
NEO4J_USERNAME=neo4j
|
NEO4J_USERNAME=neo4j
|
||||||
NEO4J_PASSWORD='your_password'
|
NEO4J_PASSWORD='your_password'
|
||||||
NEO4J_DATABASE=noe4j
|
NEO4J_DATABASE=neo4j
|
||||||
NEO4J_MAX_CONNECTION_POOL_SIZE=100
|
NEO4J_MAX_CONNECTION_POOL_SIZE=100
|
||||||
NEO4J_CONNECTION_TIMEOUT=30
|
NEO4J_CONNECTION_TIMEOUT=30
|
||||||
NEO4J_CONNECTION_ACQUISITION_TIMEOUT=30
|
NEO4J_CONNECTION_ACQUISITION_TIMEOUT=30
|
||||||
|
|
|
||||||
|
|
@ -201,44 +201,72 @@ class Neo4JStorage(BaseGraphStorage):
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
if connected:
|
if connected:
|
||||||
# Create index for workspace nodes on entity_id if it doesn't exist
|
|
||||||
workspace_label = self._get_workspace_label()
|
workspace_label = self._get_workspace_label()
|
||||||
|
# Create B-Tree index for entity_id for faster lookups
|
||||||
try:
|
try:
|
||||||
async with self._driver.session(database=database) as session:
|
async with self._driver.session(database=database) as session:
|
||||||
# Check if index exists first
|
await session.run(
|
||||||
check_query = f"""
|
f"CREATE INDEX IF NOT EXISTS FOR (n:`{workspace_label}`) ON (n.entity_id)"
|
||||||
CALL db.indexes() YIELD name, labelsOrTypes, properties
|
)
|
||||||
WHERE labelsOrTypes = ['{workspace_label}'] AND properties = ['entity_id']
|
logger.info(
|
||||||
RETURN count(*) > 0 AS exists
|
f"[{self.workspace}] Ensured B-Tree index on entity_id for {workspace_label} in {database}"
|
||||||
"""
|
)
|
||||||
try:
|
|
||||||
check_result = await session.run(check_query)
|
|
||||||
record = await check_result.single()
|
|
||||||
await check_result.consume()
|
|
||||||
|
|
||||||
index_exists = record and record.get("exists", False)
|
|
||||||
|
|
||||||
if not index_exists:
|
|
||||||
# Create index only if it doesn't exist
|
|
||||||
result = await session.run(
|
|
||||||
f"CREATE INDEX FOR (n:`{workspace_label}`) ON (n.entity_id)"
|
|
||||||
)
|
|
||||||
await result.consume()
|
|
||||||
logger.info(
|
|
||||||
f"[{self.workspace}] Created index for {workspace_label} nodes on entity_id in {database}"
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
# Fallback if db.indexes() is not supported in this Neo4j version
|
|
||||||
result = await session.run(
|
|
||||||
f"CREATE INDEX IF NOT EXISTS FOR (n:`{workspace_label}`) ON (n.entity_id)"
|
|
||||||
)
|
|
||||||
await result.consume()
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"[{self.workspace}] Failed to create index: {str(e)}"
|
f"[{self.workspace}] Failed to create B-Tree index: {str(e)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Create full-text index for entity_id for faster text searches
|
||||||
|
await self._create_fulltext_index(
|
||||||
|
self._driver, self._DATABASE, workspace_label
|
||||||
|
)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
async def _create_fulltext_index(
|
||||||
|
self, driver: AsyncDriver, database: str, workspace_label: str
|
||||||
|
):
|
||||||
|
"""Create a full-text index on the entity_id property if it doesn't exist."""
|
||||||
|
index_name = "entity_id_fulltext_idx"
|
||||||
|
try:
|
||||||
|
async with driver.session(database=database) as session:
|
||||||
|
# Check if the full-text index exists
|
||||||
|
check_index_query = "SHOW FULLTEXT INDEXES"
|
||||||
|
result = await session.run(check_index_query)
|
||||||
|
indexes = await result.data()
|
||||||
|
await result.consume()
|
||||||
|
|
||||||
|
index_exists = any(idx["name"] == index_name for idx in indexes)
|
||||||
|
|
||||||
|
if not index_exists:
|
||||||
|
logger.info(
|
||||||
|
f"[{self.workspace}] Full-text index '{index_name}' not found. Creating it now."
|
||||||
|
)
|
||||||
|
# Create the full-text index
|
||||||
|
create_index_query = f"""
|
||||||
|
CREATE FULLTEXT INDEX {index_name} FOR (n:`{workspace_label}`) ON EACH [n.entity_id]
|
||||||
|
"""
|
||||||
|
result = await session.run(create_index_query)
|
||||||
|
await result.consume()
|
||||||
|
logger.info(
|
||||||
|
f"[{self.workspace}] Successfully created full-text index '{index_name}'."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.debug(
|
||||||
|
f"[{self.workspace}] Full-text index '{index_name}' already exists."
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
# Handle cases where the command might not be supported (e.g., community edition before 5.x)
|
||||||
|
if "Unknown command" in str(e) or "invalid syntax" in str(e).lower():
|
||||||
|
logger.warning(
|
||||||
|
f"[{self.workspace}] Could not create or verify full-text index '{index_name}'. "
|
||||||
|
"This might be because you are using a Neo4j version that does not support it. "
|
||||||
|
"Search functionality will fall back to slower, non-indexed queries."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.error(
|
||||||
|
f"[{self.workspace}] Failed to create or verify full-text index '{index_name}': {str(e)}"
|
||||||
|
)
|
||||||
|
|
||||||
async def finalize(self):
|
async def finalize(self):
|
||||||
"""Close the Neo4j driver and release all resources"""
|
"""Close the Neo4j driver and release all resources"""
|
||||||
async with get_graph_db_lock():
|
async with get_graph_db_lock():
|
||||||
|
|
@ -251,7 +279,7 @@ class Neo4JStorage(BaseGraphStorage):
|
||||||
await self.finalize()
|
await self.finalize()
|
||||||
|
|
||||||
async def index_done_callback(self) -> None:
|
async def index_done_callback(self) -> None:
|
||||||
# Noe4J handles persistence automatically
|
# Neo4J handles persistence automatically
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def has_node(self, node_id: str) -> bool:
|
async def has_node(self, node_id: str) -> bool:
|
||||||
|
|
@ -1523,6 +1551,139 @@ class Neo4JStorage(BaseGraphStorage):
|
||||||
await result.consume()
|
await result.consume()
|
||||||
return edges
|
return edges
|
||||||
|
|
||||||
|
async def get_popular_labels(self, limit: int = 300) -> list[str]:
|
||||||
|
"""Get popular labels by node degree (most connected entities)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
limit: Maximum number of labels to return
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of labels sorted by degree (highest first)
|
||||||
|
"""
|
||||||
|
workspace_label = self._get_workspace_label()
|
||||||
|
async with self._driver.session(
|
||||||
|
database=self._DATABASE, default_access_mode="READ"
|
||||||
|
) as session:
|
||||||
|
try:
|
||||||
|
query = f"""
|
||||||
|
MATCH (n:`{workspace_label}`)
|
||||||
|
WHERE n.entity_id IS NOT NULL
|
||||||
|
OPTIONAL MATCH (n)-[r]-()
|
||||||
|
WITH n.entity_id AS label, count(r) AS degree
|
||||||
|
ORDER BY degree DESC, label ASC
|
||||||
|
LIMIT $limit
|
||||||
|
RETURN label
|
||||||
|
"""
|
||||||
|
result = await session.run(query, limit=limit)
|
||||||
|
labels = []
|
||||||
|
async for record in result:
|
||||||
|
labels.append(record["label"])
|
||||||
|
await result.consume()
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
f"[{self.workspace}] Retrieved {len(labels)} popular labels (limit: {limit})"
|
||||||
|
)
|
||||||
|
return labels
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"[{self.workspace}] Error getting popular labels: {str(e)}"
|
||||||
|
)
|
||||||
|
await result.consume()
|
||||||
|
raise
|
||||||
|
|
||||||
|
async def search_labels(self, query: str, limit: int = 50) -> list[str]:
|
||||||
|
"""
|
||||||
|
Search labels with fuzzy matching, using a full-text index for performance if available.
|
||||||
|
Falls back to a slower CONTAINS search if the index is not available or fails.
|
||||||
|
"""
|
||||||
|
workspace_label = self._get_workspace_label()
|
||||||
|
query_strip = query.strip()
|
||||||
|
if not query_strip:
|
||||||
|
return []
|
||||||
|
|
||||||
|
query_lower = query_strip.lower()
|
||||||
|
|
||||||
|
index_name = "entity_id_fulltext_idx"
|
||||||
|
|
||||||
|
# Attempt to use the full-text index first
|
||||||
|
try:
|
||||||
|
async with self._driver.session(
|
||||||
|
database=self._DATABASE, default_access_mode="READ"
|
||||||
|
) as session:
|
||||||
|
# The query uses a full-text index.
|
||||||
|
# The native score from the index is used as the primary sorting key.
|
||||||
|
# We add extra scoring for exact and prefix matches to align with NetworkX logic.
|
||||||
|
cypher_query = f"""
|
||||||
|
CALL db.index.fulltext.queryNodes($index_name, $search_query) YIELD node, score
|
||||||
|
WITH node, score
|
||||||
|
WHERE node:`{workspace_label}`
|
||||||
|
WITH node.entity_id AS label, toLower(node.entity_id) AS label_lower, score
|
||||||
|
WITH label, label_lower, score,
|
||||||
|
CASE
|
||||||
|
WHEN label_lower = $query_lower THEN score + 1000
|
||||||
|
WHEN label_lower STARTS WITH $query_lower THEN score + 500
|
||||||
|
WHEN label_lower CONTAINS ' ' + $query_lower OR label_lower CONTAINS '_' + $query_lower THEN score + 50
|
||||||
|
ELSE score
|
||||||
|
END AS final_score
|
||||||
|
RETURN label
|
||||||
|
ORDER BY final_score DESC, label ASC
|
||||||
|
LIMIT $limit
|
||||||
|
"""
|
||||||
|
result = await session.run(
|
||||||
|
cypher_query,
|
||||||
|
index_name=index_name,
|
||||||
|
search_query=f"{query_strip}*", # Add wildcard for prefix/contains matching
|
||||||
|
query_lower=query_lower,
|
||||||
|
limit=limit,
|
||||||
|
)
|
||||||
|
labels = [record["label"] async for record in result]
|
||||||
|
await result.consume()
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
f"[{self.workspace}] Full-text search for '{query}' returned {len(labels)} results (limit: {limit})"
|
||||||
|
)
|
||||||
|
return labels
|
||||||
|
except Exception as e:
|
||||||
|
# If the full-text search fails (e.g., index doesn't exist, unsupported version),
|
||||||
|
# fall back to the old, slower method.
|
||||||
|
logger.warning(
|
||||||
|
f"[{self.workspace}] Full-text search failed with error: {str(e)}. "
|
||||||
|
"Falling back to slower, non-indexed search. "
|
||||||
|
"Ensure you are on Neo4j Enterprise or a version supporting full-text indexes."
|
||||||
|
)
|
||||||
|
logger.warning(
|
||||||
|
f"[{self.workspace}] Falling back to slower, non-indexed search. "
|
||||||
|
"Ensure you are on Neo4j Enterprise or a version supporting full-text indexes."
|
||||||
|
)
|
||||||
|
# Fallback implementation
|
||||||
|
async with self._driver.session(
|
||||||
|
database=self._DATABASE, default_access_mode="READ"
|
||||||
|
) as session:
|
||||||
|
cypher_query = f"""
|
||||||
|
MATCH (n:`{workspace_label}`)
|
||||||
|
WHERE n.entity_id IS NOT NULL
|
||||||
|
WITH n.entity_id AS label, toLower(n.entity_id) AS label_lower
|
||||||
|
WHERE label_lower CONTAINS $query_lower
|
||||||
|
WITH label, label_lower,
|
||||||
|
CASE
|
||||||
|
WHEN label_lower = $query_lower THEN 1000
|
||||||
|
WHEN label_lower STARTS WITH $query_lower THEN 500
|
||||||
|
ELSE 100 - size(label)
|
||||||
|
END AS score
|
||||||
|
ORDER BY score DESC, label ASC
|
||||||
|
LIMIT $limit
|
||||||
|
RETURN label
|
||||||
|
"""
|
||||||
|
result = await session.run(
|
||||||
|
cypher_query, query_lower=query_lower, limit=limit
|
||||||
|
)
|
||||||
|
labels = [record["label"] async for record in result]
|
||||||
|
await result.consume()
|
||||||
|
logger.debug(
|
||||||
|
f"[{self.workspace}] Fallback search for '{query}' returned {len(labels)} results (limit: {limit})"
|
||||||
|
)
|
||||||
|
return labels
|
||||||
|
|
||||||
async def drop(self) -> dict[str, str]:
|
async def drop(self) -> dict[str, str]:
|
||||||
"""Drop all data from current workspace storage and clean up resources
|
"""Drop all data from current workspace storage and clean up resources
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue