ingestion modes for the default files

This commit is contained in:
Edwin Jose 2025-09-08 13:48:45 -04:00
parent 64ea8b6567
commit 3cb379370e
5 changed files with 137 additions and 19 deletions

View file

@ -28,3 +28,11 @@ LANGFLOW_SUPERUSER=
LANGFLOW_SUPERUSER_PASSWORD=
LANGFLOW_NEW_USER_IS_ACTIVE=False
LANGFLOW_ENABLE_SUPERUSER_CLI=False
# Ingestion Mode Configuration
# Options: "langflow" (default) or "openrag"
# - langflow: Use Langflow pipeline for document ingestion (upload -> ingest -> delete)
# - openrag: Use traditional OpenRAG processor for document ingestion
INGEST_MODE=langflow

View file

@ -28,6 +28,37 @@ If you need to reset state:
docker compose up --build --force-recreate --remove-orphans
### Configuration
OpenRAG uses environment variables for configuration. Copy `.env.example` to `.env` and populate with your values:
```bash
cp .env.example .env
```
#### Key Environment Variables
**Required:**
- `OPENAI_API_KEY`: Your OpenAI API key
- `OPENSEARCH_PASSWORD`: Password for OpenSearch admin user
- `LANGFLOW_SUPERUSER`: Langflow admin username
- `LANGFLOW_SUPERUSER_PASSWORD`: Langflow admin password
- `LANGFLOW_CHAT_FLOW_ID`: ID of your Langflow chat flow
- `LANGFLOW_INGEST_FLOW_ID`: ID of your Langflow ingestion flow
**Ingestion Configuration:**
- `INGEST_MODE`: Controls how default documents are ingested (default: `langflow`)
- `langflow`: Uses Langflow pipeline for document ingestion (upload → ingest → delete)
- `openrag`: Uses traditional OpenRAG processor for document ingestion
**Optional:**
- `LANGFLOW_PUBLIC_URL`: Public URL for Langflow (default: `http://localhost:7860`)
- `GOOGLE_OAUTH_CLIENT_ID` / `GOOGLE_OAUTH_CLIENT_SECRET`: For Google OAuth authentication
- `MICROSOFT_GRAPH_OAUTH_CLIENT_ID` / `MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET`: For Microsoft OAuth
- `WEBHOOK_BASE_URL`: Base URL for webhook endpoints
- `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`: For AWS integrations
See `.env.example` for a complete list with descriptions, or check the docker-compose.yml files.
For podman on mac you may have to increase your VM memory (`podman stats` should not show limit at only 2gb):

View file

@ -54,6 +54,7 @@ services:
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
- INGEST_MODE=${INGEST_MODE:-langflow}
- OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin
- OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD}

View file

@ -45,6 +45,9 @@ SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-producti
GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID")
GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET")
# Ingestion mode configuration
INGEST_MODE = os.getenv("INGEST_MODE", "langflow").lower() # "langflow" or "openrag"
def is_no_auth_mode():
"""Check if we're running in no-auth mode (OAuth credentials missing)"""

View file

@ -43,6 +43,7 @@ from auth_middleware import optional_auth, require_auth
from config.settings import (
INDEX_BODY,
INDEX_NAME,
INGEST_MODE,
SESSION_SECRET,
clients,
is_no_auth_mode,
@ -226,7 +227,7 @@ async def init_index_when_ready():
async def ingest_default_documents_when_ready(services):
"""Scan the local documents folder and ingest files like a non-auth upload."""
try:
logger.info("Ingesting default documents when ready")
logger.info("Ingesting default documents when ready", ingest_mode=INGEST_MODE)
base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents"))
if not os.path.isdir(base_dir):
logger.info(
@ -248,29 +249,103 @@ async def ingest_default_documents_when_ready(services):
)
return
# Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None)
from models.processors import DocumentFileProcessor
if INGEST_MODE == "langflow":
await _ingest_default_documents_langflow(services, file_paths)
else:
await _ingest_default_documents_openrag(services, file_paths)
processor = DocumentFileProcessor(
services["document_service"],
owner_user_id=None,
jwt_token=None,
owner_name=None,
owner_email=None,
)
task_id = await services["task_service"].create_custom_task(
"anonymous", file_paths, processor
)
logger.info(
"Started default documents ingestion task",
task_id=task_id,
file_count=len(file_paths),
)
except Exception as e:
logger.error("Default documents ingestion failed", error=str(e))
async def _ingest_default_documents_langflow(services, file_paths):
"""Ingest default documents using Langflow upload-ingest-delete pipeline."""
langflow_file_service = services["langflow_file_service"]
logger.info(
"Using Langflow ingestion pipeline for default documents",
file_count=len(file_paths),
)
success_count = 0
error_count = 0
for file_path in file_paths:
try:
logger.debug("Processing file with Langflow pipeline", file_path=file_path)
# Read file content
with open(file_path, 'rb') as f:
content = f.read()
# Create file tuple for upload
filename = os.path.basename(file_path)
# Determine content type based on file extension
import mimetypes
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = 'application/octet-stream'
file_tuple = (filename, content, content_type)
# Use langflow upload_and_ingest_file method
result = await langflow_file_service.upload_and_ingest_file(
file_tuple=file_tuple,
jwt_token=None, # No auth for default documents
delete_after_ingest=True, # Clean up after ingestion
)
logger.info(
"Successfully ingested file via Langflow",
file_path=file_path,
result_status=result.get("status"),
)
success_count += 1
except Exception as e:
logger.error(
"Failed to ingest file via Langflow",
file_path=file_path,
error=str(e),
)
error_count += 1
logger.info(
"Langflow ingestion completed",
success_count=success_count,
error_count=error_count,
total_files=len(file_paths),
)
async def _ingest_default_documents_openrag(services, file_paths):
"""Ingest default documents using traditional OpenRAG processor."""
logger.info(
"Using traditional OpenRAG ingestion for default documents",
file_count=len(file_paths),
)
# Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None)
from models.processors import DocumentFileProcessor
processor = DocumentFileProcessor(
services["document_service"],
owner_user_id=None,
jwt_token=None,
owner_name=None,
owner_email=None,
)
task_id = await services["task_service"].create_custom_task(
"anonymous", file_paths, processor
)
logger.info(
"Started traditional OpenRAG ingestion task",
task_id=task_id,
file_count=len(file_paths),
)
async def startup_tasks(services):
"""Startup tasks"""
logger.info("Starting startup tasks")