Merge branch 'ingest-settings' of github.com:phact/gendb into ingest-settings
This commit is contained in:
commit
f85dce6b3f
2 changed files with 35 additions and 49 deletions
82
src/main.py
82
src/main.py
|
|
@ -1,8 +1,7 @@
|
||||||
|
|
||||||
# Configure structured logging early
|
# Configure structured logging early
|
||||||
from services.flows_service import FlowsService
|
|
||||||
from connectors.langflow_connector_service import LangflowConnectorService
|
from connectors.langflow_connector_service import LangflowConnectorService
|
||||||
from connectors.service import ConnectorService
|
from connectors.service import ConnectorService
|
||||||
|
from services.flows_service import FlowsService
|
||||||
from utils.logging_config import configure_from_env, get_logger
|
from utils.logging_config import configure_from_env, get_logger
|
||||||
|
|
||||||
configure_from_env()
|
configure_from_env()
|
||||||
|
|
@ -23,24 +22,28 @@ from starlette.routing import Route
|
||||||
multiprocessing.set_start_method("spawn", force=True)
|
multiprocessing.set_start_method("spawn", force=True)
|
||||||
|
|
||||||
# Create process pool FIRST, before any torch/CUDA imports
|
# Create process pool FIRST, before any torch/CUDA imports
|
||||||
from utils.process_pool import process_pool
|
from utils.process_pool import process_pool # isort: skip
|
||||||
|
|
||||||
import torch
|
import torch
|
||||||
|
|
||||||
# API endpoints
|
# API endpoints
|
||||||
from api import (
|
from api import (
|
||||||
router,
|
|
||||||
auth,
|
auth,
|
||||||
chat,
|
chat,
|
||||||
connectors,
|
connectors,
|
||||||
|
flows,
|
||||||
knowledge_filter,
|
knowledge_filter,
|
||||||
langflow_files,
|
langflow_files,
|
||||||
|
nudges,
|
||||||
oidc,
|
oidc,
|
||||||
|
router,
|
||||||
search,
|
search,
|
||||||
settings,
|
settings,
|
||||||
tasks,
|
tasks,
|
||||||
upload,
|
upload,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Existing services
|
||||||
|
from api.connector_router import ConnectorRouter
|
||||||
from auth_middleware import optional_auth, require_auth
|
from auth_middleware import optional_auth, require_auth
|
||||||
|
|
||||||
# Configuration and setup
|
# Configuration and setup
|
||||||
|
|
@ -53,9 +56,6 @@ from config.settings import (
|
||||||
clients,
|
clients,
|
||||||
is_no_auth_mode,
|
is_no_auth_mode,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Existing services
|
|
||||||
from api.connector_router import ConnectorRouter
|
|
||||||
from services.auth_service import AuthService
|
from services.auth_service import AuthService
|
||||||
from services.chat_service import ChatService
|
from services.chat_service import ChatService
|
||||||
|
|
||||||
|
|
@ -70,24 +70,6 @@ from services.monitor_service import MonitorService
|
||||||
from services.search_service import SearchService
|
from services.search_service import SearchService
|
||||||
from services.task_service import TaskService
|
from services.task_service import TaskService
|
||||||
from session_manager import SessionManager
|
from session_manager import SessionManager
|
||||||
from utils.process_pool import process_pool
|
|
||||||
|
|
||||||
# API endpoints
|
|
||||||
from api import (
|
|
||||||
flows,
|
|
||||||
router,
|
|
||||||
nudges,
|
|
||||||
upload,
|
|
||||||
search,
|
|
||||||
chat,
|
|
||||||
auth,
|
|
||||||
connectors,
|
|
||||||
tasks,
|
|
||||||
oidc,
|
|
||||||
knowledge_filter,
|
|
||||||
settings,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"CUDA device information",
|
"CUDA device information",
|
||||||
|
|
@ -246,7 +228,10 @@ async def init_index_when_ready():
|
||||||
async def ingest_default_documents_when_ready(services):
|
async def ingest_default_documents_when_ready(services):
|
||||||
"""Scan the local documents folder and ingest files like a non-auth upload."""
|
"""Scan the local documents folder and ingest files like a non-auth upload."""
|
||||||
try:
|
try:
|
||||||
logger.info("Ingesting default documents when ready", disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW)
|
logger.info(
|
||||||
|
"Ingesting default documents when ready",
|
||||||
|
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
|
||||||
|
)
|
||||||
base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents"))
|
base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents"))
|
||||||
if not os.path.isdir(base_dir):
|
if not os.path.isdir(base_dir):
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|
@ -281,40 +266,41 @@ async def _ingest_default_documents_langflow(services, file_paths):
|
||||||
"""Ingest default documents using Langflow upload-ingest-delete pipeline."""
|
"""Ingest default documents using Langflow upload-ingest-delete pipeline."""
|
||||||
langflow_file_service = services["langflow_file_service"]
|
langflow_file_service = services["langflow_file_service"]
|
||||||
session_manager = services["session_manager"]
|
session_manager = services["session_manager"]
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Using Langflow ingestion pipeline for default documents",
|
"Using Langflow ingestion pipeline for default documents",
|
||||||
file_count=len(file_paths),
|
file_count=len(file_paths),
|
||||||
)
|
)
|
||||||
|
|
||||||
success_count = 0
|
success_count = 0
|
||||||
error_count = 0
|
error_count = 0
|
||||||
|
|
||||||
for file_path in file_paths:
|
for file_path in file_paths:
|
||||||
try:
|
try:
|
||||||
logger.debug("Processing file with Langflow pipeline", file_path=file_path)
|
logger.debug("Processing file with Langflow pipeline", file_path=file_path)
|
||||||
|
|
||||||
# Read file content
|
# Read file content
|
||||||
with open(file_path, 'rb') as f:
|
with open(file_path, "rb") as f:
|
||||||
content = f.read()
|
content = f.read()
|
||||||
|
|
||||||
# Create file tuple for upload
|
# Create file tuple for upload
|
||||||
filename = os.path.basename(file_path)
|
filename = os.path.basename(file_path)
|
||||||
# Determine content type based on file extension
|
# Determine content type based on file extension
|
||||||
content_type, _ = mimetypes.guess_type(filename)
|
content_type, _ = mimetypes.guess_type(filename)
|
||||||
if not content_type:
|
if not content_type:
|
||||||
content_type = 'application/octet-stream'
|
content_type = "application/octet-stream"
|
||||||
|
|
||||||
file_tuple = (filename, content, content_type)
|
file_tuple = (filename, content, content_type)
|
||||||
|
|
||||||
# Use AnonymousUser details for default documents
|
# Use AnonymousUser details for default documents
|
||||||
from session_manager import AnonymousUser
|
from session_manager import AnonymousUser
|
||||||
|
|
||||||
anonymous_user = AnonymousUser()
|
anonymous_user = AnonymousUser()
|
||||||
|
|
||||||
# Get JWT token using same logic as DocumentFileProcessor
|
# Get JWT token using same logic as DocumentFileProcessor
|
||||||
# This will handle anonymous JWT creation if needed for anonymous user
|
# This will handle anonymous JWT creation if needed for anonymous user
|
||||||
effective_jwt = None
|
effective_jwt = None
|
||||||
|
|
||||||
# Let session manager handle anonymous JWT creation if needed
|
# Let session manager handle anonymous JWT creation if needed
|
||||||
if session_manager:
|
if session_manager:
|
||||||
# This call will create anonymous JWT if needed (same as DocumentFileProcessor)
|
# This call will create anonymous JWT if needed (same as DocumentFileProcessor)
|
||||||
|
|
@ -322,9 +308,9 @@ async def _ingest_default_documents_langflow(services, file_paths):
|
||||||
anonymous_user.user_id, effective_jwt
|
anonymous_user.user_id, effective_jwt
|
||||||
)
|
)
|
||||||
# Get the JWT that was created by session manager
|
# Get the JWT that was created by session manager
|
||||||
if hasattr(session_manager, '_anonymous_jwt'):
|
if hasattr(session_manager, "_anonymous_jwt"):
|
||||||
effective_jwt = session_manager._anonymous_jwt
|
effective_jwt = session_manager._anonymous_jwt
|
||||||
|
|
||||||
# Prepare tweaks for default documents with anonymous user metadata
|
# Prepare tweaks for default documents with anonymous user metadata
|
||||||
default_tweaks = {
|
default_tweaks = {
|
||||||
"OpenSearchHybrid-Ve6bS": {
|
"OpenSearchHybrid-Ve6bS": {
|
||||||
|
|
@ -332,11 +318,11 @@ async def _ingest_default_documents_langflow(services, file_paths):
|
||||||
{"key": "owner", "value": None},
|
{"key": "owner", "value": None},
|
||||||
{"key": "owner_name", "value": anonymous_user.name},
|
{"key": "owner_name", "value": anonymous_user.name},
|
||||||
{"key": "owner_email", "value": anonymous_user.email},
|
{"key": "owner_email", "value": anonymous_user.email},
|
||||||
{"key": "connector_type", "value": "system_default"}
|
{"key": "connector_type", "value": "system_default"},
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Use langflow upload_and_ingest_file method with JWT token
|
# Use langflow upload_and_ingest_file method with JWT token
|
||||||
result = await langflow_file_service.upload_and_ingest_file(
|
result = await langflow_file_service.upload_and_ingest_file(
|
||||||
file_tuple=file_tuple,
|
file_tuple=file_tuple,
|
||||||
|
|
@ -346,14 +332,14 @@ async def _ingest_default_documents_langflow(services, file_paths):
|
||||||
jwt_token=effective_jwt, # Use JWT token (anonymous if needed)
|
jwt_token=effective_jwt, # Use JWT token (anonymous if needed)
|
||||||
delete_after_ingest=True, # Clean up after ingestion
|
delete_after_ingest=True, # Clean up after ingestion
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Successfully ingested file via Langflow",
|
"Successfully ingested file via Langflow",
|
||||||
file_path=file_path,
|
file_path=file_path,
|
||||||
result_status=result.get("status"),
|
result_status=result.get("status"),
|
||||||
)
|
)
|
||||||
success_count += 1
|
success_count += 1
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
"Failed to ingest file via Langflow",
|
"Failed to ingest file via Langflow",
|
||||||
|
|
@ -361,7 +347,7 @@ async def _ingest_default_documents_langflow(services, file_paths):
|
||||||
error=str(e),
|
error=str(e),
|
||||||
)
|
)
|
||||||
error_count += 1
|
error_count += 1
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Langflow ingestion completed",
|
"Langflow ingestion completed",
|
||||||
success_count=success_count,
|
success_count=success_count,
|
||||||
|
|
@ -376,7 +362,7 @@ async def _ingest_default_documents_openrag(services, file_paths):
|
||||||
"Using traditional OpenRAG ingestion for default documents",
|
"Using traditional OpenRAG ingestion for default documents",
|
||||||
file_count=len(file_paths),
|
file_count=len(file_paths),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None)
|
# Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None)
|
||||||
from models.processors import DocumentFileProcessor
|
from models.processors import DocumentFileProcessor
|
||||||
|
|
||||||
|
|
@ -443,11 +429,11 @@ async def initialize_services():
|
||||||
task_service=task_service,
|
task_service=task_service,
|
||||||
session_manager=session_manager,
|
session_manager=session_manager,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create connector router that chooses based on configuration
|
# Create connector router that chooses based on configuration
|
||||||
connector_service = ConnectorRouter(
|
connector_service = ConnectorRouter(
|
||||||
langflow_connector_service=langflow_connector_service,
|
langflow_connector_service=langflow_connector_service,
|
||||||
openrag_connector_service=openrag_connector_service
|
openrag_connector_service=openrag_connector_service,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Initialize auth service
|
# Initialize auth service
|
||||||
|
|
|
||||||
2
uv.lock
generated
2
uv.lock
generated
|
|
@ -1405,7 +1405,7 @@ wheels = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "openrag"
|
name = "openrag"
|
||||||
version = "0.1.1"
|
version = "0.1.2"
|
||||||
source = { editable = "." }
|
source = { editable = "." }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "agentd" },
|
{ name = "agentd" },
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue