From 66381d8c9ce54964aff23cec06f73318e3afb4be Mon Sep 17 00:00:00 2001 From: levischd Date: Thu, 20 Nov 2025 14:17:00 +0100 Subject: [PATCH] feat: improve metadata handling in connector service - Add metadata mapping for documents during upload - Update documents with metadata after upload in connector service - Enhance metadata support in confluence and discord connectors - Update sync data source to handle metadata properly --- api/db/services/connector_service.py | 15 ++++++++++++++- common/data_source/confluence_connector.py | 1 + common/data_source/discord_connector.py | 1 + common/data_source/models.py | 1 + rag/svr/sync_data_source.py | 11 +++++++---- 5 files changed, 24 insertions(+), 5 deletions(-) diff --git a/api/db/services/connector_service.py b/api/db/services/connector_service.py index db8c713e2..f02c28aed 100644 --- a/api/db/services/connector_service.py +++ b/api/db/services/connector_service.py @@ -214,9 +214,22 @@ class SyncLogsService(CommonService): err, doc_blob_pairs = FileService.upload_document(kb, files, tenant_id, src) errs.extend(err) + # Create a mapping from filename to metadata for later use + metadata_map = {} + for d in docs: + if d.get("metadata"): + filename = d["semantic_identifier"]+(f"{d['extension']}" if d["semantic_identifier"][::-1].find(d['extension'][::-1])<0 else "") + metadata_map[filename] = d["metadata"] + kb_table_num_map = {} - for doc, _ in doc_blob_pairs: + for (doc, _), file_obj in zip(doc_blob_pairs, files): doc_ids.append(doc["id"]) + + # Set metadata if available for this document + if file_obj.filename in metadata_map: + from api.db.services.document_service import DocumentService + DocumentService.update_by_id(doc["id"], {"meta_fields": metadata_map[file_obj.filename]}) + if not auto_parse or auto_parse == "0": continue DocumentService.run(tenant_id, doc, kb_table_num_map) diff --git a/common/data_source/confluence_connector.py b/common/data_source/confluence_connector.py index 821f79862..a7935ff6d 100644 --- a/common/data_source/confluence_connector.py +++ b/common/data_source/confluence_connector.py @@ -1562,6 +1562,7 @@ class ConfluenceConnector( size_bytes=len(page_content.encode("utf-8")), # Calculate size in bytes doc_updated_at=datetime_from_string(page["version"]["when"]), primary_owners=primary_owners if primary_owners else None, + metadata=metadata if metadata else None, ) except Exception as e: logging.error(f"Error converting page {page.get('id', 'unknown')}: {e}") diff --git a/common/data_source/discord_connector.py b/common/data_source/discord_connector.py index 93a0477b0..46b23443c 100644 --- a/common/data_source/discord_connector.py +++ b/common/data_source/discord_connector.py @@ -65,6 +65,7 @@ def _convert_message_to_document( blob=message.content.encode("utf-8"), extension=".txt", size_bytes=len(message.content.encode("utf-8")), + metadata=metadata if metadata else None, ) diff --git a/common/data_source/models.py b/common/data_source/models.py index 032f26cc8..e956194b8 100644 --- a/common/data_source/models.py +++ b/common/data_source/models.py @@ -94,6 +94,7 @@ class Document(BaseModel): blob: bytes doc_updated_at: datetime size_bytes: int + metadata: Optional[dict[str, Any]] = None class BasicExpertInfo(BaseModel): diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 6925eb5f7..9839fbc81 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -79,8 +79,9 @@ class SyncBase: min_update = min([doc.doc_updated_at for doc in document_batch]) max_update = max([doc.doc_updated_at for doc in document_batch]) next_update = max([next_update, max_update]) - docs = [ - { + docs = [] + for doc in document_batch: + doc_dict = { "id": doc.id, "connector_id": task["connector_id"], "source": self.SOURCE_NAME, @@ -90,8 +91,10 @@ class SyncBase: "doc_updated_at": doc.doc_updated_at, "blob": doc.blob, } - for doc in document_batch - ] + # Add metadata if present + if doc.metadata: + doc_dict["metadata"] = doc.metadata + docs.append(doc_dict) e, kb = KnowledgebaseService.get_by_id(task["kb_id"]) err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"])