draft changes
This commit is contained in:
parent
ad3f1e28b9
commit
c8909a71d0
7 changed files with 189 additions and 101 deletions
BIN
.DS_Store
vendored
Normal file
BIN
.DS_Store
vendored
Normal file
Binary file not shown.
|
|
@ -13,6 +13,21 @@ OpenRAG now includes a first-run initialization system that automatically loads
|
|||
- `DATA_DIRECTORY`: Path to the directory containing default documents to load on first run (default: `./documents`)
|
||||
- `SKIP_FIRST_RUN_INIT`: Set to `true` to disable automatic first-run initialization (default: `false`)
|
||||
|
||||
### External Dataset Loading
|
||||
|
||||
You can point `DATA_DIRECTORY` to any external location containing your default datasets. The system will:
|
||||
- Copy files from the external directory to `./documents` to ensure Docker volume access
|
||||
- Maintain directory structure during copying
|
||||
- Only copy newer files (based on modification time)
|
||||
- Skip files that already exist and are up-to-date
|
||||
|
||||
Example with external directory:
|
||||
```bash
|
||||
DATA_DIRECTORY=/path/to/my/external/datasets
|
||||
```
|
||||
|
||||
This allows you to maintain your datasets outside the OpenRAG project while still leveraging the automatic loading feature.
|
||||
|
||||
### Example .env Configuration
|
||||
|
||||
```bash
|
||||
|
|
@ -27,9 +42,11 @@ DATA_DIRECTORY=./documents
|
|||
|
||||
1. **First-Run Detection**: The system checks for a `.openrag_initialized` marker file in the application root
|
||||
2. **Document Detection**: If no marker file exists and there are no existing documents in the OpenSearch index, first-run initialization triggers
|
||||
3. **File Discovery**: The system scans the `DATA_DIRECTORY` for supported document types (PDF, TXT, DOC, DOCX, MD, RTF, ODT)
|
||||
4. **Automatic Ingestion**: Found files are automatically processed and ingested into the OpenSearch index using the existing upload workflow
|
||||
5. **Initialization Marker**: After successful setup, a marker file is created to prevent re-initialization on subsequent startups
|
||||
3. **File Copying**: If `DATA_DIRECTORY` points to a different location than `./documents`, files are copied to the documents folder to ensure Docker volume access
|
||||
4. **File Discovery**: The system scans the documents folder for supported document types (PDF, TXT, DOC, DOCX, MD, RTF, ODT)
|
||||
5. **Existing Workflow Reuse**: Found files are processed using the same `create_upload_task` method as the manual "Upload Path" feature
|
||||
6. **Document Ownership**: In no-auth mode, documents owned by anonymous user; in auth mode, documents created without owner (globally accessible)
|
||||
7. **Initialization Marker**: After successful setup, a marker file is created to prevent re-initialization on subsequent startups
|
||||
|
||||
## Docker Configuration
|
||||
|
||||
|
|
@ -77,10 +94,12 @@ The first-run initialization supports the following document types:
|
|||
1. Application starts
|
||||
2. OpenSearch index is initialized
|
||||
3. System checks for existing documents
|
||||
4. If none found, scans `DATA_DIRECTORY`
|
||||
5. Creates background task to process found documents
|
||||
6. Creates `.openrag_initialized` marker file
|
||||
7. Documents are processed asynchronously in the background
|
||||
4. If none found, copies files from `DATA_DIRECTORY` to `./documents` (if different)
|
||||
5. Scans documents folder for supported files
|
||||
6. Creates upload task using existing `create_upload_task` method (same as manual "Upload Path")
|
||||
7. Documents are processed through complete knowledge pipeline (conversion, chunking, embedding, indexing)
|
||||
8. Creates `.openrag_initialized` marker file
|
||||
9. Processing continues asynchronously in the background
|
||||
|
||||
### Subsequent Runs
|
||||
1. Application starts
|
||||
|
|
@ -110,6 +129,17 @@ If you want to prevent automatic initialization:
|
|||
1. Set `SKIP_FIRST_RUN_INIT=true` in your .env file
|
||||
2. Or create an empty `.openrag_initialized` file manually
|
||||
|
||||
### Files Not Visible in Knowledge List
|
||||
If first-run files don't appear in the knowledge interface:
|
||||
|
||||
**For No-Auth Mode:**
|
||||
- Files should be owned by "anonymous" user and visible immediately
|
||||
|
||||
**For Auth Mode:**
|
||||
- Files are created without owner field, making them globally accessible
|
||||
- All authenticated users should see these files in their knowledge list
|
||||
- Check OpenSearch DLS configuration in `securityconfig/roles.yml`
|
||||
|
||||
### Force Re-initialization
|
||||
To force first-run setup to run again:
|
||||
1. Stop the application
|
||||
|
|
|
|||
BIN
documents/ai-human-resources.pdf
Normal file
BIN
documents/ai-human-resources.pdf
Normal file
Binary file not shown.
|
|
@ -158,7 +158,7 @@ async def run_first_time_initialization(services):
|
|||
"""Run first-time initialization if needed"""
|
||||
try:
|
||||
from utils.default_ingestion import run_first_time_setup
|
||||
await run_first_time_setup(services['document_service'], services['task_service'])
|
||||
await run_first_time_setup(services['task_service'])
|
||||
except Exception as e:
|
||||
print(f"[ERROR] First-time initialization failed: {e}")
|
||||
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ class DocumentFileProcessor(TaskProcessor):
|
|||
)
|
||||
|
||||
|
||||
|
||||
class ConnectorFileProcessor(TaskProcessor):
|
||||
"""Processor for connector file uploads"""
|
||||
|
||||
|
|
|
|||
|
|
@ -1,104 +1,108 @@
|
|||
import asyncio
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from utils.first_run import get_default_dataset_files, has_existing_documents, is_first_run, mark_initialized, should_run_initialization
|
||||
from config.settings import is_no_auth_mode
|
||||
from utils.first_run import has_existing_documents, is_first_run, mark_initialized, should_run_initialization, copy_external_files_to_documents
|
||||
from services.task_service import TaskService
|
||||
from services.document_service import DocumentService
|
||||
|
||||
class DefaultDatasetIngestor:
|
||||
"""Handles automatic ingestion of default datasets on first run"""
|
||||
|
||||
def create_system_user_context():
|
||||
"""Create appropriate system user context for first-run operations"""
|
||||
from session_manager import User
|
||||
|
||||
def __init__(self, document_service: DocumentService, task_service: TaskService):
|
||||
self.document_service = document_service
|
||||
self.task_service = task_service
|
||||
# Create user context for task tracking (not for document ownership)
|
||||
system_user = User(
|
||||
user_id="anonymous" if is_no_auth_mode() else "global",
|
||||
email="anonymous@localhost" if is_no_auth_mode() else "system@openrag.local",
|
||||
name="Anonymous User" if is_no_auth_mode() else "Default Dataset"
|
||||
)
|
||||
jwt_token = None
|
||||
|
||||
async def should_ingest(self) -> bool:
|
||||
"""Check if we should perform default dataset ingestion"""
|
||||
return system_user, jwt_token
|
||||
|
||||
|
||||
async def run_first_time_setup(task_service: TaskService) -> bool:
|
||||
"""Run first-time setup if needed using existing upload workflow"""
|
||||
try:
|
||||
if not should_run_initialization():
|
||||
print("[FIRST_RUN] Initialization skipped due to SKIP_FIRST_RUN_INIT")
|
||||
return False
|
||||
return True
|
||||
|
||||
first_run = await is_first_run()
|
||||
existing_docs = await has_existing_documents()
|
||||
|
||||
# Only ingest if it's first run AND there are no existing documents
|
||||
should_ingest = first_run and not existing_docs
|
||||
|
||||
print(f"[FIRST_RUN] First run: {first_run}, Existing docs: {existing_docs}, Should ingest: {should_ingest}")
|
||||
return should_ingest
|
||||
|
||||
async def ingest_default_dataset(self) -> bool:
|
||||
"""Ingest the default dataset files"""
|
||||
try:
|
||||
files = await get_default_dataset_files()
|
||||
|
||||
if not files:
|
||||
print("[FIRST_RUN] No default dataset files found to ingest")
|
||||
await mark_initialized()
|
||||
return True
|
||||
|
||||
print(f"[FIRST_RUN] Found {len(files)} files to ingest from default dataset")
|
||||
|
||||
# Create a system task for ingesting default files
|
||||
# Use a dummy user ID for system operations
|
||||
system_user_id = "system"
|
||||
task_id = await self.task_service.create_upload_task(
|
||||
user_id=system_user_id,
|
||||
file_paths=files,
|
||||
jwt_token=None, # No JWT needed for system operations
|
||||
owner_name="System",
|
||||
owner_email="system@openrag.local"
|
||||
)
|
||||
|
||||
print(f"[FIRST_RUN] Created task {task_id} for default dataset ingestion")
|
||||
|
||||
# Wait a bit for the task to start processing
|
||||
await asyncio.sleep(2)
|
||||
|
||||
# Monitor task progress (but don't block indefinitely)
|
||||
max_wait_time = 300 # 5 minutes max
|
||||
check_interval = 5 # Check every 5 seconds
|
||||
elapsed_time = 0
|
||||
|
||||
while elapsed_time < max_wait_time:
|
||||
try:
|
||||
task_status = await self.task_service.get_task_status(task_id)
|
||||
if task_status and task_status.get("status") in ["completed", "failed"]:
|
||||
break
|
||||
|
||||
await asyncio.sleep(check_interval)
|
||||
elapsed_time += check_interval
|
||||
|
||||
# Log progress every 30 seconds
|
||||
if elapsed_time % 30 == 0:
|
||||
processed = task_status.get("processed", 0) if task_status else 0
|
||||
total = len(files)
|
||||
print(f"[FIRST_RUN] Task {task_id} progress: {processed}/{total} files processed")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[FIRST_RUN] Error checking task status: {e}")
|
||||
break
|
||||
|
||||
# Mark as initialized regardless of task completion
|
||||
# The task will continue running in the background if needed
|
||||
await mark_initialized()
|
||||
print("[FIRST_RUN] Default dataset ingestion initiated successfully")
|
||||
|
||||
if not should_ingest:
|
||||
print("[FIRST_RUN] Skipping first-time setup")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"[FIRST_RUN] Error during default dataset ingestion: {e}")
|
||||
# Still mark as initialized to prevent retrying on every startup
|
||||
|
||||
# Copy external files to documents folder if needed
|
||||
copied_files = await copy_external_files_to_documents()
|
||||
if copied_files:
|
||||
print(f"[FIRST_RUN] Successfully copied {len(copied_files)} files from external directory")
|
||||
|
||||
# Get documents directory
|
||||
documents_dir = Path("./documents")
|
||||
if not documents_dir.exists() or not documents_dir.is_dir():
|
||||
print("[FIRST_RUN] Documents directory does not exist")
|
||||
await mark_initialized()
|
||||
return False
|
||||
|
||||
async def run_first_time_setup(document_service: DocumentService, task_service: TaskService) -> bool:
|
||||
"""Run first-time setup if needed"""
|
||||
ingestor = DefaultDatasetIngestor(document_service, task_service)
|
||||
|
||||
if await ingestor.should_ingest():
|
||||
print("[FIRST_RUN] Starting first-time default dataset ingestion...")
|
||||
return await ingestor.ingest_default_dataset()
|
||||
else:
|
||||
print("[FIRST_RUN] Skipping first-time setup")
|
||||
return True
|
||||
return True
|
||||
|
||||
# Get all supported files (same logic as upload_path)
|
||||
file_paths = []
|
||||
supported_extensions = ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt']
|
||||
|
||||
for root, _, files in os.walk(documents_dir):
|
||||
for fn in files:
|
||||
if not fn.startswith("."): # Skip hidden files
|
||||
file_path = Path(root) / fn
|
||||
if file_path.suffix.lower() in supported_extensions:
|
||||
file_paths.append(str(file_path))
|
||||
|
||||
if not file_paths:
|
||||
print("[FIRST_RUN] No supported files found in documents directory")
|
||||
await mark_initialized()
|
||||
return True
|
||||
|
||||
print(f"[FIRST_RUN] Found {len(file_paths)} files to ingest from documents directory")
|
||||
|
||||
# Create system user context
|
||||
system_user, jwt_token = create_system_user_context()
|
||||
|
||||
# Use existing create_upload_task - same as upload_path API
|
||||
if is_no_auth_mode():
|
||||
# In no-auth mode, use anonymous user as normal
|
||||
task_id = await task_service.create_upload_task(
|
||||
user_id=system_user.user_id,
|
||||
file_paths=file_paths,
|
||||
jwt_token=jwt_token,
|
||||
owner_name=system_user.name,
|
||||
owner_email=system_user.email
|
||||
)
|
||||
else:
|
||||
# In auth mode, we need to create a custom processor that passes None for owner_user_id
|
||||
# This creates documents without owner field, making them globally accessible
|
||||
from models.processors import DocumentFileProcessor
|
||||
processor = DocumentFileProcessor(
|
||||
task_service.document_service,
|
||||
owner_user_id=None, # This is the key - no owner means globally accessible
|
||||
jwt_token=jwt_token,
|
||||
owner_name=None, # No name either
|
||||
owner_email=None # No email either
|
||||
)
|
||||
task_id = await task_service.create_custom_task("global", file_paths, processor)
|
||||
|
||||
print(f"[FIRST_RUN] Created upload task {task_id} for default dataset ingestion")
|
||||
|
||||
# Mark as initialized - the task will continue in background
|
||||
await mark_initialized()
|
||||
print("[FIRST_RUN] Default dataset ingestion initiated successfully")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"[FIRST_RUN] Error during first-time setup: {e}")
|
||||
# Still mark as initialized to prevent retrying on every startup
|
||||
await mark_initialized()
|
||||
return False
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
import os
|
||||
import asyncio
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from config.settings import DATA_DIRECTORY, clients, INDEX_NAME
|
||||
|
||||
|
|
@ -28,19 +29,71 @@ async def has_existing_documents():
|
|||
print(f"Error checking existing documents: {e}")
|
||||
return False
|
||||
|
||||
async def get_default_dataset_files():
|
||||
"""Get list of files in the default dataset directory"""
|
||||
async def copy_external_files_to_documents():
|
||||
"""Copy files from DATA_DIRECTORY to documents folder if they're different locations"""
|
||||
data_dir = Path(DATA_DIRECTORY)
|
||||
documents_dir = Path("./documents")
|
||||
|
||||
# Create documents directory if it doesn't exist
|
||||
documents_dir.mkdir(exist_ok=True)
|
||||
|
||||
# If DATA_DIRECTORY is the same as documents, no need to copy
|
||||
if data_dir.resolve() == documents_dir.resolve():
|
||||
print(f"[FIRST_RUN] DATA_DIRECTORY ({DATA_DIRECTORY}) is the same as documents folder, no copying needed")
|
||||
return []
|
||||
|
||||
if not data_dir.exists() or not data_dir.is_dir():
|
||||
print(f"Default dataset directory {DATA_DIRECTORY} does not exist or is not a directory")
|
||||
print(f"[FIRST_RUN] External dataset directory {DATA_DIRECTORY} does not exist or is not a directory")
|
||||
return []
|
||||
|
||||
print(f"[FIRST_RUN] Copying files from {DATA_DIRECTORY} to documents folder...")
|
||||
|
||||
copied_files = []
|
||||
supported_extensions = ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt']
|
||||
|
||||
# Get all files recursively from the external directory
|
||||
for file_path in data_dir.rglob("*"):
|
||||
if file_path.is_file() and not file_path.name.startswith("."):
|
||||
# Filter for document types
|
||||
if file_path.suffix.lower() in supported_extensions:
|
||||
# Create relative path to maintain directory structure
|
||||
relative_path = file_path.relative_to(data_dir)
|
||||
dest_path = documents_dir / relative_path
|
||||
|
||||
# Create destination directory if it doesn't exist
|
||||
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Only copy if file doesn't exist or is different
|
||||
if not dest_path.exists() or file_path.stat().st_mtime > dest_path.stat().st_mtime:
|
||||
try:
|
||||
shutil.copy2(file_path, dest_path)
|
||||
copied_files.append(str(dest_path.absolute()))
|
||||
print(f"[FIRST_RUN] Copied: {file_path} -> {dest_path}")
|
||||
except Exception as e:
|
||||
print(f"[FIRST_RUN] Failed to copy {file_path}: {e}")
|
||||
else:
|
||||
# File already exists and is up to date, but include it in the list
|
||||
copied_files.append(str(dest_path.absolute()))
|
||||
|
||||
print(f"[FIRST_RUN] Copied {len(copied_files)} files to documents folder")
|
||||
return copied_files
|
||||
|
||||
async def get_default_dataset_files():
|
||||
"""Get list of files in the documents directory (after copying if needed)"""
|
||||
documents_dir = Path("./documents")
|
||||
|
||||
if not documents_dir.exists() or not documents_dir.is_dir():
|
||||
print(f"[FIRST_RUN] Documents directory does not exist")
|
||||
return []
|
||||
|
||||
# Get all files recursively, excluding hidden files and directories
|
||||
files = []
|
||||
for file_path in data_dir.rglob("*"):
|
||||
supported_extensions = ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt']
|
||||
|
||||
for file_path in documents_dir.rglob("*"):
|
||||
if file_path.is_file() and not file_path.name.startswith("."):
|
||||
# Filter for document types (pdf, txt, doc, docx, etc.)
|
||||
if file_path.suffix.lower() in ['.pdf', '.txt', '.doc', '.docx', '.md', '.rtf', '.odt']:
|
||||
# Filter for document types
|
||||
if file_path.suffix.lower() in supported_extensions:
|
||||
files.append(str(file_path.absolute()))
|
||||
|
||||
return files
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue