diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 2892f591..f48ea20f 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -1089,3 +1089,100 @@ class MemgraphStorage(BaseGraphStorage): edges.append(edge_properties) await result.consume() return edges + + async def get_popular_labels(self, limit: int = 300) -> list[str]: + """Get popular labels by node degree (most connected entities) + + Args: + limit: Maximum number of labels to return + + Returns: + List of labels sorted by degree (highest first) + """ + if self._driver is None: + raise RuntimeError( + "Memgraph driver is not initialized. Call 'await initialize()' first." + ) + + try: + workspace_label = self._get_workspace_label() + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + query = f""" + MATCH (n:`{workspace_label}`) + WHERE n.entity_id IS NOT NULL + OPTIONAL MATCH (n)-[r]-() + WITH n.entity_id AS label, count(r) AS degree + ORDER BY degree DESC, label ASC + LIMIT {limit} + RETURN label + """ + result = await session.run(query) + labels = [] + async for record in result: + labels.append(record["label"]) + await result.consume() + + logger.debug( + f"[{self.workspace}] Retrieved {len(labels)} popular labels (limit: {limit})" + ) + return labels + except Exception as e: + logger.error(f"[{self.workspace}] Error getting popular labels: {str(e)}") + return [] + + async def search_labels(self, query: str, limit: int = 50) -> list[str]: + """Search labels with fuzzy matching + + Args: + query: Search query string + limit: Maximum number of results to return + + Returns: + List of matching labels sorted by relevance + """ + if self._driver is None: + raise RuntimeError( + "Memgraph driver is not initialized. Call 'await initialize()' first." + ) + + query_lower = query.lower().strip() + + if not query_lower: + return [] + + try: + workspace_label = self._get_workspace_label() + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + cypher_query = f""" + MATCH (n:`{workspace_label}`) + WHERE n.entity_id IS NOT NULL + WITH n.entity_id AS label, toLower(n.entity_id) AS label_lower + WHERE label_lower CONTAINS $query_lower + WITH label, label_lower, + CASE + WHEN label_lower = $query_lower THEN 1000 + WHEN label_lower STARTS WITH $query_lower THEN 500 + ELSE 100 - size(label) + END AS score + ORDER BY score DESC, label ASC + LIMIT {limit} + RETURN label + """ + + result = await session.run(cypher_query, query_lower=query_lower) + labels = [] + async for record in result: + labels.append(record["label"]) + await result.consume() + + logger.debug( + f"[{self.workspace}] Search query '{query}' returned {len(labels)} results (limit: {limit})" + ) + return labels + except Exception as e: + logger.error(f"[{self.workspace}] Error searching labels: {str(e)}") + return []