Dataset loading on start.

This commit is contained in:
Edwin Jose 2025-09-02 15:13:21 -04:00
parent e810aef588
commit ad3f1e28b9
9 changed files with 328 additions and 6 deletions

118
FIRST_RUN_SETUP.md Normal file
View file

@ -0,0 +1,118 @@
# OpenRAG First-Run Setup
This document describes the automatic dataset loading feature that initializes OpenRAG with default documents on first startup.
## Overview
OpenRAG now includes a first-run initialization system that automatically loads documents from a default dataset directory when the application starts for the first time.
## Configuration
### Environment Variables
- `DATA_DIRECTORY`: Path to the directory containing default documents to load on first run (default: `./documents`)
- `SKIP_FIRST_RUN_INIT`: Set to `true` to disable automatic first-run initialization (default: `false`)
### Example .env Configuration
```bash
# Default dataset directory for automatic ingestion on first run
DATA_DIRECTORY=./documents
# Skip first-run initialization (set to true to disable automatic dataset loading)
# SKIP_FIRST_RUN_INIT=false
```
## How It Works
1. **First-Run Detection**: The system checks for a `.openrag_initialized` marker file in the application root
2. **Document Detection**: If no marker file exists and there are no existing documents in the OpenSearch index, first-run initialization triggers
3. **File Discovery**: The system scans the `DATA_DIRECTORY` for supported document types (PDF, TXT, DOC, DOCX, MD, RTF, ODT)
4. **Automatic Ingestion**: Found files are automatically processed and ingested into the OpenSearch index using the existing upload workflow
5. **Initialization Marker**: After successful setup, a marker file is created to prevent re-initialization on subsequent startups
## Docker Configuration
The `DATA_DIRECTORY` environment variable is automatically passed to the Docker containers. The default `./documents` directory is already mounted as a volume in the Docker configuration.
### Docker Compose
Both `docker-compose.yml` and `docker-compose-cpu.yml` have been updated to include:
```yaml
environment:
- DATA_DIRECTORY=${DATA_DIRECTORY}
volumes:
- ./documents:/app/documents:Z
```
## File Structure
```
openrag/
├── documents/ # Default dataset directory
│ ├── sample1.pdf
│ ├── sample2.txt
│ └── ...
├── .openrag_initialized # Created after first successful initialization
└── src/
└── utils/
├── first_run.py # First-run detection logic
└── default_ingestion.py # Dataset ingestion logic
```
## Supported File Types
The first-run initialization supports the following document types:
- PDF (.pdf)
- Plain text (.txt)
- Microsoft Word (.doc, .docx)
- Markdown (.md)
- Rich Text Format (.rtf)
- OpenDocument Text (.odt)
## Behavior
### Normal First Run
1. Application starts
2. OpenSearch index is initialized
3. System checks for existing documents
4. If none found, scans `DATA_DIRECTORY`
5. Creates background task to process found documents
6. Creates `.openrag_initialized` marker file
7. Documents are processed asynchronously in the background
### Subsequent Runs
1. Application starts
2. System detects `.openrag_initialized` marker file
3. First-run initialization is skipped
4. Application starts normally
### Skipping Initialization
Set `SKIP_FIRST_RUN_INIT=true` in your environment to disable first-run initialization entirely.
## Monitoring
First-run initialization creates a background task that can be monitored through:
- Console logs with `[FIRST_RUN]` prefix
- Task API endpoints (for system tasks)
## Troubleshooting
### No Documents Were Loaded
1. Check that `DATA_DIRECTORY` points to a valid directory
2. Verify the directory contains supported file types
3. Check console logs for `[FIRST_RUN]` messages
4. Ensure OpenSearch is running and accessible
### Disable First-Run Setup
If you want to prevent automatic initialization:
1. Set `SKIP_FIRST_RUN_INIT=true` in your .env file
2. Or create an empty `.openrag_initialized` file manually
### Force Re-initialization
To force first-run setup to run again:
1. Stop the application
2. Delete the `.openrag_initialized` file
3. Optionally clear the OpenSearch index
4. Restart the application

View file

@ -66,6 +66,7 @@ services:
- WEBHOOK_BASE_URL=${WEBHOOK_BASE_URL} - WEBHOOK_BASE_URL=${WEBHOOK_BASE_URL}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- DATA_DIRECTORY=${DATA_DIRECTORY}
volumes: volumes:
- ./documents:/app/documents:Z - ./documents:/app/documents:Z
- ./keys:/app/keys:Z - ./keys:/app/keys:Z

View file

@ -66,6 +66,7 @@ services:
- WEBHOOK_BASE_URL=${WEBHOOK_BASE_URL} - WEBHOOK_BASE_URL=${WEBHOOK_BASE_URL}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- DATA_DIRECTORY=${DATA_DIRECTORY}
volumes: volumes:
- ./documents:/app/documents:Z - ./documents:/app/documents:Z
- ./keys:/app/keys:Z - ./keys:/app/keys:Z

View file

@ -6,7 +6,7 @@ async def task_status(request: Request, task_service, session_manager):
task_id = request.path_params.get("task_id") task_id = request.path_params.get("task_id")
user = request.state.user user = request.state.user
task_status_result = task_service.get_task_status(user.user_id, task_id) task_status_result = await task_service.get_task_status(task_id, user.user_id)
if not task_status_result: if not task_status_result:
return JSONResponse({"error": "Task not found"}, status_code=404) return JSONResponse({"error": "Task not found"}, status_code=404)

View file

@ -30,6 +30,9 @@ SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-producti
GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID") GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID")
GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET")
# Default dataset directory for first-run initialization
DATA_DIRECTORY = os.getenv("DATA_DIRECTORY", "./documents")
def is_no_auth_mode(): def is_no_auth_mode():
"""Check if we're running in no-auth mode (OAuth credentials missing)""" """Check if we're running in no-auth mode (OAuth credentials missing)"""
result = not (GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET) result = not (GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET)

View file

