Merge remote-tracking branch 'origin/main' into feat/new-onboarding
This commit is contained in:
commit
a2fb385a1f
1 changed files with 45 additions and 78 deletions
123
src/main.py
123
src/main.py
|
|
@ -335,97 +335,64 @@ async def _ingest_default_documents_langflow(services, file_paths):
|
||||||
"""Ingest default documents using Langflow upload-ingest-delete pipeline."""
|
"""Ingest default documents using Langflow upload-ingest-delete pipeline."""
|
||||||
langflow_file_service = services["langflow_file_service"]
|
langflow_file_service = services["langflow_file_service"]
|
||||||
session_manager = services["session_manager"]
|
session_manager = services["session_manager"]
|
||||||
|
task_service = services["task_service"]
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Using Langflow ingestion pipeline for default documents",
|
"Using Langflow ingestion pipeline for default documents",
|
||||||
file_count=len(file_paths),
|
file_count=len(file_paths),
|
||||||
)
|
)
|
||||||
|
|
||||||
success_count = 0
|
# Use AnonymousUser details for default documents
|
||||||
error_count = 0
|
from session_manager import AnonymousUser
|
||||||
|
|
||||||
for file_path in file_paths:
|
anonymous_user = AnonymousUser()
|
||||||
try:
|
|
||||||
logger.debug("Processing file with Langflow pipeline", file_path=file_path)
|
|
||||||
|
|
||||||
# Read file content
|
# Get JWT token using same logic as DocumentFileProcessor
|
||||||
with open(file_path, "rb") as f:
|
# This will handle anonymous JWT creation if needed for anonymous user
|
||||||
content = f.read()
|
effective_jwt = None
|
||||||
|
|
||||||
# Create file tuple for upload
|
# Let session manager handle anonymous JWT creation if needed
|
||||||
filename = os.path.basename(file_path)
|
if session_manager:
|
||||||
# Determine content type based on file extension
|
# This call will create anonymous JWT if needed (same as DocumentFileProcessor)
|
||||||
content_type, _ = mimetypes.guess_type(filename)
|
session_manager.get_user_opensearch_client(
|
||||||
if not content_type:
|
anonymous_user.user_id, effective_jwt
|
||||||
content_type = "application/octet-stream"
|
)
|
||||||
|
# Get the JWT that was created by session manager
|
||||||
|
if hasattr(session_manager, "_anonymous_jwt"):
|
||||||
|
effective_jwt = session_manager._anonymous_jwt
|
||||||
|
|
||||||
file_tuple = (filename, content, content_type)
|
# Prepare tweaks for default documents with anonymous user metadata
|
||||||
|
default_tweaks = {
|
||||||
|
"OpenSearchHybrid-Ve6bS": {
|
||||||
|
"docs_metadata": [
|
||||||
|
{"key": "owner", "value": None},
|
||||||
|
{"key": "owner_name", "value": anonymous_user.name},
|
||||||
|
{"key": "owner_email", "value": anonymous_user.email},
|
||||||
|
{"key": "connector_type", "value": "system_default"},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
# Use AnonymousUser details for default documents
|
# Create a langflow upload task for trackable progress
|
||||||
from session_manager import AnonymousUser
|
task_id = await task_service.create_langflow_upload_task(
|
||||||
|
user_id=None, # Anonymous user
|
||||||
anonymous_user = AnonymousUser()
|
file_paths=file_paths,
|
||||||
|
langflow_file_service=langflow_file_service,
|
||||||
# Get JWT token using same logic as DocumentFileProcessor
|
session_manager=session_manager,
|
||||||
# This will handle anonymous JWT creation if needed for anonymous user
|
jwt_token=effective_jwt,
|
||||||
effective_jwt = None
|
owner_name=anonymous_user.name,
|
||||||
|
owner_email=anonymous_user.email,
|
||||||
# Let session manager handle anonymous JWT creation if needed
|
session_id=None, # No session for default documents
|
||||||
if session_manager:
|
tweaks=default_tweaks,
|
||||||
# This call will create anonymous JWT if needed (same as DocumentFileProcessor)
|
settings=None, # Use default ingestion settings
|
||||||
session_manager.get_user_opensearch_client(
|
delete_after_ingest=True, # Clean up after ingestion
|
||||||
anonymous_user.user_id, effective_jwt
|
replace_duplicates=False,
|
||||||
)
|
)
|
||||||
# Get the JWT that was created by session manager
|
|
||||||
if hasattr(session_manager, "_anonymous_jwt"):
|
|
||||||
effective_jwt = session_manager._anonymous_jwt
|
|
||||||
|
|
||||||
# Prepare tweaks for default documents with anonymous user metadata
|
|
||||||
default_tweaks = {
|
|
||||||
"OpenSearchHybrid-Ve6bS": {
|
|
||||||
"docs_metadata": [
|
|
||||||
{"key": "owner", "value": None},
|
|
||||||
{"key": "owner_name", "value": anonymous_user.name},
|
|
||||||
{"key": "owner_email", "value": anonymous_user.email},
|
|
||||||
{"key": "connector_type", "value": "system_default"},
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Use langflow upload_and_ingest_file method with JWT token
|
|
||||||
result = await langflow_file_service.upload_and_ingest_file(
|
|
||||||
file_tuple=file_tuple,
|
|
||||||
session_id=None, # No session for default documents
|
|
||||||
tweaks=default_tweaks, # Add anonymous user metadata
|
|
||||||
settings=None, # Use default ingestion settings
|
|
||||||
jwt_token=effective_jwt, # Use JWT token (anonymous if needed)
|
|
||||||
delete_after_ingest=True, # Clean up after ingestion
|
|
||||||
owner=None,
|
|
||||||
owner_name=anonymous_user.name,
|
|
||||||
owner_email=anonymous_user.email,
|
|
||||||
connector_type="system_default",
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"Successfully ingested file via Langflow",
|
|
||||||
file_path=file_path,
|
|
||||||
result_status=result.get("status"),
|
|
||||||
)
|
|
||||||
success_count += 1
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(
|
|
||||||
"Failed to ingest file via Langflow",
|
|
||||||
file_path=file_path,
|
|
||||||
error=str(e),
|
|
||||||
)
|
|
||||||
error_count += 1
|
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Langflow ingestion completed",
|
"Started Langflow ingestion task for default documents",
|
||||||
success_count=success_count,
|
task_id=task_id,
|
||||||
error_count=error_count,
|
file_count=len(file_paths),
|
||||||
total_files=len(file_paths),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue