From 2440939fa1ed1c3606d9ba92cc077e4dd90eb428 Mon Sep 17 00:00:00 2001 From: buua436 Date: Tue, 9 Dec 2025 18:01:34 +0800 Subject: [PATCH] update --- graphrag/general/community_reports_extractor.py | 1 + graphrag/general/extractor.py | 1 + graphrag/general/index.py | 1 + graphrag/utils.py | 2 ++ rag/flow/hierarchical_merger/hierarchical_merger.py | 2 ++ rag/prompts/generator.py | 1 + rag/raptor.py | 1 + rag/svr/sync_data_source.py | 1 + rag/svr/task_executor.py | 2 ++ 9 files changed, 12 insertions(+) diff --git a/graphrag/general/community_reports_extractor.py b/graphrag/general/community_reports_extractor.py index 734a5401d..a9b5026d8 100644 --- a/graphrag/general/community_reports_extractor.py +++ b/graphrag/general/community_reports_extractor.py @@ -150,6 +150,7 @@ class CommunityReportsExtractor(Extractor): try: await asyncio.gather(*tasks, return_exceptions=False) except Exception as e: + logging.error(f"Error in community processing: {e}") for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True) diff --git a/graphrag/general/extractor.py b/graphrag/general/extractor.py index 2d2f2b33d..86c971c4c 100644 --- a/graphrag/general/extractor.py +++ b/graphrag/general/extractor.py @@ -145,6 +145,7 @@ class Extractor: try: await asyncio.gather(*tasks, return_exceptions=False) except Exception as e: + logging.error(f"Error in worker: {str(e)}") for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True) diff --git a/graphrag/general/index.py b/graphrag/general/index.py index 813027e38..1bc9790d9 100644 --- a/graphrag/general/index.py +++ b/graphrag/general/index.py @@ -286,6 +286,7 @@ async def run_graphrag_for_kb( try: await asyncio.gather(*tasks, return_exceptions=False) except Exception as e: + logging.error(f"Error in asyncio.gather: {e}") for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True) diff --git a/graphrag/utils.py b/graphrag/utils.py index 923fe8b53..9b3dc2c2b 100644 --- a/graphrag/utils.py +++ b/graphrag/utils.py @@ -531,6 +531,7 @@ async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, chang try: await asyncio.gather(*tasks, return_exceptions=False) except Exception as e: + logging.error(f"Error in get_embedding_of_nodes: {e}") for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True) @@ -549,6 +550,7 @@ async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, chang try: await asyncio.gather(*tasks, return_exceptions=False) except Exception as e: + logging.error(f"Error in get_embedding_of_edges: {e}") for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True) diff --git a/rag/flow/hierarchical_merger/hierarchical_merger.py b/rag/flow/hierarchical_merger/hierarchical_merger.py index 5b8e3483a..34e20ed0e 100644 --- a/rag/flow/hierarchical_merger/hierarchical_merger.py +++ b/rag/flow/hierarchical_merger/hierarchical_merger.py @@ -14,6 +14,7 @@ # limitations under the License. import asyncio +import logging import random import re from copy import deepcopy @@ -183,6 +184,7 @@ class HierarchicalMerger(ProcessBase): try: await asyncio.gather(*tasks, return_exceptions=False) except Exception as e: + logging.error(f"Error in image2id: {e}") for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True) diff --git a/rag/prompts/generator.py b/rag/prompts/generator.py index 9abaa8a94..3a0caae00 100644 --- a/rag/prompts/generator.py +++ b/rag/prompts/generator.py @@ -752,6 +752,7 @@ async def run_toc_from_text(chunks, chat_mdl, callback=None): try: await asyncio.gather(*tasks, return_exceptions=False) except Exception as e: + logging.error(f"Error generating TOC: {e}") for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True) diff --git a/rag/raptor.py b/rag/raptor.py index 201e67560..20ad8638b 100644 --- a/rag/raptor.py +++ b/rag/raptor.py @@ -209,6 +209,7 @@ class RecursiveAbstractiveProcessing4TreeOrganizedRetrieval: try: await asyncio.gather(*tasks, return_exceptions=False) except Exception as e: + logging.error(f"Error in RAPTOR cluster processing: {e}") for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True) diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index bd9c42f85..400f90370 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -664,6 +664,7 @@ async def dispatch_tasks(): try: await asyncio.gather(*tasks, return_exceptions=False) except Exception as e: + logging.error(f"Error in dispatch_tasks: {e}") for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True) diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 3283b0c62..cae5bd52e 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -305,6 +305,7 @@ async def build_chunks(task, progress_callback): try: await asyncio.gather(*tasks, return_exceptions=False) except Exception as e: + logging.error("Saving chunk {}/{}/{} got exception".format(task["location"], task["name"], d["id"])) for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True) @@ -830,6 +831,7 @@ async def insert_es(task_id, task_tenant_id, task_dataset_id, chunks, progress_c try: await asyncio.gather(*tasks, return_exceptions=False) except Exception as e: + logging.error(f"do_handle_task delete_image failed since task {task_id} is unknown.") for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True)