@ -153,6 +153,33 @@ async def init_index_when_ready():
except Exception as e: except Exception as e:
print(f"OpenSearch index initialization failed: {e}") print(f"OpenSearch index initialization failed: {e}")
print("OIDC endpoints will still work, but document operations may fail until OpenSearch is ready") print("OIDC endpoints will still work, but document operations may fail until OpenSearch is ready")
async def run_first_time_initialization(services):
"""Run first-time initialization if needed"""
try:
from utils.default_ingestion import run_first_time_setup
await run_first_time_setup(services['document_service'], services['task_service'])
except Exception as e:
print(f"[ERROR] First-time initialization failed: {e}")
async def run_first_time_initialization_when_ready(services):
"""Run first-time initialization after OpenSearch index is ready"""
# Wait for OpenSearch to be initialized first
max_retries = 30
retry_delay = 2
for attempt in range(max_retries):
try:
await clients.opensearch.info()
# Index is ready, now run first-time initialization
await run_first_time_initialization(services)
return
except Exception as e:
print(f"[FIRST_RUN] Waiting for OpenSearch (attempt {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
else:
print("[FIRST_RUN] Failed to wait for OpenSearch, skipping first-time initialization")
async def initialize_services(): async def initialize_services():
@ -472,6 +499,8 @@ async def create_app():
async def startup_event(): async def startup_event():
# Start index initialization in background to avoid blocking OIDC endpoints # Start index initialization in background to avoid blocking OIDC endpoints
asyncio.create_task(init_index_when_ready()) asyncio.create_task(init_index_when_ready())
# Start first-time initialization in background after index is ready
asyncio.create_task(run_first_time_initialization_when_ready(services))
# Add shutdown event handler # Add shutdown event handler
@app.on_event("shutdown") @app.on_event("shutdown")

View file

@ -145,14 +145,25 @@ class TaskService:
self.task_store[user_id][task_id].status = TaskStatus.FAILED self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time() self.task_store[user_id][task_id].updated_at = time.time()
def get_task_status(self, user_id: str, task_id: str) -> dict: async def get_task_status(self, task_id: str, user_id: str = None) -> dict:
"""Get the status of a specific upload task""" """Get the status of a specific upload task"""
if (not task_id or if not task_id:
user_id not in self.task_store or
task_id not in self.task_store[user_id]):
return None return None
upload_task = self.task_store[user_id][task_id] upload_task = None
# If user_id is provided, look in that user's tasks
if user_id and user_id in self.task_store and task_id in self.task_store[user_id]:
upload_task = self.task_store[user_id][task_id]
# If user_id not provided or task not found, search all users
elif user_id is None:
for uid, user_tasks in self.task_store.items():
if task_id in user_tasks:
upload_task = user_tasks[task_id]
break
if upload_task is None:
return None
file_statuses = {} file_statuses = {}
for file_path, file_task in upload_task.file_tasks.items(): for file_path, file_task in upload_task.file_tasks.items():

View file

@ -0,0 +1,104 @@
import asyncio
import os
from pathlib import Path
from typing import List
from utils.first_run import get_default_dataset_files, has_existing_documents, is_first_run, mark_initialized, should_run_initialization
from services.task_service import TaskService
from services.document_service import DocumentService
class DefaultDatasetIngestor:
"""Handles automatic ingestion of default datasets on first run"""
def __init__(self, document_service: DocumentService, task_service: TaskService):
self.document_service = document_service
self.task_service = task_service
async def should_ingest(self) -> bool:
"""Check if we should perform default dataset ingestion"""
if not should_run_initialization():
print("[FIRST_RUN] Initialization skipped due to SKIP_FIRST_RUN_INIT")
return False
first_run = await is_first_run()
existing_docs = await has_existing_documents()
# Only ingest if it's first run AND there are no existing documents
should_ingest = first_run and not existing_docs
print(f"[FIRST_RUN] First run: {first_run}, Existing docs: {existing_docs}, Should ingest: {should_ingest}")
return should_ingest
async def ingest_default_dataset(self) -> bool:
"""Ingest the default dataset files"""
try:
files = await get_default_dataset_files()
if not files:
print("[FIRST_RUN] No default dataset files found to ingest")
await mark_initialized()
return True
print(f"[FIRST_RUN] Found {len(files)} files to ingest from default dataset")
# Create a system task for ingesting default files
# Use a dummy user ID for system operations
system_user_id = "system"
task_id = await self.task_service.create_upload_task(
user_id=system_user_id,
file_paths=files,
jwt_token=None, # No JWT needed for system operations
owner_name="System",
owner_email="system@openrag.local"
)
print(f"[FIRST_RUN] Created task {task_id} for default dataset ingestion")
# Wait a bit for the task to start processing
await asyncio.sleep(2)
# Monitor task progress (but don't block indefinitely)
max_wait_time = 300 # 5 minutes max
check_interval = 5 # Check every 5 seconds
elapsed_time = 0
while elapsed_time < max_wait_time:
try:
task_status = await self.task_service.get_task_status(task_id)
if task_status and task_status.get("status") in ["completed", "failed"]:
break
await asyncio.sleep(check_interval)
elapsed_time += check_interval
# Log progress every 30 seconds
if elapsed_time % 30 == 0:
processed = task_status.get("processed", 0) if task_status else 0
total = len(files)
print(f"[FIRST_RUN] Task {task_id} progress: {processed}/{total} files processed")
except Exception as e:
print(f"[FIRST_RUN] Error checking task status: {e}")
break
# Mark as initialized regardless of task completion
# The task will continue running in the background if needed
await mark_initialized()
print("[FIRST_RUN] Default dataset ingestion initiated successfully")
return True
except Exception as e:
print(f"[FIRST_RUN] Error during default dataset ingestion: {e}")
# Still mark as initialized to prevent retrying on every startup
await mark_initialized()
return False
async def run_first_time_setup(document_service: DocumentService, task_service: TaskService) -> bool:
"""Run first-time setup if needed"""
ingestor = DefaultDatasetIngestor(document_service, task_service)
if await ingestor.should_ingest():
print("[FIRST_RUN] Starting first-time default dataset ingestion...")
return await ingestor.ingest_default_dataset()
else:
print("[FIRST_RUN] Skipping first-time setup")
return True

55
src/utils/first_run.py Normal file
View file

@ -0,0 +1,55 @@
import os
import asyncio
from pathlib import Path
from config.settings import DATA_DIRECTORY, clients, INDEX_NAME
FIRST_RUN_MARKER = ".openrag_initialized"
async def is_first_run():
"""Check if this is the first run by looking for initialization marker"""
marker_path = Path(FIRST_RUN_MARKER)
return not marker_path.exists()
async def mark_initialized():
"""Create marker file to indicate successful initialization"""
marker_path = Path(FIRST_RUN_MARKER)
marker_path.write_text("initialized")
async def has_existing_documents():
"""Check if there are already documents in the OpenSearch index"""
try:
response = await clients.opensearch.search(
index=INDEX_NAME,
body={"size": 0, "track_total_hits": True}
)
total_docs = response["hits"]["total"]["value"]
return total_docs > 0
except Exception as e:
print(f"Error checking existing documents: {e}")
return False
async def get_default_dataset_files():
"""Get list of files in the default dataset directory"""
data_dir = Path(DATA_DIRECTORY)
if not data_dir.exists() or not data_dir.is_dir():
print(f"Default dataset directory {DATA_DIRECTORY} does not exist or is not a directory")
return []
# Get all files recursively, excluding hidden files and directories
files = []
for file_path in data_dir.rglob("*"):
if file_path.is_file() and not file_path.name.startswith("."):
# Filter for document types (pdf, txt, doc, docx, etc.)
if file_path.suffix.lower() in ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt']:
files.append(str(file_path.absolute()))
return files
def should_run_initialization():
"""Determine if we should run first-run initialization"""
# Check for environment variable to skip initialization
skip_init = os.getenv("SKIP_FIRST_RUN_INIT", "false").lower() in ["true", "1", "yes"]
if skip_init:
return False
return True