This commit is contained in:
Raphaël MANSUY 2025-12-04 19:18:16 +08:00
parent 78c1c69b0a
commit 0a6e4616b2
10 changed files with 164 additions and 48 deletions

View file

@ -406,7 +406,7 @@ class DocStatusResponse(BaseModel):
"id": "doc_123456",
"content_summary": "Research paper on machine learning",
"content_length": 15240,
"status": "PROCESSED",
"status": "processed",
"created_at": "2025-03-31T12:34:56",
"updated_at": "2025-03-31T12:35:30",
"track_id": "upload_20250729_170612_abc123",
@ -439,7 +439,7 @@ class DocsStatusesResponse(BaseModel):
"id": "doc_123",
"content_summary": "Pending document",
"content_length": 5000,
"status": "PENDING",
"status": "pending",
"created_at": "2025-03-31T10:00:00",
"updated_at": "2025-03-31T10:00:00",
"track_id": "upload_20250331_100000_abc123",
@ -449,12 +449,27 @@ class DocsStatusesResponse(BaseModel):
"file_path": "pending_doc.pdf",
}
],
"PREPROCESSED": [
{
"id": "doc_789",
"content_summary": "Document pending final indexing",
"content_length": 7200,
"status": "multimodal_processed",
"created_at": "2025-03-31T09:30:00",
"updated_at": "2025-03-31T09:35:00",
"track_id": "upload_20250331_093000_xyz789",
"chunks_count": 10,
"error": None,
"metadata": None,
"file_path": "preprocessed_doc.pdf",
}
],
"PROCESSED": [
{
"id": "doc_456",
"content_summary": "Processed document",
"content_length": 8000,
"status": "PROCESSED",
"status": "processed",
"created_at": "2025-03-31T09:00:00",
"updated_at": "2025-03-31T09:05:00",
"track_id": "insert_20250331_090000_def456",
@ -626,6 +641,7 @@ class PaginatedDocsResponse(BaseModel):
"status_counts": {
"PENDING": 10,
"PROCESSING": 5,
"PREPROCESSED": 5,
"PROCESSED": 130,
"FAILED": 5,
},
@ -648,6 +664,7 @@ class StatusCountsResponse(BaseModel):
"status_counts": {
"PENDING": 10,
"PROCESSING": 5,
"PREPROCESSED": 5,
"PROCESSED": 130,
"FAILED": 5,
}
@ -2210,7 +2227,7 @@ def create_document_routes(
To prevent excessive resource consumption, a maximum of 1,000 records is returned.
This endpoint retrieves the current status of all documents, grouped by their
processing status (PENDING, PROCESSING, PROCESSED, FAILED). The results are
processing status (PENDING, PROCESSING, PREPROCESSED, PROCESSED, FAILED). The results are
limited to 1000 total documents with fair distribution across all statuses.
Returns:
@ -2226,6 +2243,7 @@ def create_document_routes(
statuses = (
DocStatus.PENDING,
DocStatus.PROCESSING,
DocStatus.PREPROCESSED,
DocStatus.PROCESSED,
DocStatus.FAILED,
)

View file

@ -19,6 +19,7 @@ from typing import (
from .utils import EmbeddingFunc
from .types import KnowledgeGraph
from .constants import (
GRAPH_FIELD_SEP,
DEFAULT_TOP_K,
DEFAULT_CHUNK_TOP_K,
DEFAULT_MAX_ENTITY_TOKENS,
@ -354,14 +355,6 @@ class BaseKVStorage(StorageNameSpace, ABC):
None
"""
@abstractmethod
async def is_empty(self) -> bool:
"""Check if the storage is empty
Returns:
bool: True if storage contains no data, False otherwise
"""
@dataclass
class BaseGraphStorage(StorageNameSpace, ABC):
@ -527,6 +520,56 @@ class BaseGraphStorage(StorageNameSpace, ABC):
result[node_id] = edges if edges is not None else []
return result
@abstractmethod
async def get_nodes_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
"""Get all nodes that are associated with the given chunk_ids.
Args:
chunk_ids (list[str]): A list of chunk IDs to find associated nodes for.
Returns:
list[dict]: A list of nodes, where each node is a dictionary of its properties.
An empty list if no matching nodes are found.
"""
@abstractmethod
async def get_edges_by_chunk_ids(self, chunk_ids: list[str]) -> list[dict]:
"""Get all edges that are associated with the given chunk_ids.
Args:
chunk_ids (list[str]): A list of chunk IDs to find associated edges for.
Returns:
list[dict]: A list of edges, where each edge is a dictionary of its properties.
An empty list if no matching edges are found.
"""
# Default implementation iterates through all nodes and their edges, which is inefficient.
# This method should be overridden by subclasses for better performance.
all_edges = []
all_labels = await self.get_all_labels()
processed_edges = set()
for label in all_labels:
edges = await self.get_node_edges(label)
if edges:
for src_id, tgt_id in edges:
# Avoid processing the same edge twice in an undirected graph
edge_tuple = tuple(sorted((src_id, tgt_id)))
if edge_tuple in processed_edges:
continue
processed_edges.add(edge_tuple)
edge = await self.get_edge(src_id, tgt_id)
if edge and "source_id" in edge:
source_ids = set(edge["source_id"].split(GRAPH_FIELD_SEP))
if not source_ids.isdisjoint(chunk_ids):
# Add source and target to the edge dict for easier processing later
edge_with_nodes = edge.copy()
edge_with_nodes["source"] = src_id
edge_with_nodes["target"] = tgt_id
all_edges.append(edge_with_nodes)
return all_edges
@abstractmethod
async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
"""Insert a new node or update an existing node in the graph.
@ -669,7 +712,7 @@ class DocStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
PREPROCESSED = "preprocessed"
PREPROCESSED = "multimodal_processed"
PROCESSED = "processed"
FAILED = "failed"
@ -700,25 +743,6 @@ class DocProcessingStatus:
"""Error message if failed"""
metadata: dict[str, Any] = field(default_factory=dict)
"""Additional metadata"""
multimodal_processed: bool | None = field(default=None, repr=False)
"""Internal field: indicates if multimodal processing is complete. Not shown in repr() but accessible for debugging."""
def __post_init__(self):
"""
Handle status conversion based on multimodal_processed field.
Business rules:
- If multimodal_processed is False and status is PROCESSED,
then change status to PREPROCESSED
- The multimodal_processed field is kept (with repr=False) for internal use and debugging
"""
# Apply status conversion logic
if self.multimodal_processed is not None:
if (
self.multimodal_processed is False
and self.status == DocStatus.PROCESSED
):
self.status = DocStatus.PREPROCESSED
@dataclass

View file

@ -2617,7 +2617,12 @@ class LightRAG:
)
# Check document status and log warning for non-completed documents
doc_status = doc_status_data.get("status")
raw_status = doc_status_data.get("status")
try:
doc_status = DocStatus(raw_status)
except ValueError:
doc_status = raw_status
if doc_status != DocStatus.PROCESSED:
if doc_status == DocStatus.PENDING:
warning_msg = (
@ -2627,12 +2632,23 @@ class LightRAG:
warning_msg = (
f"Deleting {doc_id} {file_path}(previous status: PROCESSING)"
)
elif doc_status == DocStatus.PREPROCESSED:
warning_msg = (
f"Deleting {doc_id} {file_path}(previous status: PREPROCESSED)"
)
elif doc_status == DocStatus.FAILED:
warning_msg = (
f"Deleting {doc_id} {file_path}(previous status: FAILED)"
)
else:
warning_msg = f"Deleting {doc_id} {file_path}(previous status: {doc_status.value})"
status_text = (
doc_status.value
if isinstance(doc_status, DocStatus)
else str(doc_status)
)
warning_msg = (
f"Deleting {doc_id} {file_path}(previous status: {status_text})"
)
logger.info(warning_msg)
# Update pipeline status for monitoring
async with pipeline_status_lock:

View file

@ -167,7 +167,7 @@ export type DeleteDocResponse = {
doc_id: string
}
export type DocStatus = 'pending' | 'processing' | 'processed' | 'failed'
export type DocStatus = 'pending' | 'processing' | 'multimodal_processed' | 'processed' | 'failed'
export type DocStatusResponse = {
id: string

View file

@ -39,6 +39,21 @@ import PipelineStatusDialog from '@/components/documents/PipelineStatusDialog'
type StatusFilter = DocStatus | 'all';
// Utility functions defined outside component for better performance and to avoid dependency issues
const getCountValue = (counts: Record<string, number>, ...keys: string[]): number => {
for (const key of keys) {
const value = counts[key]
if (typeof value === 'number') {
return value
}
}
return 0
}
const hasActiveDocumentsStatus = (counts: Record<string, number>): boolean =>
getCountValue(counts, 'PROCESSING', 'processing') > 0 ||
getCountValue(counts, 'PENDING', 'pending') > 0 ||
getCountValue(counts, 'PREPROCESSED', 'preprocessed', 'multimodal_processed') > 0
const getDisplayFileName = (doc: DocStatusResponse, maxLength: number = 20): string => {
// Check if file_path exists and is a non-empty string
@ -261,6 +276,7 @@ export default function DocumentManager() {
const [pageByStatus, setPageByStatus] = useState<Record<StatusFilter, number>>({
all: routeState.page,
processed: 1,
multimodal_processed: 1,
processing: 1,
pending: 1,
failed: 1,
@ -341,6 +357,7 @@ export default function DocumentManager() {
setPageByStatus({
all: 1,
processed: 1,
'multimodal_processed': 1,
processing: 1,
pending: 1,
failed: 1,
@ -485,9 +502,19 @@ export default function DocumentManager() {
return counts;
}, [docs]);
const processedCount = getCountValue(statusCounts, 'PROCESSED', 'processed') || documentCounts.processed || 0;
const preprocessedCount =
getCountValue(statusCounts, 'PREPROCESSED', 'preprocessed', 'multimodal_processed') ||
documentCounts.multimodal_processed ||
0;
const processingCount = getCountValue(statusCounts, 'PROCESSING', 'processing') || documentCounts.processing || 0;
const pendingCount = getCountValue(statusCounts, 'PENDING', 'pending') || documentCounts.pending || 0;
const failedCount = getCountValue(statusCounts, 'FAILED', 'failed') || documentCounts.failed || 0;
// Store previous status counts
const prevStatusCounts = useRef({
processed: 0,
multimodal_processed: 0,
processing: 0,
pending: 0,
failed: 0
@ -578,6 +605,7 @@ export default function DocumentManager() {
const legacyDocs: DocsStatusesResponse = {
statuses: {
processed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'processed'),
multimodal_processed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'multimodal_processed'),
processing: response.documents.filter((doc: DocStatusResponse) => doc.status === 'processing'),
pending: response.documents.filter((doc: DocStatusResponse) => doc.status === 'pending'),
failed: response.documents.filter((doc: DocStatusResponse) => doc.status === 'failed')
@ -907,7 +935,7 @@ export default function DocumentManager() {
setTimeout(() => {
if (isMountedRef.current && currentTab === 'documents' && health) {
// Restore intelligent polling interval based on document status
const hasActiveDocuments = (statusCounts.processing || 0) > 0 || (statusCounts.pending || 0) > 0;
const hasActiveDocuments = hasActiveDocumentsStatus(statusCounts);
const normalInterval = hasActiveDocuments ? 5000 : 30000;
startPollingInterval(normalInterval);
}
@ -943,7 +971,7 @@ export default function DocumentManager() {
setTimeout(() => {
if (isMountedRef.current && currentTab === 'documents' && health) {
// Restore intelligent polling interval based on document status
const hasActiveDocuments = (statusCounts.processing || 0) > 0 || (statusCounts.pending || 0) > 0;
const hasActiveDocuments = hasActiveDocumentsStatus(statusCounts);
const normalInterval = hasActiveDocuments ? 5000 : 30000;
startPollingInterval(normalInterval);
}
@ -967,6 +995,7 @@ export default function DocumentManager() {
setPageByStatus({
all: 1,
processed: 1,
multimodal_processed: 1,
processing: 1,
pending: 1,
failed: 1,
@ -1013,6 +1042,7 @@ export default function DocumentManager() {
const legacyDocs: DocsStatusesResponse = {
statuses: {
processed: response.documents.filter(doc => doc.status === 'processed'),
multimodal_processed: response.documents.filter(doc => doc.status === 'multimodal_processed'),
processing: response.documents.filter(doc => doc.status === 'processing'),
pending: response.documents.filter(doc => doc.status === 'pending'),
failed: response.documents.filter(doc => doc.status === 'failed')
@ -1050,14 +1080,21 @@ export default function DocumentManager() {
handleIntelligentRefresh();
// Reset polling timer after intelligent refresh
const hasActiveDocuments = (statusCounts.processing || 0) > 0 || (statusCounts.pending || 0) > 0;
const hasActiveDocuments = hasActiveDocumentsStatus(statusCounts);
const pollingInterval = hasActiveDocuments ? 5000 : 30000;
startPollingInterval(pollingInterval);
}
}
// Update the previous state
prevPipelineBusyRef.current = pipelineBusy;
}, [pipelineBusy, currentTab, health, handleIntelligentRefresh, statusCounts.processing, statusCounts.pending, startPollingInterval]);
}, [
pipelineBusy,
currentTab,
health,
handleIntelligentRefresh,
statusCounts,
startPollingInterval
]);
// Set up intelligent polling with dynamic interval based on document status
useEffect(() => {
@ -1067,7 +1104,7 @@ export default function DocumentManager() {
}
// Determine polling interval based on document status
const hasActiveDocuments = (statusCounts.processing || 0) > 0 || (statusCounts.pending || 0) > 0;
const hasActiveDocuments = hasActiveDocumentsStatus(statusCounts);
const pollingInterval = hasActiveDocuments ? 5000 : 30000; // 5s if active, 30s if idle
startPollingInterval(pollingInterval);
@ -1084,6 +1121,7 @@ export default function DocumentManager() {
// Get new status counts
const newStatusCounts = {
processed: docs?.statuses?.processed?.length || 0,
multimodal_processed: docs?.statuses?.multimodal_processed?.length || 0,
processing: docs?.statuses?.processing?.length || 0,
pending: docs?.statuses?.pending?.length || 0,
failed: docs?.statuses?.failed?.length || 0
@ -1428,11 +1466,23 @@ export default function DocumentManager() {
onClick={() => handleStatusFilterChange('processed')}
disabled={isRefreshing}
className={cn(
(statusCounts.PROCESSED || statusCounts.processed || documentCounts.processed) > 0 ? 'text-green-600' : 'text-gray-500',
processedCount > 0 ? 'text-green-600' : 'text-gray-500',
statusFilter === 'processed' && 'bg-green-100 dark:bg-green-900/30 font-medium border border-green-400 dark:border-green-600 shadow-sm'
)}
>
{t('documentPanel.documentManager.status.completed')} ({statusCounts.PROCESSED || statusCounts.processed || 0})
{t('documentPanel.documentManager.status.completed')} ({processedCount})
</Button>
<Button
size="sm"
variant={statusFilter === 'multimodal_processed' ? 'secondary' : 'outline'}
onClick={() => handleStatusFilterChange('multimodal_processed')}
disabled={isRefreshing}
className={cn(
preprocessedCount > 0 ? 'text-purple-600' : 'text-gray-500',
statusFilter === 'multimodal_processed' && 'bg-purple-100 dark:bg-purple-900/30 font-medium border border-purple-400 dark:border-purple-600 shadow-sm'
)}
>
{t('documentPanel.documentManager.status.preprocessed')} ({preprocessedCount})
</Button>
<Button
size="sm"
@ -1440,11 +1490,11 @@ export default function DocumentManager() {
onClick={() => handleStatusFilterChange('processing')}
disabled={isRefreshing}
className={cn(
(statusCounts.PROCESSING || statusCounts.processing || documentCounts.processing) > 0 ? 'text-blue-600' : 'text-gray-500',
processingCount > 0 ? 'text-blue-600' : 'text-gray-500',
statusFilter === 'processing' && 'bg-blue-100 dark:bg-blue-900/30 font-medium border border-blue-400 dark:border-blue-600 shadow-sm'
)}
>
{t('documentPanel.documentManager.status.processing')} ({statusCounts.PROCESSING || statusCounts.processing || 0})
{t('documentPanel.documentManager.status.processing')} ({processingCount})
</Button>
<Button
size="sm"
@ -1452,11 +1502,11 @@ export default function DocumentManager() {
onClick={() => handleStatusFilterChange('pending')}
disabled={isRefreshing}
className={cn(
(statusCounts.PENDING || statusCounts.pending || documentCounts.pending) > 0 ? 'text-yellow-600' : 'text-gray-500',
pendingCount > 0 ? 'text-yellow-600' : 'text-gray-500',
statusFilter === 'pending' && 'bg-yellow-100 dark:bg-yellow-900/30 font-medium border border-yellow-400 dark:border-yellow-600 shadow-sm'
)}
>
{t('documentPanel.documentManager.status.pending')} ({statusCounts.PENDING || statusCounts.pending || 0})
{t('documentPanel.documentManager.status.pending')} ({pendingCount})
</Button>
<Button
size="sm"
@ -1464,11 +1514,11 @@ export default function DocumentManager() {
onClick={() => handleStatusFilterChange('failed')}
disabled={isRefreshing}
className={cn(
(statusCounts.FAILED || statusCounts.failed || documentCounts.failed) > 0 ? 'text-red-600' : 'text-gray-500',
failedCount > 0 ? 'text-red-600' : 'text-gray-500',
statusFilter === 'failed' && 'bg-red-100 dark:bg-red-900/30 font-medium border border-red-400 dark:border-red-600 shadow-sm'
)}
>
{t('documentPanel.documentManager.status.failed')} ({statusCounts.FAILED || statusCounts.failed || 0})
{t('documentPanel.documentManager.status.failed')} ({failedCount})
</Button>
</div>
<Button
@ -1657,6 +1707,9 @@ export default function DocumentManager() {
{doc.status === 'processed' && (
<span className="text-green-600">{t('documentPanel.documentManager.status.completed')}</span>
)}
{doc.status === 'multimodal_processed' && (
<span className="text-purple-600">{t('documentPanel.documentManager.status.preprocessed')}</span>
)}
{doc.status === 'processing' && (
<span className="text-blue-600">{t('documentPanel.documentManager.status.processing')}</span>
)}

View file

@ -139,6 +139,7 @@
"status": {
"all": "الكل",
"completed": "مكتمل",
"preprocessed": "مُعالج مسبقًا",
"processing": "قيد المعالجة",
"pending": "معلق",
"failed": "فشل"

View file

@ -139,6 +139,7 @@
"status": {
"all": "All",
"completed": "Completed",
"preprocessed": "Preprocessed",
"processing": "Processing",
"pending": "Pending",
"failed": "Failed"

View file

@ -139,6 +139,7 @@
"status": {
"all": "Tous",
"completed": "Terminé",
"preprocessed": "Prétraité",
"processing": "En traitement",
"pending": "En attente",
"failed": "Échoué"

View file

@ -139,6 +139,7 @@
"status": {
"all": "全部",
"completed": "已完成",
"preprocessed": "预处理",
"processing": "处理中",
"pending": "等待中",
"failed": "失败"

View file

@ -139,6 +139,7 @@
"status": {
"all": "全部",
"completed": "已完成",
"preprocessed": "預處理",
"processing": "處理中",
"pending": "等待中",
"failed": "失敗"