From 3a8a99b73d870fdac0e8060e0f016bf25541bcb3 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 28 Jun 2025 12:11:53 +0800 Subject: [PATCH 1/6] feat(postgres): Implement text_chunks upsert for PGKVStorage --- lightrag/kg/postgres_impl.py | 52 +++++++++++++++++++++++++++++++----- lightrag/lightrag.py | 2 +- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 0ddc7948..ef3184a6 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -520,7 +520,21 @@ class PGKVStorage(BaseKVStorage): return if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS): - pass + current_time = datetime.datetime.now(timezone.utc) + for k, v in data.items(): + upsert_sql = SQL_TEMPLATES["upsert_text_chunk"] + _data = { + "workspace": self.db.workspace, + "id": k, + "tokens": v["tokens"], + "chunk_order_index": v["chunk_order_index"], + "full_doc_id": v["full_doc_id"], + "content": v["content"], + "file_path": v["file_path"], + "create_time": current_time, + "update_time": current_time, + } + await self.db.execute(upsert_sql, _data) elif is_namespace(self.namespace, NameSpace.KV_STORE_FULL_DOCS): for k, v in data.items(): upsert_sql = SQL_TEMPLATES["upsert_doc_full"] @@ -2409,7 +2423,7 @@ class PGGraphStorage(BaseGraphStorage): NAMESPACE_TABLE_MAP = { NameSpace.KV_STORE_FULL_DOCS: "LIGHTRAG_DOC_FULL", NameSpace.KV_STORE_TEXT_CHUNKS: "LIGHTRAG_DOC_CHUNKS", - NameSpace.VECTOR_STORE_CHUNKS: "LIGHTRAG_DOC_CHUNKS", + NameSpace.VECTOR_STORE_CHUNKS: "LIGHTRAG_VDB_CHUNKS", NameSpace.VECTOR_STORE_ENTITIES: "LIGHTRAG_VDB_ENTITY", NameSpace.VECTOR_STORE_RELATIONSHIPS: "LIGHTRAG_VDB_RELATION", NameSpace.DOC_STATUS: "LIGHTRAG_DOC_STATUS", @@ -2444,13 +2458,27 @@ TABLES = { chunk_order_index INTEGER, tokens INTEGER, content TEXT, - content_vector VECTOR, file_path VARCHAR(256), create_time TIMESTAMP(0) WITH TIME ZONE, update_time TIMESTAMP(0) WITH TIME ZONE, CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id) )""" }, + "LIGHTRAG_VDB_CHUNKS": { + "ddl": """CREATE TABLE LIGHTRAG_VDB_CHUNKS ( + id VARCHAR(255), + workspace VARCHAR(255), + full_doc_id VARCHAR(256), + chunk_order_index INTEGER, + tokens INTEGER, + content TEXT, + content_vector VECTOR, + file_path VARCHAR(256), + create_time TIMESTAMP(0) WITH TIME ZONE, + update_time TIMESTAMP(0) WITH TIME ZONE, + CONSTRAINT LIGHTRAG_VDB_CHUNKS_PK PRIMARY KEY (workspace, id) + )""" + }, "LIGHTRAG_VDB_ENTITY": { "ddl": """CREATE TABLE LIGHTRAG_VDB_ENTITY ( id VARCHAR(255), @@ -2551,7 +2579,20 @@ SQL_TEMPLATES = { chunk_id=EXCLUDED.chunk_id, update_time = CURRENT_TIMESTAMP """, - "upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens, + "upsert_text_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens, + chunk_order_index, full_doc_id, content, file_path, + create_time, update_time) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (workspace,id) DO UPDATE + SET tokens=EXCLUDED.tokens, + chunk_order_index=EXCLUDED.chunk_order_index, + full_doc_id=EXCLUDED.full_doc_id, + content = EXCLUDED.content, + file_path=EXCLUDED.file_path, + update_time = EXCLUDED.update_time + """, + # SQL for VectorStorage + "upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens, chunk_order_index, full_doc_id, content, content_vector, file_path, create_time, update_time) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) @@ -2564,7 +2605,6 @@ SQL_TEMPLATES = { file_path=EXCLUDED.file_path, update_time = EXCLUDED.update_time """, - # SQL for VectorStorage "upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content, content_vector, chunk_ids, file_path, create_time, update_time) VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7, $8, $9) @@ -2625,7 +2665,7 @@ SQL_TEMPLATES = { "chunks": """ WITH relevant_chunks AS ( SELECT id as chunk_id - FROM LIGHTRAG_DOC_CHUNKS + FROM LIGHTRAG_VDB_CHUNKS WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) ) SELECT id, content, file_path, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 748b9ef8..0a878ac3 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -394,13 +394,13 @@ class LightRAG: embedding_func=self.embedding_func, ) - # TODO: deprecating, text_chunks is redundant with chunks_vdb self.text_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore namespace=make_namespace( self.namespace_prefix, NameSpace.KV_STORE_TEXT_CHUNKS ), embedding_func=self.embedding_func, ) + self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore namespace=make_namespace( self.namespace_prefix, NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION From 95c7a7d038e351623b683399fd4391539b22f719 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 28 Jun 2025 14:31:53 +0800 Subject: [PATCH 2/6] feat(db): Add data migration from LIGHTRAG_DOC_CHUNKS to LIGHTRAG_VDB_CHUNKS --- lightrag/kg/postgres_impl.py | 62 ++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index ef3184a6..0cbbb285 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -189,6 +189,62 @@ class PostgreSQLDB: # Log error but don't interrupt the process logger.warning(f"Failed to migrate {table_name}.{column_name}: {e}") + async def _migrate_doc_chunks_to_vdb_chunks(self): + """ + Migrate data from LIGHTRAG_DOC_CHUNKS to LIGHTRAG_VDB_CHUNKS if specific conditions are met. + This migration is intended for users who are upgrading and have an older table structure + where LIGHTRAG_DOC_CHUNKS contained a `content_vector` column. + + """ + try: + # 1. Check if the new table LIGHTRAG_VDB_CHUNKS is empty + vdb_chunks_count_sql = "SELECT COUNT(1) as count FROM LIGHTRAG_VDB_CHUNKS" + vdb_chunks_count_result = await self.query(vdb_chunks_count_sql) + if vdb_chunks_count_result and vdb_chunks_count_result["count"] > 0: + logger.info( + "Skipping migration: LIGHTRAG_VDB_CHUNKS already contains data." + ) + return + + # 2. Check if `content_vector` column exists in the old table + check_column_sql = """ + SELECT 1 FROM information_schema.columns + WHERE table_name = 'lightrag_doc_chunks' AND column_name = 'content_vector' + """ + column_exists = await self.query(check_column_sql) + if not column_exists: + logger.info( + "Skipping migration: `content_vector` not found in LIGHTRAG_DOC_CHUNKS" + ) + return + + # 3. Check if the old table LIGHTRAG_DOC_CHUNKS has data + doc_chunks_count_sql = "SELECT COUNT(1) as count FROM LIGHTRAG_DOC_CHUNKS" + doc_chunks_count_result = await self.query(doc_chunks_count_sql) + if not doc_chunks_count_result or doc_chunks_count_result["count"] == 0: + logger.info("Skipping migration: LIGHTRAG_DOC_CHUNKS is empty.") + return + + # 4. Perform the migration + logger.info("Starting data migration from LIGHTRAG_DOC_CHUNKS to LIGHTRAG_VDB_CHUNKS...") + migration_sql = """ + INSERT INTO LIGHTRAG_VDB_CHUNKS ( + id, workspace, full_doc_id, chunk_order_index, tokens, content, + content_vector, file_path, create_time, update_time + ) + SELECT + id, workspace, full_doc_id, chunk_order_index, tokens, content, + content_vector, file_path, create_time, update_time + FROM LIGHTRAG_DOC_CHUNKS + ON CONFLICT (workspace, id) DO NOTHING; + """ + await self.execute(migration_sql) + logger.info("Data migration to LIGHTRAG_VDB_CHUNKS completed successfully.") + + except Exception as e: + logger.error(f"Failed during data migration to LIGHTRAG_VDB_CHUNKS: {e}") + # Do not re-raise, to allow the application to start + async def check_tables(self): # First create all tables for k, v in TABLES.items(): @@ -240,6 +296,12 @@ class PostgreSQLDB: logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}") # Don't throw an exception, allow the initialization process to continue + # Finally, attempt to migrate old doc chunks data if needed + try: + await self._migrate_doc_chunks_to_vdb_chunks() + except Exception as e: + logger.error(f"PostgreSQL, Failed to migrate doc_chunks to vdb_chunks: {e}") + async def query( self, sql: str, From 2c473679751ee1254f5b4e797050231a5a16d59f Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 28 Jun 2025 14:32:26 +0800 Subject: [PATCH 3/6] Fix linting --- lightrag/kg/postgres_impl.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 0cbbb285..e77429d3 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -226,7 +226,9 @@ class PostgreSQLDB: return # 4. Perform the migration - logger.info("Starting data migration from LIGHTRAG_DOC_CHUNKS to LIGHTRAG_VDB_CHUNKS...") + logger.info( + "Starting data migration from LIGHTRAG_DOC_CHUNKS to LIGHTRAG_VDB_CHUNKS..." + ) migration_sql = """ INSERT INTO LIGHTRAG_VDB_CHUNKS ( id, workspace, full_doc_id, chunk_order_index, tokens, content, From b7f8c20e616fde677dbb3403efd106015b0e1410 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 28 Jun 2025 15:36:54 +0800 Subject: [PATCH 4/6] fix(postgres): use correct table for vector queries Change SQL templates from LIGHTRAG_DOC_CHUNKS to LIGHTRAG_VDB_CHUNKS to fix "content_vector does not exist" error in vector operations. --- lightrag/kg/postgres_impl.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index e77429d3..a79ca448 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -2695,7 +2695,7 @@ SQL_TEMPLATES = { "relationships": """ WITH relevant_chunks AS ( SELECT id as chunk_id - FROM LIGHTRAG_DOC_CHUNKS + FROM LIGHTRAG_VDB_CHUNKS WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) ) SELECT source_id as src_id, target_id as tgt_id, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at @@ -2712,7 +2712,7 @@ SQL_TEMPLATES = { "entities": """ WITH relevant_chunks AS ( SELECT id as chunk_id - FROM LIGHTRAG_DOC_CHUNKS + FROM LIGHTRAG_VDB_CHUNKS WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) ) SELECT entity_name, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM @@ -2735,7 +2735,7 @@ SQL_TEMPLATES = { SELECT id, content, file_path, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM ( SELECT id, content, file_path, create_time, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance - FROM LIGHTRAG_DOC_CHUNKS + FROM LIGHTRAG_VDB_CHUNKS WHERE workspace=$1 AND id IN (SELECT chunk_id FROM relevant_chunks) ) as chunk_distances From 9d5c49e4cb2084be394d20d6bd622468143a34f6 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 28 Jun 2025 19:08:26 +0800 Subject: [PATCH 5/6] Optimied logger info --- lightrag/operate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index ad26f857..703ff0a5 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1871,7 +1871,7 @@ async def _get_node_data( ) logger.info( - f"Local query uses {len(node_datas)} entites, {len(use_relations)} relations, {len(use_text_units)} chunks" + f"Local query: {len(node_datas)} entites, {len(use_relations)} relations, {len(use_text_units)} chunks" ) # build prompt @@ -2180,7 +2180,7 @@ async def _get_edge_data( ), ) logger.info( - f"Global query uses {len(use_entities)} entites, {len(edge_datas)} relations, {len(use_text_units)} chunks" + f"Global query: {len(use_entities)} entites, {len(edge_datas)} relations, {len(use_text_units)} chunks" ) relations_context = [] From 8522bfc9dc4a717b28a939d842821f9e82c34e34 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 28 Jun 2025 19:27:36 +0800 Subject: [PATCH 6/6] Optimied logger info --- lightrag/operate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index ad26f857..d0f5f7bb 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1647,7 +1647,7 @@ async def _get_vector_context( f"Truncate chunks from {len(valid_chunks)} to {len(maybe_trun_chunks)} (max tokens:{query_param.max_token_for_text_unit})" ) logger.info( - f"Vector query: {len(maybe_trun_chunks)} chunks, top_k: {query_param.top_k}" + f"Query chunks: {len(maybe_trun_chunks)} chunks, top_k: {query_param.top_k}" ) if not maybe_trun_chunks: @@ -1871,7 +1871,7 @@ async def _get_node_data( ) logger.info( - f"Local query uses {len(node_datas)} entites, {len(use_relations)} relations, {len(use_text_units)} chunks" + f"Local query: {len(node_datas)} entites, {len(use_relations)} relations, {len(use_text_units)} chunks" ) # build prompt @@ -2180,7 +2180,7 @@ async def _get_edge_data( ), ) logger.info( - f"Global query uses {len(use_entities)} entites, {len(edge_datas)} relations, {len(use_text_units)} chunks" + f"Global query: {len(use_entities)} entites, {len(edge_datas)} relations, {len(use_text_units)} chunks" ) relations_context = []