Fix default workspace name for PostgreSQL AGE graph storage
This commit is contained in:
parent
bab2803953
commit
80f7e37168
3 changed files with 64 additions and 45 deletions
|
|
@ -882,7 +882,7 @@ rag = LightRAG(
|
||||||
|
|
||||||
* **对于Neo4j图数据库,通过label来实现数据的逻辑隔离**:Neo4JStorage
|
* **对于Neo4j图数据库,通过label来实现数据的逻辑隔离**:Neo4JStorage
|
||||||
|
|
||||||
为了保持对遗留数据的兼容,在未配置工作空间时PostgreSQL的默认工作空间为`default`,Neo4j的默认工作空间为`base`。对于所有的外部存储,系统都提供了专用的工作空间环境变量,用于覆盖公共的 `WORKSPACE`环境变量配置。这些适用于指定存储类型的工作空间环境变量为:`REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`。
|
为了保持对遗留数据的兼容,在未配置工作空间时PostgreSQL非图存储的工作空间为`default`,PostgreSQL AGE图存储的工作空间为空,Neo4j图存储的默认工作空间为`base`。对于所有的外部存储,系统都提供了专用的工作空间环境变量,用于覆盖公共的 `WORKSPACE`环境变量配置。这些适用于指定存储类型的工作空间环境变量为:`REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`。
|
||||||
|
|
||||||
## 编辑实体和关系
|
## 编辑实体和关系
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -928,7 +928,7 @@ The `workspace` parameter ensures data isolation between different LightRAG inst
|
||||||
- **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`.
|
- **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`.
|
||||||
- **For the Neo4j graph database, logical data isolation is achieved through labels:** `Neo4JStorage`
|
- **For the Neo4j graph database, logical data isolation is achieved through labels:** `Neo4JStorage`
|
||||||
|
|
||||||
To maintain compatibility with legacy data, the default workspace for PostgreSQL is `default` and for Neo4j is `base` when no workspace is configured. For all external storages, the system provides dedicated workspace environment variables to override the common `WORKSPACE` environment variable configuration. These storage-specific workspace environment variables are: `REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`.
|
To maintain compatibility with legacy data, the default workspace for PostgreSQL non-graph storage is `default` and, for PostgreSQL AGE graph storage is null, for Neo4j graph storage is `base` when no workspace is configured. For all external storages, the system provides dedicated workspace environment variables to override the common `WORKSPACE` environment variable configuration. These storage-specific workspace environment variables are: `REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`.
|
||||||
|
|
||||||
## Edit Entities and Relations
|
## Edit Entities and Relations
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -115,34 +115,46 @@ class PostgreSQLDB:
|
||||||
WHERE table_name = 'lightrag_llm_cache'
|
WHERE table_name = 'lightrag_llm_cache'
|
||||||
AND column_name IN ('chunk_id', 'cache_type')
|
AND column_name IN ('chunk_id', 'cache_type')
|
||||||
"""
|
"""
|
||||||
|
|
||||||
existing_columns = await self.query(check_columns_sql, multirows=True)
|
existing_columns = await self.query(check_columns_sql, multirows=True)
|
||||||
existing_column_names = {col['column_name'] for col in existing_columns} if existing_columns else set()
|
existing_column_names = (
|
||||||
|
{col["column_name"] for col in existing_columns}
|
||||||
|
if existing_columns
|
||||||
|
else set()
|
||||||
|
)
|
||||||
|
|
||||||
# Add missing chunk_id column
|
# Add missing chunk_id column
|
||||||
if 'chunk_id' not in existing_column_names:
|
if "chunk_id" not in existing_column_names:
|
||||||
logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table")
|
logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table")
|
||||||
add_chunk_id_sql = """
|
add_chunk_id_sql = """
|
||||||
ALTER TABLE LIGHTRAG_LLM_CACHE
|
ALTER TABLE LIGHTRAG_LLM_CACHE
|
||||||
ADD COLUMN chunk_id VARCHAR(255) NULL
|
ADD COLUMN chunk_id VARCHAR(255) NULL
|
||||||
"""
|
"""
|
||||||
await self.execute(add_chunk_id_sql)
|
await self.execute(add_chunk_id_sql)
|
||||||
logger.info("Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table")
|
logger.info(
|
||||||
|
"Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.info("chunk_id column already exists in LIGHTRAG_LLM_CACHE table")
|
logger.info(
|
||||||
|
"chunk_id column already exists in LIGHTRAG_LLM_CACHE table"
|
||||||
|
)
|
||||||
|
|
||||||
# Add missing cache_type column
|
# Add missing cache_type column
|
||||||
if 'cache_type' not in existing_column_names:
|
if "cache_type" not in existing_column_names:
|
||||||
logger.info("Adding cache_type column to LIGHTRAG_LLM_CACHE table")
|
logger.info("Adding cache_type column to LIGHTRAG_LLM_CACHE table")
|
||||||
add_cache_type_sql = """
|
add_cache_type_sql = """
|
||||||
ALTER TABLE LIGHTRAG_LLM_CACHE
|
ALTER TABLE LIGHTRAG_LLM_CACHE
|
||||||
ADD COLUMN cache_type VARCHAR(32) NULL
|
ADD COLUMN cache_type VARCHAR(32) NULL
|
||||||
"""
|
"""
|
||||||
await self.execute(add_cache_type_sql)
|
await self.execute(add_cache_type_sql)
|
||||||
logger.info("Successfully added cache_type column to LIGHTRAG_LLM_CACHE table")
|
logger.info(
|
||||||
|
"Successfully added cache_type column to LIGHTRAG_LLM_CACHE table"
|
||||||
|
)
|
||||||
|
|
||||||
# Migrate existing data using optimized regex pattern
|
# Migrate existing data using optimized regex pattern
|
||||||
logger.info("Migrating existing LLM cache data to populate cache_type field (optimized)")
|
logger.info(
|
||||||
|
"Migrating existing LLM cache data to populate cache_type field (optimized)"
|
||||||
|
)
|
||||||
optimized_update_sql = """
|
optimized_update_sql = """
|
||||||
UPDATE LIGHTRAG_LLM_CACHE
|
UPDATE LIGHTRAG_LLM_CACHE
|
||||||
SET cache_type = CASE
|
SET cache_type = CASE
|
||||||
|
|
@ -154,8 +166,10 @@ class PostgreSQLDB:
|
||||||
await self.execute(optimized_update_sql)
|
await self.execute(optimized_update_sql)
|
||||||
logger.info("Successfully migrated existing LLM cache data")
|
logger.info("Successfully migrated existing LLM cache data")
|
||||||
else:
|
else:
|
||||||
logger.info("cache_type column already exists in LIGHTRAG_LLM_CACHE table")
|
logger.info(
|
||||||
|
"cache_type column already exists in LIGHTRAG_LLM_CACHE table"
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to add columns to LIGHTRAG_LLM_CACHE: {e}")
|
logger.warning(f"Failed to add columns to LIGHTRAG_LLM_CACHE: {e}")
|
||||||
|
|
||||||
|
|
@ -288,80 +302,85 @@ class PostgreSQLDB:
|
||||||
logger.warning(f"Failed to check LLM cache migration status: {e}")
|
logger.warning(f"Failed to check LLM cache migration status: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
async def _migrate_llm_cache_to_flattened_keys(self):
|
async def _migrate_llm_cache_to_flattened_keys(self):
|
||||||
"""Optimized version: directly execute single UPDATE migration to migrate old format cache keys to flattened format"""
|
"""Optimized version: directly execute single UPDATE migration to migrate old format cache keys to flattened format"""
|
||||||
try:
|
try:
|
||||||
# Check if migration is needed
|
# Check if migration is needed
|
||||||
check_sql = """
|
check_sql = """
|
||||||
SELECT COUNT(*) as count FROM LIGHTRAG_LLM_CACHE
|
SELECT COUNT(*) as count FROM LIGHTRAG_LLM_CACHE
|
||||||
WHERE id NOT LIKE '%:%'
|
WHERE id NOT LIKE '%:%'
|
||||||
"""
|
"""
|
||||||
result = await self.query(check_sql)
|
result = await self.query(check_sql)
|
||||||
|
|
||||||
if not result or result["count"] == 0:
|
if not result or result["count"] == 0:
|
||||||
logger.info("No old format LLM cache data found, skipping migration")
|
logger.info("No old format LLM cache data found, skipping migration")
|
||||||
return
|
return
|
||||||
|
|
||||||
old_count = result["count"]
|
old_count = result["count"]
|
||||||
logger.info(f"Found {old_count} old format cache records")
|
logger.info(f"Found {old_count} old format cache records")
|
||||||
|
|
||||||
# Check potential primary key conflicts (optional but recommended)
|
# Check potential primary key conflicts (optional but recommended)
|
||||||
conflict_check_sql = """
|
conflict_check_sql = """
|
||||||
WITH new_ids AS (
|
WITH new_ids AS (
|
||||||
SELECT
|
SELECT
|
||||||
workspace,
|
workspace,
|
||||||
mode,
|
mode,
|
||||||
id as old_id,
|
id as old_id,
|
||||||
mode || ':' ||
|
mode || ':' ||
|
||||||
CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END || ':' ||
|
CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END || ':' ||
|
||||||
md5(mode || original_prompt) as new_id
|
md5(mode || original_prompt) as new_id
|
||||||
FROM LIGHTRAG_LLM_CACHE
|
FROM LIGHTRAG_LLM_CACHE
|
||||||
WHERE id NOT LIKE '%:%'
|
WHERE id NOT LIKE '%:%'
|
||||||
)
|
)
|
||||||
SELECT COUNT(*) as conflicts
|
SELECT COUNT(*) as conflicts
|
||||||
FROM new_ids n1
|
FROM new_ids n1
|
||||||
JOIN LIGHTRAG_LLM_CACHE existing
|
JOIN LIGHTRAG_LLM_CACHE existing
|
||||||
ON existing.workspace = n1.workspace
|
ON existing.workspace = n1.workspace
|
||||||
AND existing.mode = n1.mode
|
AND existing.mode = n1.mode
|
||||||
AND existing.id = n1.new_id
|
AND existing.id = n1.new_id
|
||||||
WHERE existing.id LIKE '%:%' -- Only check conflicts with existing new format records
|
WHERE existing.id LIKE '%:%' -- Only check conflicts with existing new format records
|
||||||
"""
|
"""
|
||||||
|
|
||||||
conflict_result = await self.query(conflict_check_sql)
|
conflict_result = await self.query(conflict_check_sql)
|
||||||
if conflict_result and conflict_result["conflicts"] > 0:
|
if conflict_result and conflict_result["conflicts"] > 0:
|
||||||
logger.warning(f"Found {conflict_result['conflicts']} potential ID conflicts with existing records")
|
logger.warning(
|
||||||
|
f"Found {conflict_result['conflicts']} potential ID conflicts with existing records"
|
||||||
|
)
|
||||||
# Can choose to continue or abort, here we choose to continue and log warning
|
# Can choose to continue or abort, here we choose to continue and log warning
|
||||||
|
|
||||||
# Execute single UPDATE migration
|
# Execute single UPDATE migration
|
||||||
logger.info("Starting optimized LLM cache migration...")
|
logger.info("Starting optimized LLM cache migration...")
|
||||||
migration_sql = """
|
migration_sql = """
|
||||||
UPDATE LIGHTRAG_LLM_CACHE
|
UPDATE LIGHTRAG_LLM_CACHE
|
||||||
SET
|
SET
|
||||||
id = mode || ':' ||
|
id = mode || ':' ||
|
||||||
CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END || ':' ||
|
CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END || ':' ||
|
||||||
md5(mode || original_prompt),
|
md5(mode || original_prompt),
|
||||||
cache_type = CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END,
|
cache_type = CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END,
|
||||||
update_time = CURRENT_TIMESTAMP
|
update_time = CURRENT_TIMESTAMP
|
||||||
WHERE id NOT LIKE '%:%'
|
WHERE id NOT LIKE '%:%'
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Execute migration
|
# Execute migration
|
||||||
await self.execute(migration_sql)
|
await self.execute(migration_sql)
|
||||||
|
|
||||||
# Verify migration results
|
# Verify migration results
|
||||||
verify_sql = """
|
verify_sql = """
|
||||||
SELECT COUNT(*) as remaining_old FROM LIGHTRAG_LLM_CACHE
|
SELECT COUNT(*) as remaining_old FROM LIGHTRAG_LLM_CACHE
|
||||||
WHERE id NOT LIKE '%:%'
|
WHERE id NOT LIKE '%:%'
|
||||||
"""
|
"""
|
||||||
verify_result = await self.query(verify_sql)
|
verify_result = await self.query(verify_sql)
|
||||||
remaining = verify_result["remaining_old"] if verify_result else -1
|
remaining = verify_result["remaining_old"] if verify_result else -1
|
||||||
|
|
||||||
if remaining == 0:
|
if remaining == 0:
|
||||||
logger.info(f"✅ Successfully migrated {old_count} LLM cache records to flattened format")
|
logger.info(
|
||||||
|
f"✅ Successfully migrated {old_count} LLM cache records to flattened format"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.warning(f"⚠️ Migration completed but {remaining} old format records remain")
|
logger.warning(
|
||||||
|
f"⚠️ Migration completed but {remaining} old format records remain"
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Optimized LLM cache migration failed: {e}")
|
logger.error(f"Optimized LLM cache migration failed: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
@ -1767,8 +1786,8 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
"""
|
"""
|
||||||
Generate graph name based on workspace and namespace for data isolation.
|
Generate graph name based on workspace and namespace for data isolation.
|
||||||
Rules:
|
Rules:
|
||||||
- If workspace is empty: graph_name = namespace
|
- If workspace is empty or "default": graph_name = namespace
|
||||||
- If workspace has value: graph_name = workspace_namespace
|
- If workspace has other value: graph_name = workspace_namespace
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
None
|
None
|
||||||
|
|
@ -1777,15 +1796,15 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
str: The graph name for the current workspace
|
str: The graph name for the current workspace
|
||||||
"""
|
"""
|
||||||
workspace = getattr(self, "workspace", None)
|
workspace = getattr(self, "workspace", None)
|
||||||
namespace = self.namespace or os.environ.get("AGE_GRAPH_NAME", "lightrag")
|
namespace = self.namespace
|
||||||
|
|
||||||
if workspace and workspace.strip():
|
if workspace and workspace.strip() and workspace.strip().lower() != "default":
|
||||||
# Ensure names comply with PostgreSQL identifier specifications
|
# Ensure names comply with PostgreSQL identifier specifications
|
||||||
safe_workspace = re.sub(r"[^a-zA-Z0-9_]", "_", workspace.strip())
|
safe_workspace = re.sub(r"[^a-zA-Z0-9_]", "_", workspace.strip())
|
||||||
safe_namespace = re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
|
safe_namespace = re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
|
||||||
return f"{safe_workspace}_{safe_namespace}"
|
return f"{safe_workspace}_{safe_namespace}"
|
||||||
else:
|
else:
|
||||||
# When workspace is empty, use namespace directly
|
# When workspace is empty or "default", use namespace directly
|
||||||
return re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
|
return re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue