From 2f617600515d3603ac9b3a382c2e1db7ae4d53c6 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 27 Nov 2025 09:54:39 +0000 Subject: [PATCH] docs: Add document and knowledgebase service analysis documentation - Add document_service_analysis.md: comprehensive analysis of document lifecycle management including insert, remove, parse, progress tracking - Add knowledgebase_service_analysis.md: dataset management and access control analysis with permission model, parser configuration --- .../document_service_analysis.md | 607 ++++++++++++++++ .../knowledgebase_service_analysis.md | 681 ++++++++++++++++++ 2 files changed, 1288 insertions(+) create mode 100644 personal_analyze/02-SERVICE-LAYER/document_service_analysis.md create mode 100644 personal_analyze/02-SERVICE-LAYER/knowledgebase_service_analysis.md diff --git a/personal_analyze/02-SERVICE-LAYER/document_service_analysis.md b/personal_analyze/02-SERVICE-LAYER/document_service_analysis.md new file mode 100644 index 000000000..53a4cb395 --- /dev/null +++ b/personal_analyze/02-SERVICE-LAYER/document_service_analysis.md @@ -0,0 +1,607 @@ +# Document Service Analysis - Document Lifecycle Management + +## Tổng Quan + +`document_service.py` (39KB) quản lý toàn bộ **document lifecycle** từ upload đến deletion, bao gồm parsing, chunk management, và progress tracking. + +## File Location +``` +/api/db/services/document_service.py +``` + +## Class Definition + +```python +class DocumentService(CommonService): + model = Document # Line 46 +``` + +Kế thừa `CommonService` với các method cơ bản: `query()`, `get_by_id()`, `save()`, `update_by_id()`, `delete_by_id()` + +--- + +## Document Lifecycle Flow + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ DOCUMENT LIFECYCLE │ +└─────────────────────────────────────────────────────────────────────────────┘ + +[1] UPLOAD PHASE + │ + ├──► FileService.upload_document() + │ ├── Store file in MinIO + │ ├── Create File record + │ └── Create Document record + │ + └──► DocumentService.insert(doc) + ├── Save Document to MySQL + └── KnowledgebaseService.atomic_increase_doc_num_by_id() + │ + ▼ +[2] QUEUE PHASE + │ + └──► DocumentService.run(tenant_id, doc) + │ + ├─(pipeline_id)──► TaskService.queue_dataflow() + │ └── Canvas workflow execution + │ + └─(standard)─────► TaskService.queue_tasks() + └── Push to Redis queue + │ + ▼ +[3] PROCESSING PHASE (Background) + │ + ├──► TaskExecutor picks task from queue + ├──► Parse document (deepdoc parsers) + ├──► Generate chunks + ├──► LLMBundle.encode() → Embeddings + ├──► Store in Elasticsearch/Infinity + │ + └──► DocumentService.increment_chunk_num() + ├── Document.chunk_num += count + ├── Document.token_num += count + └── Knowledgebase.chunk_num += count + │ + ▼ +[4] STATUS SYNC + │ + └──► DocumentService._sync_progress() + ├── Aggregate task progress + ├── Update Document.progress + └── Set run status (DONE/FAIL/RUNNING) + │ + ▼ +[5] QUERY/RETRIEVAL + │ + ├──► DocumentService.get_by_kb_id() + └──► docStoreConn.search() → Return chunks for RAG + │ + ▼ +[6] DELETION + │ + └──► DocumentService.remove_document() + ├── clear_chunk_num() → Reset KB stats + ├── TaskService.filter_delete() → Remove tasks + ├── docStoreConn.delete() → Remove from index + ├── STORAGE_IMPL.rm() → Delete files + └── delete_by_id() → Remove DB record +``` + +--- + +## Core Methods + +### 1. Insert Document + +**Lines**: 292-297 + +```python +@classmethod +@DB.connection_context() +def insert(cls, doc): + """ + Insert document and increment KB doc count atomically. + + Args: + doc: Document dict with keys: id, kb_id, name, parser_id, etc. + + Returns: + Document instance + + Raises: + RuntimeError: If database operation fails + """ + if not cls.save(**doc): + raise RuntimeError("Database error (Document)!") + + # Atomic increment KB document count + if not KnowledgebaseService.atomic_increase_doc_num_by_id(doc["kb_id"]): + raise RuntimeError("Database error (Knowledgebase)!") + + return Document(**doc) +``` + +**Flow**: +1. Save document record to MySQL +2. Atomically increment `Knowledgebase.doc_num` +3. Return Document instance + +--- + +### 2. Remove Document + +**Lines**: 301-340 + +```python +@classmethod +@DB.connection_context() +def remove_document(cls, doc, tenant_id): + """ + Remove document with full cascade cleanup. + + Cleanup order: + 1. Reset KB statistics (chunk_num, token_num, doc_num) + 2. Delete associated tasks + 3. Retrieve all chunk IDs (paginated) + 4. Delete chunk files from storage (MinIO) + 5. Delete thumbnail if exists + 6. Delete from document store (Elasticsearch) + 7. Clean up knowledge graph references + 8. Delete document record from MySQL + """ +``` + +**Cascade Cleanup Diagram**: + +``` +remove_document(doc, tenant_id) + │ + ├──► clear_chunk_num(doc.id) + │ └── KB: -chunk_num, -token_num, -doc_num + │ + ├──► TaskService.filter_delete([Task.doc_id == doc.id]) + │ + ├──► Retrieve chunk IDs (paginated, 1000/page) + │ for page in range(∞): + │ chunks = docStoreConn.search(...) + │ chunk_ids.extend(get_chunk_ids(chunks)) + │ if empty: break + │ + ├──► Delete chunk files from storage + │ for cid in chunk_ids: + │ STORAGE_IMPL.rm(doc.kb_id, cid) + │ + ├──► Delete thumbnail (if not base64) + │ STORAGE_IMPL.rm(doc.kb_id, doc.thumbnail) + │ + ├──► docStoreConn.delete({"doc_id": doc.id}, ...) + │ + ├──► Clean knowledge graph (if exists) + │ └── Remove doc.id from graph source_id references + │ + └──► cls.delete_by_id(doc.id) +``` + +--- + +### 3. Run Document Processing + +**Lines**: 822-841 + +```python +@classmethod +def run(cls, tenant_id: str, doc: dict, kb_table_num_map: dict): + """ + Route document to appropriate processing pipeline. + + Two paths: + 1. Pipeline mode (canvas workflow): queue_dataflow() + 2. Standard mode: queue_tasks() + """ + from api.db.services.task_service import queue_dataflow, queue_tasks + + doc["tenant_id"] = tenant_id + doc_parser = doc.get("parser_id", ParserType.NAIVE) + + # Special handling for TABLE parser + if doc_parser == ParserType.TABLE: + kb_id = doc.get("kb_id") + if kb_id not in kb_table_num_map: + count = DocumentService.count_by_kb_id(kb_id=kb_id, ...) + kb_table_num_map[kb_id] = count + if kb_table_num_map[kb_id] <= 0: + KnowledgebaseService.delete_field_map(kb_id) + + # Route to processing + if doc.get("pipeline_id", ""): + queue_dataflow(tenant_id, flow_id=doc["pipeline_id"], + task_id=get_uuid(), doc_id=doc["id"]) + else: + bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"]) + queue_tasks(doc, bucket, name, 0) +``` + +**Routing Logic**: + +``` +doc.run() + │ + ├─── Has pipeline_id? ────► queue_dataflow() + │ │ │ + │ │ └── Execute canvas workflow + │ │ + │ No + │ │ + │ ▼ + │ Get file storage address + │ │ + │ ▼ + └──────► queue_tasks() + │ + └── Push to Redis queue for TaskExecutor +``` + +--- + +### 4. Chunk Number Management + +**Lines**: 390-455 + +```python +# INCREMENT (after parsing completes) +@classmethod +@DB.connection_context() +def increment_chunk_num(cls, doc_id, kb_id, token_num, chunk_num, duration): + """ + Updates: + - Document.chunk_num += chunk_num + - Document.token_num += token_num + - Document.process_duration += duration + - Knowledgebase.chunk_num += chunk_num + - Knowledgebase.token_num += token_num + """ + +# DECREMENT (on reprocessing) +@classmethod +@DB.connection_context() +def decrement_chunk_num(cls, doc_id, kb_id, token_num, chunk_num, duration): + """Reverse of increment_chunk_num""" + +# CLEAR (on deletion) +@classmethod +@DB.connection_context() +def clear_chunk_num(cls, doc_id): + """ + Updates: + - KB.chunk_num -= doc.chunk_num + - KB.token_num -= doc.token_num + - KB.doc_num -= 1 + - Document: reset chunk_num=0, token_num=0 + """ + +# CLEAR ON RERUN (keeps doc_num) +@classmethod +@DB.connection_context() +def clear_chunk_num_when_rerun(cls, doc_id): + """Same as clear_chunk_num but KB.doc_num unchanged""" +``` + +--- + +### 5. Progress Synchronization + +**Lines**: 682-738 + +```python +@classmethod +def _sync_progress(cls, docs): + """ + Aggregate task progress → document progress. + + State Machine: + - ALL tasks done + NO failures → progress=1, status=DONE + - ALL tasks done + ANY failure → progress=-1, status=FAIL + - Any task running → progress=avg(task_progress), status=RUNNING + """ +``` + +**Progress State Machine**: + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ PROGRESS STATE MACHINE │ +└─────────────────────────────────────────────────────────────────────────────┘ + + ┌─────────────────────┐ + │ Aggregate Tasks │ + │ progress values │ + └──────────┬──────────┘ + │ + ┌─────────────────────┼─────────────────────┐ + │ │ │ + ▼ ▼ ▼ + ALL DONE (prg=1) ANY FAILED IN PROGRESS + No failures (any task=-1) (0 ≤ prg < 1) + │ │ │ + ▼ ▼ ▼ + ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ + │ progress=1 │ │ progress=-1 │ │ progress= │ + │ run=DONE │ │ run=FAIL │ │ avg(tasks)│ + │ │ │ │ │ run=RUNNING │ + └─────────────┘ └─────────────┘ └─────────────┘ + +Progress Calculation: + prg = sum(task.progress for task if task.progress >= 0) / len(tasks) +``` + +--- + +### 6. Get Documents by KB + +**Lines**: 125-163 + +```python +@classmethod +@DB.connection_context() +def get_by_kb_id(cls, kb_id, page_number, items_per_page, orderby, desc, + keywords, run_status, types, suffix): + """ + Advanced query with multiple filters and joins. + + Joins: + - File2Document → File (for location, size) + - UserCanvas (LEFT) → for pipeline info + - User (LEFT) → for creator info + + Filters: + - kb_id: Required + - keywords: Search in doc name + - run_status: [RUNNING, DONE, FAIL, CANCEL] + - types: Document types + - suffix: File extensions + + Returns: + (list[dict], total_count) + """ +``` + +**Query Structure**: + +```sql +SELECT + document.id, thumbnail, kb_id, parser_id, pipeline_id, + parser_config, source_type, type, created_by, name, + location, size, token_num, chunk_num, progress, + progress_msg, process_begin_at, process_duration, + meta_fields, suffix, run, status, + create_time, create_date, update_time, update_date, + user_canvas.title AS pipeline_name, + user.nickname +FROM document +JOIN file2document ON document.id = file2document.document_id +JOIN file ON file2document.file_id = file.id +LEFT JOIN user_canvas ON document.pipeline_id = user_canvas.id +LEFT JOIN user ON document.created_by = user.id +WHERE + document.kb_id = ? + AND document.status = '1' + AND (document.name LIKE '%keyword%' OR ...) + AND document.run IN (?, ?, ...) + AND document.type IN (?, ?, ...) + AND file.suffix IN (?, ?, ...) +ORDER BY ? DESC/ASC +LIMIT ? OFFSET ? +``` + +--- + +### 7. Full Parse Workflow (`doc_upload_and_parse`) + +**Lines**: 889-1030 (module-level function) + +```python +def doc_upload_and_parse(conversation_id, file_objs, user_id): + """ + Complete document upload and parse workflow for chat context. + + Used by: Conversation-based document uploads + + Steps: + 1. Resolve conversation → dialog → KB + 2. Initialize embedding model + 3. Upload files + 4. Parallel parsing (12 workers) + 5. Mind map generation (async) + 6. Embedding (batch 16) + 7. Bulk insert to docStore (batch 64) + 8. Update statistics + """ +``` + +**Detailed Flow**: + +``` +doc_upload_and_parse(conversation_id, file_objs, user_id) + │ + ├──► ConversationService.get_by_id(conversation_id) + │ └── Get conversation → dialog_id + │ + ├──► DialogService.get_by_id(dialog_id) + │ └── Get dialog → kb_ids[0] + │ + ├──► KnowledgebaseService.get_by_id(kb_id) + │ └── Get KB → tenant_id, embd_id + │ + ├──► LLMBundle(tenant_id, EMBEDDING, embd_id) + │ └── Initialize embedding model + │ + ├──► FileService.upload_document(kb, file_objs, user_id) + │ └── Returns: [(doc_dict, file_bytes), ...] + │ + ├──► ThreadPoolExecutor(max_workers=12) + │ │ + │ └── for (doc, blob) in files: + │ executor.submit(parser.chunk, doc["name"], blob, **kwargs) + │ + ├──► For each parsed document: + │ │ + │ ├── MindMapExtractor(llm) → Generate mind map + │ │ └── trio.run(mindmap, chunk_contents) + │ │ + │ ├── Embedding (batch=16) + │ │ └── vectors = embedding(doc_id, contents) + │ │ + │ ├── Add vectors to chunks + │ │ └── chunk["q_{dim}_vec"] = vector + │ │ + │ ├── Bulk insert (batch=64) + │ │ └── docStoreConn.insert(chunks[b:b+64], idxnm, kb_id) + │ │ + │ └── Update stats + │ └── increment_chunk_num(doc_id, kb_id, tokens, chunks, 0) + │ + └──► Return [doc_id, ...] +``` + +--- + +## Document Status Fields + +```python +# From Document model (db_models.py) + +run: CharField(max_length=1) + # "0" = UNSTART (default) + # "1" = RUNNING + # "2" = CANCEL + +status: CharField(max_length=1) + # "0" = WASTED (soft deleted) + # "1" = VALID (default) + +progress: FloatField + # 0.0 = Not started + # 0.0-1.0 = In progress + # 1.0 = Done + # -1.0 = Failed + +progress_msg: TextField + # Human-readable status message + # e.g., "Parsing...", "Embedding...", "Done" + +process_begin_at: DateTimeField + # When parsing started + +process_duration: FloatField + # Cumulative processing time (seconds) +``` + +--- + +## Service Interactions + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ SERVICE INTERACTION DIAGRAM │ +└─────────────────────────────────────────────────────────────────────────────┘ + + DocumentService + │ + ┌───────────────────────┼───────────────────────┐ + │ │ │ + ▼ ▼ ▼ +┌───────────────┐ ┌───────────────┐ ┌───────────────┐ +│ Knowledgebase │ │ Task │ │ File2Document │ +│ Service │ │ Service │ │ Service │ +│ │ │ │ │ │ +│ • atomic_ │ │ • queue_tasks │ │ • get_storage │ +│ increase_ │ │ • queue_ │ │ _address │ +│ doc_num │ │ dataflow │ │ • get_by_ │ +│ • delete_ │ │ • filter_ │ │ document_id │ +│ field_map │ │ delete │ │ │ +└───────────────┘ └───────────────┘ └───────────────┘ + │ + ▼ + ┌───────────────┐ + │ FileService │ + │ │ + │ • upload_ │ + │ document │ + └───────────────┘ + +External Systems: +┌───────────────┐ ┌───────────────┐ ┌───────────────┐ +│ docStoreConn │ │ STORAGE_IMPL │ │ REDIS_CONN │ +│ (Elasticsearch│ │ (MinIO) │ │ (Queue) │ +│ /Infinity) │ │ │ │ │ +│ │ │ │ │ │ +│ • search │ │ • obj_exist │ │ • queue_ │ +│ • insert │ │ • rm │ │ product │ +│ • delete │ │ • put │ │ • queue_info │ +│ • createIdx │ │ │ │ │ +└───────────────┘ └───────────────┘ └───────────────┘ +``` + +--- + +## Key Method Reference Table + +| Category | Method | Lines | Purpose | +|----------|--------|-------|---------| +| **Query** | `get_list` | 81-110 | Paginated list with filters | +| **Query** | `get_by_kb_id` | 125-163 | Advanced query with joins | +| **Query** | `get_filter_by_kb_id` | 167-212 | Aggregated filter counts | +| **Query** | `get_chunking_config` | 542-563 | Config for parsing | +| **Insert** | `insert` | 292-297 | Add doc + increment KB | +| **Delete** | `remove_document` | 301-340 | Cascade cleanup | +| **Parse** | `run` | 822-841 | Route to processing | +| **Parse** | `doc_upload_and_parse` | 889-1030 | Full workflow | +| **Status** | `begin2parse` | 627-637 | Set running status | +| **Status** | `_sync_progress` | 682-738 | Aggregate task→doc | +| **Status** | `update_progress` | 665-668 | Batch sync unfinished | +| **Chunks** | `increment_chunk_num` | 390-403 | Add chunks | +| **Chunks** | `decrement_chunk_num` | 407-422 | Remove chunks | +| **Chunks** | `clear_chunk_num` | 426-438 | Reset on delete | +| **Config** | `update_parser_config` | 594-615 | Deep merge config | +| **Access** | `accessible` | 495-505 | User permission check | +| **Access** | `accessible4deletion` | 509-525 | Delete permission | +| **Stats** | `knowledgebase_basic_info` | 767-819 | KB statistics | + +--- + +## Error Handling + +| Location | Error Type | Handling | +|----------|-----------|----------| +| `insert()` | RuntimeError | Raised - transaction fails | +| `remove_document()` | Any exception | Caught + pass (silent) | +| `_sync_progress()` | Exception | Logged, continues others | +| `check_doc_health()` | RuntimeError | Raised - upload rejected | +| `update_parser_config()` | LookupError | Raised - update fails | + +--- + +## Performance Patterns + +### Batch Operations + +| Operation | Batch Size | Purpose | +|-----------|-----------|---------| +| Chunk retrieval | 1000 | Memory efficient deletion | +| Bulk insert | 64 | Batch vector storage | +| Embedding | 16 | LLM batch inference | +| Parallel parsing | 12 workers | Concurrent processing | +| Doc ID retrieval | 100 | Paginated queries | + +### Parallel Processing + +```python +# ThreadPoolExecutor for parsing +exe = ThreadPoolExecutor(max_workers=12) +for (doc, blob) in files: + threads.append(exe.submit(parser.chunk, doc["name"], blob, **kwargs)) + +# Async mind map extraction +trio.run(mindmap_extractor, chunk_contents) +``` diff --git a/personal_analyze/02-SERVICE-LAYER/knowledgebase_service_analysis.md b/personal_analyze/02-SERVICE-LAYER/knowledgebase_service_analysis.md new file mode 100644 index 000000000..70912c44b --- /dev/null +++ b/personal_analyze/02-SERVICE-LAYER/knowledgebase_service_analysis.md @@ -0,0 +1,681 @@ +# Knowledgebase Service Analysis - Dataset Management & Access Control + +## Tổng Quan + +`knowledgebase_service.py` (566 lines) quản lý **Dataset (Knowledgebase)** - đơn vị tổ chức tài liệu trong RAGFlow, bao gồm CRUD operations, access control, parser configuration, và document association tracking. + +## File Location +``` +/api/db/services/knowledgebase_service.py +``` + +## Class Definition + +```python +class KnowledgebaseService(CommonService): + model = Knowledgebase # Line 49 +``` + +Kế thừa `CommonService` với các method cơ bản: `query()`, `get_by_id()`, `save()`, `update_by_id()`, `delete_by_id()` + +--- + +## Knowledgebase Model Structure + +```python +# From db_models.py (Lines 734-753) + +class Knowledgebase(DataBaseModel): + id = CharField(max_length=32, primary_key=True) + avatar = TextField(null=True) # KB avatar (base64) + tenant_id = CharField(max_length=32, index=True) # Owner tenant + name = CharField(max_length=128, index=True) # KB name + language = CharField(max_length=32) # "English"|"Chinese" + description = TextField(null=True) # KB description + embd_id = CharField(max_length=128) # Embedding model ID + permission = CharField(max_length=16) # "me"|"team" + created_by = CharField(max_length=32) # Creator user ID + + # Statistics + doc_num = IntegerField(default=0) # Document count + token_num = IntegerField(default=0) # Total tokens + chunk_num = IntegerField(default=0) # Total chunks + + # Search config + similarity_threshold = FloatField(default=0.2) + vector_similarity_weight = FloatField(default=0.3) + + # Parser config + parser_id = CharField(default="naive") # Default parser + pipeline_id = CharField(null=True) # Pipeline workflow ID + parser_config = JSONField(default={"pages": [[1, 1000000]]}) + pagerank = IntegerField(default=0) +``` + +--- + +## Permission Model + +### Dual-Level Access Control + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ PERMISSION MODEL │ +└─────────────────────────────────────────────────────────────────────────────┘ + +Level 1: Knowledgebase.permission + │ + ├─── "me" ───► Only owner (created_by) can access + │ + └─── "team" ──► All users in owner's tenant can access + +Level 2: UserTenant relationship + │ + └─── User must belong to KB's tenant to access + +Combined Check (get_by_tenant_ids): +┌──────────────────────────────────────────────────────────────────┐ +│ ((tenant_id IN joined_tenants) AND (permission == "team")) │ +│ OR │ +│ (tenant_id == user_id) │ +└──────────────────────────────────────────────────────────────────┘ +``` + +### Permission Methods + +| Method | Lines | Purpose | +|--------|-------|---------| +| `accessible` | 471-486 | Check if user can VIEW KB | +| `accessible4deletion` | 53-83 | Check if user can DELETE KB | +| `get_by_tenant_ids` | 134-197 | Get KBs with permission filter | +| `get_kb_by_id` | 488-500 | Get KB by ID + user permission | + +--- + +## Core Methods + +### 1. Create Knowledgebase + +**Lines**: 374-429 + +```python +@classmethod +@DB.connection_context() +def create_with_name( + cls, + *, + name: str, + tenant_id: str, + parser_id: str | None = None, + **kwargs +): + """ + Create a dataset with validation and defaults. + + Validation Steps: + 1. Name must be string + 2. Name cannot be empty + 3. Name cannot exceed DATASET_NAME_LIMIT bytes (UTF-8) + 4. Deduplicate name within tenant (append _1, _2, etc.) + 5. Verify tenant exists + + Returns: + (True, payload_dict) on success + (False, error_result) on failure + """ +``` + +**Creation Flow**: + +``` +create_with_name(name, tenant_id, ...) + │ + ├──► Validate name type + │ └── Must be string + │ + ├──► Validate name content + │ ├── Strip whitespace + │ ├── Check not empty + │ └── Check UTF-8 byte length + │ + ├──► duplicate_name(query, name, tenant_id, status) + │ └── Returns unique name: "name", "name_1", "name_2"... + │ + ├──► TenantService.get_by_id(tenant_id) + │ └── Verify tenant exists + │ + └──► Build payload dict + ├── id: get_uuid() + ├── name: deduplicated_name + ├── tenant_id: tenant_id + ├── created_by: tenant_id + ├── parser_id: parser_id or "naive" + └── parser_config: get_parser_config(parser_id, config) +``` + +--- + +### 2. Get Knowledgebases by Tenant + +**Lines**: 134-197 + +```python +@classmethod +@DB.connection_context() +def get_by_tenant_ids(cls, joined_tenant_ids, user_id, + page_number, items_per_page, + orderby, desc, keywords, + parser_id=None): + """ + Get knowledge bases accessible to user with pagination. + + Permission Logic: + - Include team KBs from joined tenants + - Include private KBs owned by user + + Filters: + - keywords: Case-insensitive name search + - parser_id: Filter by parser type + + Joins: + - User: Get owner nickname and avatar + + Returns: + (list[dict], total_count) + """ +``` + +**Query Structure**: + +```sql +SELECT + kb.id, kb.avatar, kb.name, kb.language, kb.description, + kb.tenant_id, kb.permission, kb.doc_num, kb.token_num, + kb.chunk_num, kb.parser_id, kb.embd_id, + user.nickname, user.avatar AS tenant_avatar, + kb.update_time +FROM knowledgebase kb +JOIN user ON kb.tenant_id = user.id +WHERE + ((kb.tenant_id IN (?, ?, ...) AND kb.permission = 'team') + OR kb.tenant_id = ?) + AND kb.status = '1' + AND LOWER(kb.name) LIKE '%keyword%' -- if keywords + AND kb.parser_id = ? -- if parser_id +ORDER BY kb.{orderby} DESC/ASC +LIMIT ? OFFSET ? +``` + +--- + +### 3. Get Knowledgebase Detail + +**Lines**: 250-292 + +```python +@classmethod +@DB.connection_context() +def get_detail(cls, kb_id): + """ + Get comprehensive KB information including pipeline details. + + Joins: + - UserCanvas (LEFT): Get pipeline name and avatar + + Fields included: + - Basic: id, avatar, name, language, description + - Config: parser_id, parser_config, embd_id + - Stats: doc_num, token_num, chunk_num + - Pipeline: pipeline_id, pipeline_name, pipeline_avatar + - GraphRAG: graphrag_task_id, graphrag_task_finish_at + - RAPTOR: raptor_task_id, raptor_task_finish_at + - MindMap: mindmap_task_id, mindmap_task_finish_at + - Timestamps: create_time, update_time + + Returns: + dict or None if not found + """ +``` + +--- + +### 4. Check Parsing Status + +**Lines**: 85-117 + +```python +@classmethod +@DB.connection_context() +def is_parsed_done(cls, kb_id): + """ + Verify all documents in KB are ready for chat. + + Validation Rules: + 1. KB must exist + 2. No documents in RUNNING/CANCEL/FAIL state + 3. No documents UNSTART with zero chunks + + Returns: + (True, None) - All parsed + (False, error_message) - Not ready + + Used by: + Chat creation validation + """ +``` + +**Status Check Flow**: + +``` +is_parsed_done(kb_id) + │ + ├──► cls.query(id=kb_id) + │ └── Get KB info + │ + ├──► DocumentService.get_by_kb_id(kb_id, ...) + │ └── Get all documents (up to 1000) + │ + └──► For each document: + │ + ├─── run == RUNNING ───► Return (False, "still being parsed") + ├─── run == CANCEL ───► Return (False, "still being parsed") + ├─── run == FAIL ───► Return (False, "still being parsed") + └─── run == UNSTART + └── chunk_num == 0 ──► Return (False, "has not been parsed") + + └──► Return (True, None) +``` + +--- + +### 5. Parser Configuration Management + +**Lines**: 294-345 + +```python +@classmethod +@DB.connection_context() +def update_parser_config(cls, id, config): + """ + Deep merge parser configuration. + + Algorithm (dfs_update): + - For dict values: recursively merge + - For list values: union (set merge) + - For scalar values: replace + + Example: + old = {"pages": [[1, 100]], "ocr": True} + new = {"pages": [[101, 200]], "language": "en"} + result = {"pages": [[1, 100], [101, 200]], "ocr": True, "language": "en"} + """ + +@classmethod +@DB.connection_context() +def delete_field_map(cls, id): + """Remove field_map key from parser_config.""" + +@classmethod +@DB.connection_context() +def get_field_map(cls, ids): + """ + Aggregate field mappings from multiple KBs. + + Used by: TABLE parser for column mapping + """ +``` + +**Deep Merge Algorithm**: + +```python +def dfs_update(old, new): + for k, v in new.items(): + if k not in old: + old[k] = v # Add new key + elif isinstance(v, dict): + dfs_update(old[k], v) # Recursive merge + elif isinstance(v, list): + old[k] = list(set(old[k] + v)) # Union lists + else: + old[k] = v # Replace value +``` + +--- + +### 6. Document Statistics Management + +**Lines**: 516-565 + +```python +@classmethod +@DB.connection_context() +def atomic_increase_doc_num_by_id(cls, kb_id): + """ + Atomically increment doc_num by 1. + Called when: DocumentService.insert() + + SQL: UPDATE knowledgebase SET doc_num = doc_num + 1 WHERE id = ? + """ + +@classmethod +@DB.connection_context() +def decrease_document_num_in_delete(cls, kb_id, doc_num_info: dict): + """ + Decrease statistics when documents are deleted. + + doc_num_info = { + 'doc_num': number of docs deleted, + 'chunk_num': total chunks deleted, + 'token_num': total tokens deleted + } + + SQL: + UPDATE knowledgebase SET + doc_num = doc_num - ?, + chunk_num = chunk_num - ?, + token_num = token_num - ?, + update_time = ? + WHERE id = ? + """ +``` + +**Statistics Flow**: + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ STATISTICS TRACKING │ +└─────────────────────────────────────────────────────────────────────────────┘ + +[Document Insert] + │ + └──► KnowledgebaseService.atomic_increase_doc_num_by_id(kb_id) + └── kb.doc_num += 1 + +[Chunk Processing] + │ + └──► DocumentService.increment_chunk_num(doc_id, kb_id, tokens, chunks, ...) + ├── doc.chunk_num += chunks + ├── doc.token_num += tokens + ├── kb.chunk_num += chunks + └── kb.token_num += tokens + +[Document Delete] + │ + └──► KnowledgebaseService.decrease_document_num_in_delete(kb_id, info) + ├── kb.doc_num -= info['doc_num'] + ├── kb.chunk_num -= info['chunk_num'] + └── kb.token_num -= info['token_num'] +``` + +--- + +### 7. Access Control Methods + +**Lines**: 471-514 + +```python +@classmethod +@DB.connection_context() +def accessible(cls, kb_id, user_id): + """ + Check if user can access (view) KB. + + Logic: User must belong to KB's tenant via UserTenant table. + + SQL: + SELECT kb.id + FROM knowledgebase kb + JOIN user_tenant ON user_tenant.tenant_id = kb.tenant_id + WHERE kb.id = ? AND user_tenant.user_id = ? + """ + +@classmethod +@DB.connection_context() +def accessible4deletion(cls, kb_id, user_id): + """ + Check if user can delete KB. + + Logic: User must be the CREATOR of the KB. + + SQL: + SELECT kb.id + FROM knowledgebase kb + WHERE kb.id = ? AND kb.created_by = ? + """ +``` + +**Access Control Diagram**: + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ ACCESS CONTROL CHECKS │ +└─────────────────────────────────────────────────────────────────────────────┘ + +VIEW Access (accessible): +┌─────────────────────────────────────────────────────────────────┐ +│ User │ +│ │ │ +│ ┌──────────────┴──────────────┐ │ +│ ▼ ▼ │ +│ UserTenant Knowledgebase │ +│ (user_id) (tenant_id) │ +│ │ │ │ +│ └──────────┬──────────────────┘ │ +│ ▼ │ +│ tenant_id MATCH? │ +│ │ │ +│ ┌────────┴────────┐ │ +│ Yes No │ +│ │ │ │ +│ ALLOWED DENIED │ +└─────────────────────────────────────────────────────────────────┘ + +DELETE Access (accessible4deletion): +┌─────────────────────────────────────────────────────────────────┐ +│ User │ +│ (user_id) │ +│ │ │ +│ ▼ │ +│ Knowledgebase │ +│ (created_by) │ +│ │ │ +│ user_id == created_by? │ +│ │ │ +│ ┌────────────┴────────────┐ │ +│ Yes No │ +│ │ │ │ +│ ALLOWED DENIED │ +└─────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Service Interactions + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ SERVICE INTERACTION DIAGRAM │ +└─────────────────────────────────────────────────────────────────────────────┘ + + KnowledgebaseService + │ + ┌───────────────────────┼───────────────────────┐ + │ │ │ + ▼ ▼ ▼ +┌───────────────┐ ┌───────────────┐ ┌───────────────┐ +│ Document │ │ Tenant │ │ User │ +│ Service │ │ Service │ │ Service │ +│ │ │ │ │ │ +│ • get_by_kb_id│ │ • get_by_id │ │ • get profile │ +│ • insert │ │ (validate │ │ info for │ +│ (→ atomic │ │ tenant) │ │ joins │ +│ _increase) │ │ │ │ │ +└───────────────┘ └───────────────┘ └───────────────┘ + +API Layer Callers: +┌───────────────┐ ┌───────────────┐ ┌───────────────┐ +│ kb_app.py │ │ dialog_app.py │ │ RESTful API │ +│ │ │ │ │ │ +│ • create │ │ • is_parsed │ │ • create_ │ +│ • update │ │ _done check │ │ with_name │ +│ • delete │ │ before chat │ │ │ +│ • list │ │ │ │ │ +└───────────────┘ └───────────────┘ └───────────────┘ +``` + +--- + +## Key Method Reference Table + +| Category | Method | Lines | Purpose | +|----------|--------|-------|---------| +| **Query** | `get_by_tenant_ids` | 134-197 | Paginated list with permissions | +| **Query** | `get_all_kb_by_tenant_ids` | 199-233 | Get all KBs (batch pagination) | +| **Query** | `get_kb_ids` | 235-248 | Get KB IDs for tenant | +| **Query** | `get_detail` | 250-292 | Comprehensive KB info | +| **Query** | `get_by_name` | 347-363 | Get by name + tenant | +| **Query** | `get_list` | 432-469 | Filtered paginated list | +| **Query** | `get_all_ids` | 365-371 | Get all KB IDs | +| **Create** | `create_with_name` | 374-429 | Validated creation | +| **Access** | `accessible` | 471-486 | View permission check | +| **Access** | `accessible4deletion` | 53-83 | Delete permission check | +| **Access** | `get_kb_by_id` | 488-500 | Get with user permission | +| **Access** | `get_kb_by_name` | 502-514 | Get by name with permission | +| **Config** | `update_parser_config` | 294-321 | Deep merge config | +| **Config** | `delete_field_map` | 323-331 | Remove field map | +| **Config** | `get_field_map` | 333-345 | Get field mappings | +| **Status** | `is_parsed_done` | 85-117 | Check parsing complete | +| **Stats** | `atomic_increase_doc_num_by_id` | 516-524 | Increment doc count | +| **Stats** | `decrease_document_num_in_delete` | 552-565 | Decrease on delete | +| **Stats** | `update_document_number_in_init` | 526-550 | Init doc count | +| **Docs** | `list_documents_by_ids` | 119-132 | Get doc IDs for KBs | + +--- + +## API Endpoints Mapping + +| HTTP Method | Endpoint | Service Method | +|-------------|----------|----------------| +| `POST` | `/v1/kb/create` | `create_with_name()` | +| `GET` | `/v1/kb/list` | `get_by_tenant_ids()` | +| `GET` | `/v1/kb/detail` | `get_detail()` | +| `PUT` | `/v1/kb/{kb_id}` | `update_by_id()` (inherited) | +| `DELETE` | `/v1/kb/{kb_id}` | `delete_by_id()` + cleanup | +| `PUT` | `/v1/kb/{kb_id}/config` | `update_parser_config()` | + +--- + +## Parser Configuration Schema + +```python +parser_config = { + # Page range for PDF parsing + "pages": [[1, 1000000]], # Default: all pages + + # OCR settings + "ocr": True, + "ocr_model": "tesseract", # or "paddleocr" + + # Layout settings + "layout_recognize": True, + + # Chunking settings + "chunk_token_num": 128, + "delimiter": "\n!?。;!?", + + # For TABLE parser + "field_map": { + "column_name": "mapped_field_name" + }, + + # For specific parsers + "raptor": {"enabled": False}, + "graphrag": {"enabled": False} +} +``` + +--- + +## Error Handling + +| Location | Error Type | Handling | +|----------|-----------|----------| +| `create_with_name()` | Invalid name | Return `(False, error_result)` | +| `create_with_name()` | Tenant not found | Return `(False, error_result)` | +| `update_parser_config()` | KB not found | Raise `LookupError` | +| `delete_field_map()` | KB not found | Raise `LookupError` | +| `decrease_document_num_in_delete()` | KB not found | Raise `RuntimeError` | +| `update_document_number_in_init()` | ValueError "no data to save" | Pass (ignore) | + +--- + +## Database Patterns + +### Atomic Updates + +```python +# Atomic increment using SQL expression (Line 522-523) +data["doc_num"] = cls.model.doc_num + 1 # Peewee generates: doc_num + 1 +cls.model.update(data).where(cls.model.id == kb_id).execute() +``` + +### Batch Pagination Pattern + +```python +# Avoid deep pagination performance issues (Lines 224-232) +offset, limit = 0, 50 +res = [] +while True: + kb_batch = kbs.offset(offset).limit(limit) + _temp = list(kb_batch.dicts()) + if not _temp: + break + res.extend(_temp) + offset += limit +``` + +### Selective Field Save + +```python +# Save only dirty fields without updating timestamps (Lines 537-545) +dirty_fields = kb.dirty_fields +if cls.model._meta.combined.get("update_time") in dirty_fields: + dirty_fields.remove(cls.model._meta.combined["update_time"]) +kb.save(only=dirty_fields) +``` + +--- + +## Key Constants & Imports + +```python +# Permission types (from api/db/__init__.py) +class TenantPermission(Enum): + ME = "me" # Private to creator + TEAM = "team" # Shared with tenant + +# Status (from common/constants.py) +class StatusEnum(Enum): + WASTED = "0" # Soft deleted + VALID = "1" # Active + +# Dataset name limit (from api/constants.py) +DATASET_NAME_LIMIT = 128 # bytes (UTF-8) + +# Default parser +ParserType.NAIVE = "naive" +``` + +--- + +## Performance Considerations + +1. **Batch Pagination**: `get_all_kb_by_tenant_ids()` uses offset-limit pagination to avoid memory issues + +2. **Selective Joins**: Queries only join necessary tables (User, UserTenant, UserCanvas) + +3. **Index Usage**: All filter/sort fields are indexed (`tenant_id`, `name`, `permission`, `status`, `parser_id`) + +4. **Atomic Operations**: Statistics updates use SQL expressions for atomicity without explicit transactions + +5. **Lazy Loading**: Document details fetched separately from KB list queries