Fix: Correct pagination and early termination bugs in chunk_list()
- Fix pagination offset calculation (page_num * bs instead of p) - Fix early loop termination (break only when zero chunks, not < bs) - Add max_count parameter to GraphRAG chunk loading These bugs prevented processing large documents (>128 chunks) in GraphRAG and other workflows. Validated by testing with 3,207 chunk document. Fixes #11687
This commit is contained in:
parent
e0e1d04da5
commit
a3f871a144
2 changed files with 13 additions and 6 deletions
|
|
@ -57,7 +57,7 @@ async def run_graphrag(
|
||||||
start = trio.current_time()
|
start = trio.current_time()
|
||||||
tenant_id, kb_id, doc_id = row["tenant_id"], str(row["kb_id"]), row["doc_id"]
|
tenant_id, kb_id, doc_id = row["tenant_id"], str(row["kb_id"]), row["doc_id"]
|
||||||
chunks = []
|
chunks = []
|
||||||
for d in settings.retriever.chunk_list(doc_id, tenant_id, [kb_id], fields=["content_with_weight", "doc_id"], sort_by_position=True):
|
for d in settings.retriever.chunk_list(doc_id, tenant_id, [kb_id], max_count=10000, fields=["content_with_weight", "doc_id"], sort_by_position=True):
|
||||||
chunks.append(d["content_with_weight"])
|
chunks.append(d["content_with_weight"])
|
||||||
|
|
||||||
with trio.fail_after(max(120, len(chunks) * 60 * 10) if enable_timeout_assertion else 10000000000):
|
with trio.fail_after(max(120, len(chunks) * 60 * 10) if enable_timeout_assertion else 10000000000):
|
||||||
|
|
@ -174,13 +174,19 @@ async def run_graphrag_for_kb(
|
||||||
chunks = []
|
chunks = []
|
||||||
current_chunk = ""
|
current_chunk = ""
|
||||||
|
|
||||||
for d in settings.retriever.chunk_list(
|
# DEBUG: Obtener todos los chunks primero
|
||||||
|
raw_chunks = list(settings.retriever.chunk_list(
|
||||||
doc_id,
|
doc_id,
|
||||||
tenant_id,
|
tenant_id,
|
||||||
[kb_id],
|
[kb_id],
|
||||||
|
max_count=10000, # FIX: Aumentar límite para procesar todos los chunks
|
||||||
fields=fields_for_chunks,
|
fields=fields_for_chunks,
|
||||||
sort_by_position=True,
|
sort_by_position=True,
|
||||||
):
|
))
|
||||||
|
|
||||||
|
callback(msg=f"[DEBUG] chunk_list() returned {len(raw_chunks)} raw chunks for doc {doc_id}")
|
||||||
|
|
||||||
|
for d in raw_chunks:
|
||||||
content = d["content_with_weight"]
|
content = d["content_with_weight"]
|
||||||
if num_tokens_from_string(current_chunk + content) < 1024:
|
if num_tokens_from_string(current_chunk + content) < 1024:
|
||||||
current_chunk += content
|
current_chunk += content
|
||||||
|
|
|
||||||
|
|
@ -527,15 +527,16 @@ class Dealer:
|
||||||
|
|
||||||
res = []
|
res = []
|
||||||
bs = 128
|
bs = 128
|
||||||
for p in range(offset, max_count, bs):
|
for page_num, p in enumerate(range(offset, max_count, bs)):
|
||||||
es_res = self.dataStore.search(fields, [], condition, [], orderBy, p, bs, index_name(tenant_id),
|
es_res = self.dataStore.search(fields, [], condition, [], orderBy, page_num * bs, bs, index_name(tenant_id),
|
||||||
kb_ids)
|
kb_ids)
|
||||||
dict_chunks = self.dataStore.get_fields(es_res, fields)
|
dict_chunks = self.dataStore.get_fields(es_res, fields)
|
||||||
for id, doc in dict_chunks.items():
|
for id, doc in dict_chunks.items():
|
||||||
doc["id"] = id
|
doc["id"] = id
|
||||||
if dict_chunks:
|
if dict_chunks:
|
||||||
res.extend(dict_chunks.values())
|
res.extend(dict_chunks.values())
|
||||||
if len(dict_chunks.values()) < bs:
|
# FIX: Solo terminar si no hay chunks, no si hay menos de bs
|
||||||
|
if len(dict_chunks.values()) == 0:
|
||||||
break
|
break
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue