Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
Edwin Jose
c8909a71d0 draft changes 2025-09-03 10:28:50 -04:00
Edwin Jose
ad3f1e28b9 Dataset loading on start. 2025-09-02 15:13:21 -04:00
12 changed files with 416 additions and 6 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

148
FIRST_RUN_SETUP.md Normal file
View file

@ -0,0 +1,148 @@
# OpenRAG First-Run Setup
This document describes the automatic dataset loading feature that initializes OpenRAG with default documents on first startup.
## Overview
OpenRAG now includes a first-run initialization system that automatically loads documents from a default dataset directory when the application starts for the first time.
## Configuration
### Environment Variables
- `DATA_DIRECTORY`: Path to the directory containing default documents to load on first run (default: `./documents`)
- `SKIP_FIRST_RUN_INIT`: Set to `true` to disable automatic first-run initialization (default: `false`)
### External Dataset Loading
You can point `DATA_DIRECTORY` to any external location containing your default datasets. The system will:
- Copy files from the external directory to `./documents` to ensure Docker volume access
- Maintain directory structure during copying
- Only copy newer files (based on modification time)
- Skip files that already exist and are up-to-date
Example with external directory:
```bash
DATA_DIRECTORY=/path/to/my/external/datasets
```
This allows you to maintain your datasets outside the OpenRAG project while still leveraging the automatic loading feature.
### Example .env Configuration
```bash
# Default dataset directory for automatic ingestion on first run
DATA_DIRECTORY=./documents
# Skip first-run initialization (set to true to disable automatic dataset loading)
# SKIP_FIRST_RUN_INIT=false
```
## How It Works
1. **First-Run Detection**: The system checks for a `.openrag_initialized` marker file in the application root
2. **Document Detection**: If no marker file exists and there are no existing documents in the OpenSearch index, first-run initialization triggers
3. **File Copying**: If `DATA_DIRECTORY` points to a different location than `./documents`, files are copied to the documents folder to ensure Docker volume access
4. **File Discovery**: The system scans the documents folder for supported document types (PDF, TXT, DOC, DOCX, MD, RTF, ODT)
5. **Existing Workflow Reuse**: Found files are processed using the same `create_upload_task` method as the manual "Upload Path" feature
6. **Document Ownership**: In no-auth mode, documents owned by anonymous user; in auth mode, documents created without owner (globally accessible)
7. **Initialization Marker**: After successful setup, a marker file is created to prevent re-initialization on subsequent startups
## Docker Configuration
The `DATA_DIRECTORY` environment variable is automatically passed to the Docker containers. The default `./documents` directory is already mounted as a volume in the Docker configuration.
### Docker Compose
Both `docker-compose.yml` and `docker-compose-cpu.yml` have been updated to include:
```yaml
environment:
- DATA_DIRECTORY=${DATA_DIRECTORY}
volumes:
- ./documents:/app/documents:Z
```
## File Structure
```
openrag/
├── documents/ # Default dataset directory
│ ├── sample1.pdf
│ ├── sample2.txt
│ └── ...
├── .openrag_initialized # Created after first successful initialization
└── src/
└── utils/
├── first_run.py # First-run detection logic
└── default_ingestion.py # Dataset ingestion logic
```
## Supported File Types
The first-run initialization supports the following document types:
- PDF (.pdf)
- Plain text (.txt)
- Microsoft Word (.doc, .docx)
- Markdown (.md)
- Rich Text Format (.rtf)
- OpenDocument Text (.odt)
## Behavior
### Normal First Run
1. Application starts
2. OpenSearch index is initialized
3. System checks for existing documents
4. If none found, copies files from `DATA_DIRECTORY` to `./documents` (if different)
5. Scans documents folder for supported files
6. Creates upload task using existing `create_upload_task` method (same as manual "Upload Path")
7. Documents are processed through complete knowledge pipeline (conversion, chunking, embedding, indexing)
8. Creates `.openrag_initialized` marker file
9. Processing continues asynchronously in the background
### Subsequent Runs
1. Application starts
2. System detects `.openrag_initialized` marker file
3. First-run initialization is skipped
4. Application starts normally
### Skipping Initialization
Set `SKIP_FIRST_RUN_INIT=true` in your environment to disable first-run initialization entirely.
## Monitoring
First-run initialization creates a background task that can be monitored through:
- Console logs with `[FIRST_RUN]` prefix
- Task API endpoints (for system tasks)
## Troubleshooting
### No Documents Were Loaded
1. Check that `DATA_DIRECTORY` points to a valid directory
2. Verify the directory contains supported file types
3. Check console logs for `[FIRST_RUN]` messages
4. Ensure OpenSearch is running and accessible
### Disable First-Run Setup
If you want to prevent automatic initialization:
1. Set `SKIP_FIRST_RUN_INIT=true` in your .env file
2. Or create an empty `.openrag_initialized` file manually
### Files Not Visible in Knowledge List
If first-run files don't appear in the knowledge interface:
**For No-Auth Mode:**
- Files should be owned by "anonymous" user and visible immediately
**For Auth Mode:**
- Files are created without owner field, making them globally accessible
- All authenticated users should see these files in their knowledge list
- Check OpenSearch DLS configuration in `securityconfig/roles.yml`
### Force Re-initialization
To force first-run setup to run again:
1. Stop the application
2. Delete the `.openrag_initialized` file
3. Optionally clear the OpenSearch index
4. Restart the application

View file

@ -66,6 +66,7 @@ services:
- WEBHOOK_BASE_URL=${WEBHOOK_BASE_URL} - WEBHOOK_BASE_URL=${WEBHOOK_BASE_URL}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- DATA_DIRECTORY=${DATA_DIRECTORY}
volumes: volumes:
- ./documents:/app/documents:Z - ./documents:/app/documents:Z
- ./keys:/app/keys:Z - ./keys:/app/keys:Z

View file

@ -66,6 +66,7 @@ services:
- WEBHOOK_BASE_URL=${WEBHOOK_BASE_URL} - WEBHOOK_BASE_URL=${WEBHOOK_BASE_URL}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- DATA_DIRECTORY=${DATA_DIRECTORY}
volumes: volumes:
- ./documents:/app/documents:Z - ./documents:/app/documents:Z
- ./keys:/app/keys:Z - ./keys:/app/keys:Z

Binary file not shown.

View file

@ -6,7 +6,7 @@ async def task_status(request: Request, task_service, session_manager):
task_id = request.path_params.get("task_id") task_id = request.path_params.get("task_id")
user = request.state.user user = request.state.user
task_status_result = task_service.get_task_status(user.user_id, task_id) task_status_result = await task_service.get_task_status(task_id, user.user_id)
if not task_status_result: if not task_status_result:
return JSONResponse({"error": "Task not found"}, status_code=404) return JSONResponse({"error": "Task not found"}, status_code=404)

View file

@ -30,6 +30,9 @@ SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-producti
GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID") GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID")
GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET")
# Default dataset directory for first-run initialization
DATA_DIRECTORY = os.getenv("DATA_DIRECTORY", "./documents")
def is_no_auth_mode(): def is_no_auth_mode():
"""Check if we're running in no-auth mode (OAuth credentials missing)""" """Check if we're running in no-auth mode (OAuth credentials missing)"""
result = not (GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET) result = not (GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET)

View file

@ -154,6 +154,33 @@ async def init_index_when_ready():
print(f"OpenSearch index initialization failed: {e}") print(f"OpenSearch index initialization failed: {e}")
print("OIDC endpoints will still work, but document operations may fail until OpenSearch is ready") print("OIDC endpoints will still work, but document operations may fail until OpenSearch is ready")
async def run_first_time_initialization(services):
"""Run first-time initialization if needed"""
try:
from utils.default_ingestion import run_first_time_setup
await run_first_time_setup(services['task_service'])
except Exception as e:
print(f"[ERROR] First-time initialization failed: {e}")
async def run_first_time_initialization_when_ready(services):
"""Run first-time initialization after OpenSearch index is ready"""
# Wait for OpenSearch to be initialized first
max_retries = 30
retry_delay = 2
for attempt in range(max_retries):
try:
await clients.opensearch.info()
# Index is ready, now run first-time initialization
await run_first_time_initialization(services)
return
except Exception as e:
print(f"[FIRST_RUN] Waiting for OpenSearch (attempt {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
else:
print("[FIRST_RUN] Failed to wait for OpenSearch, skipping first-time initialization")
async def initialize_services(): async def initialize_services():
"""Initialize all services and their dependencies""" """Initialize all services and their dependencies"""
@ -472,6 +499,8 @@ async def create_app():
async def startup_event(): async def startup_event():
# Start index initialization in background to avoid blocking OIDC endpoints # Start index initialization in background to avoid blocking OIDC endpoints
asyncio.create_task(init_index_when_ready()) asyncio.create_task(init_index_when_ready())
# Start first-time initialization in background after index is ready
asyncio.create_task(run_first_time_initialization_when_ready(services))
# Add shutdown event handler # Add shutdown event handler
@app.on_event("shutdown") @app.on_event("shutdown")

View file

@ -41,6 +41,7 @@ class DocumentFileProcessor(TaskProcessor):
) )
class ConnectorFileProcessor(TaskProcessor): class ConnectorFileProcessor(TaskProcessor):
"""Processor for connector file uploads""" """Processor for connector file uploads"""

View file

@ -145,14 +145,25 @@ class TaskService:
self.task_store[user_id][task_id].status = TaskStatus.FAILED self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time() self.task_store[user_id][task_id].updated_at = time.time()
def get_task_status(self, user_id: str, task_id: str) -> dict: async def get_task_status(self, task_id: str, user_id: str = None) -> dict:
"""Get the status of a specific upload task""" """Get the status of a specific upload task"""
if (not task_id or if not task_id:
user_id not in self.task_store or
task_id not in self.task_store[user_id]):
return None return None
upload_task = None
# If user_id is provided, look in that user's tasks
if user_id and user_id in self.task_store and task_id in self.task_store[user_id]:
upload_task = self.task_store[user_id][task_id] upload_task = self.task_store[user_id][task_id]
# If user_id not provided or task not found, search all users
elif user_id is None:
for uid, user_tasks in self.task_store.items():
if task_id in user_tasks:
upload_task = user_tasks[task_id]
break
if upload_task is None:
return None
file_statuses = {} file_statuses = {}
for file_path, file_task in upload_task.file_tasks.items(): for file_path, file_task in upload_task.file_tasks.items():

View file

@ -0,0 +1,108 @@
import os
from pathlib import Path
from config.settings import is_no_auth_mode
from utils.first_run import has_existing_documents, is_first_run, mark_initialized, should_run_initialization, copy_external_files_to_documents
from services.task_service import TaskService
def create_system_user_context():
"""Create appropriate system user context for first-run operations"""
from session_manager import User
# Create user context for task tracking (not for document ownership)
system_user = User(
user_id="anonymous" if is_no_auth_mode() else "global",
email="anonymous@localhost" if is_no_auth_mode() else "system@openrag.local",
name="Anonymous User" if is_no_auth_mode() else "Default Dataset"
)
jwt_token = None
return system_user, jwt_token
async def run_first_time_setup(task_service: TaskService) -> bool:
"""Run first-time setup if needed using existing upload workflow"""
try:
if not should_run_initialization():
print("[FIRST_RUN] Initialization skipped due to SKIP_FIRST_RUN_INIT")
return True
first_run = await is_first_run()
existing_docs = await has_existing_documents()
# Only ingest if it's first run AND there are no existing documents
should_ingest = first_run and not existing_docs
print(f"[FIRST_RUN] First run: {first_run}, Existing docs: {existing_docs}, Should ingest: {should_ingest}")
if not should_ingest:
print("[FIRST_RUN] Skipping first-time setup")
return True
# Copy external files to documents folder if needed
copied_files = await copy_external_files_to_documents()
if copied_files:
print(f"[FIRST_RUN] Successfully copied {len(copied_files)} files from external directory")
# Get documents directory
documents_dir = Path("./documents")
if not documents_dir.exists() or not documents_dir.is_dir():
print("[FIRST_RUN] Documents directory does not exist")
await mark_initialized()
return True
# Get all supported files (same logic as upload_path)
file_paths = []
supported_extensions = ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt']
for root, _, files in os.walk(documents_dir):
for fn in files:
if not fn.startswith("."): # Skip hidden files
file_path = Path(root) / fn
if file_path.suffix.lower() in supported_extensions:
file_paths.append(str(file_path))
if not file_paths:
print("[FIRST_RUN] No supported files found in documents directory")
await mark_initialized()
return True
print(f"[FIRST_RUN] Found {len(file_paths)} files to ingest from documents directory")
# Create system user context
system_user, jwt_token = create_system_user_context()
# Use existing create_upload_task - same as upload_path API
if is_no_auth_mode():
# In no-auth mode, use anonymous user as normal
task_id = await task_service.create_upload_task(
user_id=system_user.user_id,
file_paths=file_paths,
jwt_token=jwt_token,
owner_name=system_user.name,
owner_email=system_user.email
)
else:
# In auth mode, we need to create a custom processor that passes None for owner_user_id
# This creates documents without owner field, making them globally accessible
from models.processors import DocumentFileProcessor
processor = DocumentFileProcessor(
task_service.document_service,
owner_user_id=None, # This is the key - no owner means globally accessible
jwt_token=jwt_token,
owner_name=None, # No name either
owner_email=None # No email either
)
task_id = await task_service.create_custom_task("global", file_paths, processor)
print(f"[FIRST_RUN] Created upload task {task_id} for default dataset ingestion")
# Mark as initialized - the task will continue in background
await mark_initialized()
print("[FIRST_RUN] Default dataset ingestion initiated successfully")
return True
except Exception as e:
print(f"[FIRST_RUN] Error during first-time setup: {e}")
# Still mark as initialized to prevent retrying on every startup
await mark_initialized()
return False

108
src/utils/first_run.py Normal file
View file

@ -0,0 +1,108 @@
import os
import asyncio
import shutil
from pathlib import Path
from config.settings import DATA_DIRECTORY, clients, INDEX_NAME
FIRST_RUN_MARKER = ".openrag_initialized"
async def is_first_run():
"""Check if this is the first run by looking for initialization marker"""
marker_path = Path(FIRST_RUN_MARKER)
return not marker_path.exists()
async def mark_initialized():
"""Create marker file to indicate successful initialization"""
marker_path = Path(FIRST_RUN_MARKER)
marker_path.write_text("initialized")
async def has_existing_documents():
"""Check if there are already documents in the OpenSearch index"""
try:
response = await clients.opensearch.search(
index=INDEX_NAME,
body={"size": 0, "track_total_hits": True}
)
total_docs = response["hits"]["total"]["value"]
return total_docs > 0
except Exception as e:
print(f"Error checking existing documents: {e}")
return False
async def copy_external_files_to_documents():
"""Copy files from DATA_DIRECTORY to documents folder if they're different locations"""
data_dir = Path(DATA_DIRECTORY)
documents_dir = Path("./documents")
# Create documents directory if it doesn't exist
documents_dir.mkdir(exist_ok=True)
# If DATA_DIRECTORY is the same as documents, no need to copy
if data_dir.resolve() == documents_dir.resolve():
print(f"[FIRST_RUN] DATA_DIRECTORY ({DATA_DIRECTORY}) is the same as documents folder, no copying needed")
return []
if not data_dir.exists() or not data_dir.is_dir():
print(f"[FIRST_RUN] External dataset directory {DATA_DIRECTORY} does not exist or is not a directory")
return []
print(f"[FIRST_RUN] Copying files from {DATA_DIRECTORY} to documents folder...")
copied_files = []
supported_extensions = ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt']
# Get all files recursively from the external directory
for file_path in data_dir.rglob("*"):
if file_path.is_file() and not file_path.name.startswith("."):
# Filter for document types
if file_path.suffix.lower() in supported_extensions:
# Create relative path to maintain directory structure
relative_path = file_path.relative_to(data_dir)
dest_path = documents_dir / relative_path
# Create destination directory if it doesn't exist
dest_path.parent.mkdir(parents=True, exist_ok=True)
# Only copy if file doesn't exist or is different
if not dest_path.exists() or file_path.stat().st_mtime > dest_path.stat().st_mtime:
try:
shutil.copy2(file_path, dest_path)
copied_files.append(str(dest_path.absolute()))
print(f"[FIRST_RUN] Copied: {file_path} -> {dest_path}")
except Exception as e:
print(f"[FIRST_RUN] Failed to copy {file_path}: {e}")
else:
# File already exists and is up to date, but include it in the list
copied_files.append(str(dest_path.absolute()))
print(f"[FIRST_RUN] Copied {len(copied_files)} files to documents folder")
return copied_files
async def get_default_dataset_files():
"""Get list of files in the documents directory (after copying if needed)"""
documents_dir = Path("./documents")
if not documents_dir.exists() or not documents_dir.is_dir():
print(f"[FIRST_RUN] Documents directory does not exist")
return []
# Get all files recursively, excluding hidden files and directories
files = []
supported_extensions = ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt']
for file_path in documents_dir.rglob("*"):
if file_path.is_file() and not file_path.name.startswith("."):
# Filter for document types
if file_path.suffix.lower() in supported_extensions:
files.append(str(file_path.absolute()))
return files
def should_run_initialization():
"""Determine if we should run first-run initialization"""
# Check for environment variable to skip initialization
skip_init = os.getenv("SKIP_FIRST_RUN_INIT", "false").lower() in ["true", "1", "yes"]
if skip_init:
return False
return True