diff --git a/.env.example b/.env.example index 84787216..a5b110bc 100644 --- a/.env.example +++ b/.env.example @@ -28,3 +28,11 @@ LANGFLOW_SUPERUSER= LANGFLOW_SUPERUSER_PASSWORD= LANGFLOW_NEW_USER_IS_ACTIVE=False LANGFLOW_ENABLE_SUPERUSER_CLI=False + + + +# Ingestion Mode Configuration +# Options: "langflow" (default) or "openrag" +# - langflow: Use Langflow pipeline for document ingestion (upload -> ingest -> delete) +# - openrag: Use traditional OpenRAG processor for document ingestion +INGEST_MODE=langflow \ No newline at end of file diff --git a/README.md b/README.md index 5ebd64f4..d92fd65c 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,37 @@ If you need to reset state: docker compose up --build --force-recreate --remove-orphans +### Configuration + +OpenRAG uses environment variables for configuration. Copy `.env.example` to `.env` and populate with your values: + +```bash +cp .env.example .env +``` + +#### Key Environment Variables + +**Required:** +- `OPENAI_API_KEY`: Your OpenAI API key +- `OPENSEARCH_PASSWORD`: Password for OpenSearch admin user +- `LANGFLOW_SUPERUSER`: Langflow admin username +- `LANGFLOW_SUPERUSER_PASSWORD`: Langflow admin password +- `LANGFLOW_CHAT_FLOW_ID`: ID of your Langflow chat flow +- `LANGFLOW_INGEST_FLOW_ID`: ID of your Langflow ingestion flow + +**Ingestion Configuration:** +- `INGEST_MODE`: Controls how default documents are ingested (default: `langflow`) + - `langflow`: Uses Langflow pipeline for document ingestion (upload → ingest → delete) + - `openrag`: Uses traditional OpenRAG processor for document ingestion + +**Optional:** +- `LANGFLOW_PUBLIC_URL`: Public URL for Langflow (default: `http://localhost:7860`) +- `GOOGLE_OAUTH_CLIENT_ID` / `GOOGLE_OAUTH_CLIENT_SECRET`: For Google OAuth authentication +- `MICROSOFT_GRAPH_OAUTH_CLIENT_ID` / `MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET`: For Microsoft OAuth +- `WEBHOOK_BASE_URL`: Base URL for webhook endpoints +- `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`: For AWS integrations + +See `.env.example` for a complete list with descriptions, or check the docker-compose.yml files. For podman on mac you may have to increase your VM memory (`podman stats` should not show limit at only 2gb): diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index 99da6703..30bb2960 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -54,6 +54,7 @@ services: - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID} - LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID} + - INGEST_MODE=${INGEST_MODE:-langflow} - OPENSEARCH_PORT=9200 - OPENSEARCH_USERNAME=admin - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} diff --git a/src/config/settings.py b/src/config/settings.py index 8b5aaa4f..8f7334c9 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -45,6 +45,9 @@ SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-producti GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID") GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") +# Ingestion mode configuration +INGEST_MODE = os.getenv("INGEST_MODE", "langflow").lower() # "langflow" or "openrag" + def is_no_auth_mode(): """Check if we're running in no-auth mode (OAuth credentials missing)""" diff --git a/src/main.py b/src/main.py index a907bd61..1a3e66cc 100644 --- a/src/main.py +++ b/src/main.py @@ -43,6 +43,7 @@ from auth_middleware import optional_auth, require_auth from config.settings import ( INDEX_BODY, INDEX_NAME, + INGEST_MODE, SESSION_SECRET, clients, is_no_auth_mode, @@ -226,7 +227,7 @@ async def init_index_when_ready(): async def ingest_default_documents_when_ready(services): """Scan the local documents folder and ingest files like a non-auth upload.""" try: - logger.info("Ingesting default documents when ready") + logger.info("Ingesting default documents when ready", ingest_mode=INGEST_MODE) base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents")) if not os.path.isdir(base_dir): logger.info( @@ -248,29 +249,103 @@ async def ingest_default_documents_when_ready(services): ) return - # Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None) - from models.processors import DocumentFileProcessor + if INGEST_MODE == "langflow": + await _ingest_default_documents_langflow(services, file_paths) + else: + await _ingest_default_documents_openrag(services, file_paths) - processor = DocumentFileProcessor( - services["document_service"], - owner_user_id=None, - jwt_token=None, - owner_name=None, - owner_email=None, - ) - - task_id = await services["task_service"].create_custom_task( - "anonymous", file_paths, processor - ) - logger.info( - "Started default documents ingestion task", - task_id=task_id, - file_count=len(file_paths), - ) except Exception as e: logger.error("Default documents ingestion failed", error=str(e)) +async def _ingest_default_documents_langflow(services, file_paths): + """Ingest default documents using Langflow upload-ingest-delete pipeline.""" + langflow_file_service = services["langflow_file_service"] + + logger.info( + "Using Langflow ingestion pipeline for default documents", + file_count=len(file_paths), + ) + + success_count = 0 + error_count = 0 + + for file_path in file_paths: + try: + logger.debug("Processing file with Langflow pipeline", file_path=file_path) + + # Read file content + with open(file_path, 'rb') as f: + content = f.read() + + # Create file tuple for upload + filename = os.path.basename(file_path) + # Determine content type based on file extension + import mimetypes + content_type, _ = mimetypes.guess_type(filename) + if not content_type: + content_type = 'application/octet-stream' + + file_tuple = (filename, content, content_type) + + # Use langflow upload_and_ingest_file method + result = await langflow_file_service.upload_and_ingest_file( + file_tuple=file_tuple, + jwt_token=None, # No auth for default documents + delete_after_ingest=True, # Clean up after ingestion + ) + + logger.info( + "Successfully ingested file via Langflow", + file_path=file_path, + result_status=result.get("status"), + ) + success_count += 1 + + except Exception as e: + logger.error( + "Failed to ingest file via Langflow", + file_path=file_path, + error=str(e), + ) + error_count += 1 + + logger.info( + "Langflow ingestion completed", + success_count=success_count, + error_count=error_count, + total_files=len(file_paths), + ) + + +async def _ingest_default_documents_openrag(services, file_paths): + """Ingest default documents using traditional OpenRAG processor.""" + logger.info( + "Using traditional OpenRAG ingestion for default documents", + file_count=len(file_paths), + ) + + # Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None) + from models.processors import DocumentFileProcessor + + processor = DocumentFileProcessor( + services["document_service"], + owner_user_id=None, + jwt_token=None, + owner_name=None, + owner_email=None, + ) + + task_id = await services["task_service"].create_custom_task( + "anonymous", file_paths, processor + ) + logger.info( + "Started traditional OpenRAG ingestion task", + task_id=task_id, + file_count=len(file_paths), + ) + + async def startup_tasks(services): """Startup tasks""" logger.info("Starting startup tasks")