Merge branch 'main' into pick-trunk-by-vector
This commit is contained in:
commit
f85e2aa4bf
7 changed files with 72 additions and 70 deletions
|
|
@ -9,7 +9,7 @@ from ..utils import logger
|
||||||
from ..base import BaseGraphStorage
|
from ..base import BaseGraphStorage
|
||||||
from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
|
from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
|
||||||
from ..constants import GRAPH_FIELD_SEP
|
from ..constants import GRAPH_FIELD_SEP
|
||||||
from ..kg.shared_storage import get_graph_db_lock
|
from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock
|
||||||
import pipmaster as pm
|
import pipmaster as pm
|
||||||
|
|
||||||
if not pm.is_installed("neo4j"):
|
if not pm.is_installed("neo4j"):
|
||||||
|
|
@ -56,7 +56,7 @@ class MemgraphStorage(BaseGraphStorage):
|
||||||
return self.workspace
|
return self.workspace
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
async with get_graph_db_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
URI = os.environ.get(
|
URI = os.environ.get(
|
||||||
"MEMGRAPH_URI",
|
"MEMGRAPH_URI",
|
||||||
config.get("memgraph", "uri", fallback="bolt://localhost:7687"),
|
config.get("memgraph", "uri", fallback="bolt://localhost:7687"),
|
||||||
|
|
@ -102,7 +102,7 @@ class MemgraphStorage(BaseGraphStorage):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def finalize(self):
|
async def finalize(self):
|
||||||
async with get_graph_db_lock(enable_logging=True):
|
async with get_graph_db_lock():
|
||||||
if self._driver is not None:
|
if self._driver is not None:
|
||||||
await self._driver.close()
|
await self._driver.close()
|
||||||
self._driver = None
|
self._driver = None
|
||||||
|
|
@ -743,7 +743,7 @@ class MemgraphStorage(BaseGraphStorage):
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"Memgraph driver is not initialized. Call 'await initialize()' first."
|
"Memgraph driver is not initialized. Call 'await initialize()' first."
|
||||||
)
|
)
|
||||||
async with get_graph_db_lock(enable_logging=True):
|
async with get_graph_db_lock():
|
||||||
try:
|
try:
|
||||||
async with self._driver.session(database=self._DATABASE) as session:
|
async with self._driver.session(database=self._DATABASE) as session:
|
||||||
workspace_label = self._get_workspace_label()
|
workspace_label = self._get_workspace_label()
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ import numpy as np
|
||||||
from lightrag.utils import logger, compute_mdhash_id
|
from lightrag.utils import logger, compute_mdhash_id
|
||||||
from ..base import BaseVectorStorage
|
from ..base import BaseVectorStorage
|
||||||
from ..constants import DEFAULT_MAX_FILE_PATH_LENGTH
|
from ..constants import DEFAULT_MAX_FILE_PATH_LENGTH
|
||||||
from ..kg.shared_storage import get_storage_lock
|
from ..kg.shared_storage import get_data_init_lock, get_storage_lock
|
||||||
import pipmaster as pm
|
import pipmaster as pm
|
||||||
|
|
||||||
if not pm.is_installed("pymilvus"):
|
if not pm.is_installed("pymilvus"):
|
||||||
|
|
@ -723,7 +723,7 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
"""Initialize Milvus collection"""
|
"""Initialize Milvus collection"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_data_init_lock(enable_logging=True):
|
||||||
if self._initialized:
|
if self._initialized:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -1028,7 +1028,7 @@ class MilvusVectorDBStorage(BaseVectorStorage):
|
||||||
- On success: {"status": "success", "message": "data dropped"}
|
- On success: {"status": "success", "message": "data dropped"}
|
||||||
- On failure: {"status": "error", "message": "<error details>"}
|
- On failure: {"status": "error", "message": "<error details>"}
|
||||||
"""
|
"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
try:
|
try:
|
||||||
# Drop the collection and recreate it
|
# Drop the collection and recreate it
|
||||||
if self._client.has_collection(self.final_namespace):
|
if self._client.has_collection(self.final_namespace):
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ from ..base import (
|
||||||
from ..utils import logger, compute_mdhash_id
|
from ..utils import logger, compute_mdhash_id
|
||||||
from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
|
from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
|
||||||
from ..constants import GRAPH_FIELD_SEP
|
from ..constants import GRAPH_FIELD_SEP
|
||||||
from ..kg.shared_storage import get_storage_lock, get_graph_db_lock
|
from ..kg.shared_storage import get_data_init_lock, get_storage_lock, get_graph_db_lock
|
||||||
|
|
||||||
import pipmaster as pm
|
import pipmaster as pm
|
||||||
|
|
||||||
|
|
@ -126,7 +126,7 @@ class MongoKVStorage(BaseKVStorage):
|
||||||
self._collection_name = self.final_namespace
|
self._collection_name = self.final_namespace
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
if self.db is None:
|
if self.db is None:
|
||||||
self.db = await ClientManager.get_client()
|
self.db = await ClientManager.get_client()
|
||||||
|
|
||||||
|
|
@ -136,7 +136,7 @@ class MongoKVStorage(BaseKVStorage):
|
||||||
)
|
)
|
||||||
|
|
||||||
async def finalize(self):
|
async def finalize(self):
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
if self.db is not None:
|
if self.db is not None:
|
||||||
await ClientManager.release_client(self.db)
|
await ClientManager.release_client(self.db)
|
||||||
self.db = None
|
self.db = None
|
||||||
|
|
@ -255,7 +255,7 @@ class MongoKVStorage(BaseKVStorage):
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str]: Status of the operation with keys 'status' and 'message'
|
dict[str, str]: Status of the operation with keys 'status' and 'message'
|
||||||
"""
|
"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
try:
|
try:
|
||||||
result = await self._data.delete_many({})
|
result = await self._data.delete_many({})
|
||||||
deleted_count = result.deleted_count
|
deleted_count = result.deleted_count
|
||||||
|
|
@ -323,7 +323,7 @@ class MongoDocStatusStorage(DocStatusStorage):
|
||||||
self._collection_name = self.final_namespace
|
self._collection_name = self.final_namespace
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
if self.db is None:
|
if self.db is None:
|
||||||
self.db = await ClientManager.get_client()
|
self.db = await ClientManager.get_client()
|
||||||
|
|
||||||
|
|
@ -340,7 +340,7 @@ class MongoDocStatusStorage(DocStatusStorage):
|
||||||
)
|
)
|
||||||
|
|
||||||
async def finalize(self):
|
async def finalize(self):
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
if self.db is not None:
|
if self.db is not None:
|
||||||
await ClientManager.release_client(self.db)
|
await ClientManager.release_client(self.db)
|
||||||
self.db = None
|
self.db = None
|
||||||
|
|
@ -455,7 +455,7 @@ class MongoDocStatusStorage(DocStatusStorage):
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str]: Status of the operation with keys 'status' and 'message'
|
dict[str, str]: Status of the operation with keys 'status' and 'message'
|
||||||
"""
|
"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
try:
|
try:
|
||||||
result = await self._data.delete_many({})
|
result = await self._data.delete_many({})
|
||||||
deleted_count = result.deleted_count
|
deleted_count = result.deleted_count
|
||||||
|
|
@ -708,7 +708,7 @@ class MongoGraphStorage(BaseGraphStorage):
|
||||||
self._edge_collection_name = f"{self._collection_name}_edges"
|
self._edge_collection_name = f"{self._collection_name}_edges"
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
async with get_graph_db_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
if self.db is None:
|
if self.db is None:
|
||||||
self.db = await ClientManager.get_client()
|
self.db = await ClientManager.get_client()
|
||||||
|
|
||||||
|
|
@ -723,7 +723,7 @@ class MongoGraphStorage(BaseGraphStorage):
|
||||||
)
|
)
|
||||||
|
|
||||||
async def finalize(self):
|
async def finalize(self):
|
||||||
async with get_graph_db_lock(enable_logging=True):
|
async with get_graph_db_lock():
|
||||||
if self.db is not None:
|
if self.db is not None:
|
||||||
await ClientManager.release_client(self.db)
|
await ClientManager.release_client(self.db)
|
||||||
self.db = None
|
self.db = None
|
||||||
|
|
@ -1579,7 +1579,7 @@ class MongoGraphStorage(BaseGraphStorage):
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str]: Status of the operation with keys 'status' and 'message'
|
dict[str, str]: Status of the operation with keys 'status' and 'message'
|
||||||
"""
|
"""
|
||||||
async with get_graph_db_lock(enable_logging=True):
|
async with get_graph_db_lock():
|
||||||
try:
|
try:
|
||||||
result = await self.collection.delete_many({})
|
result = await self.collection.delete_many({})
|
||||||
deleted_count = result.deleted_count
|
deleted_count = result.deleted_count
|
||||||
|
|
@ -1674,7 +1674,7 @@ class MongoVectorDBStorage(BaseVectorStorage):
|
||||||
self._max_batch_size = self.global_config["embedding_batch_num"]
|
self._max_batch_size = self.global_config["embedding_batch_num"]
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
if self.db is None:
|
if self.db is None:
|
||||||
self.db = await ClientManager.get_client()
|
self.db = await ClientManager.get_client()
|
||||||
|
|
||||||
|
|
@ -1688,7 +1688,7 @@ class MongoVectorDBStorage(BaseVectorStorage):
|
||||||
)
|
)
|
||||||
|
|
||||||
async def finalize(self):
|
async def finalize(self):
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
if self.db is not None:
|
if self.db is not None:
|
||||||
await ClientManager.release_client(self.db)
|
await ClientManager.release_client(self.db)
|
||||||
self.db = None
|
self.db = None
|
||||||
|
|
@ -1973,7 +1973,7 @@ class MongoVectorDBStorage(BaseVectorStorage):
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str]: Status of the operation with keys 'status' and 'message'
|
dict[str, str]: Status of the operation with keys 'status' and 'message'
|
||||||
"""
|
"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
try:
|
try:
|
||||||
# Delete all documents
|
# Delete all documents
|
||||||
result = await self._data.delete_many({})
|
result = await self._data.delete_many({})
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ from ..utils import logger
|
||||||
from ..base import BaseGraphStorage
|
from ..base import BaseGraphStorage
|
||||||
from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
|
from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
|
||||||
from ..constants import GRAPH_FIELD_SEP
|
from ..constants import GRAPH_FIELD_SEP
|
||||||
from ..kg.shared_storage import get_graph_db_lock
|
from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock
|
||||||
import pipmaster as pm
|
import pipmaster as pm
|
||||||
|
|
||||||
if not pm.is_installed("neo4j"):
|
if not pm.is_installed("neo4j"):
|
||||||
|
|
@ -71,7 +71,7 @@ class Neo4JStorage(BaseGraphStorage):
|
||||||
return self.workspace
|
return self.workspace
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
async with get_graph_db_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
URI = os.environ.get("NEO4J_URI", config.get("neo4j", "uri", fallback=None))
|
URI = os.environ.get("NEO4J_URI", config.get("neo4j", "uri", fallback=None))
|
||||||
USERNAME = os.environ.get(
|
USERNAME = os.environ.get(
|
||||||
"NEO4J_USERNAME", config.get("neo4j", "username", fallback=None)
|
"NEO4J_USERNAME", config.get("neo4j", "username", fallback=None)
|
||||||
|
|
@ -241,7 +241,7 @@ class Neo4JStorage(BaseGraphStorage):
|
||||||
|
|
||||||
async def finalize(self):
|
async def finalize(self):
|
||||||
"""Close the Neo4j driver and release all resources"""
|
"""Close the Neo4j driver and release all resources"""
|
||||||
async with get_graph_db_lock(enable_logging=True):
|
async with get_graph_db_lock():
|
||||||
if self._driver:
|
if self._driver:
|
||||||
await self._driver.close()
|
await self._driver.close()
|
||||||
self._driver = None
|
self._driver = None
|
||||||
|
|
@ -1533,7 +1533,7 @@ class Neo4JStorage(BaseGraphStorage):
|
||||||
- On success: {"status": "success", "message": "workspace data dropped"}
|
- On success: {"status": "success", "message": "workspace data dropped"}
|
||||||
- On failure: {"status": "error", "message": "<error details>"}
|
- On failure: {"status": "error", "message": "<error details>"}
|
||||||
"""
|
"""
|
||||||
async with get_graph_db_lock(enable_logging=True):
|
async with get_graph_db_lock():
|
||||||
workspace_label = self._get_workspace_label()
|
workspace_label = self._get_workspace_label()
|
||||||
try:
|
try:
|
||||||
async with self._driver.session(database=self._DATABASE) as session:
|
async with self._driver.session(database=self._DATABASE) as session:
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ from ..base import (
|
||||||
from ..namespace import NameSpace, is_namespace
|
from ..namespace import NameSpace, is_namespace
|
||||||
from ..utils import logger
|
from ..utils import logger
|
||||||
from ..constants import GRAPH_FIELD_SEP
|
from ..constants import GRAPH_FIELD_SEP
|
||||||
from ..kg.shared_storage import get_graph_db_lock, get_storage_lock
|
from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock, get_storage_lock
|
||||||
|
|
||||||
import pipmaster as pm
|
import pipmaster as pm
|
||||||
|
|
||||||
|
|
@ -1406,7 +1406,7 @@ class PGKVStorage(BaseKVStorage):
|
||||||
self._max_batch_size = self.global_config["embedding_batch_num"]
|
self._max_batch_size = self.global_config["embedding_batch_num"]
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
if self.db is None:
|
if self.db is None:
|
||||||
self.db = await ClientManager.get_client()
|
self.db = await ClientManager.get_client()
|
||||||
|
|
||||||
|
|
@ -1422,7 +1422,7 @@ class PGKVStorage(BaseKVStorage):
|
||||||
self.workspace = "default"
|
self.workspace = "default"
|
||||||
|
|
||||||
async def finalize(self):
|
async def finalize(self):
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
if self.db is not None:
|
if self.db is not None:
|
||||||
await ClientManager.release_client(self.db)
|
await ClientManager.release_client(self.db)
|
||||||
self.db = None
|
self.db = None
|
||||||
|
|
@ -1837,7 +1837,7 @@ class PGKVStorage(BaseKVStorage):
|
||||||
|
|
||||||
async def drop(self) -> dict[str, str]:
|
async def drop(self) -> dict[str, str]:
|
||||||
"""Drop the storage"""
|
"""Drop the storage"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
try:
|
try:
|
||||||
table_name = namespace_to_table_name(self.namespace)
|
table_name = namespace_to_table_name(self.namespace)
|
||||||
if not table_name:
|
if not table_name:
|
||||||
|
|
@ -1871,7 +1871,7 @@ class PGVectorStorage(BaseVectorStorage):
|
||||||
self.cosine_better_than_threshold = cosine_threshold
|
self.cosine_better_than_threshold = cosine_threshold
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
if self.db is None:
|
if self.db is None:
|
||||||
self.db = await ClientManager.get_client()
|
self.db = await ClientManager.get_client()
|
||||||
|
|
||||||
|
|
@ -1887,7 +1887,7 @@ class PGVectorStorage(BaseVectorStorage):
|
||||||
self.workspace = "default"
|
self.workspace = "default"
|
||||||
|
|
||||||
async def finalize(self):
|
async def finalize(self):
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
if self.db is not None:
|
if self.db is not None:
|
||||||
await ClientManager.release_client(self.db)
|
await ClientManager.release_client(self.db)
|
||||||
self.db = None
|
self.db = None
|
||||||
|
|
@ -2160,7 +2160,7 @@ class PGVectorStorage(BaseVectorStorage):
|
||||||
|
|
||||||
async def drop(self) -> dict[str, str]:
|
async def drop(self) -> dict[str, str]:
|
||||||
"""Drop the storage"""
|
"""Drop the storage"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
try:
|
try:
|
||||||
table_name = namespace_to_table_name(self.namespace)
|
table_name = namespace_to_table_name(self.namespace)
|
||||||
if not table_name:
|
if not table_name:
|
||||||
|
|
@ -2194,7 +2194,7 @@ class PGDocStatusStorage(DocStatusStorage):
|
||||||
return dt.isoformat()
|
return dt.isoformat()
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
if self.db is None:
|
if self.db is None:
|
||||||
self.db = await ClientManager.get_client()
|
self.db = await ClientManager.get_client()
|
||||||
|
|
||||||
|
|
@ -2210,7 +2210,7 @@ class PGDocStatusStorage(DocStatusStorage):
|
||||||
self.workspace = "default"
|
self.workspace = "default"
|
||||||
|
|
||||||
async def finalize(self):
|
async def finalize(self):
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
if self.db is not None:
|
if self.db is not None:
|
||||||
await ClientManager.release_client(self.db)
|
await ClientManager.release_client(self.db)
|
||||||
self.db = None
|
self.db = None
|
||||||
|
|
@ -2704,21 +2704,22 @@ class PGDocStatusStorage(DocStatusStorage):
|
||||||
|
|
||||||
async def drop(self) -> dict[str, str]:
|
async def drop(self) -> dict[str, str]:
|
||||||
"""Drop the storage"""
|
"""Drop the storage"""
|
||||||
try:
|
async with get_storage_lock():
|
||||||
table_name = namespace_to_table_name(self.namespace)
|
try:
|
||||||
if not table_name:
|
table_name = namespace_to_table_name(self.namespace)
|
||||||
return {
|
if not table_name:
|
||||||
"status": "error",
|
return {
|
||||||
"message": f"Unknown namespace: {self.namespace}",
|
"status": "error",
|
||||||
}
|
"message": f"Unknown namespace: {self.namespace}",
|
||||||
|
}
|
||||||
|
|
||||||
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
|
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
|
||||||
table_name=table_name
|
table_name=table_name
|
||||||
)
|
)
|
||||||
await self.db.execute(drop_sql, {"workspace": self.workspace})
|
await self.db.execute(drop_sql, {"workspace": self.workspace})
|
||||||
return {"status": "success", "message": "data dropped"}
|
return {"status": "success", "message": "data dropped"}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {"status": "error", "message": str(e)}
|
return {"status": "error", "message": str(e)}
|
||||||
|
|
||||||
|
|
||||||
class PGGraphQueryException(Exception):
|
class PGGraphQueryException(Exception):
|
||||||
|
|
@ -2789,7 +2790,7 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
return normalized_id
|
return normalized_id
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
async with get_graph_db_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
if self.db is None:
|
if self.db is None:
|
||||||
self.db = await ClientManager.get_client()
|
self.db = await ClientManager.get_client()
|
||||||
|
|
||||||
|
|
@ -2850,7 +2851,7 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
)
|
)
|
||||||
|
|
||||||
async def finalize(self):
|
async def finalize(self):
|
||||||
async with get_graph_db_lock(enable_logging=True):
|
async with get_graph_db_lock():
|
||||||
if self.db is not None:
|
if self.db is not None:
|
||||||
await ClientManager.release_client(self.db)
|
await ClientManager.release_client(self.db)
|
||||||
self.db = None
|
self.db = None
|
||||||
|
|
@ -4141,20 +4142,21 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
|
|
||||||
async def drop(self) -> dict[str, str]:
|
async def drop(self) -> dict[str, str]:
|
||||||
"""Drop the storage"""
|
"""Drop the storage"""
|
||||||
try:
|
async with get_graph_db_lock():
|
||||||
drop_query = f"""SELECT * FROM cypher('{self.graph_name}', $$
|
try:
|
||||||
MATCH (n)
|
drop_query = f"""SELECT * FROM cypher('{self.graph_name}', $$
|
||||||
DETACH DELETE n
|
MATCH (n)
|
||||||
$$) AS (result agtype)"""
|
DETACH DELETE n
|
||||||
|
$$) AS (result agtype)"""
|
||||||
|
|
||||||
await self._query(drop_query, readonly=False)
|
await self._query(drop_query, readonly=False)
|
||||||
return {
|
return {
|
||||||
"status": "success",
|
"status": "success",
|
||||||
"message": f"workspace '{self.workspace}' graph data dropped",
|
"message": f"workspace '{self.workspace}' graph data dropped",
|
||||||
}
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[{self.workspace}] Error dropping graph: {e}")
|
logger.error(f"[{self.workspace}] Error dropping graph: {e}")
|
||||||
return {"status": "error", "message": str(e)}
|
return {"status": "error", "message": str(e)}
|
||||||
|
|
||||||
|
|
||||||
# Note: Order matters! More specific namespaces (e.g., "full_entities") must come before
|
# Note: Order matters! More specific namespaces (e.g., "full_entities") must come before
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import hashlib
|
||||||
import uuid
|
import uuid
|
||||||
from ..utils import logger
|
from ..utils import logger
|
||||||
from ..base import BaseVectorStorage
|
from ..base import BaseVectorStorage
|
||||||
from ..kg.shared_storage import get_storage_lock
|
from ..kg.shared_storage import get_data_init_lock, get_storage_lock
|
||||||
import configparser
|
import configparser
|
||||||
import pipmaster as pm
|
import pipmaster as pm
|
||||||
|
|
||||||
|
|
@ -117,7 +117,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
"""Initialize Qdrant collection"""
|
"""Initialize Qdrant collection"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
if self._initialized:
|
if self._initialized:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -412,7 +412,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
|
||||||
- On success: {"status": "success", "message": "data dropped"}
|
- On success: {"status": "success", "message": "data dropped"}
|
||||||
- On failure: {"status": "error", "message": "<error details>"}
|
- On failure: {"status": "error", "message": "<error details>"}
|
||||||
"""
|
"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
try:
|
try:
|
||||||
# Delete the collection and recreate it
|
# Delete the collection and recreate it
|
||||||
if self._client.collection_exists(self.final_namespace):
|
if self._client.collection_exists(self.final_namespace):
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ from lightrag.base import (
|
||||||
DocStatus,
|
DocStatus,
|
||||||
DocProcessingStatus,
|
DocProcessingStatus,
|
||||||
)
|
)
|
||||||
from ..kg.shared_storage import get_storage_lock
|
from ..kg.shared_storage import get_data_init_lock, get_storage_lock
|
||||||
import json
|
import json
|
||||||
|
|
||||||
# Import tenacity for retry logic
|
# Import tenacity for retry logic
|
||||||
|
|
@ -180,7 +180,7 @@ class RedisKVStorage(BaseKVStorage):
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
"""Initialize Redis connection and migrate legacy cache structure if needed"""
|
"""Initialize Redis connection and migrate legacy cache structure if needed"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
if self._initialized:
|
if self._initialized:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -428,7 +428,7 @@ class RedisKVStorage(BaseKVStorage):
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str]: Status of the operation with keys 'status' and 'message'
|
dict[str, str]: Status of the operation with keys 'status' and 'message'
|
||||||
"""
|
"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
async with self._get_redis_connection() as redis:
|
async with self._get_redis_connection() as redis:
|
||||||
try:
|
try:
|
||||||
# Use SCAN to find all keys with the namespace prefix
|
# Use SCAN to find all keys with the namespace prefix
|
||||||
|
|
@ -606,7 +606,7 @@ class RedisDocStatusStorage(DocStatusStorage):
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
"""Initialize Redis connection"""
|
"""Initialize Redis connection"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_data_init_lock():
|
||||||
if self._initialized:
|
if self._initialized:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -1049,7 +1049,7 @@ class RedisDocStatusStorage(DocStatusStorage):
|
||||||
|
|
||||||
async def drop(self) -> dict[str, str]:
|
async def drop(self) -> dict[str, str]:
|
||||||
"""Drop all document status data from storage and clean up resources"""
|
"""Drop all document status data from storage and clean up resources"""
|
||||||
async with get_storage_lock(enable_logging=True):
|
async with get_storage_lock():
|
||||||
try:
|
try:
|
||||||
async with self._get_redis_connection() as redis:
|
async with self._get_redis_connection() as redis:
|
||||||
# Use SCAN to find all keys with the namespace prefix
|
# Use SCAN to find all keys with the namespace prefix
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue