Add label search and popularity methods to MemgraphStorage

• Get popular labels by node degree
• Search labels with fuzzy matching
• Sort by relevance and connection count
This commit is contained in:
yangdx 2025-09-20 12:38:04 +08:00
parent e14cee69a3
commit 223397a247

View file

@ -1089,3 +1089,100 @@ class MemgraphStorage(BaseGraphStorage):
edges.append(edge_properties)
await result.consume()
return edges
async def get_popular_labels(self, limit: int = 300) -> list[str]:
"""Get popular labels by node degree (most connected entities)
Args:
limit: Maximum number of labels to return
Returns:
List of labels sorted by degree (highest first)
"""
if self._driver is None:
raise RuntimeError(
"Memgraph driver is not initialized. Call 'await initialize()' first."
)
try:
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
query = f"""
MATCH (n:`{workspace_label}`)
WHERE n.entity_id IS NOT NULL
OPTIONAL MATCH (n)-[r]-()
WITH n.entity_id AS label, count(r) AS degree
ORDER BY degree DESC, label ASC
LIMIT {limit}
RETURN label
"""
result = await session.run(query)
labels = []
async for record in result:
labels.append(record["label"])
await result.consume()
logger.debug(
f"[{self.workspace}] Retrieved {len(labels)} popular labels (limit: {limit})"
)
return labels
except Exception as e:
logger.error(f"[{self.workspace}] Error getting popular labels: {str(e)}")
return []
async def search_labels(self, query: str, limit: int = 50) -> list[str]:
"""Search labels with fuzzy matching
Args:
query: Search query string
limit: Maximum number of results to return
Returns:
List of matching labels sorted by relevance
"""
if self._driver is None:
raise RuntimeError(
"Memgraph driver is not initialized. Call 'await initialize()' first."
)
query_lower = query.lower().strip()
if not query_lower:
return []
try:
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
cypher_query = f"""
MATCH (n:`{workspace_label}`)
WHERE n.entity_id IS NOT NULL
WITH n.entity_id AS label, toLower(n.entity_id) AS label_lower
WHERE label_lower CONTAINS $query_lower
WITH label, label_lower,
CASE
WHEN label_lower = $query_lower THEN 1000
WHEN label_lower STARTS WITH $query_lower THEN 500
ELSE 100 - size(label)
END AS score
ORDER BY score DESC, label ASC
LIMIT {limit}
RETURN label
"""
result = await session.run(cypher_query, query_lower=query_lower)
labels = []
async for record in result:
labels.append(record["label"])
await result.consume()
logger.debug(
f"[{self.workspace}] Search query '{query}' returned {len(labels)} results (limit: {limit})"
)
return labels
except Exception as e:
logger.error(f"[{self.workspace}] Error searching labels: {str(e)}")
return []