diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..8ca5528e Binary files /dev/null and b/.DS_Store differ diff --git a/FIRST_RUN_SETUP.md b/FIRST_RUN_SETUP.md index 92d71b88..3f3f8ef9 100644 --- a/FIRST_RUN_SETUP.md +++ b/FIRST_RUN_SETUP.md @@ -13,6 +13,21 @@ OpenRAG now includes a first-run initialization system that automatically loads - `DATA_DIRECTORY`: Path to the directory containing default documents to load on first run (default: `./documents`) - `SKIP_FIRST_RUN_INIT`: Set to `true` to disable automatic first-run initialization (default: `false`) +### External Dataset Loading + +You can point `DATA_DIRECTORY` to any external location containing your default datasets. The system will: +- Copy files from the external directory to `./documents` to ensure Docker volume access +- Maintain directory structure during copying +- Only copy newer files (based on modification time) +- Skip files that already exist and are up-to-date + +Example with external directory: +```bash +DATA_DIRECTORY=/path/to/my/external/datasets +``` + +This allows you to maintain your datasets outside the OpenRAG project while still leveraging the automatic loading feature. + ### Example .env Configuration ```bash @@ -27,9 +42,11 @@ DATA_DIRECTORY=./documents 1. **First-Run Detection**: The system checks for a `.openrag_initialized` marker file in the application root 2. **Document Detection**: If no marker file exists and there are no existing documents in the OpenSearch index, first-run initialization triggers -3. **File Discovery**: The system scans the `DATA_DIRECTORY` for supported document types (PDF, TXT, DOC, DOCX, MD, RTF, ODT) -4. **Automatic Ingestion**: Found files are automatically processed and ingested into the OpenSearch index using the existing upload workflow -5. **Initialization Marker**: After successful setup, a marker file is created to prevent re-initialization on subsequent startups +3. **File Copying**: If `DATA_DIRECTORY` points to a different location than `./documents`, files are copied to the documents folder to ensure Docker volume access +4. **File Discovery**: The system scans the documents folder for supported document types (PDF, TXT, DOC, DOCX, MD, RTF, ODT) +5. **Existing Workflow Reuse**: Found files are processed using the same `create_upload_task` method as the manual "Upload Path" feature +6. **Document Ownership**: In no-auth mode, documents owned by anonymous user; in auth mode, documents created without owner (globally accessible) +7. **Initialization Marker**: After successful setup, a marker file is created to prevent re-initialization on subsequent startups ## Docker Configuration @@ -77,10 +94,12 @@ The first-run initialization supports the following document types: 1. Application starts 2. OpenSearch index is initialized 3. System checks for existing documents -4. If none found, scans `DATA_DIRECTORY` -5. Creates background task to process found documents -6. Creates `.openrag_initialized` marker file -7. Documents are processed asynchronously in the background +4. If none found, copies files from `DATA_DIRECTORY` to `./documents` (if different) +5. Scans documents folder for supported files +6. Creates upload task using existing `create_upload_task` method (same as manual "Upload Path") +7. Documents are processed through complete knowledge pipeline (conversion, chunking, embedding, indexing) +8. Creates `.openrag_initialized` marker file +9. Processing continues asynchronously in the background ### Subsequent Runs 1. Application starts @@ -110,6 +129,17 @@ If you want to prevent automatic initialization: 1. Set `SKIP_FIRST_RUN_INIT=true` in your .env file 2. Or create an empty `.openrag_initialized` file manually +### Files Not Visible in Knowledge List +If first-run files don't appear in the knowledge interface: + +**For No-Auth Mode:** +- Files should be owned by "anonymous" user and visible immediately + +**For Auth Mode:** +- Files are created without owner field, making them globally accessible +- All authenticated users should see these files in their knowledge list +- Check OpenSearch DLS configuration in `securityconfig/roles.yml` + ### Force Re-initialization To force first-run setup to run again: 1. Stop the application diff --git a/documents/ai-human-resources.pdf b/documents/ai-human-resources.pdf new file mode 100644 index 00000000..5e36eab4 Binary files /dev/null and b/documents/ai-human-resources.pdf differ diff --git a/src/main.py b/src/main.py index 52b924c5..895db300 100644 --- a/src/main.py +++ b/src/main.py @@ -158,7 +158,7 @@ async def run_first_time_initialization(services): """Run first-time initialization if needed""" try: from utils.default_ingestion import run_first_time_setup - await run_first_time_setup(services['document_service'], services['task_service']) + await run_first_time_setup(services['task_service']) except Exception as e: print(f"[ERROR] First-time initialization failed: {e}") diff --git a/src/models/processors.py b/src/models/processors.py index dd79b994..984f2c2f 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -41,6 +41,7 @@ class DocumentFileProcessor(TaskProcessor): ) + class ConnectorFileProcessor(TaskProcessor): """Processor for connector file uploads""" diff --git a/src/utils/default_ingestion.py b/src/utils/default_ingestion.py index f147c4b3..d6dd7bef 100644 --- a/src/utils/default_ingestion.py +++ b/src/utils/default_ingestion.py @@ -1,104 +1,108 @@ -import asyncio import os from pathlib import Path -from typing import List -from utils.first_run import get_default_dataset_files, has_existing_documents, is_first_run, mark_initialized, should_run_initialization +from config.settings import is_no_auth_mode +from utils.first_run import has_existing_documents, is_first_run, mark_initialized, should_run_initialization, copy_external_files_to_documents from services.task_service import TaskService -from services.document_service import DocumentService -class DefaultDatasetIngestor: - """Handles automatic ingestion of default datasets on first run""" + +def create_system_user_context(): + """Create appropriate system user context for first-run operations""" + from session_manager import User - def __init__(self, document_service: DocumentService, task_service: TaskService): - self.document_service = document_service - self.task_service = task_service + # Create user context for task tracking (not for document ownership) + system_user = User( + user_id="anonymous" if is_no_auth_mode() else "global", + email="anonymous@localhost" if is_no_auth_mode() else "system@openrag.local", + name="Anonymous User" if is_no_auth_mode() else "Default Dataset" + ) + jwt_token = None - async def should_ingest(self) -> bool: - """Check if we should perform default dataset ingestion""" + return system_user, jwt_token + + +async def run_first_time_setup(task_service: TaskService) -> bool: + """Run first-time setup if needed using existing upload workflow""" + try: if not should_run_initialization(): print("[FIRST_RUN] Initialization skipped due to SKIP_FIRST_RUN_INIT") - return False + return True first_run = await is_first_run() existing_docs = await has_existing_documents() # Only ingest if it's first run AND there are no existing documents should_ingest = first_run and not existing_docs - print(f"[FIRST_RUN] First run: {first_run}, Existing docs: {existing_docs}, Should ingest: {should_ingest}") - return should_ingest - - async def ingest_default_dataset(self) -> bool: - """Ingest the default dataset files""" - try: - files = await get_default_dataset_files() - - if not files: - print("[FIRST_RUN] No default dataset files found to ingest") - await mark_initialized() - return True - - print(f"[FIRST_RUN] Found {len(files)} files to ingest from default dataset") - - # Create a system task for ingesting default files - # Use a dummy user ID for system operations - system_user_id = "system" - task_id = await self.task_service.create_upload_task( - user_id=system_user_id, - file_paths=files, - jwt_token=None, # No JWT needed for system operations - owner_name="System", - owner_email="system@openrag.local" - ) - - print(f"[FIRST_RUN] Created task {task_id} for default dataset ingestion") - - # Wait a bit for the task to start processing - await asyncio.sleep(2) - - # Monitor task progress (but don't block indefinitely) - max_wait_time = 300 # 5 minutes max - check_interval = 5 # Check every 5 seconds - elapsed_time = 0 - - while elapsed_time < max_wait_time: - try: - task_status = await self.task_service.get_task_status(task_id) - if task_status and task_status.get("status") in ["completed", "failed"]: - break - - await asyncio.sleep(check_interval) - elapsed_time += check_interval - - # Log progress every 30 seconds - if elapsed_time % 30 == 0: - processed = task_status.get("processed", 0) if task_status else 0 - total = len(files) - print(f"[FIRST_RUN] Task {task_id} progress: {processed}/{total} files processed") - - except Exception as e: - print(f"[FIRST_RUN] Error checking task status: {e}") - break - - # Mark as initialized regardless of task completion - # The task will continue running in the background if needed - await mark_initialized() - print("[FIRST_RUN] Default dataset ingestion initiated successfully") + + if not should_ingest: + print("[FIRST_RUN] Skipping first-time setup") return True - - except Exception as e: - print(f"[FIRST_RUN] Error during default dataset ingestion: {e}") - # Still mark as initialized to prevent retrying on every startup + + # Copy external files to documents folder if needed + copied_files = await copy_external_files_to_documents() + if copied_files: + print(f"[FIRST_RUN] Successfully copied {len(copied_files)} files from external directory") + + # Get documents directory + documents_dir = Path("./documents") + if not documents_dir.exists() or not documents_dir.is_dir(): + print("[FIRST_RUN] Documents directory does not exist") await mark_initialized() - return False - -async def run_first_time_setup(document_service: DocumentService, task_service: TaskService) -> bool: - """Run first-time setup if needed""" - ingestor = DefaultDatasetIngestor(document_service, task_service) - - if await ingestor.should_ingest(): - print("[FIRST_RUN] Starting first-time default dataset ingestion...") - return await ingestor.ingest_default_dataset() - else: - print("[FIRST_RUN] Skipping first-time setup") - return True \ No newline at end of file + return True + + # Get all supported files (same logic as upload_path) + file_paths = [] + supported_extensions = ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt'] + + for root, _, files in os.walk(documents_dir): + for fn in files: + if not fn.startswith("."): # Skip hidden files + file_path = Path(root) / fn + if file_path.suffix.lower() in supported_extensions: + file_paths.append(str(file_path)) + + if not file_paths: + print("[FIRST_RUN] No supported files found in documents directory") + await mark_initialized() + return True + + print(f"[FIRST_RUN] Found {len(file_paths)} files to ingest from documents directory") + + # Create system user context + system_user, jwt_token = create_system_user_context() + + # Use existing create_upload_task - same as upload_path API + if is_no_auth_mode(): + # In no-auth mode, use anonymous user as normal + task_id = await task_service.create_upload_task( + user_id=system_user.user_id, + file_paths=file_paths, + jwt_token=jwt_token, + owner_name=system_user.name, + owner_email=system_user.email + ) + else: + # In auth mode, we need to create a custom processor that passes None for owner_user_id + # This creates documents without owner field, making them globally accessible + from models.processors import DocumentFileProcessor + processor = DocumentFileProcessor( + task_service.document_service, + owner_user_id=None, # This is the key - no owner means globally accessible + jwt_token=jwt_token, + owner_name=None, # No name either + owner_email=None # No email either + ) + task_id = await task_service.create_custom_task("global", file_paths, processor) + + print(f"[FIRST_RUN] Created upload task {task_id} for default dataset ingestion") + + # Mark as initialized - the task will continue in background + await mark_initialized() + print("[FIRST_RUN] Default dataset ingestion initiated successfully") + return True + + except Exception as e: + print(f"[FIRST_RUN] Error during first-time setup: {e}") + # Still mark as initialized to prevent retrying on every startup + await mark_initialized() + return False \ No newline at end of file diff --git a/src/utils/first_run.py b/src/utils/first_run.py index 38a2c5a2..09a7d8aa 100644 --- a/src/utils/first_run.py +++ b/src/utils/first_run.py @@ -1,5 +1,6 @@ import os import asyncio +import shutil from pathlib import Path from config.settings import DATA_DIRECTORY, clients, INDEX_NAME @@ -28,19 +29,71 @@ async def has_existing_documents(): print(f"Error checking existing documents: {e}") return False -async def get_default_dataset_files(): - """Get list of files in the default dataset directory""" +async def copy_external_files_to_documents(): + """Copy files from DATA_DIRECTORY to documents folder if they're different locations""" data_dir = Path(DATA_DIRECTORY) + documents_dir = Path("./documents") + + # Create documents directory if it doesn't exist + documents_dir.mkdir(exist_ok=True) + + # If DATA_DIRECTORY is the same as documents, no need to copy + if data_dir.resolve() == documents_dir.resolve(): + print(f"[FIRST_RUN] DATA_DIRECTORY ({DATA_DIRECTORY}) is the same as documents folder, no copying needed") + return [] + if not data_dir.exists() or not data_dir.is_dir(): - print(f"Default dataset directory {DATA_DIRECTORY} does not exist or is not a directory") + print(f"[FIRST_RUN] External dataset directory {DATA_DIRECTORY} does not exist or is not a directory") + return [] + + print(f"[FIRST_RUN] Copying files from {DATA_DIRECTORY} to documents folder...") + + copied_files = [] + supported_extensions = ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt'] + + # Get all files recursively from the external directory + for file_path in data_dir.rglob("*"): + if file_path.is_file() and not file_path.name.startswith("."): + # Filter for document types + if file_path.suffix.lower() in supported_extensions: + # Create relative path to maintain directory structure + relative_path = file_path.relative_to(data_dir) + dest_path = documents_dir / relative_path + + # Create destination directory if it doesn't exist + dest_path.parent.mkdir(parents=True, exist_ok=True) + + # Only copy if file doesn't exist or is different + if not dest_path.exists() or file_path.stat().st_mtime > dest_path.stat().st_mtime: + try: + shutil.copy2(file_path, dest_path) + copied_files.append(str(dest_path.absolute())) + print(f"[FIRST_RUN] Copied: {file_path} -> {dest_path}") + except Exception as e: + print(f"[FIRST_RUN] Failed to copy {file_path}: {e}") + else: + # File already exists and is up to date, but include it in the list + copied_files.append(str(dest_path.absolute())) + + print(f"[FIRST_RUN] Copied {len(copied_files)} files to documents folder") + return copied_files + +async def get_default_dataset_files(): + """Get list of files in the documents directory (after copying if needed)""" + documents_dir = Path("./documents") + + if not documents_dir.exists() or not documents_dir.is_dir(): + print(f"[FIRST_RUN] Documents directory does not exist") return [] # Get all files recursively, excluding hidden files and directories files = [] - for file_path in data_dir.rglob("*"): + supported_extensions = ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt'] + + for file_path in documents_dir.rglob("*"): if file_path.is_file() and not file_path.name.startswith("."): - # Filter for document types (pdf, txt, doc, docx, etc.) - if file_path.suffix.lower() in ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt']: + # Filter for document types + if file_path.suffix.lower() in supported_extensions: files.append(str(file_path.absolute())) return files