feat: improve metadata handling in connector service

- Add metadata mapping for documents during upload
- Update documents with metadata after upload in connector service
- Enhance metadata support in confluence and discord connectors
- Update sync data source to handle metadata properly
This commit is contained in:
levischd 2025-11-20 14:17:00 +01:00
parent 065917bf1c
commit 66381d8c9c
5 changed files with 24 additions and 5 deletions

View file

@ -214,9 +214,22 @@ class SyncLogsService(CommonService):
err, doc_blob_pairs = FileService.upload_document(kb, files, tenant_id, src)
errs.extend(err)
# Create a mapping from filename to metadata for later use
metadata_map = {}
for d in docs:
if d.get("metadata"):
filename = d["semantic_identifier"]+(f"{d['extension']}" if d["semantic_identifier"][::-1].find(d['extension'][::-1])<0 else "")
metadata_map[filename] = d["metadata"]
kb_table_num_map = {}
for doc, _ in doc_blob_pairs:
for (doc, _), file_obj in zip(doc_blob_pairs, files):
doc_ids.append(doc["id"])
# Set metadata if available for this document
if file_obj.filename in metadata_map:
from api.db.services.document_service import DocumentService
DocumentService.update_by_id(doc["id"], {"meta_fields": metadata_map[file_obj.filename]})
if not auto_parse or auto_parse == "0":
continue
DocumentService.run(tenant_id, doc, kb_table_num_map)

View file

@ -1562,6 +1562,7 @@ class ConfluenceConnector(
size_bytes=len(page_content.encode("utf-8")), # Calculate size in bytes
doc_updated_at=datetime_from_string(page["version"]["when"]),
primary_owners=primary_owners if primary_owners else None,
metadata=metadata if metadata else None,
)
except Exception as e:
logging.error(f"Error converting page {page.get('id', 'unknown')}: {e}")

View file

@ -65,6 +65,7 @@ def _convert_message_to_document(
blob=message.content.encode("utf-8"),
extension=".txt",
size_bytes=len(message.content.encode("utf-8")),
metadata=metadata if metadata else None,
)

View file

@ -94,6 +94,7 @@ class Document(BaseModel):
blob: bytes
doc_updated_at: datetime
size_bytes: int
metadata: Optional[dict[str, Any]] = None
class BasicExpertInfo(BaseModel):

View file

@ -79,8 +79,9 @@ class SyncBase:
min_update = min([doc.doc_updated_at for doc in document_batch])
max_update = max([doc.doc_updated_at for doc in document_batch])
next_update = max([next_update, max_update])
docs = [
{
docs = []
for doc in document_batch:
doc_dict = {
"id": doc.id,
"connector_id": task["connector_id"],
"source": self.SOURCE_NAME,
@ -90,8 +91,10 @@ class SyncBase:
"doc_updated_at": doc.doc_updated_at,
"blob": doc.blob,
}
for doc in document_batch
]
# Add metadata if present
if doc.metadata:
doc_dict["metadata"] = doc.metadata
docs.append(doc_dict)
e, kb = KnowledgebaseService.get_by_id(task["kb_id"])
err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"])