Merge pull request #1714 from danielaskdd/fix-mix-query
Fix: Resolving issue with PostgreSQL document chunk KV storage depending on vector storage
This commit is contained in:
commit
0e683a50e8
3 changed files with 117 additions and 13 deletions
|
|
@ -189,6 +189,64 @@ class PostgreSQLDB:
|
||||||
# Log error but don't interrupt the process
|
# Log error but don't interrupt the process
|
||||||
logger.warning(f"Failed to migrate {table_name}.{column_name}: {e}")
|
logger.warning(f"Failed to migrate {table_name}.{column_name}: {e}")
|
||||||
|
|
||||||
|
async def _migrate_doc_chunks_to_vdb_chunks(self):
|
||||||
|
"""
|
||||||
|
Migrate data from LIGHTRAG_DOC_CHUNKS to LIGHTRAG_VDB_CHUNKS if specific conditions are met.
|
||||||
|
This migration is intended for users who are upgrading and have an older table structure
|
||||||
|
where LIGHTRAG_DOC_CHUNKS contained a `content_vector` column.
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 1. Check if the new table LIGHTRAG_VDB_CHUNKS is empty
|
||||||
|
vdb_chunks_count_sql = "SELECT COUNT(1) as count FROM LIGHTRAG_VDB_CHUNKS"
|
||||||
|
vdb_chunks_count_result = await self.query(vdb_chunks_count_sql)
|
||||||
|
if vdb_chunks_count_result and vdb_chunks_count_result["count"] > 0:
|
||||||
|
logger.info(
|
||||||
|
"Skipping migration: LIGHTRAG_VDB_CHUNKS already contains data."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# 2. Check if `content_vector` column exists in the old table
|
||||||
|
check_column_sql = """
|
||||||
|
SELECT 1 FROM information_schema.columns
|
||||||
|
WHERE table_name = 'lightrag_doc_chunks' AND column_name = 'content_vector'
|
||||||
|
"""
|
||||||
|
column_exists = await self.query(check_column_sql)
|
||||||
|
if not column_exists:
|
||||||
|
logger.info(
|
||||||
|
"Skipping migration: `content_vector` not found in LIGHTRAG_DOC_CHUNKS"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# 3. Check if the old table LIGHTRAG_DOC_CHUNKS has data
|
||||||
|
doc_chunks_count_sql = "SELECT COUNT(1) as count FROM LIGHTRAG_DOC_CHUNKS"
|
||||||
|
doc_chunks_count_result = await self.query(doc_chunks_count_sql)
|
||||||
|
if not doc_chunks_count_result or doc_chunks_count_result["count"] == 0:
|
||||||
|
logger.info("Skipping migration: LIGHTRAG_DOC_CHUNKS is empty.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 4. Perform the migration
|
||||||
|
logger.info(
|
||||||
|
"Starting data migration from LIGHTRAG_DOC_CHUNKS to LIGHTRAG_VDB_CHUNKS..."
|
||||||
|
)
|
||||||
|
migration_sql = """
|
||||||
|
INSERT INTO LIGHTRAG_VDB_CHUNKS (
|
||||||
|
id, workspace, full_doc_id, chunk_order_index, tokens, content,
|
||||||
|
content_vector, file_path, create_time, update_time
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
id, workspace, full_doc_id, chunk_order_index, tokens, content,
|
||||||
|
content_vector, file_path, create_time, update_time
|
||||||
|
FROM LIGHTRAG_DOC_CHUNKS
|
||||||
|
ON CONFLICT (workspace, id) DO NOTHING;
|
||||||
|
"""
|
||||||
|
await self.execute(migration_sql)
|
||||||
|
logger.info("Data migration to LIGHTRAG_VDB_CHUNKS completed successfully.")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed during data migration to LIGHTRAG_VDB_CHUNKS: {e}")
|
||||||
|
# Do not re-raise, to allow the application to start
|
||||||
|
|
||||||
async def check_tables(self):
|
async def check_tables(self):
|
||||||
# First create all tables
|
# First create all tables
|
||||||
for k, v in TABLES.items():
|
for k, v in TABLES.items():
|
||||||
|
|
@ -240,6 +298,12 @@ class PostgreSQLDB:
|
||||||
logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}")
|
logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}")
|
||||||
# Don't throw an exception, allow the initialization process to continue
|
# Don't throw an exception, allow the initialization process to continue
|
||||||
|
|
||||||
|
# Finally, attempt to migrate old doc chunks data if needed
|
||||||
|
try:
|
||||||
|
await self._migrate_doc_chunks_to_vdb_chunks()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"PostgreSQL, Failed to migrate doc_chunks to vdb_chunks: {e}")
|
||||||
|
|
||||||
async def query(
|
async def query(
|
||||||
self,
|
self,
|
||||||
sql: str,
|
sql: str,
|
||||||
|
|
@ -520,7 +584,21 @@ class PGKVStorage(BaseKVStorage):
|
||||||
return
|
return
|
||||||
|
|
||||||
if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
|
if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
|
||||||
pass
|
current_time = datetime.datetime.now(timezone.utc)
|
||||||
|
for k, v in data.items():
|
||||||
|
upsert_sql = SQL_TEMPLATES["upsert_text_chunk"]
|
||||||
|
_data = {
|
||||||
|
"workspace": self.db.workspace,
|
||||||
|
"id": k,
|
||||||
|
"tokens": v["tokens"],
|
||||||
|
"chunk_order_index": v["chunk_order_index"],
|
||||||
|
"full_doc_id": v["full_doc_id"],
|
||||||
|
"content": v["content"],
|
||||||
|
"file_path": v["file_path"],
|
||||||
|
"create_time": current_time,
|
||||||
|
"update_time": current_time,
|
||||||
|
}
|
||||||
|
await self.db.execute(upsert_sql, _data)
|
||||||
elif is_namespace(self.namespace, NameSpace.KV_STORE_FULL_DOCS):
|
elif is_namespace(self.namespace, NameSpace.KV_STORE_FULL_DOCS):
|
||||||
for k, v in data.items():
|
for k, v in data.items():
|
||||||
upsert_sql = SQL_TEMPLATES["upsert_doc_full"]
|
upsert_sql = SQL_TEMPLATES["upsert_doc_full"]
|
||||||
|
|
@ -2409,7 +2487,7 @@ class PGGraphStorage(BaseGraphStorage):
|
||||||
NAMESPACE_TABLE_MAP = {
|
NAMESPACE_TABLE_MAP = {
|
||||||
NameSpace.KV_STORE_FULL_DOCS: "LIGHTRAG_DOC_FULL",
|
NameSpace.KV_STORE_FULL_DOCS: "LIGHTRAG_DOC_FULL",
|
||||||
NameSpace.KV_STORE_TEXT_CHUNKS: "LIGHTRAG_DOC_CHUNKS",
|
NameSpace.KV_STORE_TEXT_CHUNKS: "LIGHTRAG_DOC_CHUNKS",
|
||||||
NameSpace.VECTOR_STORE_CHUNKS: "LIGHTRAG_DOC_CHUNKS",
|
NameSpace.VECTOR_STORE_CHUNKS: "LIGHTRAG_VDB_CHUNKS",
|
||||||
NameSpace.VECTOR_STORE_ENTITIES: "LIGHTRAG_VDB_ENTITY",
|
NameSpace.VECTOR_STORE_ENTITIES: "LIGHTRAG_VDB_ENTITY",
|
||||||
NameSpace.VECTOR_STORE_RELATIONSHIPS: "LIGHTRAG_VDB_RELATION",
|
NameSpace.VECTOR_STORE_RELATIONSHIPS: "LIGHTRAG_VDB_RELATION",
|
||||||
NameSpace.DOC_STATUS: "LIGHTRAG_DOC_STATUS",
|
NameSpace.DOC_STATUS: "LIGHTRAG_DOC_STATUS",
|
||||||
|
|
@ -2444,13 +2522,27 @@ TABLES = {
|
||||||
chunk_order_index INTEGER,
|
chunk_order_index INTEGER,
|
||||||
tokens INTEGER,
|
tokens INTEGER,
|
||||||
content TEXT,
|
content TEXT,
|
||||||
content_vector VECTOR,
|
|
||||||
file_path VARCHAR(256),
|
file_path VARCHAR(256),
|
||||||
create_time TIMESTAMP(0) WITH TIME ZONE,
|
create_time TIMESTAMP(0) WITH TIME ZONE,
|
||||||
update_time TIMESTAMP(0) WITH TIME ZONE,
|
update_time TIMESTAMP(0) WITH TIME ZONE,
|
||||||
CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
|
CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
|
||||||
)"""
|
)"""
|
||||||
},
|
},
|
||||||
|
"LIGHTRAG_VDB_CHUNKS": {
|
||||||
|
"ddl": """CREATE TABLE LIGHTRAG_VDB_CHUNKS (
|
||||||
|
id VARCHAR(255),
|
||||||
|
workspace VARCHAR(255),
|
||||||
|
full_doc_id VARCHAR(256),
|
||||||
|
chunk_order_index INTEGER,
|
||||||
|
tokens INTEGER,
|
||||||
|
content TEXT,
|
||||||
|
content_vector VECTOR,
|
||||||
|
file_path VARCHAR(256),
|
||||||
|
create_time TIMESTAMP(0) WITH TIME ZONE,
|
||||||
|
update_time TIMESTAMP(0) WITH TIME ZONE,
|
||||||
|
CONSTRAINT LIGHTRAG_VDB_CHUNKS_PK PRIMARY KEY (workspace, id)
|
||||||
|
)"""
|
||||||
|
},
|
||||||
"LIGHTRAG_VDB_ENTITY": {
|
"LIGHTRAG_VDB_ENTITY": {
|
||||||
"ddl": """CREATE TABLE LIGHTRAG_VDB_ENTITY (
|
"ddl": """CREATE TABLE LIGHTRAG_VDB_ENTITY (
|
||||||
id VARCHAR(255),
|
id VARCHAR(255),
|
||||||
|
|
@ -2551,7 +2643,20 @@ SQL_TEMPLATES = {
|
||||||
chunk_id=EXCLUDED.chunk_id,
|
chunk_id=EXCLUDED.chunk_id,
|
||||||
update_time = CURRENT_TIMESTAMP
|
update_time = CURRENT_TIMESTAMP
|
||||||
""",
|
""",
|
||||||
"upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
|
"upsert_text_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
|
||||||
|
chunk_order_index, full_doc_id, content, file_path,
|
||||||
|
create_time, update_time)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||||
|
ON CONFLICT (workspace,id) DO UPDATE
|
||||||
|
SET tokens=EXCLUDED.tokens,
|
||||||
|
chunk_order_index=EXCLUDED.chunk_order_index,
|
||||||
|
full_doc_id=EXCLUDED.full_doc_id,
|
||||||
|
content = EXCLUDED.content,
|
||||||
|
file_path=EXCLUDED.file_path,
|
||||||
|
update_time = EXCLUDED.update_time
|
||||||
|
""",
|
||||||
|
# SQL for VectorStorage
|
||||||
|
"upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens,
|
||||||
chunk_order_index, full_doc_id, content, content_vector, file_path,
|
chunk_order_index, full_doc_id, content, content_vector, file_path,
|
||||||
create_time, update_time)
|
create_time, update_time)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||||
|
|
@ -2564,7 +2669,6 @@ SQL_TEMPLATES = {
|
||||||
file_path=EXCLUDED.file_path,
|
file_path=EXCLUDED.file_path,
|
||||||
update_time = EXCLUDED.update_time
|
update_time = EXCLUDED.update_time
|
||||||
""",
|
""",
|
||||||
# SQL for VectorStorage
|
|
||||||
"upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content,
|
"upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content,
|
||||||
content_vector, chunk_ids, file_path, create_time, update_time)
|
content_vector, chunk_ids, file_path, create_time, update_time)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7, $8, $9)
|
VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7, $8, $9)
|
||||||
|
|
@ -2591,7 +2695,7 @@ SQL_TEMPLATES = {
|
||||||
"relationships": """
|
"relationships": """
|
||||||
WITH relevant_chunks AS (
|
WITH relevant_chunks AS (
|
||||||
SELECT id as chunk_id
|
SELECT id as chunk_id
|
||||||
FROM LIGHTRAG_DOC_CHUNKS
|
FROM LIGHTRAG_VDB_CHUNKS
|
||||||
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
||||||
)
|
)
|
||||||
SELECT source_id as src_id, target_id as tgt_id, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at
|
SELECT source_id as src_id, target_id as tgt_id, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at
|
||||||
|
|
@ -2608,7 +2712,7 @@ SQL_TEMPLATES = {
|
||||||
"entities": """
|
"entities": """
|
||||||
WITH relevant_chunks AS (
|
WITH relevant_chunks AS (
|
||||||
SELECT id as chunk_id
|
SELECT id as chunk_id
|
||||||
FROM LIGHTRAG_DOC_CHUNKS
|
FROM LIGHTRAG_VDB_CHUNKS
|
||||||
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
||||||
)
|
)
|
||||||
SELECT entity_name, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
|
SELECT entity_name, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
|
||||||
|
|
@ -2625,13 +2729,13 @@ SQL_TEMPLATES = {
|
||||||
"chunks": """
|
"chunks": """
|
||||||
WITH relevant_chunks AS (
|
WITH relevant_chunks AS (
|
||||||
SELECT id as chunk_id
|
SELECT id as chunk_id
|
||||||
FROM LIGHTRAG_DOC_CHUNKS
|
FROM LIGHTRAG_VDB_CHUNKS
|
||||||
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
|
||||||
)
|
)
|
||||||
SELECT id, content, file_path, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
|
SELECT id, content, file_path, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
|
||||||
(
|
(
|
||||||
SELECT id, content, file_path, create_time, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
|
SELECT id, content, file_path, create_time, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
|
||||||
FROM LIGHTRAG_DOC_CHUNKS
|
FROM LIGHTRAG_VDB_CHUNKS
|
||||||
WHERE workspace=$1
|
WHERE workspace=$1
|
||||||
AND id IN (SELECT chunk_id FROM relevant_chunks)
|
AND id IN (SELECT chunk_id FROM relevant_chunks)
|
||||||
) as chunk_distances
|
) as chunk_distances
|
||||||
|
|
|
||||||
|
|
@ -394,13 +394,13 @@ class LightRAG:
|
||||||
embedding_func=self.embedding_func,
|
embedding_func=self.embedding_func,
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: deprecating, text_chunks is redundant with chunks_vdb
|
|
||||||
self.text_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
|
self.text_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
|
||||||
namespace=make_namespace(
|
namespace=make_namespace(
|
||||||
self.namespace_prefix, NameSpace.KV_STORE_TEXT_CHUNKS
|
self.namespace_prefix, NameSpace.KV_STORE_TEXT_CHUNKS
|
||||||
),
|
),
|
||||||
embedding_func=self.embedding_func,
|
embedding_func=self.embedding_func,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore
|
self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore
|
||||||
namespace=make_namespace(
|
namespace=make_namespace(
|
||||||
self.namespace_prefix, NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION
|
self.namespace_prefix, NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION
|
||||||
|
|
|
||||||
|
|
@ -1647,7 +1647,7 @@ async def _get_vector_context(
|
||||||
f"Truncate chunks from {len(valid_chunks)} to {len(maybe_trun_chunks)} (max tokens:{query_param.max_token_for_text_unit})"
|
f"Truncate chunks from {len(valid_chunks)} to {len(maybe_trun_chunks)} (max tokens:{query_param.max_token_for_text_unit})"
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Vector query: {len(maybe_trun_chunks)} chunks, top_k: {query_param.top_k}"
|
f"Query chunks: {len(maybe_trun_chunks)} chunks, top_k: {query_param.top_k}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if not maybe_trun_chunks:
|
if not maybe_trun_chunks:
|
||||||
|
|
@ -1871,7 +1871,7 @@ async def _get_node_data(
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Local query uses {len(node_datas)} entites, {len(use_relations)} relations, {len(use_text_units)} chunks"
|
f"Local query: {len(node_datas)} entites, {len(use_relations)} relations, {len(use_text_units)} chunks"
|
||||||
)
|
)
|
||||||
|
|
||||||
# build prompt
|
# build prompt
|
||||||
|
|
@ -2180,7 +2180,7 @@ async def _get_edge_data(
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Global query uses {len(use_entities)} entites, {len(edge_datas)} relations, {len(use_text_units)} chunks"
|
f"Global query: {len(use_entities)} entites, {len(edge_datas)} relations, {len(use_text_units)} chunks"
|
||||||
)
|
)
|
||||||
|
|
||||||
relations_context = []
|
relations_context = []
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue