diff --git a/src/api/upload.py b/src/api/upload.py index fa3d470a..d55081fb 100644 --- a/src/api/upload.py +++ b/src/api/upload.py @@ -12,7 +12,7 @@ async def upload(request: Request, document_service, session_manager): user = request.state.user jwt_token = request.cookies.get("auth_token") - result = await document_service.process_upload_file(upload_file, owner_user_id=user.user_id, jwt_token=jwt_token) + result = await document_service.process_upload_file(upload_file, owner_user_id=user.user_id, jwt_token=jwt_token, owner_name=user.name, owner_email=user.email) return JSONResponse(result, status_code=201) # Created except Exception as e: error_msg = str(e) @@ -37,7 +37,7 @@ async def upload_path(request: Request, task_service, session_manager): user = request.state.user jwt_token = request.cookies.get("auth_token") - task_id = await task_service.create_upload_task(user.user_id, file_paths, jwt_token=jwt_token) + task_id = await task_service.create_upload_task(user.user_id, file_paths, jwt_token=jwt_token, owner_name=user.name, owner_email=user.email) return JSONResponse({ "task_id": task_id, @@ -132,6 +132,8 @@ async def upload_bucket(request: Request, task_service, session_manager): s3_client=s3_client, owner_user_id=user.user_id, jwt_token=jwt_token, + owner_name=user.name, + owner_email=user.email, ) task_id = await task_service.create_custom_task(user.user_id, keys, processor) diff --git a/src/connectors/service.py b/src/connectors/service.py index a8262217..69dc5179 100644 --- a/src/connectors/service.py +++ b/src/connectors/service.py @@ -30,7 +30,7 @@ class ConnectorService: """Get a connector by connection ID""" return await self.connection_manager.get_connector(connection_id) - async def process_connector_document(self, document: ConnectorDocument, owner_user_id: str, connector_type: str, jwt_token: str = None) -> Dict[str, Any]: + async def process_connector_document(self, document: ConnectorDocument, owner_user_id: str, connector_type: str, jwt_token: str = None, owner_name: str = None, owner_email: str = None) -> Dict[str, Any]: """Process a document from a connector using existing processing pipeline""" # Create temporary file from document content @@ -44,17 +44,25 @@ class ConnectorService: from services.document_service import DocumentService doc_service = DocumentService(session_manager=self.session_manager) + print(f"[DEBUG] Processing connector document with ID: {document.id}") + # Process using the existing pipeline but with connector document metadata result = await doc_service.process_file_common( file_path=tmp_file.name, file_hash=document.id, # Use connector document ID as hash owner_user_id=owner_user_id, original_filename=document.filename, # Pass the original Google Doc title - jwt_token=jwt_token + jwt_token=jwt_token, + owner_name=owner_name, + owner_email=owner_email, + file_size=len(document.content) if document.content else 0, + connector_type=connector_type ) - # If successfully indexed, update the indexed documents with connector metadata - if result["status"] == "indexed": + print(f"[DEBUG] Document processing result: {result}") + + # If successfully indexed or already exists, update the indexed documents with connector metadata + if result["status"] in ["indexed", "unchanged"]: # Update all chunks with connector-specific metadata await self._update_connector_metadata(document, owner_user_id, connector_type, jwt_token) @@ -70,6 +78,8 @@ class ConnectorService: async def _update_connector_metadata(self, document: ConnectorDocument, owner_user_id: str, connector_type: str, jwt_token: str = None): """Update indexed chunks with connector-specific metadata""" + print(f"[DEBUG] Looking for chunks with document_id: {document.id}") + # Find all chunks for this document query = { "query": { @@ -80,29 +90,47 @@ class ConnectorService: # Get user's OpenSearch client opensearch_client = self.session_manager.get_user_opensearch_client(owner_user_id, jwt_token) - response = await opensearch_client.search(index=self.index_name, body=query) + try: + response = await opensearch_client.search(index=self.index_name, body=query) + except Exception as e: + print(f"[ERROR] OpenSearch search failed for connector metadata update: {e}") + print(f"[ERROR] Search query: {query}") + raise + + print(f"[DEBUG] Search query: {query}") + print(f"[DEBUG] Found {len(response['hits']['hits'])} chunks matching document_id: {document.id}") # Update each chunk with connector metadata + print(f"[DEBUG] Updating {len(response['hits']['hits'])} chunks with connector_type: {connector_type}") for hit in response["hits"]["hits"]: chunk_id = hit["_id"] + current_connector_type = hit["_source"].get("connector_type", "unknown") + print(f"[DEBUG] Chunk {chunk_id}: current connector_type = {current_connector_type}, updating to {connector_type}") + update_body = { "doc": { "source_url": document.source_url, - "connector_type": connector_type, + "connector_type": connector_type, # Override the "local" set by process_file_common # Additional ACL info beyond owner (already set by process_file_common) "allowed_users": document.acl.allowed_users, "allowed_groups": document.acl.allowed_groups, "user_permissions": document.acl.user_permissions, "group_permissions": document.acl.group_permissions, # Timestamps - "created_time": document.created_time.isoformat(), - "modified_time": document.modified_time.isoformat(), + "created_time": document.created_time.isoformat() if document.created_time else None, + "modified_time": document.modified_time.isoformat() if document.modified_time else None, # Additional metadata "metadata": document.metadata } } - await opensearch_client.update(index=self.index_name, id=chunk_id, body=update_body) + try: + await opensearch_client.update(index=self.index_name, id=chunk_id, body=update_body) + print(f"[DEBUG] Updated chunk {chunk_id} with connector metadata") + except Exception as e: + print(f"[ERROR] OpenSearch update failed for chunk {chunk_id}: {e}") + print(f"[ERROR] Update body: {update_body}") + raise def _get_file_extension(self, mimetype: str) -> str: """Get file extension based on MIME type""" @@ -168,9 +196,14 @@ class ConnectorService: if not files_to_process: raise ValueError("No files found to sync") + # Get user information + user = self.session_manager.get_user(user_id) if self.session_manager else None + owner_name = user.name if user else None + owner_email = user.email if user else None + # Create custom processor for connector files from models.processors import ConnectorFileProcessor - processor = ConnectorFileProcessor(self, connection_id, files_to_process, user_id) + processor = ConnectorFileProcessor(self, connection_id, files_to_process, user_id, owner_name=owner_name, owner_email=owner_email) # Use file IDs as items (no more fake file paths!) file_ids = [file_info['id'] for file_info in files_to_process] @@ -195,10 +228,15 @@ class ConnectorService: if not file_ids: raise ValueError("No file IDs provided") + # Get user information + user = self.session_manager.get_user(user_id) if self.session_manager else None + owner_name = user.name if user else None + owner_email = user.email if user else None + # Create custom processor for specific connector files from models.processors import ConnectorFileProcessor # We'll pass file_ids as the files_info, the processor will handle ID-only files - processor = ConnectorFileProcessor(self, connection_id, file_ids, user_id) + processor = ConnectorFileProcessor(self, connection_id, file_ids, user_id, owner_name=owner_name, owner_email=owner_email) # Create custom task using TaskService task_id = await self.task_service.create_custom_task(user_id, file_ids, processor) diff --git a/src/models/processors.py b/src/models/processors.py index 69a5429d..d07914be 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -22,10 +22,12 @@ class TaskProcessor(ABC): class DocumentFileProcessor(TaskProcessor): """Default processor for regular file uploads""" - def __init__(self, document_service, owner_user_id: str = None, jwt_token: str = None): + def __init__(self, document_service, owner_user_id: str = None, jwt_token: str = None, owner_name: str = None, owner_email: str = None): self.document_service = document_service self.owner_user_id = owner_user_id self.jwt_token = jwt_token + self.owner_name = owner_name + self.owner_email = owner_email async def process_item(self, upload_task: UploadTask, item: str, file_task: FileTask) -> None: """Process a regular file path using DocumentService""" @@ -33,18 +35,22 @@ class DocumentFileProcessor(TaskProcessor): await self.document_service.process_single_file_task( upload_task, item, owner_user_id=self.owner_user_id, - jwt_token=self.jwt_token + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email ) class ConnectorFileProcessor(TaskProcessor): """Processor for connector file uploads""" - def __init__(self, connector_service, connection_id: str, files_to_process: list, user_id: str = None): + def __init__(self, connector_service, connection_id: str, files_to_process: list, user_id: str = None, owner_name: str = None, owner_email: str = None): self.connector_service = connector_service self.connection_id = connection_id self.files_to_process = files_to_process self.user_id = user_id + self.owner_name = owner_name + self.owner_email = owner_email # Create lookup map for file info - handle both file objects and file IDs self.file_info_map = {} for f in files_to_process: @@ -77,7 +83,7 @@ class ConnectorFileProcessor(TaskProcessor): raise ValueError("user_id not provided to ConnectorFileProcessor") # Process using existing pipeline - result = await self.connector_service.process_connector_document(document, self.user_id, connection.connector_type) + result = await self.connector_service.process_connector_document(document, self.user_id, connection.connector_type, owner_name=self.owner_name, owner_email=self.owner_email) file_task.status = TaskStatus.COMPLETED file_task.result = result @@ -94,6 +100,8 @@ class S3FileProcessor(TaskProcessor): s3_client=None, owner_user_id: str = None, jwt_token: str = None, + owner_name: str = None, + owner_email: str = None, ): import boto3 @@ -102,6 +110,8 @@ class S3FileProcessor(TaskProcessor): self.s3_client = s3_client or boto3.client("s3") self.owner_user_id = owner_user_id self.jwt_token = jwt_token + self.owner_name = owner_name + self.owner_email = owner_email async def process_item(self, upload_task: UploadTask, item: str, file_task: FileTask) -> None: """Download an S3 object and process it using DocumentService""" @@ -145,6 +155,13 @@ class S3FileProcessor(TaskProcessor): ) embeddings.extend([d.embedding for d in resp.data]) + # Get object size + try: + obj_info = self.s3_client.head_object(Bucket=self.bucket, Key=item) + file_size = obj_info.get('ContentLength', 0) + except Exception: + file_size = 0 + for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): chunk_doc = { "document_id": slim_doc["id"], @@ -154,10 +171,19 @@ class S3FileProcessor(TaskProcessor): "text": chunk["text"], "chunk_embedding": vect, "owner": self.owner_user_id, + "owner_name": self.owner_name, + "owner_email": self.owner_email, + "file_size": file_size, + "connector_type": "s3", # S3 uploads "indexed_time": datetime.datetime.now().isoformat(), } chunk_id = f"{slim_doc['id']}_{i}" - await opensearch_client.index(index=INDEX_NAME, id=chunk_id, body=chunk_doc) + try: + await opensearch_client.index(index=INDEX_NAME, id=chunk_id, body=chunk_doc) + except Exception as e: + print(f"[ERROR] OpenSearch indexing failed for S3 chunk {chunk_id}: {e}") + print(f"[ERROR] Chunk document: {chunk_doc}") + raise result = {"status": "indexed", "id": slim_doc["id"]} diff --git a/src/services/document_service.py b/src/services/document_service.py index 35977ab1..7e3d5ded 100644 --- a/src/services/document_service.py +++ b/src/services/document_service.py @@ -80,8 +80,9 @@ class DocumentService: def __init__(self, process_pool=None, session_manager=None): self.process_pool = process_pool self.session_manager = session_manager + self._mapping_ensured = False - async def process_file_common(self, file_path: str, file_hash: str = None, owner_user_id: str = None, original_filename: str = None, jwt_token: str = None): + async def process_file_common(self, file_path: str, file_hash: str = None, owner_user_id: str = None, original_filename: str = None, jwt_token: str = None, owner_name: str = None, owner_email: str = None, file_size: int = None, connector_type: str = "local"): """ Common processing logic for both upload and upload_path. 1. Optionally compute SHA256 hash if not provided. @@ -131,16 +132,26 @@ class DocumentService: "text": chunk["text"], "chunk_embedding": vect, "owner": owner_user_id, + "owner_name": owner_name, + "owner_email": owner_email, + "file_size": file_size, + "connector_type": connector_type, "indexed_time": datetime.datetime.now().isoformat() } chunk_id = f"{file_hash}_{i}" - await opensearch_client.index(index=INDEX_NAME, id=chunk_id, body=chunk_doc) + try: + await opensearch_client.index(index=INDEX_NAME, id=chunk_id, body=chunk_doc) + except Exception as e: + print(f"[ERROR] OpenSearch indexing failed for chunk {chunk_id}: {e}") + print(f"[ERROR] Chunk document: {chunk_doc}") + raise return {"status": "indexed", "id": file_hash} - async def process_upload_file(self, upload_file, owner_user_id: str = None, jwt_token: str = None): + async def process_upload_file(self, upload_file, owner_user_id: str = None, jwt_token: str = None, owner_name: str = None, owner_email: str = None): """Process an uploaded file from form data""" sha256 = hashlib.sha256() tmp = tempfile.NamedTemporaryFile(delete=False) + file_size = 0 try: while True: chunk = await upload_file.read(1 << 20) @@ -148,16 +159,22 @@ class DocumentService: break sha256.update(chunk) tmp.write(chunk) + file_size += len(chunk) tmp.flush() file_hash = sha256.hexdigest() # Get user's OpenSearch client with JWT for OIDC auth opensearch_client = self.session_manager.get_user_opensearch_client(owner_user_id, jwt_token) - exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash) + + try: + exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash) + except Exception as e: + print(f"[ERROR] OpenSearch exists check failed for document {file_hash}: {e}") + raise if exists: return {"status": "unchanged", "id": file_hash} - result = await self.process_file_common(tmp.name, file_hash, owner_user_id=owner_user_id, jwt_token=jwt_token) + result = await self.process_file_common(tmp.name, file_hash, owner_user_id=owner_user_id, original_filename=upload_file.filename, jwt_token=jwt_token, owner_name=owner_name, owner_email=owner_email, file_size=file_size) return result finally: @@ -200,7 +217,7 @@ class DocumentService: "content_length": len(full_content) } - async def process_single_file_task(self, upload_task, file_path: str, owner_user_id: str = None, jwt_token: str = None): + async def process_single_file_task(self, upload_task, file_path: str, owner_user_id: str = None, jwt_token: str = None, owner_name: str = None, owner_email: str = None): """Process a single file and update task tracking - used by task service""" from models.tasks import TaskStatus import time @@ -234,6 +251,13 @@ class DocumentService: resp = await clients.patched_async_client.embeddings.create(model=EMBED_MODEL, input=batch) embeddings.extend([d.embedding for d in resp.data]) + # Get file size + file_size = 0 + try: + file_size = os.path.getsize(file_path) + except OSError: + pass # Keep file_size as 0 if can't get size + # Index each chunk for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): chunk_doc = { @@ -244,10 +268,19 @@ class DocumentService: "text": chunk["text"], "chunk_embedding": vect, "owner": owner_user_id, + "owner_name": owner_name, + "owner_email": owner_email, + "file_size": file_size, + "connector_type": connector_type, "indexed_time": datetime.datetime.now().isoformat() } chunk_id = f"{slim_doc['id']}_{i}" - await opensearch_client.index(index=INDEX_NAME, id=chunk_id, body=chunk_doc) + try: + await opensearch_client.index(index=INDEX_NAME, id=chunk_id, body=chunk_doc) + except Exception as e: + print(f"[ERROR] OpenSearch indexing failed for batch chunk {chunk_id}: {e}") + print(f"[ERROR] Chunk document: {chunk_doc}") + raise result = {"status": "indexed", "id": slim_doc["id"]} diff --git a/src/services/search_service.py b/src/services/search_service.py index 585963fb..1fa70946 100644 --- a/src/services/search_service.py +++ b/src/services/search_service.py @@ -40,7 +40,8 @@ class SearchService: field_mapping = { "data_sources": "filename", "document_types": "mimetype", - "owners": "owner" + "owners": "owner_name.keyword", + "connector_types": "connector_type" } for filter_key, values in filters.items(): @@ -111,12 +112,18 @@ class SearchService: }, "owners": { "terms": { - "field": "owner", + "field": "owner_name.keyword", + "size": 10 + } + }, + "connector_types": { + "terms": { + "field": "connector_type", "size": 10 } } }, - "_source": ["filename", "mimetype", "page", "text", "source_url", "owner", "allowed_users", "allowed_groups"], + "_source": ["filename", "mimetype", "page", "text", "source_url", "owner", "owner_name", "owner_email", "file_size", "connector_type", "allowed_users", "allowed_groups"], "size": limit } @@ -130,7 +137,14 @@ class SearchService: # Get user's OpenSearch client with JWT for OIDC auth opensearch_client = clients.create_user_opensearch_client(jwt_token) - results = await opensearch_client.search(index=INDEX_NAME, body=search_body) + + try: + results = await opensearch_client.search(index=INDEX_NAME, body=search_body) + except Exception as e: + print(f"[ERROR] OpenSearch query failed: {e}") + print(f"[ERROR] Search body: {search_body}") + # Re-raise the exception so the API returns the error to frontend + raise # Transform results (keep for backward compatibility) chunks = [] @@ -142,7 +156,11 @@ class SearchService: "text": hit["_source"]["text"], "score": hit["_score"], "source_url": hit["_source"].get("source_url"), - "owner": hit["_source"].get("owner") + "owner": hit["_source"].get("owner"), + "owner_name": hit["_source"].get("owner_name"), + "owner_email": hit["_source"].get("owner_email"), + "file_size": hit["_source"].get("file_size"), + "connector_type": hit["_source"].get("connector_type") }) # Return both transformed results and aggregations diff --git a/src/services/task_service.py b/src/services/task_service.py index a75735c8..8fa1ed2b 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -24,11 +24,11 @@ class TaskService: delay = min(base_delay * (2 ** retry_count) + random.uniform(0, 1), max_delay) await asyncio.sleep(delay) - async def create_upload_task(self, user_id: str, file_paths: list, jwt_token: str = None) -> str: + async def create_upload_task(self, user_id: str, file_paths: list, jwt_token: str = None, owner_name: str = None, owner_email: str = None) -> str: """Create a new upload task for bulk file processing""" # Use default DocumentFileProcessor with user context from models.processors import DocumentFileProcessor - processor = DocumentFileProcessor(self.document_service, owner_user_id=user_id, jwt_token=jwt_token) + processor = DocumentFileProcessor(self.document_service, owner_user_id=user_id, jwt_token=jwt_token, owner_name=owner_name, owner_email=owner_email) return await self.create_custom_task(user_id, file_paths, processor) async def create_custom_task(self, user_id: str, items: list, processor) -> str: