WIP
This commit is contained in:
parent
d6a8f4437c
commit
1b9d56bec0
7 changed files with 469 additions and 385 deletions
|
|
@ -394,6 +394,9 @@ class AppClients:
|
|||
ssl_assert_fingerprint=None,
|
||||
headers=headers,
|
||||
http_compress=True,
|
||||
timeout=30, # 30 second timeout
|
||||
max_retries=3,
|
||||
retry_on_timeout=True,
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -69,8 +69,10 @@ class ConnectorService:
|
|||
|
||||
logger.debug("Processing connector document", document_id=document.id)
|
||||
|
||||
# Process using the existing pipeline but with connector document metadata
|
||||
result = await doc_service.process_file_common(
|
||||
# Process using consolidated processing pipeline
|
||||
from models.processors import TaskProcessor
|
||||
processor = TaskProcessor(document_service=doc_service)
|
||||
result = await processor.process_document_standard(
|
||||
file_path=tmp_file.name,
|
||||
file_hash=document.id, # Use connector document ID as hash
|
||||
owner_user_id=owner_user_id,
|
||||
|
|
|
|||
|
|
@ -9,6 +9,138 @@ logger = get_logger(__name__)
|
|||
class TaskProcessor(ABC):
|
||||
"""Abstract base class for task processors"""
|
||||
|
||||
def __init__(self, document_service=None):
|
||||
self.document_service = document_service
|
||||
|
||||
async def check_document_exists(
|
||||
self,
|
||||
file_hash: str,
|
||||
opensearch_client,
|
||||
) -> bool:
|
||||
"""
|
||||
Check if a document with the given hash already exists in OpenSearch.
|
||||
Consolidated hash checking for all processors.
|
||||
"""
|
||||
from config.settings import INDEX_NAME
|
||||
import asyncio
|
||||
|
||||
max_retries = 3
|
||||
retry_delay = 1.0
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash)
|
||||
return exists
|
||||
except (asyncio.TimeoutError, Exception) as e:
|
||||
if attempt == max_retries - 1:
|
||||
logger.error(
|
||||
"OpenSearch exists check failed after retries",
|
||||
file_hash=file_hash,
|
||||
error=str(e),
|
||||
attempt=attempt + 1
|
||||
)
|
||||
# On final failure, assume document doesn't exist (safer to reprocess than skip)
|
||||
logger.warning(
|
||||
"Assuming document doesn't exist due to connection issues",
|
||||
file_hash=file_hash
|
||||
)
|
||||
return False
|
||||
else:
|
||||
logger.warning(
|
||||
"OpenSearch exists check failed, retrying",
|
||||
file_hash=file_hash,
|
||||
error=str(e),
|
||||
attempt=attempt + 1,
|
||||
retry_in=retry_delay
|
||||
)
|
||||
await asyncio.sleep(retry_delay)
|
||||
retry_delay *= 2 # Exponential backoff
|
||||
|
||||
async def process_document_standard(
|
||||
self,
|
||||
file_path: str,
|
||||
file_hash: str,
|
||||
owner_user_id: str = None,
|
||||
original_filename: str = None,
|
||||
jwt_token: str = None,
|
||||
owner_name: str = None,
|
||||
owner_email: str = None,
|
||||
file_size: int = None,
|
||||
connector_type: str = "local",
|
||||
):
|
||||
"""
|
||||
Standard processing pipeline for non-Langflow processors:
|
||||
docling conversion + embeddings + OpenSearch indexing.
|
||||
"""
|
||||
import datetime
|
||||
from config.settings import INDEX_NAME, EMBED_MODEL, clients
|
||||
from services.document_service import chunk_texts_for_embeddings
|
||||
from utils.document_processing import extract_relevant
|
||||
|
||||
# Get user's OpenSearch client with JWT for OIDC auth
|
||||
opensearch_client = self.document_service.session_manager.get_user_opensearch_client(
|
||||
owner_user_id, jwt_token
|
||||
)
|
||||
|
||||
# Check if already exists
|
||||
if await self.check_document_exists(file_hash, opensearch_client):
|
||||
return {"status": "unchanged", "id": file_hash}
|
||||
|
||||
# Convert and extract
|
||||
result = clients.converter.convert(file_path)
|
||||
full_doc = result.document.export_to_dict()
|
||||
slim_doc = extract_relevant(full_doc)
|
||||
|
||||
texts = [c["text"] for c in slim_doc["chunks"]]
|
||||
|
||||
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
|
||||
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
|
||||
embeddings = []
|
||||
|
||||
for batch in text_batches:
|
||||
resp = await clients.patched_async_client.embeddings.create(
|
||||
model=EMBED_MODEL, input=batch
|
||||
)
|
||||
embeddings.extend([d.embedding for d in resp.data])
|
||||
|
||||
# Index each chunk as a separate document
|
||||
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
|
||||
chunk_doc = {
|
||||
"document_id": file_hash,
|
||||
"filename": original_filename
|
||||
if original_filename
|
||||
else slim_doc["filename"],
|
||||
"mimetype": slim_doc["mimetype"],
|
||||
"page": chunk["page"],
|
||||
"text": chunk["text"],
|
||||
"chunk_embedding": vect,
|
||||
"file_size": file_size,
|
||||
"connector_type": connector_type,
|
||||
"indexed_time": datetime.datetime.now().isoformat(),
|
||||
}
|
||||
|
||||
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
|
||||
if owner_user_id is not None:
|
||||
chunk_doc["owner"] = owner_user_id
|
||||
if owner_name is not None:
|
||||
chunk_doc["owner_name"] = owner_name
|
||||
if owner_email is not None:
|
||||
chunk_doc["owner_email"] = owner_email
|
||||
chunk_id = f"{file_hash}_{i}"
|
||||
try:
|
||||
await opensearch_client.index(
|
||||
index=INDEX_NAME, id=chunk_id, body=chunk_doc
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"OpenSearch indexing failed for chunk",
|
||||
chunk_id=chunk_id,
|
||||
error=str(e),
|
||||
)
|
||||
logger.error("Chunk document details", chunk_doc=chunk_doc)
|
||||
raise
|
||||
return {"status": "indexed", "id": file_hash}
|
||||
|
||||
@abstractmethod
|
||||
async def process_item(
|
||||
self, upload_task: UploadTask, item: Any, file_task: FileTask
|
||||
|
|
@ -35,7 +167,7 @@ class DocumentFileProcessor(TaskProcessor):
|
|||
owner_name: str = None,
|
||||
owner_email: str = None,
|
||||
):
|
||||
self.document_service = document_service
|
||||
super().__init__(document_service)
|
||||
self.owner_user_id = owner_user_id
|
||||
self.jwt_token = jwt_token
|
||||
self.owner_name = owner_name
|
||||
|
|
@ -44,16 +176,52 @@ class DocumentFileProcessor(TaskProcessor):
|
|||
async def process_item(
|
||||
self, upload_task: UploadTask, item: str, file_task: FileTask
|
||||
) -> None:
|
||||
"""Process a regular file path using DocumentService"""
|
||||
# This calls the existing logic with user context
|
||||
await self.document_service.process_single_file_task(
|
||||
upload_task,
|
||||
item,
|
||||
owner_user_id=self.owner_user_id,
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
)
|
||||
"""Process a regular file path using consolidated methods"""
|
||||
from models.tasks import TaskStatus
|
||||
from utils.hash_utils import hash_id
|
||||
import time
|
||||
import os
|
||||
|
||||
file_task.status = TaskStatus.RUNNING
|
||||
file_task.updated_at = time.time()
|
||||
|
||||
try:
|
||||
# Compute hash
|
||||
file_hash = hash_id(item)
|
||||
|
||||
# Get file size
|
||||
try:
|
||||
file_size = os.path.getsize(item)
|
||||
except Exception:
|
||||
file_size = 0
|
||||
|
||||
# Use consolidated standard processing
|
||||
result = await self.process_document_standard(
|
||||
file_path=item,
|
||||
file_hash=file_hash,
|
||||
owner_user_id=self.owner_user_id,
|
||||
original_filename=os.path.basename(item),
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
file_size=file_size,
|
||||
connector_type="local",
|
||||
)
|
||||
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
file_task.updated_at = time.time()
|
||||
upload_task.successful_files += 1
|
||||
|
||||
except Exception as e:
|
||||
file_task.status = TaskStatus.FAILED
|
||||
file_task.error = str(e)
|
||||
file_task.updated_at = time.time()
|
||||
upload_task.failed_files += 1
|
||||
raise
|
||||
finally:
|
||||
upload_task.processed_files += 1
|
||||
upload_task.updated_at = time.time()
|
||||
|
||||
|
||||
class ConnectorFileProcessor(TaskProcessor):
|
||||
|
|
@ -69,6 +237,7 @@ class ConnectorFileProcessor(TaskProcessor):
|
|||
owner_name: str = None,
|
||||
owner_email: str = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.connector_service = connector_service
|
||||
self.connection_id = connection_id
|
||||
self.files_to_process = files_to_process
|
||||
|
|
@ -89,40 +258,77 @@ class ConnectorFileProcessor(TaskProcessor):
|
|||
async def process_item(
|
||||
self, upload_task: UploadTask, item: str, file_task: FileTask
|
||||
) -> None:
|
||||
"""Process a connector file using ConnectorService"""
|
||||
"""Process a connector file using consolidated methods"""
|
||||
from models.tasks import TaskStatus
|
||||
from utils.hash_utils import hash_id
|
||||
import tempfile
|
||||
import time
|
||||
import os
|
||||
|
||||
file_id = item # item is the connector file ID
|
||||
self.file_info_map.get(file_id)
|
||||
file_task.status = TaskStatus.RUNNING
|
||||
file_task.updated_at = time.time()
|
||||
|
||||
# Get the connector and connection info
|
||||
connector = await self.connector_service.get_connector(self.connection_id)
|
||||
connection = await self.connector_service.connection_manager.get_connection(
|
||||
self.connection_id
|
||||
)
|
||||
if not connector or not connection:
|
||||
raise ValueError(f"Connection '{self.connection_id}' not found")
|
||||
try:
|
||||
file_id = item # item is the connector file ID
|
||||
|
||||
# Get file content from connector (the connector will fetch metadata if needed)
|
||||
document = await connector.get_file_content(file_id)
|
||||
# Get the connector and connection info
|
||||
connector = await self.connector_service.get_connector(self.connection_id)
|
||||
connection = await self.connector_service.connection_manager.get_connection(
|
||||
self.connection_id
|
||||
)
|
||||
if not connector or not connection:
|
||||
raise ValueError(f"Connection '{self.connection_id}' not found")
|
||||
|
||||
# Use the user_id passed during initialization
|
||||
if not self.user_id:
|
||||
raise ValueError("user_id not provided to ConnectorFileProcessor")
|
||||
# Get file content from connector
|
||||
document = await connector.get_file_content(file_id)
|
||||
|
||||
# Process using existing pipeline
|
||||
result = await self.connector_service.process_connector_document(
|
||||
document,
|
||||
self.user_id,
|
||||
connection.connector_type,
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
)
|
||||
if not self.user_id:
|
||||
raise ValueError("user_id not provided to ConnectorFileProcessor")
|
||||
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
upload_task.successful_files += 1
|
||||
# Create temporary file from document content
|
||||
suffix = self.connector_service._get_file_extension(document.mimetype)
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file:
|
||||
tmp_file.write(document.content)
|
||||
tmp_file.flush()
|
||||
|
||||
try:
|
||||
# Compute hash
|
||||
file_hash = hash_id(tmp_file.name)
|
||||
|
||||
# Use consolidated standard processing
|
||||
result = await self.process_document_standard(
|
||||
file_path=tmp_file.name,
|
||||
file_hash=file_hash,
|
||||
owner_user_id=self.user_id,
|
||||
original_filename=document.filename,
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
file_size=len(document.content),
|
||||
connector_type=connection.connector_type,
|
||||
)
|
||||
|
||||
# Add connector-specific metadata
|
||||
result.update({
|
||||
"source_url": document.source_url,
|
||||
"document_id": document.id,
|
||||
})
|
||||
|
||||
finally:
|
||||
if os.path.exists(tmp_file.name):
|
||||
os.unlink(tmp_file.name)
|
||||
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
file_task.updated_at = time.time()
|
||||
upload_task.successful_files += 1
|
||||
|
||||
except Exception as e:
|
||||
file_task.status = TaskStatus.FAILED
|
||||
file_task.error = str(e)
|
||||
file_task.updated_at = time.time()
|
||||
upload_task.failed_files += 1
|
||||
raise
|
||||
|
||||
|
||||
class LangflowConnectorFileProcessor(TaskProcessor):
|
||||
|
|
@ -138,6 +344,7 @@ class LangflowConnectorFileProcessor(TaskProcessor):
|
|||
owner_name: str = None,
|
||||
owner_email: str = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.langflow_connector_service = langflow_connector_service
|
||||
self.connection_id = connection_id
|
||||
self.files_to_process = files_to_process
|
||||
|
|
@ -160,42 +367,81 @@ class LangflowConnectorFileProcessor(TaskProcessor):
|
|||
) -> None:
|
||||
"""Process a connector file using LangflowConnectorService"""
|
||||
from models.tasks import TaskStatus
|
||||
from utils.hash_utils import hash_id
|
||||
import tempfile
|
||||
import time
|
||||
import os
|
||||
|
||||
file_id = item # item is the connector file ID
|
||||
self.file_info_map.get(file_id)
|
||||
file_task.status = TaskStatus.RUNNING
|
||||
file_task.updated_at = time.time()
|
||||
|
||||
# Get the connector and connection info
|
||||
connector = await self.langflow_connector_service.get_connector(
|
||||
self.connection_id
|
||||
)
|
||||
connection = (
|
||||
await self.langflow_connector_service.connection_manager.get_connection(
|
||||
try:
|
||||
file_id = item # item is the connector file ID
|
||||
|
||||
# Get the connector and connection info
|
||||
connector = await self.langflow_connector_service.get_connector(
|
||||
self.connection_id
|
||||
)
|
||||
)
|
||||
if not connector or not connection:
|
||||
raise ValueError(f"Connection '{self.connection_id}' not found")
|
||||
connection = (
|
||||
await self.langflow_connector_service.connection_manager.get_connection(
|
||||
self.connection_id
|
||||
)
|
||||
)
|
||||
if not connector or not connection:
|
||||
raise ValueError(f"Connection '{self.connection_id}' not found")
|
||||
|
||||
# Get file content from connector (the connector will fetch metadata if needed)
|
||||
document = await connector.get_file_content(file_id)
|
||||
# Get file content from connector
|
||||
document = await connector.get_file_content(file_id)
|
||||
|
||||
# Use the user_id passed during initialization
|
||||
if not self.user_id:
|
||||
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
|
||||
if not self.user_id:
|
||||
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
|
||||
|
||||
# Process using Langflow pipeline
|
||||
result = await self.langflow_connector_service.process_connector_document(
|
||||
document,
|
||||
self.user_id,
|
||||
connection.connector_type,
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
)
|
||||
# Create temporary file and compute hash to check for duplicates
|
||||
suffix = self.langflow_connector_service._get_file_extension(document.mimetype)
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file:
|
||||
tmp_file.write(document.content)
|
||||
tmp_file.flush()
|
||||
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
upload_task.successful_files += 1
|
||||
try:
|
||||
# Compute hash and check if already exists
|
||||
file_hash = hash_id(tmp_file.name)
|
||||
|
||||
# Check if document already exists
|
||||
opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client(
|
||||
self.user_id, self.jwt_token
|
||||
)
|
||||
if await self.check_document_exists(file_hash, opensearch_client):
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = {"status": "unchanged", "id": file_hash}
|
||||
file_task.updated_at = time.time()
|
||||
upload_task.successful_files += 1
|
||||
return
|
||||
|
||||
# Process using Langflow pipeline
|
||||
result = await self.langflow_connector_service.process_connector_document(
|
||||
document,
|
||||
self.user_id,
|
||||
connection.connector_type,
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
)
|
||||
|
||||
finally:
|
||||
if os.path.exists(tmp_file.name):
|
||||
os.unlink(tmp_file.name)
|
||||
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
file_task.updated_at = time.time()
|
||||
upload_task.successful_files += 1
|
||||
|
||||
except Exception as e:
|
||||
file_task.status = TaskStatus.FAILED
|
||||
file_task.error = str(e)
|
||||
file_task.updated_at = time.time()
|
||||
upload_task.failed_files += 1
|
||||
raise
|
||||
|
||||
|
||||
class S3FileProcessor(TaskProcessor):
|
||||
|
|
@ -213,7 +459,7 @@ class S3FileProcessor(TaskProcessor):
|
|||
):
|
||||
import boto3
|
||||
|
||||
self.document_service = document_service
|
||||
super().__init__(document_service)
|
||||
self.bucket = bucket
|
||||
self.s3_client = s3_client or boto3.client("s3")
|
||||
self.owner_user_id = owner_user_id
|
||||
|
|
@ -244,72 +490,30 @@ class S3FileProcessor(TaskProcessor):
|
|||
self.s3_client.download_fileobj(self.bucket, item, tmp)
|
||||
tmp.flush()
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
slim_doc = await loop.run_in_executor(
|
||||
self.document_service.process_pool, process_document_sync, tmp.name
|
||||
# Compute hash
|
||||
from utils.hash_utils import hash_id
|
||||
file_hash = hash_id(tmp.name)
|
||||
|
||||
# Get object size
|
||||
try:
|
||||
obj_info = self.s3_client.head_object(Bucket=self.bucket, Key=item)
|
||||
file_size = obj_info.get("ContentLength", 0)
|
||||
except Exception:
|
||||
file_size = 0
|
||||
|
||||
# Use consolidated standard processing
|
||||
result = await self.process_document_standard(
|
||||
file_path=tmp.name,
|
||||
file_hash=file_hash,
|
||||
owner_user_id=self.owner_user_id,
|
||||
original_filename=item, # Use S3 key as filename
|
||||
jwt_token=self.jwt_token,
|
||||
owner_name=self.owner_name,
|
||||
owner_email=self.owner_email,
|
||||
file_size=file_size,
|
||||
connector_type="s3",
|
||||
)
|
||||
|
||||
opensearch_client = (
|
||||
self.document_service.session_manager.get_user_opensearch_client(
|
||||
self.owner_user_id, self.jwt_token
|
||||
)
|
||||
)
|
||||
exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
|
||||
if exists:
|
||||
result = {"status": "unchanged", "id": slim_doc["id"]}
|
||||
else:
|
||||
texts = [c["text"] for c in slim_doc["chunks"]]
|
||||
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
|
||||
embeddings = []
|
||||
for batch in text_batches:
|
||||
resp = await clients.patched_async_client.embeddings.create(
|
||||
model=EMBED_MODEL, input=batch
|
||||
)
|
||||
embeddings.extend([d.embedding for d in resp.data])
|
||||
|
||||
# Get object size
|
||||
try:
|
||||
obj_info = self.s3_client.head_object(Bucket=self.bucket, Key=item)
|
||||
file_size = obj_info.get("ContentLength", 0)
|
||||
except Exception:
|
||||
file_size = 0
|
||||
|
||||
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
|
||||
chunk_doc = {
|
||||
"document_id": slim_doc["id"],
|
||||
"filename": slim_doc["filename"],
|
||||
"mimetype": slim_doc["mimetype"],
|
||||
"page": chunk["page"],
|
||||
"text": chunk["text"],
|
||||
"chunk_embedding": vect,
|
||||
"file_size": file_size,
|
||||
"connector_type": "s3", # S3 uploads
|
||||
"indexed_time": datetime.datetime.now().isoformat(),
|
||||
}
|
||||
|
||||
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
|
||||
if self.owner_user_id is not None:
|
||||
chunk_doc["owner"] = self.owner_user_id
|
||||
if self.owner_name is not None:
|
||||
chunk_doc["owner_name"] = self.owner_name
|
||||
if self.owner_email is not None:
|
||||
chunk_doc["owner_email"] = self.owner_email
|
||||
chunk_id = f"{slim_doc['id']}_{i}"
|
||||
try:
|
||||
await opensearch_client.index(
|
||||
index=INDEX_NAME, id=chunk_id, body=chunk_doc
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"OpenSearch indexing failed for S3 chunk",
|
||||
chunk_id=chunk_id,
|
||||
error=str(e),
|
||||
chunk_doc=chunk_doc,
|
||||
)
|
||||
raise
|
||||
|
||||
result = {"status": "indexed", "id": slim_doc["id"]}
|
||||
|
||||
result["path"] = f"s3://{self.bucket}/{item}"
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
|
|
@ -320,8 +524,8 @@ class S3FileProcessor(TaskProcessor):
|
|||
file_task.error = str(e)
|
||||
upload_task.failed_files += 1
|
||||
finally:
|
||||
tmp.close()
|
||||
os.remove(tmp.name)
|
||||
if os.path.exists(tmp.name):
|
||||
os.unlink(tmp.name)
|
||||
file_task.updated_at = time.time()
|
||||
|
||||
|
||||
|
|
@ -341,6 +545,7 @@ class LangflowFileProcessor(TaskProcessor):
|
|||
settings: dict = None,
|
||||
delete_after_ingest: bool = True,
|
||||
):
|
||||
super().__init__()
|
||||
self.langflow_file_service = langflow_file_service
|
||||
self.session_manager = session_manager
|
||||
self.owner_user_id = owner_user_id
|
||||
|
|
@ -366,7 +571,22 @@ class LangflowFileProcessor(TaskProcessor):
|
|||
file_task.updated_at = time.time()
|
||||
|
||||
try:
|
||||
# Read file content
|
||||
# Compute hash and check if already exists
|
||||
from utils.hash_utils import hash_id
|
||||
file_hash = hash_id(item)
|
||||
|
||||
# Check if document already exists
|
||||
opensearch_client = self.session_manager.get_user_opensearch_client(
|
||||
self.owner_user_id, self.jwt_token
|
||||
)
|
||||
if await self.check_document_exists(file_hash, opensearch_client):
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = {"status": "unchanged", "id": file_hash}
|
||||
file_task.updated_at = time.time()
|
||||
upload_task.successful_files += 1
|
||||
return
|
||||
|
||||
# Read file content for processing
|
||||
with open(item, 'rb') as f:
|
||||
content = f.read()
|
||||
|
||||
|
|
|
|||
|
|
@ -112,98 +112,6 @@ class DocumentService:
|
|||
return False
|
||||
return False
|
||||
|
||||
async def process_file_common(
|
||||
self,
|
||||
file_path: str,
|
||||
file_hash: str = None,
|
||||
owner_user_id: str = None,
|
||||
original_filename: str = None,
|
||||
jwt_token: str = None,
|
||||
owner_name: str = None,
|
||||
owner_email: str = None,
|
||||
file_size: int = None,
|
||||
connector_type: str = "local",
|
||||
):
|
||||
"""
|
||||
Common processing logic for both upload and upload_path.
|
||||
1. Optionally compute SHA256 hash if not provided.
|
||||
2. Convert with docling and extract relevant content.
|
||||
3. Add embeddings.
|
||||
4. Index into OpenSearch.
|
||||
"""
|
||||
if file_hash is None:
|
||||
sha256 = hashlib.sha256()
|
||||
async with aiofiles.open(file_path, "rb") as f:
|
||||
while True:
|
||||
chunk = await f.read(1 << 20)
|
||||
if not chunk:
|
||||
break
|
||||
sha256.update(chunk)
|
||||
file_hash = sha256.hexdigest()
|
||||
|
||||
# Get user's OpenSearch client with JWT for OIDC auth
|
||||
opensearch_client = self.session_manager.get_user_opensearch_client(
|
||||
owner_user_id, jwt_token
|
||||
)
|
||||
|
||||
exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash)
|
||||
if exists:
|
||||
return {"status": "unchanged", "id": file_hash}
|
||||
|
||||
# convert and extract
|
||||
result = clients.converter.convert(file_path)
|
||||
full_doc = result.document.export_to_dict()
|
||||
slim_doc = extract_relevant(full_doc)
|
||||
|
||||
texts = [c["text"] for c in slim_doc["chunks"]]
|
||||
|
||||
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
|
||||
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
|
||||
embeddings = []
|
||||
|
||||
for batch in text_batches:
|
||||
resp = await clients.patched_async_client.embeddings.create(
|
||||
model=EMBED_MODEL, input=batch
|
||||
)
|
||||
embeddings.extend([d.embedding for d in resp.data])
|
||||
|
||||
# Index each chunk as a separate document
|
||||
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
|
||||
chunk_doc = {
|
||||
"document_id": file_hash,
|
||||
"filename": original_filename
|
||||
if original_filename
|
||||
else slim_doc["filename"],
|
||||
"mimetype": slim_doc["mimetype"],
|
||||
"page": chunk["page"],
|
||||
"text": chunk["text"],
|
||||
"chunk_embedding": vect,
|
||||
"file_size": file_size,
|
||||
"connector_type": connector_type,
|
||||
"indexed_time": datetime.datetime.now().isoformat(),
|
||||
}
|
||||
|
||||
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
|
||||
if owner_user_id is not None:
|
||||
chunk_doc["owner"] = owner_user_id
|
||||
if owner_name is not None:
|
||||
chunk_doc["owner_name"] = owner_name
|
||||
if owner_email is not None:
|
||||
chunk_doc["owner_email"] = owner_email
|
||||
chunk_id = f"{file_hash}_{i}"
|
||||
try:
|
||||
await opensearch_client.index(
|
||||
index=INDEX_NAME, id=chunk_id, body=chunk_doc
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"OpenSearch indexing failed for chunk",
|
||||
chunk_id=chunk_id,
|
||||
error=str(e),
|
||||
)
|
||||
logger.error("Chunk document details", chunk_doc=chunk_doc)
|
||||
raise
|
||||
return {"status": "indexed", "id": file_hash}
|
||||
|
||||
async def process_upload_file(
|
||||
self,
|
||||
|
|
@ -214,7 +122,8 @@ class DocumentService:
|
|||
owner_email: str = None,
|
||||
):
|
||||
"""Process an uploaded file from form data"""
|
||||
sha256 = hashlib.sha256()
|
||||
from utils.hash_utils import hash_id
|
||||
import os
|
||||
tmp = tempfile.NamedTemporaryFile(delete=False)
|
||||
file_size = 0
|
||||
try:
|
||||
|
|
@ -222,12 +131,11 @@ class DocumentService:
|
|||
chunk = await upload_file.read(1 << 20)
|
||||
if not chunk:
|
||||
break
|
||||
sha256.update(chunk)
|
||||
tmp.write(chunk)
|
||||
file_size += len(chunk)
|
||||
tmp.flush()
|
||||
|
||||
file_hash = sha256.hexdigest()
|
||||
file_hash = hash_id(tmp.name)
|
||||
# Get user's OpenSearch client with JWT for OIDC auth
|
||||
opensearch_client = self.session_manager.get_user_opensearch_client(
|
||||
owner_user_id, jwt_token
|
||||
|
|
@ -241,17 +149,22 @@ class DocumentService:
|
|||
)
|
||||
raise
|
||||
if exists:
|
||||
os.unlink(tmp.name) # Delete temp file since we don't need it
|
||||
return {"status": "unchanged", "id": file_hash}
|
||||
|
||||
result = await self.process_file_common(
|
||||
tmp.name,
|
||||
file_hash,
|
||||
# Use consolidated standard processing
|
||||
from models.processors import TaskProcessor
|
||||
processor = TaskProcessor(document_service=self)
|
||||
result = await processor.process_document_standard(
|
||||
file_path=tmp.name,
|
||||
file_hash=file_hash,
|
||||
owner_user_id=owner_user_id,
|
||||
original_filename=upload_file.filename,
|
||||
jwt_token=jwt_token,
|
||||
owner_name=owner_name,
|
||||
owner_email=owner_email,
|
||||
file_size=file_size,
|
||||
connector_type="local",
|
||||
)
|
||||
return result
|
||||
|
||||
|
|
@ -294,145 +207,3 @@ class DocumentService:
|
|||
"pages": len(slim_doc["chunks"]),
|
||||
"content_length": len(full_content),
|
||||
}
|
||||
|
||||
async def process_single_file_task(
|
||||
self,
|
||||
upload_task,
|
||||
file_path: str,
|
||||
owner_user_id: str = None,
|
||||
jwt_token: str = None,
|
||||
owner_name: str = None,
|
||||
owner_email: str = None,
|
||||
connector_type: str = "local",
|
||||
):
|
||||
"""Process a single file and update task tracking - used by task service"""
|
||||
from models.tasks import TaskStatus
|
||||
import time
|
||||
import asyncio
|
||||
|
||||
file_task = upload_task.file_tasks[file_path]
|
||||
file_task.status = TaskStatus.RUNNING
|
||||
file_task.updated_at = time.time()
|
||||
|
||||
try:
|
||||
# Handle regular file processing
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
# Run CPU-intensive docling processing in separate process
|
||||
slim_doc = await loop.run_in_executor(
|
||||
self.process_pool, process_document_sync, file_path
|
||||
)
|
||||
|
||||
# Check if already indexed
|
||||
opensearch_client = self.session_manager.get_user_opensearch_client(
|
||||
owner_user_id, jwt_token
|
||||
)
|
||||
exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
|
||||
if exists:
|
||||
result = {"status": "unchanged", "id": slim_doc["id"]}
|
||||
else:
|
||||
# Generate embeddings and index (I/O bound, keep in main process)
|
||||
texts = [c["text"] for c in slim_doc["chunks"]]
|
||||
|
||||
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
|
||||
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
|
||||
embeddings = []
|
||||
|
||||
for batch in text_batches:
|
||||
resp = await clients.patched_async_client.embeddings.create(
|
||||
model=EMBED_MODEL, input=batch
|
||||
)
|
||||
embeddings.extend([d.embedding for d in resp.data])
|
||||
|
||||
# Get file size
|
||||
file_size = 0
|
||||
try:
|
||||
file_size = os.path.getsize(file_path)
|
||||
except OSError:
|
||||
pass # Keep file_size as 0 if can't get size
|
||||
|
||||
# Index each chunk
|
||||
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
|
||||
chunk_doc = {
|
||||
"document_id": slim_doc["id"],
|
||||
"filename": slim_doc["filename"],
|
||||
"mimetype": slim_doc["mimetype"],
|
||||
"page": chunk["page"],
|
||||
"text": chunk["text"],
|
||||
"chunk_embedding": vect,
|
||||
"file_size": file_size,
|
||||
"connector_type": connector_type,
|
||||
"indexed_time": datetime.datetime.now().isoformat(),
|
||||
}
|
||||
|
||||
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
|
||||
if owner_user_id is not None:
|
||||
chunk_doc["owner"] = owner_user_id
|
||||
if owner_name is not None:
|
||||
chunk_doc["owner_name"] = owner_name
|
||||
if owner_email is not None:
|
||||
chunk_doc["owner_email"] = owner_email
|
||||
chunk_id = f"{slim_doc['id']}_{i}"
|
||||
try:
|
||||
await opensearch_client.index(
|
||||
index=INDEX_NAME, id=chunk_id, body=chunk_doc
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"OpenSearch indexing failed for batch chunk",
|
||||
chunk_id=chunk_id,
|
||||
error=str(e),
|
||||
)
|
||||
logger.error("Chunk document details", chunk_doc=chunk_doc)
|
||||
raise
|
||||
|
||||
result = {"status": "indexed", "id": slim_doc["id"]}
|
||||
|
||||
result["path"] = file_path
|
||||
file_task.status = TaskStatus.COMPLETED
|
||||
file_task.result = result
|
||||
upload_task.successful_files += 1
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
from concurrent.futures import BrokenExecutor
|
||||
|
||||
if isinstance(e, BrokenExecutor):
|
||||
logger.error(
|
||||
"Process pool broken while processing file", file_path=file_path
|
||||
)
|
||||
logger.info("Worker process likely crashed")
|
||||
logger.info(
|
||||
"You should see detailed crash logs above from the worker process"
|
||||
)
|
||||
|
||||
# Mark pool as broken for potential recreation
|
||||
self._process_pool_broken = True
|
||||
|
||||
# Attempt to recreate the pool for future operations
|
||||
if self._recreate_process_pool():
|
||||
logger.info("Process pool successfully recreated")
|
||||
else:
|
||||
logger.warning(
|
||||
"Failed to recreate process pool - future operations may fail"
|
||||
)
|
||||
|
||||
file_task.error = f"Worker process crashed: {str(e)}"
|
||||
else:
|
||||
logger.error(
|
||||
"Failed to process file", file_path=file_path, error=str(e)
|
||||
)
|
||||
file_task.error = str(e)
|
||||
|
||||
logger.error("Full traceback available")
|
||||
traceback.print_exc()
|
||||
file_task.status = TaskStatus.FAILED
|
||||
upload_task.failed_files += 1
|
||||
finally:
|
||||
file_task.updated_at = time.time()
|
||||
upload_task.processed_files += 1
|
||||
upload_task.updated_at = time.time()
|
||||
|
||||
if upload_task.processed_files >= upload_task.total_files:
|
||||
upload_task.status = TaskStatus.COMPLETED
|
||||
|
||||
|
|
|
|||
|
|
@ -126,12 +126,30 @@ class TaskService:
|
|||
|
||||
async def process_with_semaphore(file_path: str):
|
||||
async with semaphore:
|
||||
await self.document_service.process_single_file_task(upload_task, file_path)
|
||||
from models.processors import DocumentFileProcessor
|
||||
file_task = upload_task.file_tasks[file_path]
|
||||
|
||||
# Create processor with user context (all None for background processing)
|
||||
processor = DocumentFileProcessor(
|
||||
document_service=self.document_service,
|
||||
owner_user_id=None,
|
||||
jwt_token=None,
|
||||
owner_name=None,
|
||||
owner_email=None,
|
||||
)
|
||||
|
||||
# Process the file
|
||||
await processor.process_item(upload_task, file_path, file_task)
|
||||
|
||||
tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()]
|
||||
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Check if task is complete
|
||||
if upload_task.processed_files >= upload_task.total_files:
|
||||
upload_task.status = TaskStatus.COMPLETED
|
||||
upload_task.updated_at = time.time()
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Background upload processor failed", task_id=task_id, error=str(e))
|
||||
import traceback
|
||||
|
|
|
|||
|
|
@ -229,15 +229,9 @@ def process_document_sync(file_path: str):
|
|||
|
||||
# Compute file hash
|
||||
try:
|
||||
from utils.hash_utils import hash_id
|
||||
logger.info("Computing file hash", worker_pid=os.getpid())
|
||||
sha256 = hashlib.sha256()
|
||||
with open(file_path, "rb") as f:
|
||||
while True:
|
||||
chunk = f.read(1 << 20)
|
||||
if not chunk:
|
||||
break
|
||||
sha256.update(chunk)
|
||||
file_hash = sha256.hexdigest()
|
||||
file_hash = hash_id(file_path)
|
||||
logger.info(
|
||||
"File hash computed",
|
||||
worker_pid=os.getpid(),
|
||||
|
|
|
|||
76
src/utils/hash_utils.py
Normal file
76
src/utils/hash_utils.py
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
import io
|
||||
import os
|
||||
import base64
|
||||
import hashlib
|
||||
from typing import BinaryIO, Optional, Union
|
||||
|
||||
|
||||
def _b64url(data: bytes) -> str:
|
||||
"""URL-safe base64 without padding"""
|
||||
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8")
|
||||
|
||||
|
||||
def stream_hash(
|
||||
source: Union[str, os.PathLike, BinaryIO],
|
||||
*,
|
||||
algo: str = "sha256",
|
||||
include_filename: Optional[str] = None,
|
||||
chunk_size: int = 1024 * 1024, # 1 MiB
|
||||
) -> bytes:
|
||||
"""
|
||||
Memory-safe, incremental hash of a file path or binary stream.
|
||||
- source: path or file-like object with .read()
|
||||
- algo: hashlib algorithm name ('sha256', 'blake2b', 'sha3_256', etc.)
|
||||
- include_filename: if provided, the UTF-8 bytes of this string are prepended
|
||||
- chunk_size: read size per iteration
|
||||
Returns: raw digest bytes
|
||||
"""
|
||||
try:
|
||||
h = hashlib.new(algo)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Unsupported hash algorithm: {algo}") from e
|
||||
|
||||
def _update_from_file(f: BinaryIO):
|
||||
if include_filename:
|
||||
h.update(include_filename.encode("utf-8"))
|
||||
for chunk in iter(lambda: f.read(chunk_size), b""):
|
||||
h.update(chunk)
|
||||
|
||||
if isinstance(source, (str, os.PathLike)):
|
||||
with open(source, "rb", buffering=io.DEFAULT_BUFFER_SIZE) as f:
|
||||
_update_from_file(f)
|
||||
else:
|
||||
f = source
|
||||
# Preserve position if seekable
|
||||
pos = None
|
||||
try:
|
||||
if f.seekable():
|
||||
pos = f.tell()
|
||||
f.seek(0)
|
||||
except Exception:
|
||||
pos = None
|
||||
try:
|
||||
_update_from_file(f)
|
||||
finally:
|
||||
if pos is not None:
|
||||
try:
|
||||
f.seek(pos)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return h.digest()
|
||||
|
||||
|
||||
def hash_id(
|
||||
source: Union[str, os.PathLike, BinaryIO],
|
||||
*,
|
||||
algo: str = "sha256",
|
||||
include_filename: Optional[str] = None,
|
||||
length: int = 24, # characters of base64url (set 0 or None for full)
|
||||
) -> str:
|
||||
"""
|
||||
Deterministic, URL-safe base64 digest (no prefix).
|
||||
"""
|
||||
b = stream_hash(source, algo=algo, include_filename=include_filename)
|
||||
s = _b64url(b)
|
||||
return s[:length] if length else s
|
||||
Loading…
Add table
Reference in a new issue