Implemented storage types: PostgreSQL and MongoDB

This commit is contained in:
yangdx 2025-07-03 11:46:24 +08:00
parent e56734cb8b
commit ff1b1c61c7
4 changed files with 289 additions and 67 deletions

View file

@ -114,15 +114,6 @@ EMBEDDING_BINDING_HOST=http://localhost:11434
# LIGHTRAG_DOC_STATUS_STORAGE=PGDocStatusStorage
# LIGHTRAG_GRAPH_STORAGE=Neo4JStorage
### TiDB Configuration (Deprecated)
# TIDB_HOST=localhost
# TIDB_PORT=4000
# TIDB_USER=your_username
# TIDB_PASSWORD='your_password'
# TIDB_DATABASE=your_database
### separating all data from difference Lightrag instances(deprecating)
# TIDB_WORKSPACE=default
### PostgreSQL Configuration
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
@ -130,7 +121,7 @@ POSTGRES_USER=your_username
POSTGRES_PASSWORD='your_password'
POSTGRES_DATABASE=your_database
POSTGRES_MAX_CONNECTIONS=12
### separating all data from difference Lightrag instances(deprecating)
### separating all data from difference Lightrag instances
# POSTGRES_WORKSPACE=default
### Neo4j Configuration
@ -146,14 +137,15 @@ NEO4J_PASSWORD='your_password'
# AGE_POSTGRES_PORT=8529
# AGE Graph Name(apply to PostgreSQL and independent AGM)
### AGE_GRAPH_NAME is precated
### AGE_GRAPH_NAME is deprecated
# AGE_GRAPH_NAME=lightrag
### MongoDB Configuration
MONGO_URI=mongodb://root:root@localhost:27017/
MONGO_DATABASE=LightRAG
### separating all data from difference Lightrag instances(deprecating)
# MONGODB_GRAPH=false
### separating all data from difference Lightrag instances
# MONGODB_WORKSPACE=default
### Milvus Configuration
MILVUS_URI=http://localhost:19530

View file

@ -133,6 +133,11 @@ class MongoKVStorage(BaseKVStorage):
operations = []
for k, v in data.items():
# For text_chunks namespace, ensure llm_cache_list field exists
if self.namespace.endswith("text_chunks"):
if "llm_cache_list" not in v:
v["llm_cache_list"] = []
v["_id"] = k # Use flattened key as _id
operations.append(UpdateOne({"_id": k}, {"$set": v}, upsert=True))
@ -247,6 +252,9 @@ class MongoDocStatusStorage(DocStatusStorage):
return
update_tasks: list[Any] = []
for k, v in data.items():
# Ensure chunks_list field exists and is an array
if "chunks_list" not in v:
v["chunks_list"] = []
data[k]["_id"] = k
update_tasks.append(
self._data.update_one({"_id": k}, {"$set": v}, upsert=True)
@ -279,6 +287,7 @@ class MongoDocStatusStorage(DocStatusStorage):
updated_at=doc.get("updated_at"),
chunks_count=doc.get("chunks_count", -1),
file_path=doc.get("file_path", doc["_id"]),
chunks_list=doc.get("chunks_list", []),
)
for doc in result
}

View file

@ -136,6 +136,48 @@ class PostgreSQLDB:
except Exception as e:
logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}")
async def _migrate_llm_cache_add_cache_type(self):
"""Add cache_type column to LIGHTRAG_LLM_CACHE table if it doesn't exist"""
try:
# Check if cache_type column exists
check_column_sql = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'lightrag_llm_cache'
AND column_name = 'cache_type'
"""
column_info = await self.query(check_column_sql)
if not column_info:
logger.info("Adding cache_type column to LIGHTRAG_LLM_CACHE table")
add_column_sql = """
ALTER TABLE LIGHTRAG_LLM_CACHE
ADD COLUMN cache_type VARCHAR(32) NULL
"""
await self.execute(add_column_sql)
logger.info(
"Successfully added cache_type column to LIGHTRAG_LLM_CACHE table"
)
# Migrate existing data: extract cache_type from flattened keys
logger.info("Migrating existing LLM cache data to populate cache_type field")
update_sql = """
UPDATE LIGHTRAG_LLM_CACHE
SET cache_type = CASE
WHEN id LIKE '%:%:%' THEN split_part(id, ':', 2)
ELSE 'extract'
END
WHERE cache_type IS NULL
"""
await self.execute(update_sql)
logger.info("Successfully migrated existing LLM cache data")
else:
logger.info(
"cache_type column already exists in LIGHTRAG_LLM_CACHE table"
)
except Exception as e:
logger.warning(f"Failed to add cache_type column to LIGHTRAG_LLM_CACHE: {e}")
async def _migrate_timestamp_columns(self):
"""Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC time"""
# Tables and columns that need migration
@ -301,15 +343,17 @@ class PostgreSQLDB:
record["mode"], record["original_prompt"]
)
# Determine cache_type based on mode
cache_type = "extract" if record["mode"] == "default" else "unknown"
# Generate new flattened key
cache_type = "extract" # Default type
new_key = f"{record['mode']}:{cache_type}:{new_hash}"
# Insert new format data
# Insert new format data with cache_type field
insert_sql = """
INSERT INTO LIGHTRAG_LLM_CACHE
(workspace, id, mode, original_prompt, return_value, chunk_id, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
(workspace, id, mode, original_prompt, return_value, chunk_id, cache_type, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (workspace, mode, id) DO NOTHING
"""
@ -322,6 +366,7 @@ class PostgreSQLDB:
"original_prompt": record["original_prompt"],
"return_value": record["return_value"],
"chunk_id": record["chunk_id"],
"cache_type": cache_type, # Add cache_type field
"create_time": record["create_time"],
"update_time": record["update_time"],
},
@ -357,6 +402,68 @@ class PostgreSQLDB:
logger.error(f"LLM cache migration failed: {e}")
# Don't raise exception, allow system to continue startup
async def _migrate_doc_status_add_chunks_list(self):
"""Add chunks_list column to LIGHTRAG_DOC_STATUS table if it doesn't exist"""
try:
# Check if chunks_list column exists
check_column_sql = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'lightrag_doc_status'
AND column_name = 'chunks_list'
"""
column_info = await self.query(check_column_sql)
if not column_info:
logger.info("Adding chunks_list column to LIGHTRAG_DOC_STATUS table")
add_column_sql = """
ALTER TABLE LIGHTRAG_DOC_STATUS
ADD COLUMN chunks_list JSONB NULL DEFAULT '[]'::jsonb
"""
await self.execute(add_column_sql)
logger.info(
"Successfully added chunks_list column to LIGHTRAG_DOC_STATUS table"
)
else:
logger.info(
"chunks_list column already exists in LIGHTRAG_DOC_STATUS table"
)
except Exception as e:
logger.warning(
f"Failed to add chunks_list column to LIGHTRAG_DOC_STATUS: {e}"
)
async def _migrate_text_chunks_add_llm_cache_list(self):
"""Add llm_cache_list column to LIGHTRAG_DOC_CHUNKS table if it doesn't exist"""
try:
# Check if llm_cache_list column exists
check_column_sql = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'lightrag_doc_chunks'
AND column_name = 'llm_cache_list'
"""
column_info = await self.query(check_column_sql)
if not column_info:
logger.info("Adding llm_cache_list column to LIGHTRAG_DOC_CHUNKS table")
add_column_sql = """
ALTER TABLE LIGHTRAG_DOC_CHUNKS
ADD COLUMN llm_cache_list JSONB NULL DEFAULT '[]'::jsonb
"""
await self.execute(add_column_sql)
logger.info(
"Successfully added llm_cache_list column to LIGHTRAG_DOC_CHUNKS table"
)
else:
logger.info(
"llm_cache_list column already exists in LIGHTRAG_DOC_CHUNKS table"
)
except Exception as e:
logger.warning(
f"Failed to add llm_cache_list column to LIGHTRAG_DOC_CHUNKS: {e}"
)
async def check_tables(self):
# First create all tables
for k, v in TABLES.items():
@ -408,6 +515,13 @@ class PostgreSQLDB:
logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}")
# Don't throw an exception, allow the initialization process to continue
# Migrate LLM cache table to add cache_type field if needed
try:
await self._migrate_llm_cache_add_cache_type()
except Exception as e:
logger.error(f"PostgreSQL, Failed to migrate LLM cache cache_type field: {e}")
# Don't throw an exception, allow the initialization process to continue
# Finally, attempt to migrate old doc chunks data if needed
try:
await self._migrate_doc_chunks_to_vdb_chunks()
@ -421,6 +535,22 @@ class PostgreSQLDB:
except Exception as e:
logger.error(f"PostgreSQL, LLM cache migration failed: {e}")
# Migrate doc status to add chunks_list field if needed
try:
await self._migrate_doc_status_add_chunks_list()
except Exception as e:
logger.error(
f"PostgreSQL, Failed to migrate doc status chunks_list field: {e}"
)
# Migrate text chunks to add llm_cache_list field if needed
try:
await self._migrate_text_chunks_add_llm_cache_list()
except Exception as e:
logger.error(
f"PostgreSQL, Failed to migrate text chunks llm_cache_list field: {e}"
)
async def query(
self,
sql: str,
@ -608,17 +738,11 @@ class PGKVStorage(BaseKVStorage):
if is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE):
processed_results = {}
for row in results:
# Parse flattened key to extract cache_type
key_parts = row["id"].split(":")
cache_type = key_parts[1] if len(key_parts) >= 3 else "unknown"
# Map field names and add cache_type for compatibility
processed_row = {
**row,
"return": row.get(
"return_value", ""
), # Map return_value to return
"cache_type": cache_type, # Add cache_type from key
"return": row.get("return_value", ""),
"cache_type": row.get("original_prompt", "unknow"),
"original_prompt": row.get("original_prompt", ""),
"chunk_id": row.get("chunk_id"),
"mode": row.get("mode", "default"),
@ -626,6 +750,20 @@ class PGKVStorage(BaseKVStorage):
processed_results[row["id"]] = processed_row
return processed_results
# For text_chunks namespace, parse llm_cache_list JSON string back to list
if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
processed_results = {}
for row in results:
llm_cache_list = row.get("llm_cache_list", [])
if isinstance(llm_cache_list, str):
try:
llm_cache_list = json.loads(llm_cache_list)
except json.JSONDecodeError:
llm_cache_list = []
row["llm_cache_list"] = llm_cache_list
processed_results[row["id"]] = row
return processed_results
# For other namespaces, return as-is
return {row["id"]: row for row in results}
except Exception as e:
@ -637,6 +775,29 @@ class PGKVStorage(BaseKVStorage):
sql = SQL_TEMPLATES["get_by_id_" + self.namespace]
params = {"workspace": self.db.workspace, "id": id}
response = await self.db.query(sql, params)
if response and is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
# Parse llm_cache_list JSON string back to list
llm_cache_list = response.get("llm_cache_list", [])
if isinstance(llm_cache_list, str):
try:
llm_cache_list = json.loads(llm_cache_list)
except json.JSONDecodeError:
llm_cache_list = []
response["llm_cache_list"] = llm_cache_list
# Special handling for LLM cache to ensure compatibility with _get_cached_extraction_results
if response and is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE):
# Map field names and add cache_type for compatibility
response = {
**response,
"return": response.get("return_value", ""),
"cache_type": response.get("cache_type"),
"original_prompt": response.get("original_prompt", ""),
"chunk_id": response.get("chunk_id"),
"mode": response.get("mode", "default"),
}
return response if response else None
# Query by id
@ -646,13 +807,36 @@ class PGKVStorage(BaseKVStorage):
ids=",".join([f"'{id}'" for id in ids])
)
params = {"workspace": self.db.workspace}
return await self.db.query(sql, params, multirows=True)
results = await self.db.query(sql, params, multirows=True)
async def get_by_status(self, status: str) -> Union[list[dict[str, Any]], None]:
"""Specifically for llm_response_cache."""
SQL = SQL_TEMPLATES["get_by_status_" + self.namespace]
params = {"workspace": self.db.workspace, "status": status}
return await self.db.query(SQL, params, multirows=True)
if results and is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
# Parse llm_cache_list JSON string back to list for each result
for result in results:
llm_cache_list = result.get("llm_cache_list", [])
if isinstance(llm_cache_list, str):
try:
llm_cache_list = json.loads(llm_cache_list)
except json.JSONDecodeError:
llm_cache_list = []
result["llm_cache_list"] = llm_cache_list
# Special handling for LLM cache to ensure compatibility with _get_cached_extraction_results
if results and is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE):
processed_results = []
for row in results:
# Map field names and add cache_type for compatibility
processed_row = {
**row,
"return": row.get("return_value", ""),
"cache_type": row.get("cache_type"),
"original_prompt": row.get("original_prompt", ""),
"chunk_id": row.get("chunk_id"),
"mode": row.get("mode", "default"),
}
processed_results.append(processed_row)
return processed_results
return results if results else []
async def filter_keys(self, keys: set[str]) -> set[str]:
"""Filter out duplicated content"""
@ -693,6 +877,7 @@ class PGKVStorage(BaseKVStorage):
"full_doc_id": v["full_doc_id"],
"content": v["content"],
"file_path": v["file_path"],
"llm_cache_list": json.dumps(v.get("llm_cache_list", [])),
"create_time": current_time,
"update_time": current_time,
}
@ -716,6 +901,7 @@ class PGKVStorage(BaseKVStorage):
"return_value": v["return"],
"mode": v.get("mode", "default"), # Get mode from data
"chunk_id": v.get("chunk_id"),
"cache_type": v.get("cache_type", "extract"), # Get cache_type from data
}
await self.db.execute(upsert_sql, _data)
@ -1140,6 +1326,14 @@ class PGDocStatusStorage(DocStatusStorage):
if result is None or result == []:
return None
else:
# Parse chunks_list JSON string back to list
chunks_list = result[0].get("chunks_list", [])
if isinstance(chunks_list, str):
try:
chunks_list = json.loads(chunks_list)
except json.JSONDecodeError:
chunks_list = []
return dict(
content=result[0]["content"],
content_length=result[0]["content_length"],
@ -1149,6 +1343,7 @@ class PGDocStatusStorage(DocStatusStorage):
created_at=result[0]["created_at"],
updated_at=result[0]["updated_at"],
file_path=result[0]["file_path"],
chunks_list=chunks_list,
)
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
@ -1163,19 +1358,32 @@ class PGDocStatusStorage(DocStatusStorage):
if not results:
return []
return [
{
"content": row["content"],
"content_length": row["content_length"],
"content_summary": row["content_summary"],
"status": row["status"],
"chunks_count": row["chunks_count"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"file_path": row["file_path"],
}
for row in results
]
processed_results = []
for row in results:
# Parse chunks_list JSON string back to list
chunks_list = row.get("chunks_list", [])
if isinstance(chunks_list, str):
try:
chunks_list = json.loads(chunks_list)
except json.JSONDecodeError:
chunks_list = []
processed_results.append(
{
"content": row["content"],
"content_length": row["content_length"],
"content_summary": row["content_summary"],
"status": row["status"],
"chunks_count": row["chunks_count"],
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"file_path": row["file_path"],
"chunks_list": chunks_list,
}
)
return processed_results
async def get_status_counts(self) -> dict[str, int]:
"""Get counts of documents in each status"""
@ -1196,8 +1404,18 @@ class PGDocStatusStorage(DocStatusStorage):
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
params = {"workspace": self.db.workspace, "status": status.value}
result = await self.db.query(sql, params, True)
docs_by_status = {
element["id"]: DocProcessingStatus(
docs_by_status = {}
for element in result:
# Parse chunks_list JSON string back to list
chunks_list = element.get("chunks_list", [])
if isinstance(chunks_list, str):
try:
chunks_list = json.loads(chunks_list)
except json.JSONDecodeError:
chunks_list = []
docs_by_status[element["id"]] = DocProcessingStatus(
content=element["content"],
content_summary=element["content_summary"],
content_length=element["content_length"],
@ -1206,9 +1424,9 @@ class PGDocStatusStorage(DocStatusStorage):
updated_at=element["updated_at"],
chunks_count=element["chunks_count"],
file_path=element["file_path"],
chunks_list=chunks_list,
)
for element in result
}
return docs_by_status
async def index_done_callback(self) -> None:
@ -1272,10 +1490,10 @@ class PGDocStatusStorage(DocStatusStorage):
logger.warning(f"Unable to parse datetime string: {dt_str}")
return None
# Modified SQL to include created_at and updated_at in both INSERT and UPDATE operations
# Both fields are updated from the input data in both INSERT and UPDATE cases
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path,created_at,updated_at)
values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
# Modified SQL to include created_at, updated_at, and chunks_list in both INSERT and UPDATE operations
# All fields are updated from the input data in both INSERT and UPDATE cases
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path,chunks_list,created_at,updated_at)
values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
on conflict(id,workspace) do update set
content = EXCLUDED.content,
content_summary = EXCLUDED.content_summary,
@ -1283,6 +1501,7 @@ class PGDocStatusStorage(DocStatusStorage):
chunks_count = EXCLUDED.chunks_count,
status = EXCLUDED.status,
file_path = EXCLUDED.file_path,
chunks_list = EXCLUDED.chunks_list,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at"""
for k, v in data.items():
@ -1290,7 +1509,7 @@ class PGDocStatusStorage(DocStatusStorage):
created_at = parse_datetime(v.get("created_at"))
updated_at = parse_datetime(v.get("updated_at"))
# chunks_count is optional
# chunks_count and chunks_list are optional
await self.db.execute(
sql,
{
@ -1302,6 +1521,7 @@ class PGDocStatusStorage(DocStatusStorage):
"chunks_count": v["chunks_count"] if "chunks_count" in v else -1,
"status": v["status"],
"file_path": v["file_path"],
"chunks_list": json.dumps(v.get("chunks_list", [])),
"created_at": created_at, # Use the converted datetime object
"updated_at": updated_at, # Use the converted datetime object
},
@ -2620,6 +2840,7 @@ TABLES = {
tokens INTEGER,
content TEXT,
file_path VARCHAR(256),
llm_cache_list JSONB NULL DEFAULT '[]'::jsonb,
create_time TIMESTAMP(0) WITH TIME ZONE,
update_time TIMESTAMP(0) WITH TIME ZONE,
CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
@ -2692,6 +2913,7 @@ TABLES = {
chunks_count int4 NULL,
status varchar(64) NULL,
file_path TEXT NULL,
chunks_list JSONB NULL DEFAULT '[]'::jsonb,
created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL,
updated_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL,
CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id)
@ -2706,24 +2928,26 @@ SQL_TEMPLATES = {
FROM LIGHTRAG_DOC_FULL WHERE workspace=$1 AND id=$2
""",
"get_by_id_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
chunk_order_index, full_doc_id, file_path
chunk_order_index, full_doc_id, file_path,
COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2
""",
"get_by_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
"get_by_id_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id, cache_type
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id=$2
""",
"get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
"get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode=$2 AND id=$3
""",
"get_by_ids_full_docs": """SELECT id, COALESCE(content, '') as content
FROM LIGHTRAG_DOC_FULL WHERE workspace=$1 AND id IN ({ids})
""",
"get_by_ids_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
chunk_order_index, full_doc_id, file_path
chunk_order_index, full_doc_id, file_path,
COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN ({ids})
""",
"get_by_ids_llm_response_cache": """SELECT id, original_prompt, COALESCE(return_value, '') as "return", mode, chunk_id
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND mode= IN ({ids})
"get_by_ids_llm_response_cache": """SELECT id, original_prompt, return_value, mode, chunk_id, cache_type
FROM LIGHTRAG_LLM_CACHE WHERE workspace=$1 AND id IN ({ids})
""",
"filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",
"upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, workspace)
@ -2731,25 +2955,27 @@ SQL_TEMPLATES = {
ON CONFLICT (workspace,id) DO UPDATE
SET content = $2, update_time = CURRENT_TIMESTAMP
""",
"upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode,chunk_id)
VALUES ($1, $2, $3, $4, $5, $6)
"upsert_llm_response_cache": """INSERT INTO LIGHTRAG_LLM_CACHE(workspace,id,original_prompt,return_value,mode,chunk_id,cache_type)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (workspace,mode,id) DO UPDATE
SET original_prompt = EXCLUDED.original_prompt,
return_value=EXCLUDED.return_value,
mode=EXCLUDED.mode,
chunk_id=EXCLUDED.chunk_id,
cache_type=EXCLUDED.cache_type,
update_time = CURRENT_TIMESTAMP
""",
"upsert_text_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
chunk_order_index, full_doc_id, content, file_path,
chunk_order_index, full_doc_id, content, file_path, llm_cache_list,
create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (workspace,id) DO UPDATE
SET tokens=EXCLUDED.tokens,
chunk_order_index=EXCLUDED.chunk_order_index,
full_doc_id=EXCLUDED.full_doc_id,
content = EXCLUDED.content,
file_path=EXCLUDED.file_path,
llm_cache_list=EXCLUDED.llm_cache_list,
update_time = EXCLUDED.update_time
""",
# SQL for VectorStorage

View file

@ -520,11 +520,6 @@ class TiDBVectorDBStorage(BaseVectorStorage):
}
await self.db.execute(SQL_TEMPLATES["upsert_relationship"], param)
async def get_by_status(self, status: str) -> Union[list[dict[str, Any]], None]:
SQL = SQL_TEMPLATES["get_by_status_" + self.namespace]
params = {"workspace": self.db.workspace, "status": status}
return await self.db.query(SQL, params, multirows=True)
async def delete(self, ids: list[str]) -> None:
"""Delete vectors with specified IDs from the storage.