docs: Add document and knowledgebase service analysis documentation

- Add document_service_analysis.md: comprehensive analysis of document
  lifecycle management including insert, remove, parse, progress tracking
- Add knowledgebase_service_analysis.md: dataset management and access
  control analysis with permission model, parser configuration
This commit is contained in:
Claude 2025-11-27 09:54:39 +00:00
parent 1dcc9a870b
commit 2f61760051
No known key found for this signature in database
2 changed files with 1288 additions and 0 deletions

View file

@ -0,0 +1,607 @@
# Document Service Analysis - Document Lifecycle Management
## Tổng Quan
`document_service.py` (39KB) quản lý toàn bộ **document lifecycle** từ upload đến deletion, bao gồm parsing, chunk management, và progress tracking.
## File Location
```
/api/db/services/document_service.py
```
## Class Definition
```python
class DocumentService(CommonService):
model = Document # Line 46
```
Kế thừa `CommonService` với các method cơ bản: `query()`, `get_by_id()`, `save()`, `update_by_id()`, `delete_by_id()`
---
## Document Lifecycle Flow
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ DOCUMENT LIFECYCLE │
└─────────────────────────────────────────────────────────────────────────────┘
[1] UPLOAD PHASE
├──► FileService.upload_document()
│ ├── Store file in MinIO
│ ├── Create File record
│ └── Create Document record
└──► DocumentService.insert(doc)
├── Save Document to MySQL
└── KnowledgebaseService.atomic_increase_doc_num_by_id()
[2] QUEUE PHASE
└──► DocumentService.run(tenant_id, doc)
├─(pipeline_id)──► TaskService.queue_dataflow()
│ └── Canvas workflow execution
└─(standard)─────► TaskService.queue_tasks()
└── Push to Redis queue
[3] PROCESSING PHASE (Background)
├──► TaskExecutor picks task from queue
├──► Parse document (deepdoc parsers)
├──► Generate chunks
├──► LLMBundle.encode() → Embeddings
├──► Store in Elasticsearch/Infinity
└──► DocumentService.increment_chunk_num()
├── Document.chunk_num += count
├── Document.token_num += count
└── Knowledgebase.chunk_num += count
[4] STATUS SYNC
└──► DocumentService._sync_progress()
├── Aggregate task progress
├── Update Document.progress
└── Set run status (DONE/FAIL/RUNNING)
[5] QUERY/RETRIEVAL
├──► DocumentService.get_by_kb_id()
└──► docStoreConn.search() → Return chunks for RAG
[6] DELETION
└──► DocumentService.remove_document()
├── clear_chunk_num() → Reset KB stats
├── TaskService.filter_delete() → Remove tasks
├── docStoreConn.delete() → Remove from index
├── STORAGE_IMPL.rm() → Delete files
└── delete_by_id() → Remove DB record
```
---
## Core Methods
### 1. Insert Document
**Lines**: 292-297
```python
@classmethod
@DB.connection_context()
def insert(cls, doc):
"""
Insert document and increment KB doc count atomically.
Args:
doc: Document dict with keys: id, kb_id, name, parser_id, etc.
Returns:
Document instance
Raises:
RuntimeError: If database operation fails
"""
if not cls.save(**doc):
raise RuntimeError("Database error (Document)!")
# Atomic increment KB document count
if not KnowledgebaseService.atomic_increase_doc_num_by_id(doc["kb_id"]):
raise RuntimeError("Database error (Knowledgebase)!")
return Document(**doc)
```
**Flow**:
1. Save document record to MySQL
2. Atomically increment `Knowledgebase.doc_num`
3. Return Document instance
---
### 2. Remove Document
**Lines**: 301-340
```python
@classmethod
@DB.connection_context()
def remove_document(cls, doc, tenant_id):
"""
Remove document with full cascade cleanup.
Cleanup order:
1. Reset KB statistics (chunk_num, token_num, doc_num)
2. Delete associated tasks
3. Retrieve all chunk IDs (paginated)
4. Delete chunk files from storage (MinIO)
5. Delete thumbnail if exists
6. Delete from document store (Elasticsearch)
7. Clean up knowledge graph references
8. Delete document record from MySQL
"""
```
**Cascade Cleanup Diagram**:
```
remove_document(doc, tenant_id)
├──► clear_chunk_num(doc.id)
│ └── KB: -chunk_num, -token_num, -doc_num
├──► TaskService.filter_delete([Task.doc_id == doc.id])
├──► Retrieve chunk IDs (paginated, 1000/page)
│ for page in range(∞):
│ chunks = docStoreConn.search(...)
│ chunk_ids.extend(get_chunk_ids(chunks))
│ if empty: break
├──► Delete chunk files from storage
│ for cid in chunk_ids:
│ STORAGE_IMPL.rm(doc.kb_id, cid)
├──► Delete thumbnail (if not base64)
│ STORAGE_IMPL.rm(doc.kb_id, doc.thumbnail)
├──► docStoreConn.delete({"doc_id": doc.id}, ...)
├──► Clean knowledge graph (if exists)
│ └── Remove doc.id from graph source_id references
└──► cls.delete_by_id(doc.id)
```
---
### 3. Run Document Processing
**Lines**: 822-841
```python
@classmethod
def run(cls, tenant_id: str, doc: dict, kb_table_num_map: dict):
"""
Route document to appropriate processing pipeline.
Two paths:
1. Pipeline mode (canvas workflow): queue_dataflow()
2. Standard mode: queue_tasks()
"""
from api.db.services.task_service import queue_dataflow, queue_tasks
doc["tenant_id"] = tenant_id
doc_parser = doc.get("parser_id", ParserType.NAIVE)
# Special handling for TABLE parser
if doc_parser == ParserType.TABLE:
kb_id = doc.get("kb_id")
if kb_id not in kb_table_num_map:
count = DocumentService.count_by_kb_id(kb_id=kb_id, ...)
kb_table_num_map[kb_id] = count
if kb_table_num_map[kb_id] <= 0:
KnowledgebaseService.delete_field_map(kb_id)
# Route to processing
if doc.get("pipeline_id", ""):
queue_dataflow(tenant_id, flow_id=doc["pipeline_id"],
task_id=get_uuid(), doc_id=doc["id"])
else:
bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"])
queue_tasks(doc, bucket, name, 0)
```
**Routing Logic**:
```
doc.run()
├─── Has pipeline_id? ────► queue_dataflow()
│ │ │
│ │ └── Execute canvas workflow
│ │
│ No
│ │
│ ▼
│ Get file storage address
│ │
│ ▼
└──────► queue_tasks()
└── Push to Redis queue for TaskExecutor
```
---
### 4. Chunk Number Management
**Lines**: 390-455
```python
# INCREMENT (after parsing completes)
@classmethod
@DB.connection_context()
def increment_chunk_num(cls, doc_id, kb_id, token_num, chunk_num, duration):
"""
Updates:
- Document.chunk_num += chunk_num
- Document.token_num += token_num
- Document.process_duration += duration
- Knowledgebase.chunk_num += chunk_num
- Knowledgebase.token_num += token_num
"""
# DECREMENT (on reprocessing)
@classmethod
@DB.connection_context()
def decrement_chunk_num(cls, doc_id, kb_id, token_num, chunk_num, duration):
"""Reverse of increment_chunk_num"""
# CLEAR (on deletion)
@classmethod
@DB.connection_context()
def clear_chunk_num(cls, doc_id):
"""
Updates:
- KB.chunk_num -= doc.chunk_num
- KB.token_num -= doc.token_num
- KB.doc_num -= 1
- Document: reset chunk_num=0, token_num=0
"""
# CLEAR ON RERUN (keeps doc_num)
@classmethod
@DB.connection_context()
def clear_chunk_num_when_rerun(cls, doc_id):
"""Same as clear_chunk_num but KB.doc_num unchanged"""
```
---
### 5. Progress Synchronization
**Lines**: 682-738
```python
@classmethod
def _sync_progress(cls, docs):
"""
Aggregate task progress → document progress.
State Machine:
- ALL tasks done + NO failures → progress=1, status=DONE
- ALL tasks done + ANY failure → progress=-1, status=FAIL
- Any task running → progress=avg(task_progress), status=RUNNING
"""
```
**Progress State Machine**:
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ PROGRESS STATE MACHINE │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────┐
│ Aggregate Tasks │
│ progress values │
└──────────┬──────────┘
┌─────────────────────┼─────────────────────┐
│ │ │
▼ ▼ ▼
ALL DONE (prg=1) ANY FAILED IN PROGRESS
No failures (any task=-1) (0 ≤ prg < 1)
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ progress=1 │ │ progress=-1 │ │ progress= │
│ run=DONE │ │ run=FAIL │ │ avg(tasks)│
│ │ │ │ │ run=RUNNING │
└─────────────┘ └─────────────┘ └─────────────┘
Progress Calculation:
prg = sum(task.progress for task if task.progress >= 0) / len(tasks)
```
---
### 6. Get Documents by KB
**Lines**: 125-163
```python
@classmethod
@DB.connection_context()
def get_by_kb_id(cls, kb_id, page_number, items_per_page, orderby, desc,
keywords, run_status, types, suffix):
"""
Advanced query with multiple filters and joins.
Joins:
- File2Document → File (for location, size)
- UserCanvas (LEFT) → for pipeline info
- User (LEFT) → for creator info
Filters:
- kb_id: Required
- keywords: Search in doc name
- run_status: [RUNNING, DONE, FAIL, CANCEL]
- types: Document types
- suffix: File extensions
Returns:
(list[dict], total_count)
"""
```
**Query Structure**:
```sql
SELECT
document.id, thumbnail, kb_id, parser_id, pipeline_id,
parser_config, source_type, type, created_by, name,
location, size, token_num, chunk_num, progress,
progress_msg, process_begin_at, process_duration,
meta_fields, suffix, run, status,
create_time, create_date, update_time, update_date,
user_canvas.title AS pipeline_name,
user.nickname
FROM document
JOIN file2document ON document.id = file2document.document_id
JOIN file ON file2document.file_id = file.id
LEFT JOIN user_canvas ON document.pipeline_id = user_canvas.id
LEFT JOIN user ON document.created_by = user.id
WHERE
document.kb_id = ?
AND document.status = '1'
AND (document.name LIKE '%keyword%' OR ...)
AND document.run IN (?, ?, ...)
AND document.type IN (?, ?, ...)
AND file.suffix IN (?, ?, ...)
ORDER BY ? DESC/ASC
LIMIT ? OFFSET ?
```
---
### 7. Full Parse Workflow (`doc_upload_and_parse`)
**Lines**: 889-1030 (module-level function)
```python
def doc_upload_and_parse(conversation_id, file_objs, user_id):
"""
Complete document upload and parse workflow for chat context.
Used by: Conversation-based document uploads
Steps:
1. Resolve conversation → dialog → KB
2. Initialize embedding model
3. Upload files
4. Parallel parsing (12 workers)
5. Mind map generation (async)
6. Embedding (batch 16)
7. Bulk insert to docStore (batch 64)
8. Update statistics
"""
```
**Detailed Flow**:
```
doc_upload_and_parse(conversation_id, file_objs, user_id)
├──► ConversationService.get_by_id(conversation_id)
│ └── Get conversation → dialog_id
├──► DialogService.get_by_id(dialog_id)
│ └── Get dialog → kb_ids[0]
├──► KnowledgebaseService.get_by_id(kb_id)
│ └── Get KB → tenant_id, embd_id
├──► LLMBundle(tenant_id, EMBEDDING, embd_id)
│ └── Initialize embedding model
├──► FileService.upload_document(kb, file_objs, user_id)
│ └── Returns: [(doc_dict, file_bytes), ...]
├──► ThreadPoolExecutor(max_workers=12)
│ │
│ └── for (doc, blob) in files:
│ executor.submit(parser.chunk, doc["name"], blob, **kwargs)
├──► For each parsed document:
│ │
│ ├── MindMapExtractor(llm) → Generate mind map
│ │ └── trio.run(mindmap, chunk_contents)
│ │
│ ├── Embedding (batch=16)
│ │ └── vectors = embedding(doc_id, contents)
│ │
│ ├── Add vectors to chunks
│ │ └── chunk["q_{dim}_vec"] = vector
│ │
│ ├── Bulk insert (batch=64)
│ │ └── docStoreConn.insert(chunks[b:b+64], idxnm, kb_id)
│ │
│ └── Update stats
│ └── increment_chunk_num(doc_id, kb_id, tokens, chunks, 0)
└──► Return [doc_id, ...]
```
---
## Document Status Fields
```python
# From Document model (db_models.py)
run: CharField(max_length=1)
# "0" = UNSTART (default)
# "1" = RUNNING
# "2" = CANCEL
status: CharField(max_length=1)
# "0" = WASTED (soft deleted)
# "1" = VALID (default)
progress: FloatField
# 0.0 = Not started
# 0.0-1.0 = In progress
# 1.0 = Done
# -1.0 = Failed
progress_msg: TextField
# Human-readable status message
# e.g., "Parsing...", "Embedding...", "Done"
process_begin_at: DateTimeField
# When parsing started
process_duration: FloatField
# Cumulative processing time (seconds)
```
---
## Service Interactions
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ SERVICE INTERACTION DIAGRAM │
└─────────────────────────────────────────────────────────────────────────────┘
DocumentService
┌───────────────────────┼───────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Knowledgebase │ │ Task │ │ File2Document │
│ Service │ │ Service │ │ Service │
│ │ │ │ │ │
│ • atomic_ │ │ • queue_tasks │ │ • get_storage │
│ increase_ │ │ • queue_ │ │ _address │
│ doc_num │ │ dataflow │ │ • get_by_ │
│ • delete_ │ │ • filter_ │ │ document_id │
│ field_map │ │ delete │ │ │
└───────────────┘ └───────────────┘ └───────────────┘
┌───────────────┐
│ FileService │
│ │
│ • upload_ │
│ document │
└───────────────┘
External Systems:
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ docStoreConn │ │ STORAGE_IMPL │ │ REDIS_CONN │
│ (Elasticsearch│ │ (MinIO) │ │ (Queue) │
│ /Infinity) │ │ │ │ │
│ │ │ │ │ │
│ • search │ │ • obj_exist │ │ • queue_ │
│ • insert │ │ • rm │ │ product │
│ • delete │ │ • put │ │ • queue_info │
│ • createIdx │ │ │ │ │
└───────────────┘ └───────────────┘ └───────────────┘
```
---
## Key Method Reference Table
| Category | Method | Lines | Purpose |
|----------|--------|-------|---------|
| **Query** | `get_list` | 81-110 | Paginated list with filters |
| **Query** | `get_by_kb_id` | 125-163 | Advanced query with joins |
| **Query** | `get_filter_by_kb_id` | 167-212 | Aggregated filter counts |
| **Query** | `get_chunking_config` | 542-563 | Config for parsing |
| **Insert** | `insert` | 292-297 | Add doc + increment KB |
| **Delete** | `remove_document` | 301-340 | Cascade cleanup |
| **Parse** | `run` | 822-841 | Route to processing |
| **Parse** | `doc_upload_and_parse` | 889-1030 | Full workflow |
| **Status** | `begin2parse` | 627-637 | Set running status |
| **Status** | `_sync_progress` | 682-738 | Aggregate task→doc |
| **Status** | `update_progress` | 665-668 | Batch sync unfinished |
| **Chunks** | `increment_chunk_num` | 390-403 | Add chunks |
| **Chunks** | `decrement_chunk_num` | 407-422 | Remove chunks |
| **Chunks** | `clear_chunk_num` | 426-438 | Reset on delete |
| **Config** | `update_parser_config` | 594-615 | Deep merge config |
| **Access** | `accessible` | 495-505 | User permission check |
| **Access** | `accessible4deletion` | 509-525 | Delete permission |
| **Stats** | `knowledgebase_basic_info` | 767-819 | KB statistics |
---
## Error Handling
| Location | Error Type | Handling |
|----------|-----------|----------|
| `insert()` | RuntimeError | Raised - transaction fails |
| `remove_document()` | Any exception | Caught + pass (silent) |
| `_sync_progress()` | Exception | Logged, continues others |
| `check_doc_health()` | RuntimeError | Raised - upload rejected |
| `update_parser_config()` | LookupError | Raised - update fails |
---
## Performance Patterns
### Batch Operations
| Operation | Batch Size | Purpose |
|-----------|-----------|---------|
| Chunk retrieval | 1000 | Memory efficient deletion |
| Bulk insert | 64 | Batch vector storage |
| Embedding | 16 | LLM batch inference |
| Parallel parsing | 12 workers | Concurrent processing |
| Doc ID retrieval | 100 | Paginated queries |
### Parallel Processing
```python
# ThreadPoolExecutor for parsing
exe = ThreadPoolExecutor(max_workers=12)
for (doc, blob) in files:
threads.append(exe.submit(parser.chunk, doc["name"], blob, **kwargs))
# Async mind map extraction
trio.run(mindmap_extractor, chunk_contents)
```

View file

@ -0,0 +1,681 @@
# Knowledgebase Service Analysis - Dataset Management & Access Control
## Tổng Quan
`knowledgebase_service.py` (566 lines) quản lý **Dataset (Knowledgebase)** - đơn vị tổ chức tài liệu trong RAGFlow, bao gồm CRUD operations, access control, parser configuration, và document association tracking.
## File Location
```
/api/db/services/knowledgebase_service.py
```
## Class Definition
```python
class KnowledgebaseService(CommonService):
model = Knowledgebase # Line 49
```
Kế thừa `CommonService` với các method cơ bản: `query()`, `get_by_id()`, `save()`, `update_by_id()`, `delete_by_id()`
---
## Knowledgebase Model Structure
```python
# From db_models.py (Lines 734-753)
class Knowledgebase(DataBaseModel):
id = CharField(max_length=32, primary_key=True)
avatar = TextField(null=True) # KB avatar (base64)
tenant_id = CharField(max_length=32, index=True) # Owner tenant
name = CharField(max_length=128, index=True) # KB name
language = CharField(max_length=32) # "English"|"Chinese"
description = TextField(null=True) # KB description
embd_id = CharField(max_length=128) # Embedding model ID
permission = CharField(max_length=16) # "me"|"team"
created_by = CharField(max_length=32) # Creator user ID
# Statistics
doc_num = IntegerField(default=0) # Document count
token_num = IntegerField(default=0) # Total tokens
chunk_num = IntegerField(default=0) # Total chunks
# Search config
similarity_threshold = FloatField(default=0.2)
vector_similarity_weight = FloatField(default=0.3)
# Parser config
parser_id = CharField(default="naive") # Default parser
pipeline_id = CharField(null=True) # Pipeline workflow ID
parser_config = JSONField(default={"pages": [[1, 1000000]]})
pagerank = IntegerField(default=0)
```
---
## Permission Model
### Dual-Level Access Control
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ PERMISSION MODEL │
└─────────────────────────────────────────────────────────────────────────────┘
Level 1: Knowledgebase.permission
├─── "me" ───► Only owner (created_by) can access
└─── "team" ──► All users in owner's tenant can access
Level 2: UserTenant relationship
└─── User must belong to KB's tenant to access
Combined Check (get_by_tenant_ids):
┌──────────────────────────────────────────────────────────────────┐
│ ((tenant_id IN joined_tenants) AND (permission == "team")) │
│ OR │
│ (tenant_id == user_id) │
└──────────────────────────────────────────────────────────────────┘
```
### Permission Methods
| Method | Lines | Purpose |
|--------|-------|---------|
| `accessible` | 471-486 | Check if user can VIEW KB |
| `accessible4deletion` | 53-83 | Check if user can DELETE KB |
| `get_by_tenant_ids` | 134-197 | Get KBs with permission filter |
| `get_kb_by_id` | 488-500 | Get KB by ID + user permission |
---
## Core Methods
### 1. Create Knowledgebase
**Lines**: 374-429
```python
@classmethod
@DB.connection_context()
def create_with_name(
cls,
*,
name: str,
tenant_id: str,
parser_id: str | None = None,
**kwargs
):
"""
Create a dataset with validation and defaults.
Validation Steps:
1. Name must be string
2. Name cannot be empty
3. Name cannot exceed DATASET_NAME_LIMIT bytes (UTF-8)
4. Deduplicate name within tenant (append _1, _2, etc.)
5. Verify tenant exists
Returns:
(True, payload_dict) on success
(False, error_result) on failure
"""
```
**Creation Flow**:
```
create_with_name(name, tenant_id, ...)
├──► Validate name type
│ └── Must be string
├──► Validate name content
│ ├── Strip whitespace
│ ├── Check not empty
│ └── Check UTF-8 byte length
├──► duplicate_name(query, name, tenant_id, status)
│ └── Returns unique name: "name", "name_1", "name_2"...
├──► TenantService.get_by_id(tenant_id)
│ └── Verify tenant exists
└──► Build payload dict
├── id: get_uuid()
├── name: deduplicated_name
├── tenant_id: tenant_id
├── created_by: tenant_id
├── parser_id: parser_id or "naive"
└── parser_config: get_parser_config(parser_id, config)
```
---
### 2. Get Knowledgebases by Tenant
**Lines**: 134-197
```python
@classmethod
@DB.connection_context()
def get_by_tenant_ids(cls, joined_tenant_ids, user_id,
page_number, items_per_page,
orderby, desc, keywords,
parser_id=None):
"""
Get knowledge bases accessible to user with pagination.
Permission Logic:
- Include team KBs from joined tenants
- Include private KBs owned by user
Filters:
- keywords: Case-insensitive name search
- parser_id: Filter by parser type
Joins:
- User: Get owner nickname and avatar
Returns:
(list[dict], total_count)
"""
```
**Query Structure**:
```sql
SELECT
kb.id, kb.avatar, kb.name, kb.language, kb.description,
kb.tenant_id, kb.permission, kb.doc_num, kb.token_num,
kb.chunk_num, kb.parser_id, kb.embd_id,
user.nickname, user.avatar AS tenant_avatar,
kb.update_time
FROM knowledgebase kb
JOIN user ON kb.tenant_id = user.id
WHERE
((kb.tenant_id IN (?, ?, ...) AND kb.permission = 'team')
OR kb.tenant_id = ?)
AND kb.status = '1'
AND LOWER(kb.name) LIKE '%keyword%' -- if keywords
AND kb.parser_id = ? -- if parser_id
ORDER BY kb.{orderby} DESC/ASC
LIMIT ? OFFSET ?
```
---
### 3. Get Knowledgebase Detail
**Lines**: 250-292
```python
@classmethod
@DB.connection_context()
def get_detail(cls, kb_id):
"""
Get comprehensive KB information including pipeline details.
Joins:
- UserCanvas (LEFT): Get pipeline name and avatar
Fields included:
- Basic: id, avatar, name, language, description
- Config: parser_id, parser_config, embd_id
- Stats: doc_num, token_num, chunk_num
- Pipeline: pipeline_id, pipeline_name, pipeline_avatar
- GraphRAG: graphrag_task_id, graphrag_task_finish_at
- RAPTOR: raptor_task_id, raptor_task_finish_at
- MindMap: mindmap_task_id, mindmap_task_finish_at
- Timestamps: create_time, update_time
Returns:
dict or None if not found
"""
```
---
### 4. Check Parsing Status
**Lines**: 85-117
```python
@classmethod
@DB.connection_context()
def is_parsed_done(cls, kb_id):
"""
Verify all documents in KB are ready for chat.
Validation Rules:
1. KB must exist
2. No documents in RUNNING/CANCEL/FAIL state
3. No documents UNSTART with zero chunks
Returns:
(True, None) - All parsed
(False, error_message) - Not ready
Used by:
Chat creation validation
"""
```
**Status Check Flow**:
```
is_parsed_done(kb_id)
├──► cls.query(id=kb_id)
│ └── Get KB info
├──► DocumentService.get_by_kb_id(kb_id, ...)
│ └── Get all documents (up to 1000)
└──► For each document:
├─── run == RUNNING ───► Return (False, "still being parsed")
├─── run == CANCEL ───► Return (False, "still being parsed")
├─── run == FAIL ───► Return (False, "still being parsed")
└─── run == UNSTART
└── chunk_num == 0 ──► Return (False, "has not been parsed")
└──► Return (True, None)
```
---
### 5. Parser Configuration Management
**Lines**: 294-345
```python
@classmethod
@DB.connection_context()
def update_parser_config(cls, id, config):
"""
Deep merge parser configuration.
Algorithm (dfs_update):
- For dict values: recursively merge
- For list values: union (set merge)
- For scalar values: replace
Example:
old = {"pages": [[1, 100]], "ocr": True}
new = {"pages": [[101, 200]], "language": "en"}
result = {"pages": [[1, 100], [101, 200]], "ocr": True, "language": "en"}
"""
@classmethod
@DB.connection_context()
def delete_field_map(cls, id):
"""Remove field_map key from parser_config."""
@classmethod
@DB.connection_context()
def get_field_map(cls, ids):
"""
Aggregate field mappings from multiple KBs.
Used by: TABLE parser for column mapping
"""
```
**Deep Merge Algorithm**:
```python
def dfs_update(old, new):
for k, v in new.items():
if k not in old:
old[k] = v # Add new key
elif isinstance(v, dict):
dfs_update(old[k], v) # Recursive merge
elif isinstance(v, list):
old[k] = list(set(old[k] + v)) # Union lists
else:
old[k] = v # Replace value
```
---
### 6. Document Statistics Management
**Lines**: 516-565
```python
@classmethod
@DB.connection_context()
def atomic_increase_doc_num_by_id(cls, kb_id):
"""
Atomically increment doc_num by 1.
Called when: DocumentService.insert()
SQL: UPDATE knowledgebase SET doc_num = doc_num + 1 WHERE id = ?
"""
@classmethod
@DB.connection_context()
def decrease_document_num_in_delete(cls, kb_id, doc_num_info: dict):
"""
Decrease statistics when documents are deleted.
doc_num_info = {
'doc_num': number of docs deleted,
'chunk_num': total chunks deleted,
'token_num': total tokens deleted
}
SQL:
UPDATE knowledgebase SET
doc_num = doc_num - ?,
chunk_num = chunk_num - ?,
token_num = token_num - ?,
update_time = ?
WHERE id = ?
"""
```
**Statistics Flow**:
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ STATISTICS TRACKING │
└─────────────────────────────────────────────────────────────────────────────┘
[Document Insert]
└──► KnowledgebaseService.atomic_increase_doc_num_by_id(kb_id)
└── kb.doc_num += 1
[Chunk Processing]
└──► DocumentService.increment_chunk_num(doc_id, kb_id, tokens, chunks, ...)
├── doc.chunk_num += chunks
├── doc.token_num += tokens
├── kb.chunk_num += chunks
└── kb.token_num += tokens
[Document Delete]
└──► KnowledgebaseService.decrease_document_num_in_delete(kb_id, info)
├── kb.doc_num -= info['doc_num']
├── kb.chunk_num -= info['chunk_num']
└── kb.token_num -= info['token_num']
```
---
### 7. Access Control Methods
**Lines**: 471-514
```python
@classmethod
@DB.connection_context()
def accessible(cls, kb_id, user_id):
"""
Check if user can access (view) KB.
Logic: User must belong to KB's tenant via UserTenant table.
SQL:
SELECT kb.id
FROM knowledgebase kb
JOIN user_tenant ON user_tenant.tenant_id = kb.tenant_id
WHERE kb.id = ? AND user_tenant.user_id = ?
"""
@classmethod
@DB.connection_context()
def accessible4deletion(cls, kb_id, user_id):
"""
Check if user can delete KB.
Logic: User must be the CREATOR of the KB.
SQL:
SELECT kb.id
FROM knowledgebase kb
WHERE kb.id = ? AND kb.created_by = ?
"""
```
**Access Control Diagram**:
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ ACCESS CONTROL CHECKS │
└─────────────────────────────────────────────────────────────────────────────┘
VIEW Access (accessible):
┌─────────────────────────────────────────────────────────────────┐
│ User │
│ │ │
│ ┌──────────────┴──────────────┐ │
│ ▼ ▼ │
│ UserTenant Knowledgebase │
│ (user_id) (tenant_id) │
│ │ │ │
│ └──────────┬──────────────────┘ │
│ ▼ │
│ tenant_id MATCH? │
│ │ │
│ ┌────────┴────────┐ │
│ Yes No │
│ │ │ │
│ ALLOWED DENIED │
└─────────────────────────────────────────────────────────────────┘
DELETE Access (accessible4deletion):
┌─────────────────────────────────────────────────────────────────┐
│ User │
│ (user_id) │
│ │ │
│ ▼ │
│ Knowledgebase │
│ (created_by) │
│ │ │
│ user_id == created_by? │
│ │ │
│ ┌────────────┴────────────┐ │
│ Yes No │
│ │ │ │
│ ALLOWED DENIED │
└─────────────────────────────────────────────────────────────────┘
```
---
## Service Interactions
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ SERVICE INTERACTION DIAGRAM │
└─────────────────────────────────────────────────────────────────────────────┘
KnowledgebaseService
┌───────────────────────┼───────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Document │ │ Tenant │ │ User │
│ Service │ │ Service │ │ Service │
│ │ │ │ │ │
│ • get_by_kb_id│ │ • get_by_id │ │ • get profile │
│ • insert │ │ (validate │ │ info for │
│ (→ atomic │ │ tenant) │ │ joins │
│ _increase) │ │ │ │ │
└───────────────┘ └───────────────┘ └───────────────┘
API Layer Callers:
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ kb_app.py │ │ dialog_app.py │ │ RESTful API │
│ │ │ │ │ │
│ • create │ │ • is_parsed │ │ • create_ │
│ • update │ │ _done check │ │ with_name │
│ • delete │ │ before chat │ │ │
│ • list │ │ │ │ │
└───────────────┘ └───────────────┘ └───────────────┘
```
---
## Key Method Reference Table
| Category | Method | Lines | Purpose |
|----------|--------|-------|---------|
| **Query** | `get_by_tenant_ids` | 134-197 | Paginated list with permissions |
| **Query** | `get_all_kb_by_tenant_ids` | 199-233 | Get all KBs (batch pagination) |
| **Query** | `get_kb_ids` | 235-248 | Get KB IDs for tenant |
| **Query** | `get_detail` | 250-292 | Comprehensive KB info |
| **Query** | `get_by_name` | 347-363 | Get by name + tenant |
| **Query** | `get_list` | 432-469 | Filtered paginated list |
| **Query** | `get_all_ids` | 365-371 | Get all KB IDs |
| **Create** | `create_with_name` | 374-429 | Validated creation |
| **Access** | `accessible` | 471-486 | View permission check |
| **Access** | `accessible4deletion` | 53-83 | Delete permission check |
| **Access** | `get_kb_by_id` | 488-500 | Get with user permission |
| **Access** | `get_kb_by_name` | 502-514 | Get by name with permission |
| **Config** | `update_parser_config` | 294-321 | Deep merge config |
| **Config** | `delete_field_map` | 323-331 | Remove field map |
| **Config** | `get_field_map` | 333-345 | Get field mappings |
| **Status** | `is_parsed_done` | 85-117 | Check parsing complete |
| **Stats** | `atomic_increase_doc_num_by_id` | 516-524 | Increment doc count |
| **Stats** | `decrease_document_num_in_delete` | 552-565 | Decrease on delete |
| **Stats** | `update_document_number_in_init` | 526-550 | Init doc count |
| **Docs** | `list_documents_by_ids` | 119-132 | Get doc IDs for KBs |
---
## API Endpoints Mapping
| HTTP Method | Endpoint | Service Method |
|-------------|----------|----------------|
| `POST` | `/v1/kb/create` | `create_with_name()` |
| `GET` | `/v1/kb/list` | `get_by_tenant_ids()` |
| `GET` | `/v1/kb/detail` | `get_detail()` |
| `PUT` | `/v1/kb/{kb_id}` | `update_by_id()` (inherited) |
| `DELETE` | `/v1/kb/{kb_id}` | `delete_by_id()` + cleanup |
| `PUT` | `/v1/kb/{kb_id}/config` | `update_parser_config()` |
---
## Parser Configuration Schema
```python
parser_config = {
# Page range for PDF parsing
"pages": [[1, 1000000]], # Default: all pages
# OCR settings
"ocr": True,
"ocr_model": "tesseract", # or "paddleocr"
# Layout settings
"layout_recognize": True,
# Chunking settings
"chunk_token_num": 128,
"delimiter": "\n!?。;!?",
# For TABLE parser
"field_map": {
"column_name": "mapped_field_name"
},
# For specific parsers
"raptor": {"enabled": False},
"graphrag": {"enabled": False}
}
```
---
## Error Handling
| Location | Error Type | Handling |
|----------|-----------|----------|
| `create_with_name()` | Invalid name | Return `(False, error_result)` |
| `create_with_name()` | Tenant not found | Return `(False, error_result)` |
| `update_parser_config()` | KB not found | Raise `LookupError` |
| `delete_field_map()` | KB not found | Raise `LookupError` |
| `decrease_document_num_in_delete()` | KB not found | Raise `RuntimeError` |
| `update_document_number_in_init()` | ValueError "no data to save" | Pass (ignore) |
---
## Database Patterns
### Atomic Updates
```python
# Atomic increment using SQL expression (Line 522-523)
data["doc_num"] = cls.model.doc_num + 1 # Peewee generates: doc_num + 1
cls.model.update(data).where(cls.model.id == kb_id).execute()
```
### Batch Pagination Pattern
```python
# Avoid deep pagination performance issues (Lines 224-232)
offset, limit = 0, 50
res = []
while True:
kb_batch = kbs.offset(offset).limit(limit)
_temp = list(kb_batch.dicts())
if not _temp:
break
res.extend(_temp)
offset += limit
```
### Selective Field Save
```python
# Save only dirty fields without updating timestamps (Lines 537-545)
dirty_fields = kb.dirty_fields
if cls.model._meta.combined.get("update_time") in dirty_fields:
dirty_fields.remove(cls.model._meta.combined["update_time"])
kb.save(only=dirty_fields)
```
---
## Key Constants & Imports
```python
# Permission types (from api/db/__init__.py)
class TenantPermission(Enum):
ME = "me" # Private to creator
TEAM = "team" # Shared with tenant
# Status (from common/constants.py)
class StatusEnum(Enum):
WASTED = "0" # Soft deleted
VALID = "1" # Active
# Dataset name limit (from api/constants.py)
DATASET_NAME_LIMIT = 128 # bytes (UTF-8)
# Default parser
ParserType.NAIVE = "naive"
```
---
## Performance Considerations
1. **Batch Pagination**: `get_all_kb_by_tenant_ids()` uses offset-limit pagination to avoid memory issues
2. **Selective Joins**: Queries only join necessary tables (User, UserTenant, UserCanvas)
3. **Index Usage**: All filter/sort fields are indexed (`tenant_id`, `name`, `permission`, `status`, `parser_id`)
4. **Atomic Operations**: Statistics updates use SQL expressions for atomicity without explicit transactions
5. **Lazy Loading**: Document details fetched separately from KB list queries