From 1b9d56bec0aa4c9dd5b449d7719ae76a7a8fbeba Mon Sep 17 00:00:00 2001 From: phact Date: Wed, 24 Sep 2025 21:13:26 -0400 Subject: [PATCH] WIP --- src/config/settings.py | 3 + src/connectors/service.py | 6 +- src/models/processors.py | 488 ++++++++++++++++++++++--------- src/services/document_service.py | 251 +--------------- src/services/task_service.py | 20 +- src/utils/document_processing.py | 10 +- src/utils/hash_utils.py | 76 +++++ 7 files changed, 469 insertions(+), 385 deletions(-) create mode 100644 src/utils/hash_utils.py diff --git a/src/config/settings.py b/src/config/settings.py index 17000f4b..b54c87a8 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -394,6 +394,9 @@ class AppClients: ssl_assert_fingerprint=None, headers=headers, http_compress=True, + timeout=30, # 30 second timeout + max_retries=3, + retry_on_timeout=True, ) diff --git a/src/connectors/service.py b/src/connectors/service.py index 01a41519..171da7a1 100644 --- a/src/connectors/service.py +++ b/src/connectors/service.py @@ -69,8 +69,10 @@ class ConnectorService: logger.debug("Processing connector document", document_id=document.id) - # Process using the existing pipeline but with connector document metadata - result = await doc_service.process_file_common( + # Process using consolidated processing pipeline + from models.processors import TaskProcessor + processor = TaskProcessor(document_service=doc_service) + result = await processor.process_document_standard( file_path=tmp_file.name, file_hash=document.id, # Use connector document ID as hash owner_user_id=owner_user_id, diff --git a/src/models/processors.py b/src/models/processors.py index a817f8d4..87562424 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -9,6 +9,138 @@ logger = get_logger(__name__) class TaskProcessor(ABC): """Abstract base class for task processors""" + def __init__(self, document_service=None): + self.document_service = document_service + + async def check_document_exists( + self, + file_hash: str, + opensearch_client, + ) -> bool: + """ + Check if a document with the given hash already exists in OpenSearch. + Consolidated hash checking for all processors. + """ + from config.settings import INDEX_NAME + import asyncio + + max_retries = 3 + retry_delay = 1.0 + + for attempt in range(max_retries): + try: + exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash) + return exists + except (asyncio.TimeoutError, Exception) as e: + if attempt == max_retries - 1: + logger.error( + "OpenSearch exists check failed after retries", + file_hash=file_hash, + error=str(e), + attempt=attempt + 1 + ) + # On final failure, assume document doesn't exist (safer to reprocess than skip) + logger.warning( + "Assuming document doesn't exist due to connection issues", + file_hash=file_hash + ) + return False + else: + logger.warning( + "OpenSearch exists check failed, retrying", + file_hash=file_hash, + error=str(e), + attempt=attempt + 1, + retry_in=retry_delay + ) + await asyncio.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + + async def process_document_standard( + self, + file_path: str, + file_hash: str, + owner_user_id: str = None, + original_filename: str = None, + jwt_token: str = None, + owner_name: str = None, + owner_email: str = None, + file_size: int = None, + connector_type: str = "local", + ): + """ + Standard processing pipeline for non-Langflow processors: + docling conversion + embeddings + OpenSearch indexing. + """ + import datetime + from config.settings import INDEX_NAME, EMBED_MODEL, clients + from services.document_service import chunk_texts_for_embeddings + from utils.document_processing import extract_relevant + + # Get user's OpenSearch client with JWT for OIDC auth + opensearch_client = self.document_service.session_manager.get_user_opensearch_client( + owner_user_id, jwt_token + ) + + # Check if already exists + if await self.check_document_exists(file_hash, opensearch_client): + return {"status": "unchanged", "id": file_hash} + + # Convert and extract + result = clients.converter.convert(file_path) + full_doc = result.document.export_to_dict() + slim_doc = extract_relevant(full_doc) + + texts = [c["text"] for c in slim_doc["chunks"]] + + # Split into batches to avoid token limits (8191 limit, use 8000 with buffer) + text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) + embeddings = [] + + for batch in text_batches: + resp = await clients.patched_async_client.embeddings.create( + model=EMBED_MODEL, input=batch + ) + embeddings.extend([d.embedding for d in resp.data]) + + # Index each chunk as a separate document + for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): + chunk_doc = { + "document_id": file_hash, + "filename": original_filename + if original_filename + else slim_doc["filename"], + "mimetype": slim_doc["mimetype"], + "page": chunk["page"], + "text": chunk["text"], + "chunk_embedding": vect, + "file_size": file_size, + "connector_type": connector_type, + "indexed_time": datetime.datetime.now().isoformat(), + } + + # Only set owner fields if owner_user_id is provided (for no-auth mode support) + if owner_user_id is not None: + chunk_doc["owner"] = owner_user_id + if owner_name is not None: + chunk_doc["owner_name"] = owner_name + if owner_email is not None: + chunk_doc["owner_email"] = owner_email + chunk_id = f"{file_hash}_{i}" + try: + await opensearch_client.index( + index=INDEX_NAME, id=chunk_id, body=chunk_doc + ) + except Exception as e: + logger.error( + "OpenSearch indexing failed for chunk", + chunk_id=chunk_id, + error=str(e), + ) + logger.error("Chunk document details", chunk_doc=chunk_doc) + raise + return {"status": "indexed", "id": file_hash} + @abstractmethod async def process_item( self, upload_task: UploadTask, item: Any, file_task: FileTask @@ -35,7 +167,7 @@ class DocumentFileProcessor(TaskProcessor): owner_name: str = None, owner_email: str = None, ): - self.document_service = document_service + super().__init__(document_service) self.owner_user_id = owner_user_id self.jwt_token = jwt_token self.owner_name = owner_name @@ -44,16 +176,52 @@ class DocumentFileProcessor(TaskProcessor): async def process_item( self, upload_task: UploadTask, item: str, file_task: FileTask ) -> None: - """Process a regular file path using DocumentService""" - # This calls the existing logic with user context - await self.document_service.process_single_file_task( - upload_task, - item, - owner_user_id=self.owner_user_id, - jwt_token=self.jwt_token, - owner_name=self.owner_name, - owner_email=self.owner_email, - ) + """Process a regular file path using consolidated methods""" + from models.tasks import TaskStatus + from utils.hash_utils import hash_id + import time + import os + + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() + + try: + # Compute hash + file_hash = hash_id(item) + + # Get file size + try: + file_size = os.path.getsize(item) + except Exception: + file_size = 0 + + # Use consolidated standard processing + result = await self.process_document_standard( + file_path=item, + file_hash=file_hash, + owner_user_id=self.owner_user_id, + original_filename=os.path.basename(item), + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + file_size=file_size, + connector_type="local", + ) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 + + except Exception as e: + file_task.status = TaskStatus.FAILED + file_task.error = str(e) + file_task.updated_at = time.time() + upload_task.failed_files += 1 + raise + finally: + upload_task.processed_files += 1 + upload_task.updated_at = time.time() class ConnectorFileProcessor(TaskProcessor): @@ -69,6 +237,7 @@ class ConnectorFileProcessor(TaskProcessor): owner_name: str = None, owner_email: str = None, ): + super().__init__() self.connector_service = connector_service self.connection_id = connection_id self.files_to_process = files_to_process @@ -89,40 +258,77 @@ class ConnectorFileProcessor(TaskProcessor): async def process_item( self, upload_task: UploadTask, item: str, file_task: FileTask ) -> None: - """Process a connector file using ConnectorService""" + """Process a connector file using consolidated methods""" from models.tasks import TaskStatus + from utils.hash_utils import hash_id + import tempfile + import time + import os - file_id = item # item is the connector file ID - self.file_info_map.get(file_id) + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() - # Get the connector and connection info - connector = await self.connector_service.get_connector(self.connection_id) - connection = await self.connector_service.connection_manager.get_connection( - self.connection_id - ) - if not connector or not connection: - raise ValueError(f"Connection '{self.connection_id}' not found") + try: + file_id = item # item is the connector file ID - # Get file content from connector (the connector will fetch metadata if needed) - document = await connector.get_file_content(file_id) + # Get the connector and connection info + connector = await self.connector_service.get_connector(self.connection_id) + connection = await self.connector_service.connection_manager.get_connection( + self.connection_id + ) + if not connector or not connection: + raise ValueError(f"Connection '{self.connection_id}' not found") - # Use the user_id passed during initialization - if not self.user_id: - raise ValueError("user_id not provided to ConnectorFileProcessor") + # Get file content from connector + document = await connector.get_file_content(file_id) - # Process using existing pipeline - result = await self.connector_service.process_connector_document( - document, - self.user_id, - connection.connector_type, - jwt_token=self.jwt_token, - owner_name=self.owner_name, - owner_email=self.owner_email, - ) + if not self.user_id: + raise ValueError("user_id not provided to ConnectorFileProcessor") - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 + # Create temporary file from document content + suffix = self.connector_service._get_file_extension(document.mimetype) + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: + tmp_file.write(document.content) + tmp_file.flush() + + try: + # Compute hash + file_hash = hash_id(tmp_file.name) + + # Use consolidated standard processing + result = await self.process_document_standard( + file_path=tmp_file.name, + file_hash=file_hash, + owner_user_id=self.user_id, + original_filename=document.filename, + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + file_size=len(document.content), + connector_type=connection.connector_type, + ) + + # Add connector-specific metadata + result.update({ + "source_url": document.source_url, + "document_id": document.id, + }) + + finally: + if os.path.exists(tmp_file.name): + os.unlink(tmp_file.name) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 + + except Exception as e: + file_task.status = TaskStatus.FAILED + file_task.error = str(e) + file_task.updated_at = time.time() + upload_task.failed_files += 1 + raise class LangflowConnectorFileProcessor(TaskProcessor): @@ -138,6 +344,7 @@ class LangflowConnectorFileProcessor(TaskProcessor): owner_name: str = None, owner_email: str = None, ): + super().__init__() self.langflow_connector_service = langflow_connector_service self.connection_id = connection_id self.files_to_process = files_to_process @@ -160,42 +367,81 @@ class LangflowConnectorFileProcessor(TaskProcessor): ) -> None: """Process a connector file using LangflowConnectorService""" from models.tasks import TaskStatus + from utils.hash_utils import hash_id + import tempfile + import time + import os - file_id = item # item is the connector file ID - self.file_info_map.get(file_id) + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() - # Get the connector and connection info - connector = await self.langflow_connector_service.get_connector( - self.connection_id - ) - connection = ( - await self.langflow_connector_service.connection_manager.get_connection( + try: + file_id = item # item is the connector file ID + + # Get the connector and connection info + connector = await self.langflow_connector_service.get_connector( self.connection_id ) - ) - if not connector or not connection: - raise ValueError(f"Connection '{self.connection_id}' not found") + connection = ( + await self.langflow_connector_service.connection_manager.get_connection( + self.connection_id + ) + ) + if not connector or not connection: + raise ValueError(f"Connection '{self.connection_id}' not found") - # Get file content from connector (the connector will fetch metadata if needed) - document = await connector.get_file_content(file_id) + # Get file content from connector + document = await connector.get_file_content(file_id) - # Use the user_id passed during initialization - if not self.user_id: - raise ValueError("user_id not provided to LangflowConnectorFileProcessor") + if not self.user_id: + raise ValueError("user_id not provided to LangflowConnectorFileProcessor") - # Process using Langflow pipeline - result = await self.langflow_connector_service.process_connector_document( - document, - self.user_id, - connection.connector_type, - jwt_token=self.jwt_token, - owner_name=self.owner_name, - owner_email=self.owner_email, - ) + # Create temporary file and compute hash to check for duplicates + suffix = self.langflow_connector_service._get_file_extension(document.mimetype) + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: + tmp_file.write(document.content) + tmp_file.flush() - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 + try: + # Compute hash and check if already exists + file_hash = hash_id(tmp_file.name) + + # Check if document already exists + opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client( + self.user_id, self.jwt_token + ) + if await self.check_document_exists(file_hash, opensearch_client): + file_task.status = TaskStatus.COMPLETED + file_task.result = {"status": "unchanged", "id": file_hash} + file_task.updated_at = time.time() + upload_task.successful_files += 1 + return + + # Process using Langflow pipeline + result = await self.langflow_connector_service.process_connector_document( + document, + self.user_id, + connection.connector_type, + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + ) + + finally: + if os.path.exists(tmp_file.name): + os.unlink(tmp_file.name) + + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + upload_task.successful_files += 1 + + except Exception as e: + file_task.status = TaskStatus.FAILED + file_task.error = str(e) + file_task.updated_at = time.time() + upload_task.failed_files += 1 + raise class S3FileProcessor(TaskProcessor): @@ -213,7 +459,7 @@ class S3FileProcessor(TaskProcessor): ): import boto3 - self.document_service = document_service + super().__init__(document_service) self.bucket = bucket self.s3_client = s3_client or boto3.client("s3") self.owner_user_id = owner_user_id @@ -244,72 +490,30 @@ class S3FileProcessor(TaskProcessor): self.s3_client.download_fileobj(self.bucket, item, tmp) tmp.flush() - loop = asyncio.get_event_loop() - slim_doc = await loop.run_in_executor( - self.document_service.process_pool, process_document_sync, tmp.name + # Compute hash + from utils.hash_utils import hash_id + file_hash = hash_id(tmp.name) + + # Get object size + try: + obj_info = self.s3_client.head_object(Bucket=self.bucket, Key=item) + file_size = obj_info.get("ContentLength", 0) + except Exception: + file_size = 0 + + # Use consolidated standard processing + result = await self.process_document_standard( + file_path=tmp.name, + file_hash=file_hash, + owner_user_id=self.owner_user_id, + original_filename=item, # Use S3 key as filename + jwt_token=self.jwt_token, + owner_name=self.owner_name, + owner_email=self.owner_email, + file_size=file_size, + connector_type="s3", ) - opensearch_client = ( - self.document_service.session_manager.get_user_opensearch_client( - self.owner_user_id, self.jwt_token - ) - ) - exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"]) - if exists: - result = {"status": "unchanged", "id": slim_doc["id"]} - else: - texts = [c["text"] for c in slim_doc["chunks"]] - text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) - embeddings = [] - for batch in text_batches: - resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=batch - ) - embeddings.extend([d.embedding for d in resp.data]) - - # Get object size - try: - obj_info = self.s3_client.head_object(Bucket=self.bucket, Key=item) - file_size = obj_info.get("ContentLength", 0) - except Exception: - file_size = 0 - - for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): - chunk_doc = { - "document_id": slim_doc["id"], - "filename": slim_doc["filename"], - "mimetype": slim_doc["mimetype"], - "page": chunk["page"], - "text": chunk["text"], - "chunk_embedding": vect, - "file_size": file_size, - "connector_type": "s3", # S3 uploads - "indexed_time": datetime.datetime.now().isoformat(), - } - - # Only set owner fields if owner_user_id is provided (for no-auth mode support) - if self.owner_user_id is not None: - chunk_doc["owner"] = self.owner_user_id - if self.owner_name is not None: - chunk_doc["owner_name"] = self.owner_name - if self.owner_email is not None: - chunk_doc["owner_email"] = self.owner_email - chunk_id = f"{slim_doc['id']}_{i}" - try: - await opensearch_client.index( - index=INDEX_NAME, id=chunk_id, body=chunk_doc - ) - except Exception as e: - logger.error( - "OpenSearch indexing failed for S3 chunk", - chunk_id=chunk_id, - error=str(e), - chunk_doc=chunk_doc, - ) - raise - - result = {"status": "indexed", "id": slim_doc["id"]} - result["path"] = f"s3://{self.bucket}/{item}" file_task.status = TaskStatus.COMPLETED file_task.result = result @@ -320,8 +524,8 @@ class S3FileProcessor(TaskProcessor): file_task.error = str(e) upload_task.failed_files += 1 finally: - tmp.close() - os.remove(tmp.name) + if os.path.exists(tmp.name): + os.unlink(tmp.name) file_task.updated_at = time.time() @@ -341,6 +545,7 @@ class LangflowFileProcessor(TaskProcessor): settings: dict = None, delete_after_ingest: bool = True, ): + super().__init__() self.langflow_file_service = langflow_file_service self.session_manager = session_manager self.owner_user_id = owner_user_id @@ -366,7 +571,22 @@ class LangflowFileProcessor(TaskProcessor): file_task.updated_at = time.time() try: - # Read file content + # Compute hash and check if already exists + from utils.hash_utils import hash_id + file_hash = hash_id(item) + + # Check if document already exists + opensearch_client = self.session_manager.get_user_opensearch_client( + self.owner_user_id, self.jwt_token + ) + if await self.check_document_exists(file_hash, opensearch_client): + file_task.status = TaskStatus.COMPLETED + file_task.result = {"status": "unchanged", "id": file_hash} + file_task.updated_at = time.time() + upload_task.successful_files += 1 + return + + # Read file content for processing with open(item, 'rb') as f: content = f.read() diff --git a/src/services/document_service.py b/src/services/document_service.py index 949515e3..fb52a89c 100644 --- a/src/services/document_service.py +++ b/src/services/document_service.py @@ -112,98 +112,6 @@ class DocumentService: return False return False - async def process_file_common( - self, - file_path: str, - file_hash: str = None, - owner_user_id: str = None, - original_filename: str = None, - jwt_token: str = None, - owner_name: str = None, - owner_email: str = None, - file_size: int = None, - connector_type: str = "local", - ): - """ - Common processing logic for both upload and upload_path. - 1. Optionally compute SHA256 hash if not provided. - 2. Convert with docling and extract relevant content. - 3. Add embeddings. - 4. Index into OpenSearch. - """ - if file_hash is None: - sha256 = hashlib.sha256() - async with aiofiles.open(file_path, "rb") as f: - while True: - chunk = await f.read(1 << 20) - if not chunk: - break - sha256.update(chunk) - file_hash = sha256.hexdigest() - - # Get user's OpenSearch client with JWT for OIDC auth - opensearch_client = self.session_manager.get_user_opensearch_client( - owner_user_id, jwt_token - ) - - exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash) - if exists: - return {"status": "unchanged", "id": file_hash} - - # convert and extract - result = clients.converter.convert(file_path) - full_doc = result.document.export_to_dict() - slim_doc = extract_relevant(full_doc) - - texts = [c["text"] for c in slim_doc["chunks"]] - - # Split into batches to avoid token limits (8191 limit, use 8000 with buffer) - text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) - embeddings = [] - - for batch in text_batches: - resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=batch - ) - embeddings.extend([d.embedding for d in resp.data]) - - # Index each chunk as a separate document - for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): - chunk_doc = { - "document_id": file_hash, - "filename": original_filename - if original_filename - else slim_doc["filename"], - "mimetype": slim_doc["mimetype"], - "page": chunk["page"], - "text": chunk["text"], - "chunk_embedding": vect, - "file_size": file_size, - "connector_type": connector_type, - "indexed_time": datetime.datetime.now().isoformat(), - } - - # Only set owner fields if owner_user_id is provided (for no-auth mode support) - if owner_user_id is not None: - chunk_doc["owner"] = owner_user_id - if owner_name is not None: - chunk_doc["owner_name"] = owner_name - if owner_email is not None: - chunk_doc["owner_email"] = owner_email - chunk_id = f"{file_hash}_{i}" - try: - await opensearch_client.index( - index=INDEX_NAME, id=chunk_id, body=chunk_doc - ) - except Exception as e: - logger.error( - "OpenSearch indexing failed for chunk", - chunk_id=chunk_id, - error=str(e), - ) - logger.error("Chunk document details", chunk_doc=chunk_doc) - raise - return {"status": "indexed", "id": file_hash} async def process_upload_file( self, @@ -214,7 +122,8 @@ class DocumentService: owner_email: str = None, ): """Process an uploaded file from form data""" - sha256 = hashlib.sha256() + from utils.hash_utils import hash_id + import os tmp = tempfile.NamedTemporaryFile(delete=False) file_size = 0 try: @@ -222,12 +131,11 @@ class DocumentService: chunk = await upload_file.read(1 << 20) if not chunk: break - sha256.update(chunk) tmp.write(chunk) file_size += len(chunk) tmp.flush() - file_hash = sha256.hexdigest() + file_hash = hash_id(tmp.name) # Get user's OpenSearch client with JWT for OIDC auth opensearch_client = self.session_manager.get_user_opensearch_client( owner_user_id, jwt_token @@ -241,17 +149,22 @@ class DocumentService: ) raise if exists: + os.unlink(tmp.name) # Delete temp file since we don't need it return {"status": "unchanged", "id": file_hash} - result = await self.process_file_common( - tmp.name, - file_hash, + # Use consolidated standard processing + from models.processors import TaskProcessor + processor = TaskProcessor(document_service=self) + result = await processor.process_document_standard( + file_path=tmp.name, + file_hash=file_hash, owner_user_id=owner_user_id, original_filename=upload_file.filename, jwt_token=jwt_token, owner_name=owner_name, owner_email=owner_email, file_size=file_size, + connector_type="local", ) return result @@ -294,145 +207,3 @@ class DocumentService: "pages": len(slim_doc["chunks"]), "content_length": len(full_content), } - - async def process_single_file_task( - self, - upload_task, - file_path: str, - owner_user_id: str = None, - jwt_token: str = None, - owner_name: str = None, - owner_email: str = None, - connector_type: str = "local", - ): - """Process a single file and update task tracking - used by task service""" - from models.tasks import TaskStatus - import time - import asyncio - - file_task = upload_task.file_tasks[file_path] - file_task.status = TaskStatus.RUNNING - file_task.updated_at = time.time() - - try: - # Handle regular file processing - loop = asyncio.get_event_loop() - - # Run CPU-intensive docling processing in separate process - slim_doc = await loop.run_in_executor( - self.process_pool, process_document_sync, file_path - ) - - # Check if already indexed - opensearch_client = self.session_manager.get_user_opensearch_client( - owner_user_id, jwt_token - ) - exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"]) - if exists: - result = {"status": "unchanged", "id": slim_doc["id"]} - else: - # Generate embeddings and index (I/O bound, keep in main process) - texts = [c["text"] for c in slim_doc["chunks"]] - - # Split into batches to avoid token limits (8191 limit, use 8000 with buffer) - text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000) - embeddings = [] - - for batch in text_batches: - resp = await clients.patched_async_client.embeddings.create( - model=EMBED_MODEL, input=batch - ) - embeddings.extend([d.embedding for d in resp.data]) - - # Get file size - file_size = 0 - try: - file_size = os.path.getsize(file_path) - except OSError: - pass # Keep file_size as 0 if can't get size - - # Index each chunk - for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)): - chunk_doc = { - "document_id": slim_doc["id"], - "filename": slim_doc["filename"], - "mimetype": slim_doc["mimetype"], - "page": chunk["page"], - "text": chunk["text"], - "chunk_embedding": vect, - "file_size": file_size, - "connector_type": connector_type, - "indexed_time": datetime.datetime.now().isoformat(), - } - - # Only set owner fields if owner_user_id is provided (for no-auth mode support) - if owner_user_id is not None: - chunk_doc["owner"] = owner_user_id - if owner_name is not None: - chunk_doc["owner_name"] = owner_name - if owner_email is not None: - chunk_doc["owner_email"] = owner_email - chunk_id = f"{slim_doc['id']}_{i}" - try: - await opensearch_client.index( - index=INDEX_NAME, id=chunk_id, body=chunk_doc - ) - except Exception as e: - logger.error( - "OpenSearch indexing failed for batch chunk", - chunk_id=chunk_id, - error=str(e), - ) - logger.error("Chunk document details", chunk_doc=chunk_doc) - raise - - result = {"status": "indexed", "id": slim_doc["id"]} - - result["path"] = file_path - file_task.status = TaskStatus.COMPLETED - file_task.result = result - upload_task.successful_files += 1 - - except Exception as e: - import traceback - from concurrent.futures import BrokenExecutor - - if isinstance(e, BrokenExecutor): - logger.error( - "Process pool broken while processing file", file_path=file_path - ) - logger.info("Worker process likely crashed") - logger.info( - "You should see detailed crash logs above from the worker process" - ) - - # Mark pool as broken for potential recreation - self._process_pool_broken = True - - # Attempt to recreate the pool for future operations - if self._recreate_process_pool(): - logger.info("Process pool successfully recreated") - else: - logger.warning( - "Failed to recreate process pool - future operations may fail" - ) - - file_task.error = f"Worker process crashed: {str(e)}" - else: - logger.error( - "Failed to process file", file_path=file_path, error=str(e) - ) - file_task.error = str(e) - - logger.error("Full traceback available") - traceback.print_exc() - file_task.status = TaskStatus.FAILED - upload_task.failed_files += 1 - finally: - file_task.updated_at = time.time() - upload_task.processed_files += 1 - upload_task.updated_at = time.time() - - if upload_task.processed_files >= upload_task.total_files: - upload_task.status = TaskStatus.COMPLETED - diff --git a/src/services/task_service.py b/src/services/task_service.py index c9328b90..126a02b3 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -126,12 +126,30 @@ class TaskService: async def process_with_semaphore(file_path: str): async with semaphore: - await self.document_service.process_single_file_task(upload_task, file_path) + from models.processors import DocumentFileProcessor + file_task = upload_task.file_tasks[file_path] + + # Create processor with user context (all None for background processing) + processor = DocumentFileProcessor( + document_service=self.document_service, + owner_user_id=None, + jwt_token=None, + owner_name=None, + owner_email=None, + ) + + # Process the file + await processor.process_item(upload_task, file_path, file_task) tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()] await asyncio.gather(*tasks, return_exceptions=True) + # Check if task is complete + if upload_task.processed_files >= upload_task.total_files: + upload_task.status = TaskStatus.COMPLETED + upload_task.updated_at = time.time() + except Exception as e: logger.error("Background upload processor failed", task_id=task_id, error=str(e)) import traceback diff --git a/src/utils/document_processing.py b/src/utils/document_processing.py index a8792e46..fcb458fb 100644 --- a/src/utils/document_processing.py +++ b/src/utils/document_processing.py @@ -229,15 +229,9 @@ def process_document_sync(file_path: str): # Compute file hash try: + from utils.hash_utils import hash_id logger.info("Computing file hash", worker_pid=os.getpid()) - sha256 = hashlib.sha256() - with open(file_path, "rb") as f: - while True: - chunk = f.read(1 << 20) - if not chunk: - break - sha256.update(chunk) - file_hash = sha256.hexdigest() + file_hash = hash_id(file_path) logger.info( "File hash computed", worker_pid=os.getpid(), diff --git a/src/utils/hash_utils.py b/src/utils/hash_utils.py new file mode 100644 index 00000000..c25c8856 --- /dev/null +++ b/src/utils/hash_utils.py @@ -0,0 +1,76 @@ +import io +import os +import base64 +import hashlib +from typing import BinaryIO, Optional, Union + + +def _b64url(data: bytes) -> str: + """URL-safe base64 without padding""" + return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8") + + +def stream_hash( + source: Union[str, os.PathLike, BinaryIO], + *, + algo: str = "sha256", + include_filename: Optional[str] = None, + chunk_size: int = 1024 * 1024, # 1 MiB +) -> bytes: + """ + Memory-safe, incremental hash of a file path or binary stream. + - source: path or file-like object with .read() + - algo: hashlib algorithm name ('sha256', 'blake2b', 'sha3_256', etc.) + - include_filename: if provided, the UTF-8 bytes of this string are prepended + - chunk_size: read size per iteration + Returns: raw digest bytes + """ + try: + h = hashlib.new(algo) + except ValueError as e: + raise ValueError(f"Unsupported hash algorithm: {algo}") from e + + def _update_from_file(f: BinaryIO): + if include_filename: + h.update(include_filename.encode("utf-8")) + for chunk in iter(lambda: f.read(chunk_size), b""): + h.update(chunk) + + if isinstance(source, (str, os.PathLike)): + with open(source, "rb", buffering=io.DEFAULT_BUFFER_SIZE) as f: + _update_from_file(f) + else: + f = source + # Preserve position if seekable + pos = None + try: + if f.seekable(): + pos = f.tell() + f.seek(0) + except Exception: + pos = None + try: + _update_from_file(f) + finally: + if pos is not None: + try: + f.seek(pos) + except Exception: + pass + + return h.digest() + + +def hash_id( + source: Union[str, os.PathLike, BinaryIO], + *, + algo: str = "sha256", + include_filename: Optional[str] = None, + length: int = 24, # characters of base64url (set 0 or None for full) +) -> str: + """ + Deterministic, URL-safe base64 digest (no prefix). + """ + b = stream_hash(source, algo=algo, include_filename=include_filename) + s = _b64url(b) + return s[:length] if length else s \ No newline at end of file