Merge pull request #1743 from danielaskdd/add-workspace-env

Feat: Implement data isolation for all storage types via WORKSPACE environment variable
This commit is contained in:
Daniel.y 2025-07-07 04:24:22 +08:00 committed by GitHub
commit d7d59ed973
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 672 additions and 206 deletions

View file

@ -111,25 +111,37 @@ EMBEDDING_BINDING_HOST=http://localhost:11434
###########################
### Data storage selection
###########################
### In-memory database with data persistence to local files
# LIGHTRAG_KV_STORAGE=JsonKVStorage
# LIGHTRAG_DOC_STATUS_STORAGE=JsonDocStatusStorage
# LIGHTRAG_GRAPH_STORAGE=NetworkXStorage
# LIGHTRAG_VECTOR_STORAGE=NanoVectorDBStorage
# LIGHTRAG_VECTOR_STORAGE=FaissVectorDBStorage
### PostgreSQL
# LIGHTRAG_KV_STORAGE=PGKVStorage
# LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage
# LIGHTRAG_VECTOR_STORAGE=PGVectorStorage
# LIGHTRAG_GRAPH_STORAGE=PGGraphStorage
### MongoDB
# LIGHTRAG_VECTOR_STORAGE=PGVectorStorage
### MongoDB (recommended for production deploy)
# LIGHTRAG_KV_STORAGE=MongoKVStorage
# LIGHTRAG_DOC_STATUS_STORAGE=MongoDocStatusStorage
# LIGHTRAG_VECTOR_STORAGE=MongoVectorDBStorage
# LIGHTRAG_GRAPH_STORAGE=MongoGraphStorage
### KV Storage
# LIGHTRAG_VECTOR_STORAGE=MongoVectorDBStorage
### Redis Storage (recommended for production deploy)
# LIGHTRAG_KV_STORAGE=RedisKVStorage
# LIGHTRAG_DOC_STATUS_STORAGE=RedisDocStatusStorage
### Vector Storage
# LIGHTRAG_VECTOR_STORAGE=FaissVectorDBStorage
### Vector Storage (recommended for production deploy)
# LIGHTRAG_VECTOR_STORAGE=MilvusVectorDBStorage
### Graph Storage
# LIGHTRAG_VECTOR_STORAGE=QdrantVectorDBStorage
### Graph Storage (recommended for production deploy)
# LIGHTRAG_GRAPH_STORAGE=Neo4JStorage
# LIGHTRAG_GRAPH_STORAGE=MemgraphStorage
####################################################################
### Default workspace for all storage types
### For the purpose of isolation of data for each LightRAG instance
### Valid characters: a-z, A-Z, 0-9, and _
####################################################################
# WORKSPACE=doc—
### PostgreSQL Configuration
POSTGRES_HOST=localhost
@ -138,31 +150,19 @@ POSTGRES_USER=your_username
POSTGRES_PASSWORD='your_password'
POSTGRES_DATABASE=your_database
POSTGRES_MAX_CONNECTIONS=12
### separating all data from difference Lightrag instances
# POSTGRES_WORKSPACE=default
# POSTGRES_WORKSPACE=forced_workspace_name
### Neo4j Configuration
NEO4J_URI=neo4j+s://xxxxxxxx.databases.neo4j.io
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD='your_password'
### Independent AGM Configuration(not for AMG embedded in PostreSQL)
# AGE_POSTGRES_DB=
# AGE_POSTGRES_USER=
# AGE_POSTGRES_PASSWORD=
# AGE_POSTGRES_HOST=
# AGE_POSTGRES_PORT=8529
# AGE Graph Name(apply to PostgreSQL and independent AGM)
### AGE_GRAPH_NAME is deprecated
# AGE_GRAPH_NAME=lightrag
# NEO4J_WORKSPACE=forced_workspace_name
### MongoDB Configuration
MONGO_URI=mongodb://root:root@localhost:27017/
#MONGO_URI=mongodb+srv://root:rooot@cluster0.xxxx.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0
MONGO_DATABASE=LightRAG
### separating all data from difference Lightrag instances(deprecating)
### separating all data from difference Lightrag instances
# MONGODB_WORKSPACE=default
# MONGODB_WORKSPACE=forced_workspace_name
### Milvus Configuration
MILVUS_URI=http://localhost:19530
@ -170,10 +170,13 @@ MILVUS_DB_NAME=lightrag
# MILVUS_USER=root
# MILVUS_PASSWORD=your_password
# MILVUS_TOKEN=your_token
# MILVUS_WORKSPACE=forced_workspace_name
### Qdrant
QDRANT_URL=http://localhost:16333
QDRANT_URL=http://localhost:6333
# QDRANT_API_KEY=your-api-key
# QDRANT_WORKSPACE=forced_workspace_name
### Redis
REDIS_URI=redis://localhost:6379
# REDIS_WORKSPACE=forced_workspace_name

View file

@ -1 +1 @@
__api_version__ = "0178"
__api_version__ = "0179"

View file

@ -184,10 +184,10 @@ def parse_args() -> argparse.Namespace:
# Namespace
parser.add_argument(
"--namespace-prefix",
"--workspace",
type=str,
default=get_env_value("NAMESPACE_PREFIX", ""),
help="Prefix of the namespace",
default=get_env_value("WORKSPACE", ""),
help="Default workspace for all storage",
)
parser.add_argument(

View file

@ -112,8 +112,8 @@ def create_app(args):
# Check if API key is provided either through env var or args
api_key = os.getenv("LIGHTRAG_API_KEY") or args.key
# Initialize document manager
doc_manager = DocumentManager(args.input_dir)
# Initialize document manager with workspace support for data isolation
doc_manager = DocumentManager(args.input_dir, workspace=args.workspace)
@asynccontextmanager
async def lifespan(app: FastAPI):
@ -295,6 +295,7 @@ def create_app(args):
if args.llm_binding in ["lollms", "ollama", "openai"]:
rag = LightRAG(
working_dir=args.working_dir,
workspace=args.workspace,
llm_model_func=lollms_model_complete
if args.llm_binding == "lollms"
else ollama_model_complete
@ -330,6 +331,7 @@ def create_app(args):
else: # azure_openai
rag = LightRAG(
working_dir=args.working_dir,
workspace=args.workspace,
llm_model_func=azure_openai_model_complete,
chunk_token_size=int(args.chunk_size),
chunk_overlap_token_size=int(args.chunk_overlap_size),
@ -472,6 +474,8 @@ def create_app(args):
"vector_storage": args.vector_storage,
"enable_llm_cache_for_extract": args.enable_llm_cache_for_extract,
"enable_llm_cache": args.enable_llm_cache,
"workspace": args.workspace,
"max_graph_nodes": os.getenv("MAX_GRAPH_NODES"),
},
"auth_mode": auth_mode,
"pipeline_busy": pipeline_status.get("busy", False),

View file

@ -475,6 +475,7 @@ class DocumentManager:
def __init__(
self,
input_dir: str,
workspace: str = "", # New parameter for workspace isolation
supported_extensions: tuple = (
".txt",
".md",
@ -515,10 +516,19 @@ class DocumentManager:
".less", # LESS CSS
),
):
self.input_dir = Path(input_dir)
# Store the base input directory and workspace
self.base_input_dir = Path(input_dir)
self.workspace = workspace
self.supported_extensions = supported_extensions
self.indexed_files = set()
# Create workspace-specific input directory
# If workspace is provided, create a subdirectory for data isolation
if workspace:
self.input_dir = self.base_input_dir / workspace
else:
self.input_dir = self.base_input_dir
# Create input directory if it doesn't exist
self.input_dir.mkdir(parents=True, exist_ok=True)
@ -716,7 +726,9 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
if content:
# Check if content contains only whitespace characters
if not content.strip():
logger.warning(f"File contains only whitespace characters. file_paths={file_path.name}")
logger.warning(
f"File contains only whitespace characters. file_paths={file_path.name}"
)
await rag.apipeline_enqueue_documents(content, file_paths=file_path.name)
logger.info(f"Successfully fetched and enqueued file: {file_path.name}")

View file

@ -284,8 +284,10 @@ def display_splash_screen(args: argparse.Namespace) -> None:
ASCIIColors.yellow(f"{args.vector_storage}")
ASCIIColors.white(" ├─ Graph Storage: ", end="")
ASCIIColors.yellow(f"{args.graph_storage}")
ASCIIColors.white(" ─ Document Status Storage: ", end="")
ASCIIColors.white(" ─ Document Status Storage: ", end="")
ASCIIColors.yellow(f"{args.doc_status_storage}")
ASCIIColors.white(" └─ Workspace: ", end="")
ASCIIColors.yellow(f"{args.workspace if args.workspace else '-'}")
# Server Status
ASCIIColors.green("\n✨ Server starting up...\n")

File diff suppressed because one or more lines are too long

View file

@ -8,7 +8,7 @@
<link rel="icon" type="image/png" href="favicon.png" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Lightrag</title>
<script type="module" crossorigin src="/webui/assets/index-CRWiopRc.js"></script>
<script type="module" crossorigin src="/webui/assets/index-BlAjHenV.js"></script>
<link rel="modulepreload" crossorigin href="/webui/assets/react-vendor-DEwriMA6.js">
<link rel="modulepreload" crossorigin href="/webui/assets/ui-vendor-CeCm8EER.js">
<link rel="modulepreload" crossorigin href="/webui/assets/graph-vendor-B-X5JegA.js">

View file

@ -103,6 +103,7 @@ class QueryParam:
@dataclass
class StorageNameSpace(ABC):
namespace: str
workspace: str
global_config: dict[str, Any]
async def initialize(self):

View file

@ -38,9 +38,19 @@ class FaissVectorDBStorage(BaseVectorStorage):
self.cosine_better_than_threshold = cosine_threshold
# Where to save index file if you want persistent storage
self._faiss_index_file = os.path.join(
self.global_config["working_dir"], f"faiss_index_{self.namespace}.index"
)
working_dir = self.global_config["working_dir"]
if self.workspace:
# Include workspace in the file path for data isolation
workspace_dir = os.path.join(working_dir, self.workspace)
os.makedirs(workspace_dir, exist_ok=True)
self._faiss_index_file = os.path.join(
workspace_dir, f"faiss_index_{self.namespace}.index"
)
else:
# Default behavior when workspace is empty
self._faiss_index_file = os.path.join(
working_dir, f"faiss_index_{self.namespace}.index"
)
self._meta_file = self._faiss_index_file + ".meta.json"
self._max_batch_size = self.global_config["embedding_batch_num"]

View file

@ -30,7 +30,18 @@ class JsonDocStatusStorage(DocStatusStorage):
def __post_init__(self):
working_dir = self.global_config["working_dir"]
self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
if self.workspace:
# Include workspace in the file path for data isolation
workspace_dir = os.path.join(working_dir, self.workspace)
os.makedirs(workspace_dir, exist_ok=True)
self._file_name = os.path.join(
workspace_dir, f"kv_store_{self.namespace}.json"
)
else:
# Default behavior when workspace is empty
self._file_name = os.path.join(
working_dir, f"kv_store_{self.namespace}.json"
)
self._data = None
self._storage_lock = None
self.storage_updated = None

View file

@ -26,7 +26,18 @@ from .shared_storage import (
class JsonKVStorage(BaseKVStorage):
def __post_init__(self):
working_dir = self.global_config["working_dir"]
self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
if self.workspace:
# Include workspace in the file path for data isolation
workspace_dir = os.path.join(working_dir, self.workspace)
os.makedirs(workspace_dir, exist_ok=True)
self._file_name = os.path.join(
workspace_dir, f"kv_store_{self.namespace}.json"
)
else:
# Default behavior when workspace is empty
self._file_name = os.path.join(
working_dir, f"kv_store_{self.namespace}.json"
)
self._data = None
self._storage_lock = None
self.storage_updated = None

View file

@ -7,10 +7,6 @@ from lightrag.utils import logger, compute_mdhash_id
from ..base import BaseVectorStorage
import pipmaster as pm
if not pm.is_installed("configparser"):
pm.install("configparser")
if not pm.is_installed("pymilvus"):
pm.install("pymilvus")
@ -660,6 +656,29 @@ class MilvusVectorDBStorage(BaseVectorStorage):
raise
def __post_init__(self):
# Check for MILVUS_WORKSPACE environment variable first (higher priority)
# This allows administrators to force a specific workspace for all Milvus storage instances
milvus_workspace = os.environ.get("MILVUS_WORKSPACE")
if milvus_workspace and milvus_workspace.strip():
# Use environment variable value, overriding the passed workspace parameter
effective_workspace = milvus_workspace.strip()
logger.info(
f"Using MILVUS_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
)
else:
# Use the workspace parameter passed during initialization
effective_workspace = self.workspace
if effective_workspace:
logger.debug(
f"Using passed workspace parameter: '{effective_workspace}'"
)
# Build namespace with workspace prefix for data isolation
if effective_workspace:
self.namespace = f"{effective_workspace}_{self.namespace}"
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
# When workspace is empty, keep the original namespace unchanged
kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
cosine_threshold = kwargs.get("cosine_better_than_threshold")
if cosine_threshold is None:

View file

@ -25,6 +25,7 @@ if not pm.is_installed("pymongo"):
pm.install("pymongo")
from pymongo import AsyncMongoClient # type: ignore
from pymongo import UpdateOne # type: ignore
from pymongo.asynchronous.database import AsyncDatabase # type: ignore
from pymongo.asynchronous.collection import AsyncCollection # type: ignore
from pymongo.operations import SearchIndexModel # type: ignore
@ -81,7 +82,39 @@ class MongoKVStorage(BaseKVStorage):
db: AsyncDatabase = field(default=None)
_data: AsyncCollection = field(default=None)
def __init__(self, namespace, global_config, embedding_func, workspace=None):
super().__init__(
namespace=namespace,
workspace=workspace or "",
global_config=global_config,
embedding_func=embedding_func,
)
self.__post_init__()
def __post_init__(self):
# Check for MONGODB_WORKSPACE environment variable first (higher priority)
# This allows administrators to force a specific workspace for all MongoDB storage instances
mongodb_workspace = os.environ.get("MONGODB_WORKSPACE")
if mongodb_workspace and mongodb_workspace.strip():
# Use environment variable value, overriding the passed workspace parameter
effective_workspace = mongodb_workspace.strip()
logger.info(
f"Using MONGODB_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
)
else:
# Use the workspace parameter passed during initialization
effective_workspace = self.workspace
if effective_workspace:
logger.debug(
f"Using passed workspace parameter: '{effective_workspace}'"
)
# Build namespace with workspace prefix for data isolation
if effective_workspace:
self.namespace = f"{effective_workspace}_{self.namespace}"
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
# When workspace is empty, keep the original namespace unchanged
self._collection_name = self.namespace
async def initialize(self):
@ -142,7 +175,6 @@ class MongoKVStorage(BaseKVStorage):
# Unified handling for all namespaces with flattened keys
# Use bulk_write for better performance
from pymongo import UpdateOne
operations = []
current_time = int(time.time()) # Get current Unix timestamp
@ -252,7 +284,39 @@ class MongoDocStatusStorage(DocStatusStorage):
db: AsyncDatabase = field(default=None)
_data: AsyncCollection = field(default=None)
def __init__(self, namespace, global_config, embedding_func, workspace=None):
super().__init__(
namespace=namespace,
workspace=workspace or "",
global_config=global_config,
embedding_func=embedding_func,
)
self.__post_init__()
def __post_init__(self):
# Check for MONGODB_WORKSPACE environment variable first (higher priority)
# This allows administrators to force a specific workspace for all MongoDB storage instances
mongodb_workspace = os.environ.get("MONGODB_WORKSPACE")
if mongodb_workspace and mongodb_workspace.strip():
# Use environment variable value, overriding the passed workspace parameter
effective_workspace = mongodb_workspace.strip()
logger.info(
f"Using MONGODB_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
)
else:
# Use the workspace parameter passed during initialization
effective_workspace = self.workspace
if effective_workspace:
logger.debug(
f"Using passed workspace parameter: '{effective_workspace}'"
)
# Build namespace with workspace prefix for data isolation
if effective_workspace:
self.namespace = f"{effective_workspace}_{self.namespace}"
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
# When workspace is empty, keep the original namespace unchanged
self._collection_name = self.namespace
async def initialize(self):
@ -367,12 +431,36 @@ class MongoGraphStorage(BaseGraphStorage):
# edge collection storing source_node_id, target_node_id, and edge_properties
edgeCollection: AsyncCollection = field(default=None)
def __init__(self, namespace, global_config, embedding_func):
def __init__(self, namespace, global_config, embedding_func, workspace=None):
super().__init__(
namespace=namespace,
workspace=workspace or "",
global_config=global_config,
embedding_func=embedding_func,
)
# Check for MONGODB_WORKSPACE environment variable first (higher priority)
# This allows administrators to force a specific workspace for all MongoDB storage instances
mongodb_workspace = os.environ.get("MONGODB_WORKSPACE")
if mongodb_workspace and mongodb_workspace.strip():
# Use environment variable value, overriding the passed workspace parameter
effective_workspace = mongodb_workspace.strip()
logger.info(
f"Using MONGODB_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
)
else:
# Use the workspace parameter passed during initialization
effective_workspace = self.workspace
if effective_workspace:
logger.debug(
f"Using passed workspace parameter: '{effective_workspace}'"
)
# Build namespace with workspace prefix for data isolation
if effective_workspace:
self.namespace = f"{effective_workspace}_{self.namespace}"
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
# When workspace is empty, keep the original namespace unchanged
self._collection_name = self.namespace
self._edge_collection_name = f"{self._collection_name}_edges"
@ -1230,8 +1318,52 @@ class MongoGraphStorage(BaseGraphStorage):
class MongoVectorDBStorage(BaseVectorStorage):
db: AsyncDatabase | None = field(default=None)
_data: AsyncCollection | None = field(default=None)
_index_name: str = field(default="", init=False)
def __init__(
self, namespace, global_config, embedding_func, workspace=None, meta_fields=None
):
super().__init__(
namespace=namespace,
workspace=workspace or "",
global_config=global_config,
embedding_func=embedding_func,
meta_fields=meta_fields or set(),
)
self.__post_init__()
def __post_init__(self):
# Check for MONGODB_WORKSPACE environment variable first (higher priority)
# This allows administrators to force a specific workspace for all MongoDB storage instances
mongodb_workspace = os.environ.get("MONGODB_WORKSPACE")
if mongodb_workspace and mongodb_workspace.strip():
# Use environment variable value, overriding the passed workspace parameter
effective_workspace = mongodb_workspace.strip()
logger.info(
f"Using MONGODB_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
)
else:
# Use the workspace parameter passed during initialization
effective_workspace = self.workspace
if effective_workspace:
logger.debug(
f"Using passed workspace parameter: '{effective_workspace}'"
)
# Build namespace with workspace prefix for data isolation
if effective_workspace:
self.namespace = f"{effective_workspace}_{self.namespace}"
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
# When workspace is empty, keep the original namespace unchanged
# Set index name based on workspace for backward compatibility
if effective_workspace:
# Use collection-specific index name for workspaced collections to avoid conflicts
self._index_name = f"vector_knn_index_{self.namespace}"
else:
# Keep original index name for backward compatibility with existing deployments
self._index_name = "vector_knn_index"
kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
cosine_threshold = kwargs.get("cosine_better_than_threshold")
if cosine_threshold is None:
@ -1261,13 +1393,11 @@ class MongoVectorDBStorage(BaseVectorStorage):
async def create_vector_index_if_not_exists(self):
"""Creates an Atlas Vector Search index."""
try:
index_name = "vector_knn_index"
indexes_cursor = await self._data.list_search_indexes()
indexes = await indexes_cursor.to_list(length=None)
for index in indexes:
if index["name"] == index_name:
logger.debug("vector index already exist")
if index["name"] == self._index_name:
logger.info(f"vector index {self._index_name} already exist")
return
search_index_model = SearchIndexModel(
@ -1281,15 +1411,15 @@ class MongoVectorDBStorage(BaseVectorStorage):
}
]
},
name=index_name,
name=self._index_name,
type="vectorSearch",
)
await self._data.create_search_index(search_index_model)
logger.info("Vector index created successfully.")
logger.info(f"Vector index {self._index_name} created successfully.")
except PyMongoError as _:
logger.debug("vector index already exist")
except PyMongoError as e:
logger.error(f"Error creating vector index {self._index_name}: {e}")
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
logger.debug(f"Inserting {len(data)} to {self.namespace}")
@ -1344,7 +1474,7 @@ class MongoVectorDBStorage(BaseVectorStorage):
pipeline = [
{
"$vectorSearch": {
"index": "vector_knn_index", # Ensure this matches the created index name
"index": self._index_name, # Use stored index name for consistency
"path": "vector",
"queryVector": query_vector,
"numCandidates": 100, # Adjust for performance

View file

@ -41,9 +41,19 @@ class NanoVectorDBStorage(BaseVectorStorage):
)
self.cosine_better_than_threshold = cosine_threshold
self._client_file_name = os.path.join(
self.global_config["working_dir"], f"vdb_{self.namespace}.json"
)
working_dir = self.global_config["working_dir"]
if self.workspace:
# Include workspace in the file path for data isolation
workspace_dir = os.path.join(working_dir, self.workspace)
os.makedirs(workspace_dir, exist_ok=True)
self._client_file_name = os.path.join(
workspace_dir, f"vdb_{self.namespace}.json"
)
else:
# Default behavior when workspace is empty
self._client_file_name = os.path.join(
working_dir, f"vdb_{self.namespace}.json"
)
self._max_batch_size = self.global_config["embedding_batch_num"]
self._client = NanoVectorDB(

View file

@ -50,14 +50,25 @@ logging.getLogger("neo4j").setLevel(logging.ERROR)
@final
@dataclass
class Neo4JStorage(BaseGraphStorage):
def __init__(self, namespace, global_config, embedding_func):
def __init__(self, namespace, global_config, embedding_func, workspace=None):
# Check NEO4J_WORKSPACE environment variable and override workspace if set
neo4j_workspace = os.environ.get("NEO4J_WORKSPACE")
if neo4j_workspace and neo4j_workspace.strip():
workspace = neo4j_workspace
super().__init__(
namespace=namespace,
workspace=workspace or "",
global_config=global_config,
embedding_func=embedding_func,
)
self._driver = None
def _get_workspace_label(self) -> str:
"""Get workspace label, return 'base' for compatibility when workspace is empty"""
workspace = getattr(self, "workspace", None)
return workspace if workspace else "base"
async def initialize(self):
URI = os.environ.get("NEO4J_URI", config.get("neo4j", "uri", fallback=None))
USERNAME = os.environ.get(
@ -153,13 +164,14 @@ class Neo4JStorage(BaseGraphStorage):
raise e
if connected:
# Create index for base nodes on entity_id if it doesn't exist
# Create index for workspace nodes on entity_id if it doesn't exist
workspace_label = self._get_workspace_label()
try:
async with self._driver.session(database=database) as session:
# Check if index exists first
check_query = """
check_query = f"""
CALL db.indexes() YIELD name, labelsOrTypes, properties
WHERE labelsOrTypes = ['base'] AND properties = ['entity_id']
WHERE labelsOrTypes = ['{workspace_label}'] AND properties = ['entity_id']
RETURN count(*) > 0 AS exists
"""
try:
@ -172,16 +184,16 @@ class Neo4JStorage(BaseGraphStorage):
if not index_exists:
# Create index only if it doesn't exist
result = await session.run(
"CREATE INDEX FOR (n:base) ON (n.entity_id)"
f"CREATE INDEX FOR (n:`{workspace_label}`) ON (n.entity_id)"
)
await result.consume()
logger.info(
f"Created index for base nodes on entity_id in {database}"
f"Created index for {workspace_label} nodes on entity_id in {database}"
)
except Exception:
# Fallback if db.indexes() is not supported in this Neo4j version
result = await session.run(
"CREATE INDEX IF NOT EXISTS FOR (n:base) ON (n.entity_id)"
f"CREATE INDEX IF NOT EXISTS FOR (n:`{workspace_label}`) ON (n.entity_id)"
)
await result.consume()
except Exception as e:
@ -216,11 +228,12 @@ class Neo4JStorage(BaseGraphStorage):
ValueError: If node_id is invalid
Exception: If there is an error executing the query
"""
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
try:
query = "MATCH (n:base {entity_id: $entity_id}) RETURN count(n) > 0 AS node_exists"
query = f"MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) RETURN count(n) > 0 AS node_exists"
result = await session.run(query, entity_id=node_id)
single_result = await result.single()
await result.consume() # Ensure result is fully consumed
@ -245,12 +258,13 @@ class Neo4JStorage(BaseGraphStorage):
ValueError: If either node_id is invalid
Exception: If there is an error executing the query
"""
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
try:
query = (
"MATCH (a:base {entity_id: $source_entity_id})-[r]-(b:base {entity_id: $target_entity_id}) "
f"MATCH (a:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(b:`{workspace_label}` {{entity_id: $target_entity_id}}) "
"RETURN COUNT(r) > 0 AS edgeExists"
)
result = await session.run(
@ -282,11 +296,14 @@ class Neo4JStorage(BaseGraphStorage):
ValueError: If node_id is invalid
Exception: If there is an error executing the query
"""
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
try:
query = "MATCH (n:base {entity_id: $entity_id}) RETURN n"
query = (
f"MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) RETURN n"
)
result = await session.run(query, entity_id=node_id)
try:
records = await result.fetch(
@ -300,12 +317,12 @@ class Neo4JStorage(BaseGraphStorage):
if records:
node = records[0]["n"]
node_dict = dict(node)
# Remove base label from labels list if it exists
# Remove workspace label from labels list if it exists
if "labels" in node_dict:
node_dict["labels"] = [
label
for label in node_dict["labels"]
if label != "base"
if label != workspace_label
]
# logger.debug(f"Neo4j query node {query} return: {node_dict}")
return node_dict
@ -326,12 +343,13 @@ class Neo4JStorage(BaseGraphStorage):
Returns:
A dictionary mapping each node_id to its node data (or None if not found).
"""
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
query = """
query = f"""
UNWIND $node_ids AS id
MATCH (n:base {entity_id: id})
MATCH (n:`{workspace_label}` {{entity_id: id}})
RETURN n.entity_id AS entity_id, n
"""
result = await session.run(query, node_ids=node_ids)
@ -340,10 +358,12 @@ class Neo4JStorage(BaseGraphStorage):
entity_id = record["entity_id"]
node = record["n"]
node_dict = dict(node)
# Remove the 'base' label if present in a 'labels' property
# Remove the workspace label if present in a 'labels' property
if "labels" in node_dict:
node_dict["labels"] = [
label for label in node_dict["labels"] if label != "base"
label
for label in node_dict["labels"]
if label != workspace_label
]
nodes[entity_id] = node_dict
await result.consume() # Make sure to consume the result fully
@ -364,12 +384,13 @@ class Neo4JStorage(BaseGraphStorage):
ValueError: If node_id is invalid
Exception: If there is an error executing the query
"""
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
try:
query = """
MATCH (n:base {entity_id: $entity_id})
query = f"""
MATCH (n:`{workspace_label}` {{entity_id: $entity_id}})
OPTIONAL MATCH (n)-[r]-()
RETURN COUNT(r) AS degree
"""
@ -403,13 +424,14 @@ class Neo4JStorage(BaseGraphStorage):
A dictionary mapping each node_id to its degree (number of relationships).
If a node is not found, its degree will be set to 0.
"""
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
query = """
query = f"""
UNWIND $node_ids AS id
MATCH (n:base {entity_id: id})
RETURN n.entity_id AS entity_id, count { (n)--() } AS degree;
MATCH (n:`{workspace_label}` {{entity_id: id}})
RETURN n.entity_id AS entity_id, count {{ (n)--() }} AS degree;
"""
result = await session.run(query, node_ids=node_ids)
degrees = {}
@ -489,12 +511,13 @@ class Neo4JStorage(BaseGraphStorage):
ValueError: If either node_id is invalid
Exception: If there is an error executing the query
"""
workspace_label = self._get_workspace_label()
try:
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
query = """
MATCH (start:base {entity_id: $source_entity_id})-[r]-(end:base {entity_id: $target_entity_id})
query = f"""
MATCH (start:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(end:`{workspace_label}` {{entity_id: $target_entity_id}})
RETURN properties(r) as edge_properties
"""
result = await session.run(
@ -571,12 +594,13 @@ class Neo4JStorage(BaseGraphStorage):
Returns:
A dictionary mapping (src, tgt) tuples to their edge properties.
"""
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
query = """
query = f"""
UNWIND $pairs AS pair
MATCH (start:base {entity_id: pair.src})-[r:DIRECTED]-(end:base {entity_id: pair.tgt})
MATCH (start:`{workspace_label}` {{entity_id: pair.src}})-[r:DIRECTED]-(end:`{workspace_label}` {{entity_id: pair.tgt}})
RETURN pair.src AS src_id, pair.tgt AS tgt_id, collect(properties(r)) AS edges
"""
result = await session.run(query, pairs=pairs)
@ -627,8 +651,9 @@ class Neo4JStorage(BaseGraphStorage):
database=self._DATABASE, default_access_mode="READ"
) as session:
try:
query = """MATCH (n:base {entity_id: $entity_id})
OPTIONAL MATCH (n)-[r]-(connected:base)
workspace_label = self._get_workspace_label()
query = f"""MATCH (n:`{workspace_label}` {{entity_id: $entity_id}})
OPTIONAL MATCH (n)-[r]-(connected:`{workspace_label}`)
WHERE connected.entity_id IS NOT NULL
RETURN n, r, connected"""
results = await session.run(query, entity_id=source_node_id)
@ -689,10 +714,11 @@ class Neo4JStorage(BaseGraphStorage):
database=self._DATABASE, default_access_mode="READ"
) as session:
# Query to get both outgoing and incoming edges
query = """
workspace_label = self._get_workspace_label()
query = f"""
UNWIND $node_ids AS id
MATCH (n:base {entity_id: id})
OPTIONAL MATCH (n)-[r]-(connected:base)
MATCH (n:`{workspace_label}` {{entity_id: id}})
OPTIONAL MATCH (n)-[r]-(connected:`{workspace_label}`)
RETURN id AS queried_id, n.entity_id AS node_entity_id,
connected.entity_id AS connected_entity_id,
startNode(r).entity_id AS start_entity_id
@ -727,12 +753,13 @@ class Neo4JStorage(BaseGraphStorage):
return edges_dict
async def get_nodes_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
query = """
query = f"""
UNWIND $chunk_ids AS chunk_id
MATCH (n:base)
MATCH (n:`{workspace_label}`)
WHERE n.source_id IS NOT NULL AND chunk_id IN split(n.source_id, $sep)
RETURN DISTINCT n
"""
@ -748,12 +775,13 @@ class Neo4JStorage(BaseGraphStorage):
return nodes
async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
query = """
query = f"""
UNWIND $chunk_ids AS chunk_id
MATCH (a:base)-[r]-(b:base)
MATCH (a:`{workspace_label}`)-[r]-(b:`{workspace_label}`)
WHERE r.source_id IS NOT NULL AND chunk_id IN split(r.source_id, $sep)
RETURN DISTINCT a.entity_id AS source, b.entity_id AS target, properties(r) AS properties
"""
@ -787,6 +815,7 @@ class Neo4JStorage(BaseGraphStorage):
node_id: The unique identifier for the node (used as label)
node_data: Dictionary of node properties
"""
workspace_label = self._get_workspace_label()
properties = node_data
entity_type = properties["entity_type"]
if "entity_id" not in properties:
@ -796,14 +825,11 @@ class Neo4JStorage(BaseGraphStorage):
async with self._driver.session(database=self._DATABASE) as session:
async def execute_upsert(tx: AsyncManagedTransaction):
query = (
"""
MERGE (n:base {entity_id: $entity_id})
query = f"""
MERGE (n:`{workspace_label}` {{entity_id: $entity_id}})
SET n += $properties
SET n:`%s`
SET n:`{entity_type}`
"""
% entity_type
)
result = await tx.run(
query, entity_id=node_id, properties=properties
)
@ -847,10 +873,11 @@ class Neo4JStorage(BaseGraphStorage):
async with self._driver.session(database=self._DATABASE) as session:
async def execute_upsert(tx: AsyncManagedTransaction):
query = """
MATCH (source:base {entity_id: $source_entity_id})
workspace_label = self._get_workspace_label()
query = f"""
MATCH (source:`{workspace_label}` {{entity_id: $source_entity_id}})
WITH source
MATCH (target:base {entity_id: $target_entity_id})
MATCH (target:`{workspace_label}` {{entity_id: $target_entity_id}})
MERGE (source)-[r:DIRECTED]-(target)
SET r += $properties
RETURN r, source, target
@ -889,6 +916,7 @@ class Neo4JStorage(BaseGraphStorage):
KnowledgeGraph object containing nodes and edges, with an is_truncated flag
indicating whether the graph was truncated due to max_nodes limit
"""
workspace_label = self._get_workspace_label()
result = KnowledgeGraph()
seen_nodes = set()
seen_edges = set()
@ -899,7 +927,9 @@ class Neo4JStorage(BaseGraphStorage):
try:
if node_label == "*":
# First check total node count to determine if graph is truncated
count_query = "MATCH (n) RETURN count(n) as total"
count_query = (
f"MATCH (n:`{workspace_label}`) RETURN count(n) as total"
)
count_result = None
try:
count_result = await session.run(count_query)
@ -915,13 +945,13 @@ class Neo4JStorage(BaseGraphStorage):
await count_result.consume()
# Run main query to get nodes with highest degree
main_query = """
MATCH (n)
main_query = f"""
MATCH (n:`{workspace_label}`)
OPTIONAL MATCH (n)-[r]-()
WITH n, COALESCE(count(r), 0) AS degree
ORDER BY degree DESC
LIMIT $max_nodes
WITH collect({node: n}) AS filtered_nodes
WITH collect({{node: n}}) AS filtered_nodes
UNWIND filtered_nodes AS node_info
WITH collect(node_info.node) AS kept_nodes, filtered_nodes
OPTIONAL MATCH (a)-[r]-(b)
@ -943,20 +973,21 @@ class Neo4JStorage(BaseGraphStorage):
else:
# return await self._robust_fallback(node_label, max_depth, max_nodes)
# First try without limit to check if we need to truncate
full_query = """
MATCH (start)
full_query = f"""
MATCH (start:`{workspace_label}`)
WHERE start.entity_id = $entity_id
WITH start
CALL apoc.path.subgraphAll(start, {
CALL apoc.path.subgraphAll(start, {{
relationshipFilter: '',
labelFilter: '{workspace_label}',
minLevel: 0,
maxLevel: $max_depth,
bfs: true
})
}})
YIELD nodes, relationships
WITH nodes, relationships, size(nodes) AS total_nodes
UNWIND nodes AS node
WITH collect({node: node}) AS node_info, relationships, total_nodes
WITH collect({{node: node}}) AS node_info, relationships, total_nodes
RETURN node_info, relationships, total_nodes
"""
@ -994,20 +1025,21 @@ class Neo4JStorage(BaseGraphStorage):
)
# Run limited query
limited_query = """
MATCH (start)
limited_query = f"""
MATCH (start:`{workspace_label}`)
WHERE start.entity_id = $entity_id
WITH start
CALL apoc.path.subgraphAll(start, {
CALL apoc.path.subgraphAll(start, {{
relationshipFilter: '',
labelFilter: '{workspace_label}',
minLevel: 0,
maxLevel: $max_depth,
limit: $max_nodes,
bfs: true
})
}})
YIELD nodes, relationships
UNWIND nodes AS node
WITH collect({node: node}) AS node_info, relationships
WITH collect({{node: node}}) AS node_info, relationships
RETURN node_info, relationships
"""
result_set = None
@ -1094,11 +1126,12 @@ class Neo4JStorage(BaseGraphStorage):
visited_edge_pairs = set()
# Get the starting node's data
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
query = """
MATCH (n:base {entity_id: $entity_id})
query = f"""
MATCH (n:`{workspace_label}` {{entity_id: $entity_id}})
RETURN id(n) as node_id, n
"""
node_result = await session.run(query, entity_id=node_label)
@ -1156,8 +1189,9 @@ class Neo4JStorage(BaseGraphStorage):
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
query = """
MATCH (a:base {entity_id: $entity_id})-[r]-(b)
workspace_label = self._get_workspace_label()
query = f"""
MATCH (a:`{workspace_label}` {{entity_id: $entity_id}})-[r]-(b)
WITH r, b, id(r) as edge_id, id(b) as target_id
RETURN r, b, edge_id, target_id
"""
@ -1241,6 +1275,7 @@ class Neo4JStorage(BaseGraphStorage):
Returns:
["Person", "Company", ...] # Alphabetically sorted label list
"""
workspace_label = self._get_workspace_label()
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
@ -1248,8 +1283,8 @@ class Neo4JStorage(BaseGraphStorage):
# query = "CALL db.labels() YIELD label RETURN label"
# Method 2: Query compatible with older versions
query = """
MATCH (n:base)
query = f"""
MATCH (n:`{workspace_label}`)
WHERE n.entity_id IS NOT NULL
RETURN DISTINCT n.entity_id AS label
ORDER BY label
@ -1285,8 +1320,9 @@ class Neo4JStorage(BaseGraphStorage):
"""
async def _do_delete(tx: AsyncManagedTransaction):
query = """
MATCH (n:base {entity_id: $entity_id})
workspace_label = self._get_workspace_label()
query = f"""
MATCH (n:`{workspace_label}` {{entity_id: $entity_id}})
DETACH DELETE n
"""
result = await tx.run(query, entity_id=node_id)
@ -1342,8 +1378,9 @@ class Neo4JStorage(BaseGraphStorage):
for source, target in edges:
async def _do_delete_edge(tx: AsyncManagedTransaction):
query = """
MATCH (source:base {entity_id: $source_entity_id})-[r]-(target:base {entity_id: $target_entity_id})
workspace_label = self._get_workspace_label()
query = f"""
MATCH (source:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(target:`{workspace_label}` {{entity_id: $target_entity_id}})
DELETE r
"""
result = await tx.run(
@ -1360,26 +1397,32 @@ class Neo4JStorage(BaseGraphStorage):
raise
async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources
"""Drop all data from current workspace storage and clean up resources
This method will delete all nodes and relationships in the Neo4j database.
This method will delete all nodes and relationships in the current workspace only.
Returns:
dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"}
- On success: {"status": "success", "message": "workspace data dropped"}
- On failure: {"status": "error", "message": "<error details>"}
"""
workspace_label = self._get_workspace_label()
try:
async with self._driver.session(database=self._DATABASE) as session:
# Delete all nodes and relationships
query = "MATCH (n) DETACH DELETE n"
# Delete all nodes and relationships in current workspace only
query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n"
result = await session.run(query)
await result.consume() # Ensure result is fully consumed
logger.info(
f"Process {os.getpid()} drop Neo4j database {self._DATABASE}"
f"Process {os.getpid()} drop Neo4j workspace '{workspace_label}' in database {self._DATABASE}"
)
return {"status": "success", "message": "data dropped"}
return {
"status": "success",
"message": f"workspace '{workspace_label}' data dropped",
}
except Exception as e:
logger.error(f"Error dropping Neo4j database {self._DATABASE}: {e}")
logger.error(
f"Error dropping Neo4j workspace '{workspace_label}' in database {self._DATABASE}: {e}"
)
return {"status": "error", "message": str(e)}

View file

@ -46,9 +46,19 @@ class NetworkXStorage(BaseGraphStorage):
nx.write_graphml(graph, file_name)
def __post_init__(self):
self._graphml_xml_file = os.path.join(
self.global_config["working_dir"], f"graph_{self.namespace}.graphml"
)
working_dir = self.global_config["working_dir"]
if self.workspace:
# Include workspace in the file path for data isolation
workspace_dir = os.path.join(working_dir, self.workspace)
os.makedirs(workspace_dir, exist_ok=True)
self._graphml_xml_file = os.path.join(
workspace_dir, f"graph_{self.namespace}.graphml"
)
else:
# Default behavior when workspace is empty
self._graphml_xml_file = os.path.join(
working_dir, f"graph_{self.namespace}.graphml"
)
self._storage_lock = None
self.storage_updated = None
self._graph = None

View file

@ -1,6 +1,7 @@
import asyncio
import json
import os
import re
import datetime
from datetime import timezone
from dataclasses import dataclass, field
@ -319,7 +320,7 @@ class PostgreSQLDB:
# Get all old format data
old_data_sql = """
SELECT id, mode, original_prompt, return_value, chunk_id,
create_time, update_time
workspace, create_time, update_time
FROM LIGHTRAG_LLM_CACHE
WHERE id NOT LIKE '%:%'
"""
@ -364,7 +365,9 @@ class PostgreSQLDB:
await self.execute(
insert_sql,
{
"workspace": self.workspace,
"workspace": record[
"workspace"
], # Use original record's workspace
"id": new_key,
"mode": record["mode"],
"original_prompt": record["original_prompt"],
@ -384,7 +387,9 @@ class PostgreSQLDB:
await self.execute(
delete_sql,
{
"workspace": self.workspace,
"workspace": record[
"workspace"
], # Use original record's workspace
"mode": record["mode"],
"id": record["id"], # Old id
},
@ -505,6 +510,29 @@ class PostgreSQLDB:
f"PostgreSQL, Failed to create index on table {k}, Got: {e}"
)
# Create composite index for (workspace, id) columns in each table
try:
composite_index_name = f"idx_{k.lower()}_workspace_id"
check_composite_index_sql = f"""
SELECT 1 FROM pg_indexes
WHERE indexname = '{composite_index_name}'
AND tablename = '{k.lower()}'
"""
composite_index_exists = await self.query(check_composite_index_sql)
if not composite_index_exists:
create_composite_index_sql = (
f"CREATE INDEX {composite_index_name} ON {k}(workspace, id)"
)
logger.info(
f"PostgreSQL, Creating composite index {composite_index_name} on table {k}"
)
await self.execute(create_composite_index_sql)
except Exception as e:
logger.error(
f"PostgreSQL, Failed to create composite index on table {k}, Got: {e}"
)
# After all tables are created, attempt to migrate timestamp fields
try:
await self._migrate_timestamp_columns()
@ -670,7 +698,7 @@ class ClientManager:
),
"workspace": os.environ.get(
"POSTGRES_WORKSPACE",
config.get("postgres", "workspace", fallback="default"),
config.get("postgres", "workspace", fallback=None),
),
"max_connections": os.environ.get(
"POSTGRES_MAX_CONNECTIONS",
@ -716,6 +744,18 @@ class PGKVStorage(BaseKVStorage):
async def initialize(self):
if self.db is None:
self.db = await ClientManager.get_client()
# Implement workspace priority: PostgreSQLDB.workspace > self.workspace > "default"
if self.db.workspace:
# Use PostgreSQLDB's workspace (highest priority)
final_workspace = self.db.workspace
elif hasattr(self, "workspace") and self.workspace:
# Use storage class's workspace (medium priority)
final_workspace = self.workspace
self.db.workspace = final_workspace
else:
# Use "default" for compatibility (lowest priority)
final_workspace = "default"
self.db.workspace = final_workspace
async def finalize(self):
if self.db is not None:
@ -1047,6 +1087,18 @@ class PGVectorStorage(BaseVectorStorage):
async def initialize(self):
if self.db is None:
self.db = await ClientManager.get_client()
# Implement workspace priority: PostgreSQLDB.workspace > self.workspace > "default"
if self.db.workspace:
# Use PostgreSQLDB's workspace (highest priority)
final_workspace = self.db.workspace
elif hasattr(self, "workspace") and self.workspace:
# Use storage class's workspace (medium priority)
final_workspace = self.workspace
self.db.workspace = final_workspace
else:
# Use "default" for compatibility (lowest priority)
final_workspace = "default"
self.db.workspace = final_workspace
async def finalize(self):
if self.db is not None:
@ -1328,6 +1380,18 @@ class PGDocStatusStorage(DocStatusStorage):
async def initialize(self):
if self.db is None:
self.db = await ClientManager.get_client()
# Implement workspace priority: PostgreSQLDB.workspace > self.workspace > "default"
if self.db.workspace:
# Use PostgreSQLDB's workspace (highest priority)
final_workspace = self.db.workspace
elif hasattr(self, "workspace") and self.workspace:
# Use storage class's workspace (medium priority)
final_workspace = self.workspace
self.db.workspace = final_workspace
else:
# Use "default" for compatibility (lowest priority)
final_workspace = "default"
self.db.workspace = final_workspace
async def finalize(self):
if self.db is not None:
@ -1606,9 +1670,34 @@ class PGGraphQueryException(Exception):
@dataclass
class PGGraphStorage(BaseGraphStorage):
def __post_init__(self):
self.graph_name = self.namespace or os.environ.get("AGE_GRAPH_NAME", "lightrag")
# Graph name will be dynamically generated in initialize() based on workspace
self.db: PostgreSQLDB | None = None
def _get_workspace_graph_name(self) -> str:
"""
Generate graph name based on workspace and namespace for data isolation.
Rules:
- If workspace is empty: graph_name = namespace
- If workspace has value: graph_name = workspace_namespace
Args:
None
Returns:
str: The graph name for the current workspace
"""
workspace = getattr(self, "workspace", None)
namespace = self.namespace or os.environ.get("AGE_GRAPH_NAME", "lightrag")
if workspace and workspace.strip():
# Ensure names comply with PostgreSQL identifier specifications
safe_workspace = re.sub(r"[^a-zA-Z0-9_]", "_", workspace.strip())
safe_namespace = re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
return f"{safe_workspace}_{safe_namespace}"
else:
# When workspace is empty, use namespace directly
return re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
@staticmethod
def _normalize_node_id(node_id: str) -> str:
"""
@ -1629,6 +1718,27 @@ class PGGraphStorage(BaseGraphStorage):
async def initialize(self):
if self.db is None:
self.db = await ClientManager.get_client()
# Implement workspace priority: PostgreSQLDB.workspace > self.workspace > None
if self.db.workspace:
# Use PostgreSQLDB's workspace (highest priority)
final_workspace = self.db.workspace
elif hasattr(self, "workspace") and self.workspace:
# Use storage class's workspace (medium priority)
final_workspace = self.workspace
self.db.workspace = final_workspace
else:
# Use None for compatibility (lowest priority)
final_workspace = None
self.db.workspace = final_workspace
# Dynamically generate graph name based on workspace
self.workspace = self.db.workspace
self.graph_name = self._get_workspace_graph_name()
# Log the graph initialization for debugging
logger.info(
f"PostgreSQL Graph initialized: workspace='{self.workspace}', graph_name='{self.graph_name}'"
)
# Execute each statement separately and ignore errors
queries = [
@ -2833,7 +2943,10 @@ class PGGraphStorage(BaseGraphStorage):
$$) AS (result agtype)"""
await self._query(drop_query, readonly=False)
return {"status": "success", "message": "graph data dropped"}
return {
"status": "success",
"message": f"workspace '{self.workspace}' graph data dropped",
}
except Exception as e:
logger.error(f"Error dropping graph: {e}")
return {"status": "error", "message": str(e)}

View file

@ -50,6 +50,18 @@ def compute_mdhash_id_for_qdrant(
@final
@dataclass
class QdrantVectorDBStorage(BaseVectorStorage):
def __init__(
self, namespace, global_config, embedding_func, workspace=None, meta_fields=None
):
super().__init__(
namespace=namespace,
workspace=workspace or "",
global_config=global_config,
embedding_func=embedding_func,
meta_fields=meta_fields or set(),
)
self.__post_init__()
@staticmethod
def create_collection_if_not_exist(
client: QdrantClient, collection_name: str, **kwargs
@ -59,6 +71,29 @@ class QdrantVectorDBStorage(BaseVectorStorage):
client.create_collection(collection_name, **kwargs)
def __post_init__(self):
# Check for QDRANT_WORKSPACE environment variable first (higher priority)
# This allows administrators to force a specific workspace for all Qdrant storage instances
qdrant_workspace = os.environ.get("QDRANT_WORKSPACE")
if qdrant_workspace and qdrant_workspace.strip():
# Use environment variable value, overriding the passed workspace parameter
effective_workspace = qdrant_workspace.strip()
logger.info(
f"Using QDRANT_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
)
else:
# Use the workspace parameter passed during initialization
effective_workspace = self.workspace
if effective_workspace:
logger.debug(
f"Using passed workspace parameter: '{effective_workspace}'"
)
# Build namespace with workspace prefix for data isolation
if effective_workspace:
self.namespace = f"{effective_workspace}_{self.namespace}"
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
# When workspace is empty, keep the original namespace unchanged
kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
cosine_threshold = kwargs.get("cosine_better_than_threshold")
if cosine_threshold is None:

View file

@ -71,6 +71,29 @@ class RedisConnectionManager:
@dataclass
class RedisKVStorage(BaseKVStorage):
def __post_init__(self):
# Check for REDIS_WORKSPACE environment variable first (higher priority)
# This allows administrators to force a specific workspace for all Redis storage instances
redis_workspace = os.environ.get("REDIS_WORKSPACE")
if redis_workspace and redis_workspace.strip():
# Use environment variable value, overriding the passed workspace parameter
effective_workspace = redis_workspace.strip()
logger.info(
f"Using REDIS_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
)
else:
# Use the workspace parameter passed during initialization
effective_workspace = self.workspace
if effective_workspace:
logger.debug(
f"Using passed workspace parameter: '{effective_workspace}'"
)
# Build namespace with workspace prefix for data isolation
if effective_workspace:
self.namespace = f"{effective_workspace}_{self.namespace}"
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
# When workspace is empty, keep the original namespace unchanged
redis_url = os.environ.get(
"REDIS_URI", config.get("redis", "uri", fallback="redis://localhost:6379")
)
@ -461,6 +484,29 @@ class RedisDocStatusStorage(DocStatusStorage):
"""Redis implementation of document status storage"""
def __post_init__(self):
# Check for REDIS_WORKSPACE environment variable first (higher priority)
# This allows administrators to force a specific workspace for all Redis storage instances
redis_workspace = os.environ.get("REDIS_WORKSPACE")
if redis_workspace and redis_workspace.strip():
# Use environment variable value, overriding the passed workspace parameter
effective_workspace = redis_workspace.strip()
logger.info(
f"Using REDIS_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')"
)
else:
# Use the workspace parameter passed during initialization
effective_workspace = self.workspace
if effective_workspace:
logger.debug(
f"Using passed workspace parameter: '{effective_workspace}'"
)
# Build namespace with workspace prefix for data isolation
if effective_workspace:
self.namespace = f"{effective_workspace}_{self.namespace}"
logger.debug(f"Final namespace with workspace prefix: '{self.namespace}'")
# When workspace is empty, keep the original namespace unchanged
redis_url = os.environ.get(
"REDIS_URI", config.get("redis", "uri", fallback="redis://localhost:6379")
)

View file

@ -51,7 +51,7 @@ from .base import (
StoragesStatus,
DeletionResult,
)
from .namespace import NameSpace, make_namespace
from .namespace import NameSpace
from .operate import (
chunking_by_token_size,
extract_entities,
@ -97,9 +97,7 @@ class LightRAG:
# Directory
# ---
working_dir: str = field(
default=f"./lightrag_cache_{datetime.now().strftime('%Y-%m-%d-%H:%M:%S')}"
)
working_dir: str = field(default="./rag_storage")
"""Directory where cache and temporary files are stored."""
# Storage
@ -117,6 +115,12 @@ class LightRAG:
doc_status_storage: str = field(default="JsonDocStatusStorage")
"""Storage type for tracking document processing statuses."""
# Workspace
# ---
workspace: str = field(default_factory=lambda: os.getenv("WORKSPACE", ""))
"""Workspace for data isolation. Defaults to empty string if WORKSPACE environment variable is not set."""
# Logging (Deprecated, use setup_logger in utils.py instead)
# ---
log_level: int | None = field(default=None)
@ -242,10 +246,6 @@ class LightRAG:
vector_db_storage_cls_kwargs: dict[str, Any] = field(default_factory=dict)
"""Additional parameters for vector database storage."""
# TODOdeprecated, remove in the future, use WORKSPACE instead
namespace_prefix: str = field(default="")
"""Prefix for namespacing stored data across different environments."""
enable_llm_cache: bool = field(default=True)
"""Enables caching for LLM responses to avoid redundant computations."""
@ -382,61 +382,53 @@ class LightRAG:
self.doc_status_storage_cls = self._get_storage_class(self.doc_status_storage)
self.llm_response_cache: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
namespace=make_namespace(
self.namespace_prefix, NameSpace.KV_STORE_LLM_RESPONSE_CACHE
),
global_config=asdict(
self
), # Add global_config to ensure cache works properly
namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE,
workspace=self.workspace,
global_config=global_config,
embedding_func=self.embedding_func,
)
self.full_docs: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
namespace=make_namespace(
self.namespace_prefix, NameSpace.KV_STORE_FULL_DOCS
),
namespace=NameSpace.KV_STORE_FULL_DOCS,
workspace=self.workspace,
embedding_func=self.embedding_func,
)
self.text_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
namespace=make_namespace(
self.namespace_prefix, NameSpace.KV_STORE_TEXT_CHUNKS
),
namespace=NameSpace.KV_STORE_TEXT_CHUNKS,
workspace=self.workspace,
embedding_func=self.embedding_func,
)
self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore
namespace=make_namespace(
self.namespace_prefix, NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION
),
namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION,
workspace=self.workspace,
embedding_func=self.embedding_func,
)
self.entities_vdb: BaseVectorStorage = self.vector_db_storage_cls( # type: ignore
namespace=make_namespace(
self.namespace_prefix, NameSpace.VECTOR_STORE_ENTITIES
),
namespace=NameSpace.VECTOR_STORE_ENTITIES,
workspace=self.workspace,
embedding_func=self.embedding_func,
meta_fields={"entity_name", "source_id", "content", "file_path"},
)
self.relationships_vdb: BaseVectorStorage = self.vector_db_storage_cls( # type: ignore
namespace=make_namespace(
self.namespace_prefix, NameSpace.VECTOR_STORE_RELATIONSHIPS
),
namespace=NameSpace.VECTOR_STORE_RELATIONSHIPS,
workspace=self.workspace,
embedding_func=self.embedding_func,
meta_fields={"src_id", "tgt_id", "source_id", "content", "file_path"},
)
self.chunks_vdb: BaseVectorStorage = self.vector_db_storage_cls( # type: ignore
namespace=make_namespace(
self.namespace_prefix, NameSpace.VECTOR_STORE_CHUNKS
),
namespace=NameSpace.VECTOR_STORE_CHUNKS,
workspace=self.workspace,
embedding_func=self.embedding_func,
meta_fields={"full_doc_id", "content", "file_path"},
)
# Initialize document status storage
self.doc_status: DocStatusStorage = self.doc_status_storage_cls(
namespace=make_namespace(self.namespace_prefix, NameSpace.DOC_STATUS),
namespace=NameSpace.DOC_STATUS,
workspace=self.workspace,
global_config=global_config,
embedding_func=None,
)

View file

@ -17,10 +17,6 @@ class NameSpace:
DOC_STATUS = "doc_status"
def make_namespace(prefix: str, base_namespace: str):
return prefix + base_namespace
def is_namespace(namespace: str, base_namespace: str | Iterable[str]):
if isinstance(base_namespace, str):
return namespace.endswith(base_namespace)

View file

@ -40,6 +40,8 @@ export type LightragStatus = {
doc_status_storage: string
graph_storage: string
vector_storage: string
workspace?: string
max_graph_nodes?: string
}
update_status?: Record<string, any>
core_version?: string

View file

@ -56,6 +56,10 @@ const StatusCard = ({ status }: { status: LightragStatus | null }) => {
<span>{status.configuration.graph_storage}</span>
<span>{t('graphPanel.statusCard.vectorStorage')}:</span>
<span>{status.configuration.vector_storage}</span>
<span>{t('graphPanel.statusCard.workspace')}:</span>
<span>{status.configuration.workspace || '-'}</span>
<span>{t('graphPanel.statusCard.maxGraphNodes')}:</span>
<span>{status.configuration.max_graph_nodes || '-'}</span>
</div>
</div>
</div>

View file

@ -263,7 +263,9 @@
"kvStorage": "تخزين المفتاح-القيمة",
"docStatusStorage": "تخزين حالة المستند",
"graphStorage": "تخزين الرسم البياني",
"vectorStorage": "تخزين المتجهات"
"vectorStorage": "تخزين المتجهات",
"workspace": "مساحة العمل",
"maxGraphNodes": "الحد الأقصى لعقد الرسم البياني"
},
"propertiesView": {
"editProperty": "تعديل {{property}}",

View file

@ -263,7 +263,9 @@
"kvStorage": "KV Storage",
"docStatusStorage": "Doc Status Storage",
"graphStorage": "Graph Storage",
"vectorStorage": "Vector Storage"
"vectorStorage": "Vector Storage",
"workspace": "Workspace",
"maxGraphNodes": "Max Graph Nodes"
},
"propertiesView": {
"editProperty": "Edit {{property}}",

View file

@ -263,7 +263,9 @@
"kvStorage": "Stockage clé-valeur",
"docStatusStorage": "Stockage de l'état des documents",
"graphStorage": "Stockage du graphe",
"vectorStorage": "Stockage vectoriel"
"vectorStorage": "Stockage vectoriel",
"workspace": "Espace de travail",
"maxGraphNodes": "Nombre maximum de nœuds du graphe"
},
"propertiesView": {
"editProperty": "Modifier {{property}}",

View file

@ -263,7 +263,9 @@
"kvStorage": "KV存储",
"docStatusStorage": "文档状态存储",
"graphStorage": "图存储",
"vectorStorage": "向量存储"
"vectorStorage": "向量存储",
"workspace": "工作空间",
"maxGraphNodes": "最大图节点数"
},
"propertiesView": {
"editProperty": "编辑{{property}}",

View file

@ -263,7 +263,9 @@
"kvStorage": "KV 儲存",
"docStatusStorage": "文件狀態儲存",
"graphStorage": "圖形儲存",
"vectorStorage": "向量儲存"
"vectorStorage": "向量儲存",
"workspace": "工作空間",
"maxGraphNodes": "最大圖形節點數"
},
"propertiesView": {
"editProperty": "編輯{{property}}",

View file

@ -28,9 +28,10 @@ def run_queries_and_save_to_json(
):
loop = always_get_an_event_loop()
with open(output_file, "a", encoding="utf-8") as result_file, open(
error_file, "a", encoding="utf-8"
) as err_file:
with (
open(output_file, "a", encoding="utf-8") as result_file,
open(error_file, "a", encoding="utf-8") as err_file,
):
result_file.write("[\n")
first_entry = True

View file

@ -59,9 +59,10 @@ def run_queries_and_save_to_json(
):
loop = always_get_an_event_loop()
with open(output_file, "a", encoding="utf-8") as result_file, open(
error_file, "a", encoding="utf-8"
) as err_file:
with (
open(output_file, "a", encoding="utf-8") as result_file,
open(error_file, "a", encoding="utf-8") as err_file,
):
result_file.write("[\n")
first_entry = True