From ad3f1e28b970c96a5cebb5d521d84d1de2ee164e Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Tue, 2 Sep 2025 15:13:21 -0400 Subject: [PATCH] Dataset loading on start. --- FIRST_RUN_SETUP.md | 118 +++++++++++++++++++++++++++++++++ docker-compose-cpu.yml | 1 + docker-compose.yml | 1 + src/api/tasks.py | 2 +- src/config/settings.py | 3 + src/main.py | 29 ++++++++ src/services/task_service.py | 21 ++++-- src/utils/default_ingestion.py | 104 +++++++++++++++++++++++++++++ src/utils/first_run.py | 55 +++++++++++++++ 9 files changed, 328 insertions(+), 6 deletions(-) create mode 100644 FIRST_RUN_SETUP.md create mode 100644 src/utils/default_ingestion.py create mode 100644 src/utils/first_run.py diff --git a/FIRST_RUN_SETUP.md b/FIRST_RUN_SETUP.md new file mode 100644 index 00000000..92d71b88 --- /dev/null +++ b/FIRST_RUN_SETUP.md @@ -0,0 +1,118 @@ +# OpenRAG First-Run Setup + +This document describes the automatic dataset loading feature that initializes OpenRAG with default documents on first startup. + +## Overview + +OpenRAG now includes a first-run initialization system that automatically loads documents from a default dataset directory when the application starts for the first time. + +## Configuration + +### Environment Variables + +- `DATA_DIRECTORY`: Path to the directory containing default documents to load on first run (default: `./documents`) +- `SKIP_FIRST_RUN_INIT`: Set to `true` to disable automatic first-run initialization (default: `false`) + +### Example .env Configuration + +```bash +# Default dataset directory for automatic ingestion on first run +DATA_DIRECTORY=./documents + +# Skip first-run initialization (set to true to disable automatic dataset loading) +# SKIP_FIRST_RUN_INIT=false +``` + +## How It Works + +1. **First-Run Detection**: The system checks for a `.openrag_initialized` marker file in the application root +2. **Document Detection**: If no marker file exists and there are no existing documents in the OpenSearch index, first-run initialization triggers +3. **File Discovery**: The system scans the `DATA_DIRECTORY` for supported document types (PDF, TXT, DOC, DOCX, MD, RTF, ODT) +4. **Automatic Ingestion**: Found files are automatically processed and ingested into the OpenSearch index using the existing upload workflow +5. **Initialization Marker**: After successful setup, a marker file is created to prevent re-initialization on subsequent startups + +## Docker Configuration + +The `DATA_DIRECTORY` environment variable is automatically passed to the Docker containers. The default `./documents` directory is already mounted as a volume in the Docker configuration. + +### Docker Compose + +Both `docker-compose.yml` and `docker-compose-cpu.yml` have been updated to include: + +```yaml +environment: + - DATA_DIRECTORY=${DATA_DIRECTORY} +volumes: + - ./documents:/app/documents:Z +``` + +## File Structure + +``` +openrag/ +├── documents/ # Default dataset directory +│ ├── sample1.pdf +│ ├── sample2.txt +│ └── ... +├── .openrag_initialized # Created after first successful initialization +└── src/ + └── utils/ + ├── first_run.py # First-run detection logic + └── default_ingestion.py # Dataset ingestion logic +``` + +## Supported File Types + +The first-run initialization supports the following document types: +- PDF (.pdf) +- Plain text (.txt) +- Microsoft Word (.doc, .docx) +- Markdown (.md) +- Rich Text Format (.rtf) +- OpenDocument Text (.odt) + +## Behavior + +### Normal First Run +1. Application starts +2. OpenSearch index is initialized +3. System checks for existing documents +4. If none found, scans `DATA_DIRECTORY` +5. Creates background task to process found documents +6. Creates `.openrag_initialized` marker file +7. Documents are processed asynchronously in the background + +### Subsequent Runs +1. Application starts +2. System detects `.openrag_initialized` marker file +3. First-run initialization is skipped +4. Application starts normally + +### Skipping Initialization +Set `SKIP_FIRST_RUN_INIT=true` in your environment to disable first-run initialization entirely. + +## Monitoring + +First-run initialization creates a background task that can be monitored through: +- Console logs with `[FIRST_RUN]` prefix +- Task API endpoints (for system tasks) + +## Troubleshooting + +### No Documents Were Loaded +1. Check that `DATA_DIRECTORY` points to a valid directory +2. Verify the directory contains supported file types +3. Check console logs for `[FIRST_RUN]` messages +4. Ensure OpenSearch is running and accessible + +### Disable First-Run Setup +If you want to prevent automatic initialization: +1. Set `SKIP_FIRST_RUN_INIT=true` in your .env file +2. Or create an empty `.openrag_initialized` file manually + +### Force Re-initialization +To force first-run setup to run again: +1. Stop the application +2. Delete the `.openrag_initialized` file +3. Optionally clear the OpenSearch index +4. Restart the application \ No newline at end of file diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index 6f27042a..35607075 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -66,6 +66,7 @@ services: - WEBHOOK_BASE_URL=${WEBHOOK_BASE_URL} - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - DATA_DIRECTORY=${DATA_DIRECTORY} volumes: - ./documents:/app/documents:Z - ./keys:/app/keys:Z diff --git a/docker-compose.yml b/docker-compose.yml index 8e2fdee2..f2c88a29 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -66,6 +66,7 @@ services: - WEBHOOK_BASE_URL=${WEBHOOK_BASE_URL} - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - DATA_DIRECTORY=${DATA_DIRECTORY} volumes: - ./documents:/app/documents:Z - ./keys:/app/keys:Z diff --git a/src/api/tasks.py b/src/api/tasks.py index 0d07837b..9d38d3cd 100644 --- a/src/api/tasks.py +++ b/src/api/tasks.py @@ -6,7 +6,7 @@ async def task_status(request: Request, task_service, session_manager): task_id = request.path_params.get("task_id") user = request.state.user - task_status_result = task_service.get_task_status(user.user_id, task_id) + task_status_result = await task_service.get_task_status(task_id, user.user_id) if not task_status_result: return JSONResponse({"error": "Task not found"}, status_code=404) diff --git a/src/config/settings.py b/src/config/settings.py index 3a42fa1c..206491e0 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -30,6 +30,9 @@ SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-producti GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID") GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") +# Default dataset directory for first-run initialization +DATA_DIRECTORY = os.getenv("DATA_DIRECTORY", "./documents") + def is_no_auth_mode(): """Check if we're running in no-auth mode (OAuth credentials missing)""" result = not (GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET) diff --git a/src/main.py b/src/main.py index 5f0aeedc..52b924c5 100644 --- a/src/main.py +++ b/src/main.py @@ -153,6 +153,33 @@ async def init_index_when_ready(): except Exception as e: print(f"OpenSearch index initialization failed: {e}") print("OIDC endpoints will still work, but document operations may fail until OpenSearch is ready") + +async def run_first_time_initialization(services): + """Run first-time initialization if needed""" + try: + from utils.default_ingestion import run_first_time_setup + await run_first_time_setup(services['document_service'], services['task_service']) + except Exception as e: + print(f"[ERROR] First-time initialization failed: {e}") + +async def run_first_time_initialization_when_ready(services): + """Run first-time initialization after OpenSearch index is ready""" + # Wait for OpenSearch to be initialized first + max_retries = 30 + retry_delay = 2 + + for attempt in range(max_retries): + try: + await clients.opensearch.info() + # Index is ready, now run first-time initialization + await run_first_time_initialization(services) + return + except Exception as e: + print(f"[FIRST_RUN] Waiting for OpenSearch (attempt {attempt + 1}/{max_retries}): {e}") + if attempt < max_retries - 1: + await asyncio.sleep(retry_delay) + else: + print("[FIRST_RUN] Failed to wait for OpenSearch, skipping first-time initialization") async def initialize_services(): @@ -472,6 +499,8 @@ async def create_app(): async def startup_event(): # Start index initialization in background to avoid blocking OIDC endpoints asyncio.create_task(init_index_when_ready()) + # Start first-time initialization in background after index is ready + asyncio.create_task(run_first_time_initialization_when_ready(services)) # Add shutdown event handler @app.on_event("shutdown") diff --git a/src/services/task_service.py b/src/services/task_service.py index 8fa1ed2b..0f151c22 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -145,14 +145,25 @@ class TaskService: self.task_store[user_id][task_id].status = TaskStatus.FAILED self.task_store[user_id][task_id].updated_at = time.time() - def get_task_status(self, user_id: str, task_id: str) -> dict: + async def get_task_status(self, task_id: str, user_id: str = None) -> dict: """Get the status of a specific upload task""" - if (not task_id or - user_id not in self.task_store or - task_id not in self.task_store[user_id]): + if not task_id: return None - upload_task = self.task_store[user_id][task_id] + upload_task = None + + # If user_id is provided, look in that user's tasks + if user_id and user_id in self.task_store and task_id in self.task_store[user_id]: + upload_task = self.task_store[user_id][task_id] + # If user_id not provided or task not found, search all users + elif user_id is None: + for uid, user_tasks in self.task_store.items(): + if task_id in user_tasks: + upload_task = user_tasks[task_id] + break + + if upload_task is None: + return None file_statuses = {} for file_path, file_task in upload_task.file_tasks.items(): diff --git a/src/utils/default_ingestion.py b/src/utils/default_ingestion.py new file mode 100644 index 00000000..f147c4b3 --- /dev/null +++ b/src/utils/default_ingestion.py @@ -0,0 +1,104 @@ +import asyncio +import os +from pathlib import Path +from typing import List +from utils.first_run import get_default_dataset_files, has_existing_documents, is_first_run, mark_initialized, should_run_initialization +from services.task_service import TaskService +from services.document_service import DocumentService + +class DefaultDatasetIngestor: + """Handles automatic ingestion of default datasets on first run""" + + def __init__(self, document_service: DocumentService, task_service: TaskService): + self.document_service = document_service + self.task_service = task_service + + async def should_ingest(self) -> bool: + """Check if we should perform default dataset ingestion""" + if not should_run_initialization(): + print("[FIRST_RUN] Initialization skipped due to SKIP_FIRST_RUN_INIT") + return False + + first_run = await is_first_run() + existing_docs = await has_existing_documents() + + # Only ingest if it's first run AND there are no existing documents + should_ingest = first_run and not existing_docs + + print(f"[FIRST_RUN] First run: {first_run}, Existing docs: {existing_docs}, Should ingest: {should_ingest}") + return should_ingest + + async def ingest_default_dataset(self) -> bool: + """Ingest the default dataset files""" + try: + files = await get_default_dataset_files() + + if not files: + print("[FIRST_RUN] No default dataset files found to ingest") + await mark_initialized() + return True + + print(f"[FIRST_RUN] Found {len(files)} files to ingest from default dataset") + + # Create a system task for ingesting default files + # Use a dummy user ID for system operations + system_user_id = "system" + task_id = await self.task_service.create_upload_task( + user_id=system_user_id, + file_paths=files, + jwt_token=None, # No JWT needed for system operations + owner_name="System", + owner_email="system@openrag.local" + ) + + print(f"[FIRST_RUN] Created task {task_id} for default dataset ingestion") + + # Wait a bit for the task to start processing + await asyncio.sleep(2) + + # Monitor task progress (but don't block indefinitely) + max_wait_time = 300 # 5 minutes max + check_interval = 5 # Check every 5 seconds + elapsed_time = 0 + + while elapsed_time < max_wait_time: + try: + task_status = await self.task_service.get_task_status(task_id) + if task_status and task_status.get("status") in ["completed", "failed"]: + break + + await asyncio.sleep(check_interval) + elapsed_time += check_interval + + # Log progress every 30 seconds + if elapsed_time % 30 == 0: + processed = task_status.get("processed", 0) if task_status else 0 + total = len(files) + print(f"[FIRST_RUN] Task {task_id} progress: {processed}/{total} files processed") + + except Exception as e: + print(f"[FIRST_RUN] Error checking task status: {e}") + break + + # Mark as initialized regardless of task completion + # The task will continue running in the background if needed + await mark_initialized() + print("[FIRST_RUN] Default dataset ingestion initiated successfully") + return True + + except Exception as e: + print(f"[FIRST_RUN] Error during default dataset ingestion: {e}") + # Still mark as initialized to prevent retrying on every startup + await mark_initialized() + return False + +async def run_first_time_setup(document_service: DocumentService, task_service: TaskService) -> bool: + """Run first-time setup if needed""" + ingestor = DefaultDatasetIngestor(document_service, task_service) + + if await ingestor.should_ingest(): + print("[FIRST_RUN] Starting first-time default dataset ingestion...") + return await ingestor.ingest_default_dataset() + else: + print("[FIRST_RUN] Skipping first-time setup") + return True \ No newline at end of file diff --git a/src/utils/first_run.py b/src/utils/first_run.py new file mode 100644 index 00000000..38a2c5a2 --- /dev/null +++ b/src/utils/first_run.py @@ -0,0 +1,55 @@ +import os +import asyncio +from pathlib import Path +from config.settings import DATA_DIRECTORY, clients, INDEX_NAME + +FIRST_RUN_MARKER = ".openrag_initialized" + +async def is_first_run(): + """Check if this is the first run by looking for initialization marker""" + marker_path = Path(FIRST_RUN_MARKER) + return not marker_path.exists() + +async def mark_initialized(): + """Create marker file to indicate successful initialization""" + marker_path = Path(FIRST_RUN_MARKER) + marker_path.write_text("initialized") + +async def has_existing_documents(): + """Check if there are already documents in the OpenSearch index""" + try: + response = await clients.opensearch.search( + index=INDEX_NAME, + body={"size": 0, "track_total_hits": True} + ) + total_docs = response["hits"]["total"]["value"] + return total_docs > 0 + except Exception as e: + print(f"Error checking existing documents: {e}") + return False + +async def get_default_dataset_files(): + """Get list of files in the default dataset directory""" + data_dir = Path(DATA_DIRECTORY) + if not data_dir.exists() or not data_dir.is_dir(): + print(f"Default dataset directory {DATA_DIRECTORY} does not exist or is not a directory") + return [] + + # Get all files recursively, excluding hidden files and directories + files = [] + for file_path in data_dir.rglob("*"): + if file_path.is_file() and not file_path.name.startswith("."): + # Filter for document types (pdf, txt, doc, docx, etc.) + if file_path.suffix.lower() in ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt']: + files.append(str(file_path.absolute())) + + return files + +def should_run_initialization(): + """Determine if we should run first-run initialization""" + # Check for environment variable to skip initialization + skip_init = os.getenv("SKIP_FIRST_RUN_INIT", "false").lower() in ["true", "1", "yes"] + if skip_init: + return False + + return True \ No newline at end of file