diff --git a/.gitignore b/.gitignore
index 4f22035a..8bf471e7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,3 +18,5 @@ wheels/
1001*.pdf
*.json
.DS_Store
+
+config.yaml
diff --git a/config.yaml b/config.yaml
deleted file mode 100644
index cd2e929b..00000000
--- a/config.yaml
+++ /dev/null
@@ -1,33 +0,0 @@
-# OpenRAG Configuration File
-# This file allows you to configure OpenRAG settings.
-# Environment variables will override these settings unless edited is true.
-
-# Track if this config has been manually edited (prevents env var overrides)
-edited: false
-
-# Model provider configuration
-provider:
- # Supported providers: "openai", "anthropic", "azure", etc.
- model_provider: "openai"
- # API key for the model provider (can also be set via OPENAI_API_KEY env var)
- api_key: ""
-
-# Knowledge base and document processing configuration
-knowledge:
- # Embedding model for vector search
- embedding_model: "text-embedding-3-small"
- # Text chunk size for document processing
- chunk_size: 1000
- # Overlap between chunks
- chunk_overlap: 200
- # Docling preset setting
- ocr: false
- picture_descriptions: false
- table_structure: false
-
-# AI agent configuration
-agent:
- # Language model for the chat agent
- llm_model: "gpt-4o-mini"
- # System prompt for the agent
- system_prompt: "You are a helpful AI assistant with access to a knowledge base. Answer questions based on the provided context."
diff --git a/docs/docs/core-components/knowledge.mdx b/docs/docs/core-components/knowledge.mdx
index 255b0b68..2ea5ef9f 100644
--- a/docs/docs/core-components/knowledge.mdx
+++ b/docs/docs/core-components/knowledge.mdx
@@ -38,13 +38,9 @@ The file is loaded into your OpenSearch database, and appears in the Knowledge p
To load and process a directory from the mapped location, click **Add Knowledge**, and then click **Process Folder**.
The files are loaded into your OpenSearch database, and appear in the Knowledge page.
-### Ingest files through OAuth connectors (#oauth-ingestion)
+### Ingest files through OAuth connectors {#oauth-ingestion}
-OpenRAG supports the following enterprise-grade OAuth connectors for seamless document synchronization.
-
-- **Google Drive**
-- **OneDrive**
-- **AWS**
+OpenRAG supports Google Drive, OneDrive, and AWS S3 as OAuth connectors for seamless document synchronization.
OAuth integration allows individual users to connect their personal cloud storage accounts to OpenRAG. Each user must separately authorize OpenRAG to access their own cloud storage files. When a user connects a cloud service, they are redirected to authenticate with that service provider and grant OpenRAG permission to sync documents from their personal cloud storage.
diff --git a/docs/docs/get-started/install.mdx b/docs/docs/get-started/install.mdx
index 67f4ae89..dcb5c5f1 100644
--- a/docs/docs/get-started/install.mdx
+++ b/docs/docs/get-started/install.mdx
@@ -79,8 +79,46 @@ For more information on virtual environments, see [uv](https://docs.astral.sh/uv
Command completed successfully
```
-7. To open the OpenRAG application, click **Open App** or press 6.
-8. Continue with the Quickstart.
+7. To open the OpenRAG application, click **Open App**, press 6, or navigate to `http://localhost:3000`.
+ The application opens.
+8. Select your language model and embedding model provider, and complete the required fields.
+ **Your provider can only be selected once, and you must use the same provider for your language model and embedding model.**
+ The language model can be changed, but the embeddings model cannot be changed.
+ To change your provider selection, you must restart OpenRAG and delete the `config.yml` file.
+
+
+
+ 9. If you already entered a value for `OPENAI_API_KEY` in the TUI in Step 5, enable **Get API key from environment variable**.
+ 10. Under **Advanced settings**, select your **Embedding Model** and **Language Model**.
+ 11. To load 2 sample PDFs, enable **Sample dataset**.
+ This is recommended, but not required.
+ 12. Click **Complete**.
+
+
+
+ 9. Complete the fields for **watsonx.ai API Endpoint**, **IBM API key**, and **IBM Project ID**.
+ These values are found in your IBM watsonx deployment.
+ 10. Under **Advanced settings**, select your **Embedding Model** and **Language Model**.
+ 11. To load 2 sample PDFs, enable **Sample dataset**.
+ This is recommended, but not required.
+ 12. Click **Complete**.
+
+
+
+ 9. Enter your Ollama server's base URL address.
+ The default Ollama server address is `http://localhost:11434`.
+ Since OpenRAG is running in a container, you may need to change `localhost` to access services outside of the container. For example, change `http://localhost:11434` to `http://host.docker.internal:11434` to connect to Ollama.
+ OpenRAG automatically sends a test connection to your Ollama server to confirm connectivity.
+ 10. Select the **Embedding Model** and **Language Model** your Ollama server is running.
+ OpenRAG automatically lists the available models from your Ollama server.
+ 11. To load 2 sample PDFs, enable **Sample dataset**.
+ This is recommended, but not required.
+ 12. Click **Complete**.
+
+
+
+
+13. Continue with the [Quickstart](/quickstart).
### Advanced Setup {#advanced-setup}
diff --git a/docs/docs/get-started/quickstart.mdx b/docs/docs/get-started/quickstart.mdx
index 748a8078..b071529b 100644
--- a/docs/docs/get-started/quickstart.mdx
+++ b/docs/docs/get-started/quickstart.mdx
@@ -11,7 +11,41 @@ Get started with OpenRAG by loading your knowledge, swapping out your language m
## Prerequisites
-- Install and start OpenRAG
+- [Install and start OpenRAG](/install)
+- Create a [Langflow API key](https://docs.langflow.org/api-keys-and-authentication)
+
+ Create a Langflow API key
+
+ A Langflow API key is a user-specific token you can use with Langflow.
+ It is **only** used for sending requests to the Langflow server.
+ It does **not** access to OpenRAG.
+
+ To create a Langflow API key, do the following:
+
+ 1. In Langflow, click your user icon, and then select **Settings**.
+ 2. Click **Langflow API Keys**, and then click **Add New**.
+ 3. Name your key, and then click **Create API Key**.
+ 4. Copy the API key and store it securely.
+ 5. To use your Langflow API key in a request, set a `LANGFLOW_API_KEY` environment variable in your terminal, and then include an `x-api-key` header or query parameter with your request.
+ For example:
+
+ ```bash
+ # Set variable
+ export LANGFLOW_API_KEY="sk..."
+
+ # Send request
+ curl --request POST \
+ --url "http://LANGFLOW_SERVER_ADDRESS/api/v1/run/FLOW_ID" \
+ --header "Content-Type: application/json" \
+ --header "x-api-key: $LANGFLOW_API_KEY" \
+ --data '{
+ "output_type": "chat",
+ "input_type": "chat",
+ "input_value": "Hello"
+ }'
+ ```
+
+
## Find your way around
@@ -20,14 +54,18 @@ Get started with OpenRAG by loading your knowledge, swapping out your language m
For more information, see [Langflow Agents](/agents).
2. Ask `What documents are available to you?`
The agent responds with a message summarizing the documents that OpenRAG loads by default, which are PDFs about evaluating data quality when using LLMs in health care.
+ Knowledge is stored in OpenSearch.
+ For more information, see [Knowledge](/knowledge).
3. To confirm the agent is correct, click **Knowledge**.
- The **Knowledge** page lists the documents OpenRAG has ingested into the OpenSearch vector database. Click on a document to display the chunks derived from splitting the default documents into the vector database.
+ The **Knowledge** page lists the documents OpenRAG has ingested into the OpenSearch vector database.
+ Click on a document to display the chunks derived from splitting the default documents into the vector database.
## Add your own knowledge
1. To add documents to your knowledge base, click **Add Knowledge**.
* Select **Add File** to add a single file from your local machine (mapped with the Docker volume mount).
* Select **Process Folder** to process an entire folder of documents from your local machine (mapped with the Docker volume mount).
+ * Select your cloud storage provider to add knowledge from an OAuth-connected storage provider. For more information, see [OAuth ingestion](/knowledge#oauth-ingestion).
2. Return to the Chat window and ask a question about your loaded data.
For example, with a manual about a PC tablet loaded, ask `How do I connect this device to WiFI?`
The agent responds with a message indicating it now has your knowledge as context for answering questions.
@@ -40,353 +78,289 @@ If you aren't getting the results you need, you can further tune the knowledge i
To modify the knowledge ingestion or Agent behavior, click **Settings**.
In this example, you'll try a different LLM to demonstrate how the Agent's response changes.
+You can only change the **Language model**, and not the **Model provider** that you started with in OpenRAG.
+If you're using Ollama, you can use any installed model.
1. To edit the Agent's behavior, click **Edit in Langflow**.
+You can more quickly access the **Language Model** and **Agent Instructions** fields in this page, but for illustration purposes, navigate to the Langflow visual builder.
2. OpenRAG warns you that you're entering Langflow. Click **Proceed**.
+
3. The OpenRAG OpenSearch Agent flow appears.
+ 
-
-
-4. In the **Language Model** component, under **Model Provider**, select **Anthropic**.
- :::note
- This guide uses an Anthropic model for demonstration purposes. If you want to use a different provider, change the **Model Provider** and **Model Name** fields, and then provide credentials for your selected provider.
- :::
+4. In the **Language Model** component, under **Model**, select a different OpenAI model.
5. Save your flow with Command+S.
6. In OpenRAG, start a new conversation by clicking the in the **Conversations** tab.
7. Ask the same question as before to demonstrate how a different language model changes the results.
## Integrate OpenRAG into your application
-:::tip
-Ensure the `openrag-backend` container has port 8000 exposed in your `docker-compose.yml`:
+To integrate OpenRAG into your application, use the [Langflow API](https://docs.langflow.org/api-reference-api-examples).
+Make requests with Python, TypeScript, or any HTTP client to run one of OpenRAG's default flows and get a response, and then modify the flow further to improve results.
-```yaml
-openrag-backend:
- ports:
- - "8000:8000"
-```
-:::
+Langflow provides code snippets to help you get started with the Langflow API.
-OpenRAG provides a REST API that you can call from Python, TypeScript, or any HTTP client to chat with your documents.
+1. To navigate to the OpenRAG OpenSearch Agent flow, click **Settings**, and then click **Edit in Langflow** in the OpenRAG OpenSearch Agent flow.
+2. Click **Share**, and then click **API access**.
-These example requests are run assuming OpenRAG is in "no-auth" mode.
-For complete API documentation, including authentication, request and response parameters, and example requests, see the API documentation.
+ The default code in the API access pane constructs a request with the Langflow server `url`, `headers`, and a `payload` of request data. The code snippets automatically include the `LANGFLOW_SERVER_ADDRESS` and `FLOW_ID` values for the flow. Replace these values if you're using the code for a different server or flow. The default Langflow server address is http://localhost:7860.
-### Chat with your documents
-
-Prompt OpenRAG at the `/chat` API endpoint.
-
-
-
-
-```python
-import requests
-
-url = "http://localhost:8000/chat"
-payload = {
- "prompt": "What documents are available to you?",
- "previous_response_id": None
-}
-
-response = requests.post(url, json=payload)
-print("OpenRAG Response:", response.json())
-```
-
-
-
-
-```typescript
-import fetch from 'node-fetch';
-
-const response = await fetch("http://localhost:8000/chat", {
- method: "POST",
- headers: { "Content-Type": "application/json" },
- body: JSON.stringify({
- prompt: "What documents are available to you?",
- previous_response_id: null
- })
-});
-
-const data = await response.json();
-console.log("OpenRAG Response:", data);
-```
-
-
-
-
-```bash
-curl -X POST "http://localhost:8000/chat" \
- -H "Content-Type: application/json" \
- -d '{
- "prompt": "What documents are available to you?",
- "previous_response_id": null
- }'
-```
-
-
-
-
-
-Response
-
-```
-{
- "response": "I have access to a wide range of documents depending on the context and the tools enabled in this environment. Specifically, I can search for and retrieve documents related to various topics such as technical papers, articles, manuals, guides, knowledge base entries, and other text-based resources. If you specify a particular subject or type of document you're interested in, I can try to locate relevant materials for you. Let me know what you need!",
- "response_id": "resp_68d3fdbac93081958b8781b97919fe7007f98bd83932fa1a"
-}
-```
-
-
-
-### Search your documents
-
-Search your document knowledge base at the `/search` endpoint.
-
-
-
-
-```python
-import requests
-
-url = "http://localhost:8000/search"
-payload = {"query": "healthcare data quality", "limit": 5}
-
-response = requests.post(url, json=payload)
-results = response.json()
-
-print("Search Results:")
-for result in results.get("results", []):
- print(f"- {result.get('filename')}: {result.get('text', '')[:100]}...")
-```
-
-
-
-
-```typescript
-const response = await fetch("http://localhost:8000/search", {
- method: "POST",
- headers: { "Content-Type": "application/json" },
- body: JSON.stringify({
- query: "healthcare data quality",
- limit: 5
- })
-});
-
-const results = await response.json();
-console.log("Search Results:");
-results.results?.forEach((result, index) => {
- const filename = result.filename || 'Unknown';
- const text = result.text?.substring(0, 100) || '';
- console.log(`${index + 1}. ${filename}: ${text}...`);
-});
-```
-
-
-
-
-```bash
-curl -X POST "http://localhost:8000/search" \
- -H "Content-Type: application/json" \
- -d '{"query": "healthcare data quality", "limit": 5}'
-```
-
-
-
-
-
-
-Example response
-
-```
-Found 5 results
-1. 2506.08231v1.pdf: variables with high performance metrics. These variables might also require fewer replication analys...
-2. 2506.08231v1.pdf: on EHR data and may lack the clinical domain knowledge needed to perform well on the tasks where EHR...
-3. 2506.08231v1.pdf: Abstract Large language models (LLMs) are increasingly used to extract clinical data from electronic...
-4. 2506.08231v1.pdf: these multidimensional assessments, the framework not only quantifies accuracy, but can also be appl...
-5. 2506.08231v1.pdf: observed in only the model metrics, but not the abstractor metrics, it indicates that model errors m...
-```
-
-
-
-### Use chat and search together
-
-Create a complete chat application that combines an interactive terminal chat with session continuity and search functionality.
-
-
-
-
-```python
-import requests
-
-# Configuration
-OPENRAG_BASE_URL = "http://localhost:8000"
-CHAT_URL = f"{OPENRAG_BASE_URL}/chat"
-SEARCH_URL = f"{OPENRAG_BASE_URL}/search"
-DEFAULT_SEARCH_LIMIT = 5
-
-def chat_with_openrag(message, previous_response_id=None):
- try:
- response = requests.post(CHAT_URL, json={
- "prompt": message,
- "previous_response_id": previous_response_id
- })
- response.raise_for_status()
- data = response.json()
- return data.get("response"), data.get("response_id")
- except Exception as e:
- return f"Error: {str(e)}", None
-
-def search_documents(query, limit=DEFAULT_SEARCH_LIMIT):
- try:
- response = requests.post(SEARCH_URL, json={
- "query": query,
- "limit": limit
- })
- response.raise_for_status()
- data = response.json()
- return data.get("results", [])
- except Exception as e:
- return []
-
-# Interactive chat with session continuity and search
-previous_response_id = None
-while True:
- question = input("Your question (or 'search ' to search): ").strip()
- if question.lower() in ['quit', 'exit', 'q']:
- break
- if not question:
- continue
+
+
- if question.lower().startswith('search '):
- query = question[7:].strip()
- print("Searching documents...")
- results = search_documents(query)
- print(f"\nFound {len(results)} results:")
- for i, result in enumerate(results, 1):
- filename = result.get('filename', 'Unknown')
- text = result.get('text', '')[:100]
- print(f"{i}. {filename}: {text}...")
- print()
- else:
- print("OpenRAG is thinking...")
- result, response_id = chat_with_openrag(question, previous_response_id)
- print(f"OpenRAG: {result}\n")
- previous_response_id = response_id
-```
-
-
-
-
-```ts
-import fetch from 'node-fetch';
-
-// Configuration
-const OPENRAG_BASE_URL = "http://localhost:8000";
-const CHAT_URL = `${OPENRAG_BASE_URL}/chat`;
-const SEARCH_URL = `${OPENRAG_BASE_URL}/search`;
-const DEFAULT_SEARCH_LIMIT = 5;
-
-async function chatWithOpenRAG(message: string, previousResponseId?: string | null) {
- try {
- const response = await fetch(CHAT_URL, {
- method: "POST",
- headers: { "Content-Type": "application/json" },
- body: JSON.stringify({
- prompt: message,
- previous_response_id: previousResponseId
- })
- });
- const data = await response.json();
- return [data.response || "No response received", data.response_id || null];
- } catch (error) {
- return [`Error: ${error}`, null];
+ ```python
+ import requests
+ import os
+ import uuid
+
+ api_key = 'LANGFLOW_API_KEY'
+ url = "http://LANGFLOW_SERVER_ADDRESS/api/v1/run/FLOW_ID" # The complete API endpoint URL for this flow
+
+ # Request payload configuration
+ payload = {
+ "output_type": "chat",
+ "input_type": "chat",
+ "input_value": "hello world!"
}
-}
+ payload["session_id"] = str(uuid.uuid4())
+
+ headers = {"x-api-key": api_key}
+
+ try:
+ # Send API request
+ response = requests.request("POST", url, json=payload, headers=headers)
+ response.raise_for_status() # Raise exception for bad status codes
+
+ # Print response
+ print(response.text)
+
+ except requests.exceptions.RequestException as e:
+ print(f"Error making API request: {e}")
+ except ValueError as e:
+ print(f"Error parsing response: {e}")
+ ```
+
+
+
+
+ ```typescript
+ const crypto = require('crypto');
+ const apiKey = 'LANGFLOW_API_KEY';
+ const payload = {
+ "output_type": "chat",
+ "input_type": "chat",
+ "input_value": "hello world!"
+ };
+ payload.session_id = crypto.randomUUID();
+
+ const options = {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ "x-api-key": apiKey
+ },
+ body: JSON.stringify(payload)
+ };
+
+ fetch('http://LANGFLOW_SERVER_ADDRESS/api/v1/run/FLOW_ID', options)
+ .then(response => response.json())
+ .then(response => console.warn(response))
+ .catch(err => console.error(err));
+ ```
+
+
+
+
+ ```bash
+ curl --request POST \
+ --url 'http://LANGFLOW_SERVER_ADDRESS/api/v1/run/FLOW_ID?stream=false' \
+ --header 'Content-Type: application/json' \
+ --header "x-api-key: LANGFLOW_API_KEY" \
+ --data '{
+ "output_type": "chat",
+ "input_type": "chat",
+ "input_value": "hello world!",
+ }'
+ ```
+
+
+
-async function searchDocuments(query: string, limit: number = DEFAULT_SEARCH_LIMIT) {
- try {
- const response = await fetch(SEARCH_URL, {
- method: "POST",
- headers: { "Content-Type": "application/json" },
- body: JSON.stringify({ query, limit })
- });
- const data = await response.json();
- return data.results || [];
- } catch (error) {
- return [];
+3. Copy the snippet, paste it in a script file, and then run the script to send the request. If you are using the curl snippet, you can run the command directly in your terminal.
+
+If the request is successful, the response includes many details about the flow run, including the session ID, inputs, outputs, components, durations, and more.
+The following is an example of a response from running the **Simple Agent** template flow:
+
+
+Result
+
+```json
+{
+ "session_id": "29deb764-af3f-4d7d-94a0-47491ed241d6",
+ "outputs": [
+ {
+ "inputs": {
+ "input_value": "hello world!"
+ },
+ "outputs": [
+ {
+ "results": {
+ "message": {
+ "text_key": "text",
+ "data": {
+ "timestamp": "2025-06-16 19:58:23 UTC",
+ "sender": "Machine",
+ "sender_name": "AI",
+ "session_id": "29deb764-af3f-4d7d-94a0-47491ed241d6",
+ "text": "Hello world! 🌍 How can I assist you today?",
+ "files": [],
+ "error": false,
+ "edit": false,
+ "properties": {
+ "text_color": "",
+ "background_color": "",
+ "edited": false,
+ "source": {
+ "id": "Agent-ZOknz",
+ "display_name": "Agent",
+ "source": "gpt-4o-mini"
+ },
+ "icon": "bot",
+ "allow_markdown": false,
+ "positive_feedback": null,
+ "state": "complete",
+ "targets": []
+ },
+ "category": "message",
+ "content_blocks": [
+ {
+ "title": "Agent Steps",
+ "contents": [
+ {
+ "type": "text",
+ "duration": 2,
+ "header": {
+ "title": "Input",
+ "icon": "MessageSquare"
+ },
+ "text": "**Input**: hello world!"
+ },
+ {
+ "type": "text",
+ "duration": 226,
+ "header": {
+ "title": "Output",
+ "icon": "MessageSquare"
+ },
+ "text": "Hello world! 🌍 How can I assist you today?"
+ }
+ ],
+ "allow_markdown": true,
+ "media_url": null
+ }
+ ],
+ "id": "f3d85d9a-261c-4325-b004-95a1bf5de7ca",
+ "flow_id": "29deb764-af3f-4d7d-94a0-47491ed241d6",
+ "duration": null
+ },
+ "default_value": "",
+ "text": "Hello world! 🌍 How can I assist you today?",
+ "sender": "Machine",
+ "sender_name": "AI",
+ "files": [],
+ "session_id": "29deb764-af3f-4d7d-94a0-47491ed241d6",
+ "timestamp": "2025-06-16T19:58:23+00:00",
+ "flow_id": "29deb764-af3f-4d7d-94a0-47491ed241d6",
+ "error": false,
+ "edit": false,
+ "properties": {
+ "text_color": "",
+ "background_color": "",
+ "edited": false,
+ "source": {
+ "id": "Agent-ZOknz",
+ "display_name": "Agent",
+ "source": "gpt-4o-mini"
+ },
+ "icon": "bot",
+ "allow_markdown": false,
+ "positive_feedback": null,
+ "state": "complete",
+ "targets": []
+ },
+ "category": "message",
+ "content_blocks": [
+ {
+ "title": "Agent Steps",
+ "contents": [
+ {
+ "type": "text",
+ "duration": 2,
+ "header": {
+ "title": "Input",
+ "icon": "MessageSquare"
+ },
+ "text": "**Input**: hello world!"
+ },
+ {
+ "type": "text",
+ "duration": 226,
+ "header": {
+ "title": "Output",
+ "icon": "MessageSquare"
+ },
+ "text": "Hello world! 🌍 How can I assist you today?"
+ }
+ ],
+ "allow_markdown": true,
+ "media_url": null
+ }
+ ],
+ "duration": null
+ }
+ },
+ "artifacts": {
+ "message": "Hello world! 🌍 How can I assist you today?",
+ "sender": "Machine",
+ "sender_name": "AI",
+ "files": [],
+ "type": "object"
+ },
+ "outputs": {
+ "message": {
+ "message": "Hello world! 🌍 How can I assist you today?",
+ "type": "text"
+ }
+ },
+ "logs": {
+ "message": []
+ },
+ "messages": [
+ {
+ "message": "Hello world! 🌍 How can I assist you today?",
+ "sender": "Machine",
+ "sender_name": "AI",
+ "session_id": "29deb764-af3f-4d7d-94a0-47491ed241d6",
+ "stream_url": null,
+ "component_id": "ChatOutput-aF5lw",
+ "files": [],
+ "type": "text"
+ }
+ ],
+ "timedelta": null,
+ "duration": null,
+ "component_display_name": "Chat Output",
+ "component_id": "ChatOutput-aF5lw",
+ "used_frozen_result": false
+ }
+ ]
}
+ ]
}
-
-// Interactive chat with session continuity and search
-let previousResponseId = null;
-const readline = require('readline');
-const rl = readline.createInterface({ input: process.stdin, output: process.stdout });
-
-const askQuestion = () => {
- rl.question("Your question (or 'search ' to search): ", async (question) => {
- if (question.toLowerCase() === 'quit' || question.toLowerCase() === 'exit' || question.toLowerCase() === 'q') {
- console.log("Goodbye!");
- rl.close();
- return;
- }
- if (!question.trim()) {
- askQuestion();
- return;
- }
-
- if (question.toLowerCase().startsWith('search ')) {
- const query = question.substring(7).trim();
- console.log("Searching documents...");
- const results = await searchDocuments(query);
- console.log(`\nFound ${results.length} results:`);
- results.forEach((result, i) => {
- const filename = result.filename || 'Unknown';
- const text = result.text?.substring(0, 100) || '';
- console.log(`${i + 1}. ${filename}: ${text}...`);
- });
- console.log();
- } else {
- console.log("OpenRAG is thinking...");
- const [result, responseId] = await chatWithOpenRAG(question, previousResponseId);
- console.log(`\nOpenRAG: ${result}\n`);
- previousResponseId = responseId;
- }
- askQuestion();
- });
-};
-
-console.log("OpenRAG Chat Interface");
-console.log("Ask questions about your documents. Type 'quit' to exit.");
-console.log("Use 'search ' to search documents directly.\n");
-askQuestion();
```
-
-
-
-
-
-Example response
-
-```
-Your question (or 'search ' to search): search healthcare
-Searching documents...
-
-Found 5 results:
-1. 2506.08231v1.pdf: variables with high performance metrics. These variables might also require fewer replication analys...
-2. 2506.08231v1.pdf: on EHR data and may lack the clinical domain knowledge needed to perform well on the tasks where EHR...
-3. 2506.08231v1.pdf: Abstract Large language models (LLMs) are increasingly used to extract clinical data from electronic...
-4. 2506.08231v1.pdf: Acknowledgements Darren Johnson for support in publication planning and management. The authors used...
-5. 2506.08231v1.pdf: Ensuring Reliability of Curated EHR-Derived Data: The Validation of Accuracy for LLM/ML-Extracted In...
-
-Your question (or 'search ' to search): what's the weather today?
-OpenRAG is thinking...
-OpenRAG: I don't have access to real-time weather data. Could you please provide me with your location? Then I can help you find the weather information.
-
-Your question (or 'search ' to search): newark nj
-OpenRAG is thinking...
-```
-
-## Next steps
-TBD
\ No newline at end of file
+To further explore the API, see:
+
+* The Langflow [Quickstart](https://docs.langflow.org/quickstart#extract-data-from-the-response) extends this example with extracting fields from the response.
+* [Get started with the Langflow API](https://docs.langflow.org/api-reference-api-examples)
\ No newline at end of file
diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py
index a5595813..1c83724d 100644
--- a/src/api/langflow_files.py
+++ b/src/api/langflow_files.py
@@ -231,11 +231,8 @@ async def upload_and_ingest_user_file(
except Exception:
# Clean up temp file on error
- try:
- if os.path.exists(temp_path):
- os.unlink(temp_path)
- except Exception:
- pass # Ignore cleanup errors
+ from utils.file_utils import safe_unlink
+ safe_unlink(temp_path)
raise
except Exception as e:
diff --git a/src/api/router.py b/src/api/router.py
index 154757a5..56789d41 100644
--- a/src/api/router.py
+++ b/src/api/router.py
@@ -164,12 +164,9 @@ async def langflow_upload_ingest_task(
except Exception:
# Clean up temp files on error
+ from utils.file_utils import safe_unlink
for temp_path in temp_file_paths:
- try:
- if os.path.exists(temp_path):
- os.unlink(temp_path)
- except Exception:
- pass # Ignore cleanup errors
+ safe_unlink(temp_path)
raise
except Exception as e:
diff --git a/src/api/tasks.py b/src/api/tasks.py
index de4bf505..92779d09 100644
--- a/src/api/tasks.py
+++ b/src/api/tasks.py
@@ -26,7 +26,7 @@ async def cancel_task(request: Request, task_service, session_manager):
task_id = request.path_params.get("task_id")
user = request.state.user
- success = task_service.cancel_task(user.user_id, task_id)
+ success = await task_service.cancel_task(user.user_id, task_id)
if not success:
return JSONResponse(
{"error": "Task not found or cannot be cancelled"}, status_code=400
diff --git a/src/config/settings.py b/src/config/settings.py
index 3bf1e6cf..517bf2df 100644
--- a/src/config/settings.py
+++ b/src/config/settings.py
@@ -514,6 +514,9 @@ class AppClients:
ssl_assert_fingerprint=None,
headers=headers,
http_compress=True,
+ timeout=30, # 30 second timeout
+ max_retries=3,
+ retry_on_timeout=True,
)
diff --git a/src/connectors/langflow_connector_service.py b/src/connectors/langflow_connector_service.py
index ef68816d..545c6190 100644
--- a/src/connectors/langflow_connector_service.py
+++ b/src/connectors/langflow_connector_service.py
@@ -53,25 +53,27 @@ class LangflowConnectorService:
filename=document.filename,
)
+ from utils.file_utils import auto_cleanup_tempfile
+
suffix = self._get_file_extension(document.mimetype)
# Create temporary file from document content
- with tempfile.NamedTemporaryFile(
- delete=False, suffix=suffix
- ) as tmp_file:
- tmp_file.write(document.content)
- tmp_file.flush()
+ with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
+ # Write document content to temp file
+ with open(tmp_path, 'wb') as f:
+ f.write(document.content)
+ # Step 1: Upload file to Langflow
+ logger.debug("Uploading file to Langflow", filename=document.filename)
+ content = document.content
+ file_tuple = (
+ document.filename.replace(" ", "_").replace("/", "_")+suffix,
+ content,
+ document.mimetype or "application/octet-stream",
+ )
+
+ langflow_file_id = None # Initialize to track if upload succeeded
try:
- # Step 1: Upload file to Langflow
- logger.debug("Uploading file to Langflow", filename=document.filename)
- content = document.content
- file_tuple = (
- document.filename.replace(" ", "_").replace("/", "_")+suffix,
- content,
- document.mimetype or "application/octet-stream",
- )
-
upload_result = await self.langflow_service.upload_user_file(
file_tuple, jwt_token
)
@@ -125,7 +127,7 @@ class LangflowConnectorService:
error=str(e),
)
# Try to clean up Langflow file if upload succeeded but processing failed
- if "langflow_file_id" in locals():
+ if langflow_file_id is not None:
try:
await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug(
@@ -140,10 +142,6 @@ class LangflowConnectorService:
)
raise
- finally:
- # Clean up temporary file
- os.unlink(tmp_file.name)
-
def _get_file_extension(self, mimetype: str) -> str:
"""Get file extension based on MIME type"""
mime_to_ext = {
diff --git a/src/connectors/service.py b/src/connectors/service.py
index 01a41519..792d8d1f 100644
--- a/src/connectors/service.py
+++ b/src/connectors/service.py
@@ -54,52 +54,50 @@ class ConnectorService:
"""Process a document from a connector using existing processing pipeline"""
# Create temporary file from document content
- with tempfile.NamedTemporaryFile(
- delete=False, suffix=self._get_file_extension(document.mimetype)
- ) as tmp_file:
- tmp_file.write(document.content)
- tmp_file.flush()
+ from utils.file_utils import auto_cleanup_tempfile
- try:
- # Use existing process_file_common function with connector document metadata
- # We'll use the document service's process_file_common method
- from services.document_service import DocumentService
+ with auto_cleanup_tempfile(suffix=self._get_file_extension(document.mimetype)) as tmp_path:
+ # Write document content to temp file
+ with open(tmp_path, 'wb') as f:
+ f.write(document.content)
- doc_service = DocumentService(session_manager=self.session_manager)
+ # Use existing process_file_common function with connector document metadata
+ # We'll use the document service's process_file_common method
+ from services.document_service import DocumentService
- logger.debug("Processing connector document", document_id=document.id)
+ doc_service = DocumentService(session_manager=self.session_manager)
- # Process using the existing pipeline but with connector document metadata
- result = await doc_service.process_file_common(
- file_path=tmp_file.name,
- file_hash=document.id, # Use connector document ID as hash
- owner_user_id=owner_user_id,
- original_filename=document.filename, # Pass the original Google Doc title
- jwt_token=jwt_token,
- owner_name=owner_name,
- owner_email=owner_email,
- file_size=len(document.content) if document.content else 0,
- connector_type=connector_type,
+ logger.debug("Processing connector document", document_id=document.id)
+
+ # Process using consolidated processing pipeline
+ from models.processors import TaskProcessor
+ processor = TaskProcessor(document_service=doc_service)
+ result = await processor.process_document_standard(
+ file_path=tmp_path,
+ file_hash=document.id, # Use connector document ID as hash
+ owner_user_id=owner_user_id,
+ original_filename=document.filename, # Pass the original Google Doc title
+ jwt_token=jwt_token,
+ owner_name=owner_name,
+ owner_email=owner_email,
+ file_size=len(document.content) if document.content else 0,
+ connector_type=connector_type,
+ )
+
+ logger.debug("Document processing result", result=result)
+
+ # If successfully indexed or already exists, update the indexed documents with connector metadata
+ if result["status"] in ["indexed", "unchanged"]:
+ # Update all chunks with connector-specific metadata
+ await self._update_connector_metadata(
+ document, owner_user_id, connector_type, jwt_token
)
- logger.debug("Document processing result", result=result)
-
- # If successfully indexed or already exists, update the indexed documents with connector metadata
- if result["status"] in ["indexed", "unchanged"]:
- # Update all chunks with connector-specific metadata
- await self._update_connector_metadata(
- document, owner_user_id, connector_type, jwt_token
- )
-
- return {
- **result,
- "filename": document.filename,
- "source_url": document.source_url,
- }
-
- finally:
- # Clean up temporary file
- os.unlink(tmp_file.name)
+ return {
+ **result,
+ "filename": document.filename,
+ "source_url": document.source_url,
+ }
async def _update_connector_metadata(
self,
diff --git a/src/models/processors.py b/src/models/processors.py
index a817f8d4..ecec9c49 100644
--- a/src/models/processors.py
+++ b/src/models/processors.py
@@ -1,4 +1,3 @@
-from abc import ABC, abstractmethod
from typing import Any
from .tasks import UploadTask, FileTask
from utils.logging_config import get_logger
@@ -6,22 +5,160 @@ from utils.logging_config import get_logger
logger = get_logger(__name__)
-class TaskProcessor(ABC):
- """Abstract base class for task processors"""
+class TaskProcessor:
+ """Base class for task processors with shared processing logic"""
+
+ def __init__(self, document_service=None):
+ self.document_service = document_service
+
+ async def check_document_exists(
+ self,
+ file_hash: str,
+ opensearch_client,
+ ) -> bool:
+ """
+ Check if a document with the given hash already exists in OpenSearch.
+ Consolidated hash checking for all processors.
+ """
+ from config.settings import INDEX_NAME
+ import asyncio
+
+ max_retries = 3
+ retry_delay = 1.0
+
+ for attempt in range(max_retries):
+ try:
+ exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash)
+ return exists
+ except (asyncio.TimeoutError, Exception) as e:
+ if attempt == max_retries - 1:
+ logger.error(
+ "OpenSearch exists check failed after retries",
+ file_hash=file_hash,
+ error=str(e),
+ attempt=attempt + 1
+ )
+ # On final failure, assume document doesn't exist (safer to reprocess than skip)
+ logger.warning(
+ "Assuming document doesn't exist due to connection issues",
+ file_hash=file_hash
+ )
+ return False
+ else:
+ logger.warning(
+ "OpenSearch exists check failed, retrying",
+ file_hash=file_hash,
+ error=str(e),
+ attempt=attempt + 1,
+ retry_in=retry_delay
+ )
+ await asyncio.sleep(retry_delay)
+ retry_delay *= 2 # Exponential backoff
+
+ async def process_document_standard(
+ self,
+ file_path: str,
+ file_hash: str,
+ owner_user_id: str = None,
+ original_filename: str = None,
+ jwt_token: str = None,
+ owner_name: str = None,
+ owner_email: str = None,
+ file_size: int = None,
+ connector_type: str = "local",
+ ):
+ """
+ Standard processing pipeline for non-Langflow processors:
+ docling conversion + embeddings + OpenSearch indexing.
+ """
+ import datetime
+ from config.settings import INDEX_NAME, EMBED_MODEL, clients
+ from services.document_service import chunk_texts_for_embeddings
+ from utils.document_processing import extract_relevant
+
+ # Get user's OpenSearch client with JWT for OIDC auth
+ opensearch_client = self.document_service.session_manager.get_user_opensearch_client(
+ owner_user_id, jwt_token
+ )
+
+ # Check if already exists
+ if await self.check_document_exists(file_hash, opensearch_client):
+ return {"status": "unchanged", "id": file_hash}
+
+ # Convert and extract
+ result = clients.converter.convert(file_path)
+ full_doc = result.document.export_to_dict()
+ slim_doc = extract_relevant(full_doc)
+
+ texts = [c["text"] for c in slim_doc["chunks"]]
+
+ # Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
+ text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
+ embeddings = []
+
+ for batch in text_batches:
+ resp = await clients.patched_async_client.embeddings.create(
+ model=EMBED_MODEL, input=batch
+ )
+ embeddings.extend([d.embedding for d in resp.data])
+
+ # Index each chunk as a separate document
+ for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
+ chunk_doc = {
+ "document_id": file_hash,
+ "filename": original_filename
+ if original_filename
+ else slim_doc["filename"],
+ "mimetype": slim_doc["mimetype"],
+ "page": chunk["page"],
+ "text": chunk["text"],
+ "chunk_embedding": vect,
+ "file_size": file_size,
+ "connector_type": connector_type,
+ "indexed_time": datetime.datetime.now().isoformat(),
+ }
+
+ # Only set owner fields if owner_user_id is provided (for no-auth mode support)
+ if owner_user_id is not None:
+ chunk_doc["owner"] = owner_user_id
+ if owner_name is not None:
+ chunk_doc["owner_name"] = owner_name
+ if owner_email is not None:
+ chunk_doc["owner_email"] = owner_email
+ chunk_id = f"{file_hash}_{i}"
+ try:
+ await opensearch_client.index(
+ index=INDEX_NAME, id=chunk_id, body=chunk_doc
+ )
+ except Exception as e:
+ logger.error(
+ "OpenSearch indexing failed for chunk",
+ chunk_id=chunk_id,
+ error=str(e),
+ )
+ logger.error("Chunk document details", chunk_doc=chunk_doc)
+ raise
+ return {"status": "indexed", "id": file_hash}
- @abstractmethod
async def process_item(
self, upload_task: UploadTask, item: Any, file_task: FileTask
) -> None:
"""
Process a single item in the task.
+ This is a base implementation that should be overridden by subclasses.
+ When TaskProcessor is used directly (not via subclass), this method
+ is not called - only the utility methods like process_document_standard
+ are used.
+
Args:
upload_task: The overall upload task
item: The item to process (could be file path, file info, etc.)
file_task: The specific file task to update
"""
- pass
+ raise NotImplementedError(
+ "process_item should be overridden by subclasses when used in task processing"
+ )
class DocumentFileProcessor(TaskProcessor):
@@ -35,7 +172,7 @@ class DocumentFileProcessor(TaskProcessor):
owner_name: str = None,
owner_email: str = None,
):
- self.document_service = document_service
+ super().__init__(document_service)
self.owner_user_id = owner_user_id
self.jwt_token = jwt_token
self.owner_name = owner_name
@@ -44,16 +181,52 @@ class DocumentFileProcessor(TaskProcessor):
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
- """Process a regular file path using DocumentService"""
- # This calls the existing logic with user context
- await self.document_service.process_single_file_task(
- upload_task,
- item,
- owner_user_id=self.owner_user_id,
- jwt_token=self.jwt_token,
- owner_name=self.owner_name,
- owner_email=self.owner_email,
- )
+ """Process a regular file path using consolidated methods"""
+ from models.tasks import TaskStatus
+ from utils.hash_utils import hash_id
+ import time
+ import os
+
+ file_task.status = TaskStatus.RUNNING
+ file_task.updated_at = time.time()
+
+ try:
+ # Compute hash
+ file_hash = hash_id(item)
+
+ # Get file size
+ try:
+ file_size = os.path.getsize(item)
+ except Exception:
+ file_size = 0
+
+ # Use consolidated standard processing
+ result = await self.process_document_standard(
+ file_path=item,
+ file_hash=file_hash,
+ owner_user_id=self.owner_user_id,
+ original_filename=os.path.basename(item),
+ jwt_token=self.jwt_token,
+ owner_name=self.owner_name,
+ owner_email=self.owner_email,
+ file_size=file_size,
+ connector_type="local",
+ )
+
+ file_task.status = TaskStatus.COMPLETED
+ file_task.result = result
+ file_task.updated_at = time.time()
+ upload_task.successful_files += 1
+
+ except Exception as e:
+ file_task.status = TaskStatus.FAILED
+ file_task.error = str(e)
+ file_task.updated_at = time.time()
+ upload_task.failed_files += 1
+ raise
+ finally:
+ upload_task.processed_files += 1
+ upload_task.updated_at = time.time()
class ConnectorFileProcessor(TaskProcessor):
@@ -69,6 +242,7 @@ class ConnectorFileProcessor(TaskProcessor):
owner_name: str = None,
owner_email: str = None,
):
+ super().__init__()
self.connector_service = connector_service
self.connection_id = connection_id
self.files_to_process = files_to_process
@@ -76,53 +250,79 @@ class ConnectorFileProcessor(TaskProcessor):
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
- # Create lookup map for file info - handle both file objects and file IDs
- self.file_info_map = {}
- for f in files_to_process:
- if isinstance(f, dict):
- # Full file info objects
- self.file_info_map[f["id"]] = f
- else:
- # Just file IDs - will need to fetch metadata during processing
- self.file_info_map[f] = None
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
- """Process a connector file using ConnectorService"""
+ """Process a connector file using consolidated methods"""
from models.tasks import TaskStatus
+ from utils.hash_utils import hash_id
+ import tempfile
+ import time
+ import os
- file_id = item # item is the connector file ID
- self.file_info_map.get(file_id)
+ file_task.status = TaskStatus.RUNNING
+ file_task.updated_at = time.time()
- # Get the connector and connection info
- connector = await self.connector_service.get_connector(self.connection_id)
- connection = await self.connector_service.connection_manager.get_connection(
- self.connection_id
- )
- if not connector or not connection:
- raise ValueError(f"Connection '{self.connection_id}' not found")
+ try:
+ file_id = item # item is the connector file ID
- # Get file content from connector (the connector will fetch metadata if needed)
- document = await connector.get_file_content(file_id)
+ # Get the connector and connection info
+ connector = await self.connector_service.get_connector(self.connection_id)
+ connection = await self.connector_service.connection_manager.get_connection(
+ self.connection_id
+ )
+ if not connector or not connection:
+ raise ValueError(f"Connection '{self.connection_id}' not found")
- # Use the user_id passed during initialization
- if not self.user_id:
- raise ValueError("user_id not provided to ConnectorFileProcessor")
+ # Get file content from connector
+ document = await connector.get_file_content(file_id)
- # Process using existing pipeline
- result = await self.connector_service.process_connector_document(
- document,
- self.user_id,
- connection.connector_type,
- jwt_token=self.jwt_token,
- owner_name=self.owner_name,
- owner_email=self.owner_email,
- )
+ if not self.user_id:
+ raise ValueError("user_id not provided to ConnectorFileProcessor")
- file_task.status = TaskStatus.COMPLETED
- file_task.result = result
- upload_task.successful_files += 1
+ # Create temporary file from document content
+ from utils.file_utils import auto_cleanup_tempfile
+
+ suffix = self.connector_service._get_file_extension(document.mimetype)
+ with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
+ # Write content to temp file
+ with open(tmp_path, 'wb') as f:
+ f.write(document.content)
+
+ # Compute hash
+ file_hash = hash_id(tmp_path)
+
+ # Use consolidated standard processing
+ result = await self.process_document_standard(
+ file_path=tmp_path,
+ file_hash=file_hash,
+ owner_user_id=self.user_id,
+ original_filename=document.filename,
+ jwt_token=self.jwt_token,
+ owner_name=self.owner_name,
+ owner_email=self.owner_email,
+ file_size=len(document.content),
+ connector_type=connection.connector_type,
+ )
+
+ # Add connector-specific metadata
+ result.update({
+ "source_url": document.source_url,
+ "document_id": document.id,
+ })
+
+ file_task.status = TaskStatus.COMPLETED
+ file_task.result = result
+ file_task.updated_at = time.time()
+ upload_task.successful_files += 1
+
+ except Exception as e:
+ file_task.status = TaskStatus.FAILED
+ file_task.error = str(e)
+ file_task.updated_at = time.time()
+ upload_task.failed_files += 1
+ raise
class LangflowConnectorFileProcessor(TaskProcessor):
@@ -138,6 +338,7 @@ class LangflowConnectorFileProcessor(TaskProcessor):
owner_name: str = None,
owner_email: str = None,
):
+ super().__init__()
self.langflow_connector_service = langflow_connector_service
self.connection_id = connection_id
self.files_to_process = files_to_process
@@ -145,57 +346,85 @@ class LangflowConnectorFileProcessor(TaskProcessor):
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
- # Create lookup map for file info - handle both file objects and file IDs
- self.file_info_map = {}
- for f in files_to_process:
- if isinstance(f, dict):
- # Full file info objects
- self.file_info_map[f["id"]] = f
- else:
- # Just file IDs - will need to fetch metadata during processing
- self.file_info_map[f] = None
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a connector file using LangflowConnectorService"""
from models.tasks import TaskStatus
+ from utils.hash_utils import hash_id
+ import tempfile
+ import time
+ import os
- file_id = item # item is the connector file ID
- self.file_info_map.get(file_id)
+ file_task.status = TaskStatus.RUNNING
+ file_task.updated_at = time.time()
- # Get the connector and connection info
- connector = await self.langflow_connector_service.get_connector(
- self.connection_id
- )
- connection = (
- await self.langflow_connector_service.connection_manager.get_connection(
+ try:
+ file_id = item # item is the connector file ID
+
+ # Get the connector and connection info
+ connector = await self.langflow_connector_service.get_connector(
self.connection_id
)
- )
- if not connector or not connection:
- raise ValueError(f"Connection '{self.connection_id}' not found")
+ connection = (
+ await self.langflow_connector_service.connection_manager.get_connection(
+ self.connection_id
+ )
+ )
+ if not connector or not connection:
+ raise ValueError(f"Connection '{self.connection_id}' not found")
- # Get file content from connector (the connector will fetch metadata if needed)
- document = await connector.get_file_content(file_id)
+ # Get file content from connector
+ document = await connector.get_file_content(file_id)
- # Use the user_id passed during initialization
- if not self.user_id:
- raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
+ if not self.user_id:
+ raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
- # Process using Langflow pipeline
- result = await self.langflow_connector_service.process_connector_document(
- document,
- self.user_id,
- connection.connector_type,
- jwt_token=self.jwt_token,
- owner_name=self.owner_name,
- owner_email=self.owner_email,
- )
+ # Create temporary file and compute hash to check for duplicates
+ from utils.file_utils import auto_cleanup_tempfile
- file_task.status = TaskStatus.COMPLETED
- file_task.result = result
- upload_task.successful_files += 1
+ suffix = self.langflow_connector_service._get_file_extension(document.mimetype)
+ with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
+ # Write content to temp file
+ with open(tmp_path, 'wb') as f:
+ f.write(document.content)
+
+ # Compute hash and check if already exists
+ file_hash = hash_id(tmp_path)
+
+ # Check if document already exists
+ opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client(
+ self.user_id, self.jwt_token
+ )
+ if await self.check_document_exists(file_hash, opensearch_client):
+ file_task.status = TaskStatus.COMPLETED
+ file_task.result = {"status": "unchanged", "id": file_hash}
+ file_task.updated_at = time.time()
+ upload_task.successful_files += 1
+ return
+
+ # Process using Langflow pipeline
+ result = await self.langflow_connector_service.process_connector_document(
+ document,
+ self.user_id,
+ connection.connector_type,
+ jwt_token=self.jwt_token,
+ owner_name=self.owner_name,
+ owner_email=self.owner_email,
+ )
+
+ file_task.status = TaskStatus.COMPLETED
+ file_task.result = result
+ file_task.updated_at = time.time()
+ upload_task.successful_files += 1
+
+ except Exception as e:
+ file_task.status = TaskStatus.FAILED
+ file_task.error = str(e)
+ file_task.updated_at = time.time()
+ upload_task.failed_files += 1
+ raise
class S3FileProcessor(TaskProcessor):
@@ -213,7 +442,7 @@ class S3FileProcessor(TaskProcessor):
):
import boto3
- self.document_service = document_service
+ super().__init__(document_service)
self.bucket = bucket
self.s3_client = s3_client or boto3.client("s3")
self.owner_user_id = owner_user_id
@@ -238,34 +467,17 @@ class S3FileProcessor(TaskProcessor):
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
- tmp = tempfile.NamedTemporaryFile(delete=False)
+ from utils.file_utils import auto_cleanup_tempfile
+ from utils.hash_utils import hash_id
+
try:
- # Download object to temporary file
- self.s3_client.download_fileobj(self.bucket, item, tmp)
- tmp.flush()
+ with auto_cleanup_tempfile() as tmp_path:
+ # Download object to temporary file
+ with open(tmp_path, 'wb') as tmp_file:
+ self.s3_client.download_fileobj(self.bucket, item, tmp_file)
- loop = asyncio.get_event_loop()
- slim_doc = await loop.run_in_executor(
- self.document_service.process_pool, process_document_sync, tmp.name
- )
-
- opensearch_client = (
- self.document_service.session_manager.get_user_opensearch_client(
- self.owner_user_id, self.jwt_token
- )
- )
- exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
- if exists:
- result = {"status": "unchanged", "id": slim_doc["id"]}
- else:
- texts = [c["text"] for c in slim_doc["chunks"]]
- text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
- embeddings = []
- for batch in text_batches:
- resp = await clients.patched_async_client.embeddings.create(
- model=EMBED_MODEL, input=batch
- )
- embeddings.extend([d.embedding for d in resp.data])
+ # Compute hash
+ file_hash = hash_id(tmp_path)
# Get object size
try:
@@ -274,54 +486,29 @@ class S3FileProcessor(TaskProcessor):
except Exception:
file_size = 0
- for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
- chunk_doc = {
- "document_id": slim_doc["id"],
- "filename": slim_doc["filename"],
- "mimetype": slim_doc["mimetype"],
- "page": chunk["page"],
- "text": chunk["text"],
- "chunk_embedding": vect,
- "file_size": file_size,
- "connector_type": "s3", # S3 uploads
- "indexed_time": datetime.datetime.now().isoformat(),
- }
+ # Use consolidated standard processing
+ result = await self.process_document_standard(
+ file_path=tmp_path,
+ file_hash=file_hash,
+ owner_user_id=self.owner_user_id,
+ original_filename=item, # Use S3 key as filename
+ jwt_token=self.jwt_token,
+ owner_name=self.owner_name,
+ owner_email=self.owner_email,
+ file_size=file_size,
+ connector_type="s3",
+ )
- # Only set owner fields if owner_user_id is provided (for no-auth mode support)
- if self.owner_user_id is not None:
- chunk_doc["owner"] = self.owner_user_id
- if self.owner_name is not None:
- chunk_doc["owner_name"] = self.owner_name
- if self.owner_email is not None:
- chunk_doc["owner_email"] = self.owner_email
- chunk_id = f"{slim_doc['id']}_{i}"
- try:
- await opensearch_client.index(
- index=INDEX_NAME, id=chunk_id, body=chunk_doc
- )
- except Exception as e:
- logger.error(
- "OpenSearch indexing failed for S3 chunk",
- chunk_id=chunk_id,
- error=str(e),
- chunk_doc=chunk_doc,
- )
- raise
-
- result = {"status": "indexed", "id": slim_doc["id"]}
-
- result["path"] = f"s3://{self.bucket}/{item}"
- file_task.status = TaskStatus.COMPLETED
- file_task.result = result
- upload_task.successful_files += 1
+ result["path"] = f"s3://{self.bucket}/{item}"
+ file_task.status = TaskStatus.COMPLETED
+ file_task.result = result
+ upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
upload_task.failed_files += 1
finally:
- tmp.close()
- os.remove(tmp.name)
file_task.updated_at = time.time()
@@ -341,6 +528,7 @@ class LangflowFileProcessor(TaskProcessor):
settings: dict = None,
delete_after_ingest: bool = True,
):
+ super().__init__()
self.langflow_file_service = langflow_file_service
self.session_manager = session_manager
self.owner_user_id = owner_user_id
@@ -366,7 +554,22 @@ class LangflowFileProcessor(TaskProcessor):
file_task.updated_at = time.time()
try:
- # Read file content
+ # Compute hash and check if already exists
+ from utils.hash_utils import hash_id
+ file_hash = hash_id(item)
+
+ # Check if document already exists
+ opensearch_client = self.session_manager.get_user_opensearch_client(
+ self.owner_user_id, self.jwt_token
+ )
+ if await self.check_document_exists(file_hash, opensearch_client):
+ file_task.status = TaskStatus.COMPLETED
+ file_task.result = {"status": "unchanged", "id": file_hash}
+ file_task.updated_at = time.time()
+ upload_task.successful_files += 1
+ return
+
+ # Read file content for processing
with open(item, 'rb') as f:
content = f.read()
diff --git a/src/services/document_service.py b/src/services/document_service.py
index 949515e3..5204ea0e 100644
--- a/src/services/document_service.py
+++ b/src/services/document_service.py
@@ -112,98 +112,6 @@ class DocumentService:
return False
return False
- async def process_file_common(
- self,
- file_path: str,
- file_hash: str = None,
- owner_user_id: str = None,
- original_filename: str = None,
- jwt_token: str = None,
- owner_name: str = None,
- owner_email: str = None,
- file_size: int = None,
- connector_type: str = "local",
- ):
- """
- Common processing logic for both upload and upload_path.
- 1. Optionally compute SHA256 hash if not provided.
- 2. Convert with docling and extract relevant content.
- 3. Add embeddings.
- 4. Index into OpenSearch.
- """
- if file_hash is None:
- sha256 = hashlib.sha256()
- async with aiofiles.open(file_path, "rb") as f:
- while True:
- chunk = await f.read(1 << 20)
- if not chunk:
- break
- sha256.update(chunk)
- file_hash = sha256.hexdigest()
-
- # Get user's OpenSearch client with JWT for OIDC auth
- opensearch_client = self.session_manager.get_user_opensearch_client(
- owner_user_id, jwt_token
- )
-
- exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash)
- if exists:
- return {"status": "unchanged", "id": file_hash}
-
- # convert and extract
- result = clients.converter.convert(file_path)
- full_doc = result.document.export_to_dict()
- slim_doc = extract_relevant(full_doc)
-
- texts = [c["text"] for c in slim_doc["chunks"]]
-
- # Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
- text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
- embeddings = []
-
- for batch in text_batches:
- resp = await clients.patched_async_client.embeddings.create(
- model=EMBED_MODEL, input=batch
- )
- embeddings.extend([d.embedding for d in resp.data])
-
- # Index each chunk as a separate document
- for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
- chunk_doc = {
- "document_id": file_hash,
- "filename": original_filename
- if original_filename
- else slim_doc["filename"],
- "mimetype": slim_doc["mimetype"],
- "page": chunk["page"],
- "text": chunk["text"],
- "chunk_embedding": vect,
- "file_size": file_size,
- "connector_type": connector_type,
- "indexed_time": datetime.datetime.now().isoformat(),
- }
-
- # Only set owner fields if owner_user_id is provided (for no-auth mode support)
- if owner_user_id is not None:
- chunk_doc["owner"] = owner_user_id
- if owner_name is not None:
- chunk_doc["owner_name"] = owner_name
- if owner_email is not None:
- chunk_doc["owner_email"] = owner_email
- chunk_id = f"{file_hash}_{i}"
- try:
- await opensearch_client.index(
- index=INDEX_NAME, id=chunk_id, body=chunk_doc
- )
- except Exception as e:
- logger.error(
- "OpenSearch indexing failed for chunk",
- chunk_id=chunk_id,
- error=str(e),
- )
- logger.error("Chunk document details", chunk_doc=chunk_doc)
- raise
- return {"status": "indexed", "id": file_hash}
async def process_upload_file(
self,
@@ -214,20 +122,22 @@ class DocumentService:
owner_email: str = None,
):
"""Process an uploaded file from form data"""
- sha256 = hashlib.sha256()
- tmp = tempfile.NamedTemporaryFile(delete=False)
- file_size = 0
- try:
- while True:
- chunk = await upload_file.read(1 << 20)
- if not chunk:
- break
- sha256.update(chunk)
- tmp.write(chunk)
- file_size += len(chunk)
- tmp.flush()
+ from utils.hash_utils import hash_id
+ from utils.file_utils import auto_cleanup_tempfile
+ import os
- file_hash = sha256.hexdigest()
+ with auto_cleanup_tempfile() as tmp_path:
+ # Stream upload file to temporary file
+ file_size = 0
+ with open(tmp_path, 'wb') as tmp_file:
+ while True:
+ chunk = await upload_file.read(1 << 20)
+ if not chunk:
+ break
+ tmp_file.write(chunk)
+ file_size += len(chunk)
+
+ file_hash = hash_id(tmp_path)
# Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
@@ -243,22 +153,22 @@ class DocumentService:
if exists:
return {"status": "unchanged", "id": file_hash}
- result = await self.process_file_common(
- tmp.name,
- file_hash,
+ # Use consolidated standard processing
+ from models.processors import TaskProcessor
+ processor = TaskProcessor(document_service=self)
+ result = await processor.process_document_standard(
+ file_path=tmp_path,
+ file_hash=file_hash,
owner_user_id=owner_user_id,
original_filename=upload_file.filename,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
file_size=file_size,
+ connector_type="local",
)
return result
- finally:
- tmp.close()
- os.remove(tmp.name)
-
async def process_upload_context(self, upload_file, filename: str = None):
"""Process uploaded file and return content for context"""
import io
@@ -294,145 +204,3 @@ class DocumentService:
"pages": len(slim_doc["chunks"]),
"content_length": len(full_content),
}
-
- async def process_single_file_task(
- self,
- upload_task,
- file_path: str,
- owner_user_id: str = None,
- jwt_token: str = None,
- owner_name: str = None,
- owner_email: str = None,
- connector_type: str = "local",
- ):
- """Process a single file and update task tracking - used by task service"""
- from models.tasks import TaskStatus
- import time
- import asyncio
-
- file_task = upload_task.file_tasks[file_path]
- file_task.status = TaskStatus.RUNNING
- file_task.updated_at = time.time()
-
- try:
- # Handle regular file processing
- loop = asyncio.get_event_loop()
-
- # Run CPU-intensive docling processing in separate process
- slim_doc = await loop.run_in_executor(
- self.process_pool, process_document_sync, file_path
- )
-
- # Check if already indexed
- opensearch_client = self.session_manager.get_user_opensearch_client(
- owner_user_id, jwt_token
- )
- exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
- if exists:
- result = {"status": "unchanged", "id": slim_doc["id"]}
- else:
- # Generate embeddings and index (I/O bound, keep in main process)
- texts = [c["text"] for c in slim_doc["chunks"]]
-
- # Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
- text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
- embeddings = []
-
- for batch in text_batches:
- resp = await clients.patched_async_client.embeddings.create(
- model=EMBED_MODEL, input=batch
- )
- embeddings.extend([d.embedding for d in resp.data])
-
- # Get file size
- file_size = 0
- try:
- file_size = os.path.getsize(file_path)
- except OSError:
- pass # Keep file_size as 0 if can't get size
-
- # Index each chunk
- for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
- chunk_doc = {
- "document_id": slim_doc["id"],
- "filename": slim_doc["filename"],
- "mimetype": slim_doc["mimetype"],
- "page": chunk["page"],
- "text": chunk["text"],
- "chunk_embedding": vect,
- "file_size": file_size,
- "connector_type": connector_type,
- "indexed_time": datetime.datetime.now().isoformat(),
- }
-
- # Only set owner fields if owner_user_id is provided (for no-auth mode support)
- if owner_user_id is not None:
- chunk_doc["owner"] = owner_user_id
- if owner_name is not None:
- chunk_doc["owner_name"] = owner_name
- if owner_email is not None:
- chunk_doc["owner_email"] = owner_email
- chunk_id = f"{slim_doc['id']}_{i}"
- try:
- await opensearch_client.index(
- index=INDEX_NAME, id=chunk_id, body=chunk_doc
- )
- except Exception as e:
- logger.error(
- "OpenSearch indexing failed for batch chunk",
- chunk_id=chunk_id,
- error=str(e),
- )
- logger.error("Chunk document details", chunk_doc=chunk_doc)
- raise
-
- result = {"status": "indexed", "id": slim_doc["id"]}
-
- result["path"] = file_path
- file_task.status = TaskStatus.COMPLETED
- file_task.result = result
- upload_task.successful_files += 1
-
- except Exception as e:
- import traceback
- from concurrent.futures import BrokenExecutor
-
- if isinstance(e, BrokenExecutor):
- logger.error(
- "Process pool broken while processing file", file_path=file_path
- )
- logger.info("Worker process likely crashed")
- logger.info(
- "You should see detailed crash logs above from the worker process"
- )
-
- # Mark pool as broken for potential recreation
- self._process_pool_broken = True
-
- # Attempt to recreate the pool for future operations
- if self._recreate_process_pool():
- logger.info("Process pool successfully recreated")
- else:
- logger.warning(
- "Failed to recreate process pool - future operations may fail"
- )
-
- file_task.error = f"Worker process crashed: {str(e)}"
- else:
- logger.error(
- "Failed to process file", file_path=file_path, error=str(e)
- )
- file_task.error = str(e)
-
- logger.error("Full traceback available")
- traceback.print_exc()
- file_task.status = TaskStatus.FAILED
- upload_task.failed_files += 1
- finally:
- file_task.updated_at = time.time()
- upload_task.processed_files += 1
- upload_task.updated_at = time.time()
-
- if upload_task.processed_files >= upload_task.total_files:
- upload_task.status = TaskStatus.COMPLETED
-
diff --git a/src/services/task_service.py b/src/services/task_service.py
index de297dff..be5312a0 100644
--- a/src/services/task_service.py
+++ b/src/services/task_service.py
@@ -130,10 +130,21 @@ class TaskService:
async def process_with_semaphore(file_path: str):
async with semaphore:
- await self.document_service.process_single_file_task(
- upload_task, file_path
+ from models.processors import DocumentFileProcessor
+ file_task = upload_task.file_tasks[file_path]
+
+ # Create processor with user context (all None for background processing)
+ processor = DocumentFileProcessor(
+ document_service=self.document_service,
+ owner_user_id=None,
+ jwt_token=None,
+ owner_name=None,
+ owner_email=None,
)
+ # Process the file
+ await processor.process_item(upload_task, file_path, file_task)
+
tasks = [
process_with_semaphore(file_path)
for file_path in upload_task.file_tasks.keys()
@@ -141,6 +152,11 @@ class TaskService:
await asyncio.gather(*tasks, return_exceptions=True)
+ # Check if task is complete
+ if upload_task.processed_files >= upload_task.total_files:
+ upload_task.status = TaskStatus.COMPLETED
+ upload_task.updated_at = time.time()
+
except Exception as e:
logger.error(
"Background upload processor failed", task_id=task_id, error=str(e)
@@ -336,7 +352,7 @@ class TaskService:
tasks.sort(key=lambda x: x["created_at"], reverse=True)
return tasks
- def cancel_task(self, user_id: str, task_id: str) -> bool:
+ async def cancel_task(self, user_id: str, task_id: str) -> bool:
"""Cancel a task if it exists and is not already completed.
Supports cancellation of shared default tasks stored under the anonymous user.
@@ -368,18 +384,28 @@ class TaskService:
and not upload_task.background_task.done()
):
upload_task.background_task.cancel()
+ # Wait for the background task to actually stop to avoid race conditions
+ try:
+ await upload_task.background_task
+ except asyncio.CancelledError:
+ pass # Expected when we cancel the task
+ except Exception:
+ pass # Ignore other errors during cancellation
# Mark task as failed (cancelled)
upload_task.status = TaskStatus.FAILED
upload_task.updated_at = time.time()
- # Mark all pending file tasks as failed
+ # Mark all pending and running file tasks as failed
for file_task in upload_task.file_tasks.values():
- if file_task.status == TaskStatus.PENDING:
+ if file_task.status in [TaskStatus.PENDING, TaskStatus.RUNNING]:
+ # Increment failed_files counter for both pending and running
+ # (running files haven't been counted yet in either counter)
+ upload_task.failed_files += 1
+
file_task.status = TaskStatus.FAILED
file_task.error = "Task cancelled by user"
file_task.updated_at = time.time()
- upload_task.failed_files += 1
return True
diff --git a/src/utils/document_processing.py b/src/utils/document_processing.py
index a8792e46..fcb458fb 100644
--- a/src/utils/document_processing.py
+++ b/src/utils/document_processing.py
@@ -229,15 +229,9 @@ def process_document_sync(file_path: str):
# Compute file hash
try:
+ from utils.hash_utils import hash_id
logger.info("Computing file hash", worker_pid=os.getpid())
- sha256 = hashlib.sha256()
- with open(file_path, "rb") as f:
- while True:
- chunk = f.read(1 << 20)
- if not chunk:
- break
- sha256.update(chunk)
- file_hash = sha256.hexdigest()
+ file_hash = hash_id(file_path)
logger.info(
"File hash computed",
worker_pid=os.getpid(),
diff --git a/src/utils/file_utils.py b/src/utils/file_utils.py
new file mode 100644
index 00000000..2afc4024
--- /dev/null
+++ b/src/utils/file_utils.py
@@ -0,0 +1,60 @@
+"""File handling utilities for OpenRAG"""
+
+import os
+import tempfile
+from contextlib import contextmanager
+from typing import Optional
+
+
+@contextmanager
+def auto_cleanup_tempfile(suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None):
+ """
+ Context manager for temporary files that automatically cleans up.
+
+ Unlike tempfile.NamedTemporaryFile with delete=True, this keeps the file
+ on disk for the duration of the context, making it safe for async operations.
+
+ Usage:
+ with auto_cleanup_tempfile(suffix=".pdf") as tmp_path:
+ # Write to the file
+ with open(tmp_path, 'wb') as f:
+ f.write(content)
+ # Use tmp_path for processing
+ result = await process_file(tmp_path)
+ # File is automatically deleted here
+
+ Args:
+ suffix: Optional file suffix/extension (e.g., ".pdf")
+ prefix: Optional file prefix
+ dir: Optional directory for temp file
+
+ Yields:
+ str: Path to the temporary file
+ """
+ fd, path = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
+ try:
+ os.close(fd) # Close the file descriptor immediately
+ yield path
+ finally:
+ # Always clean up, even if an exception occurred
+ try:
+ if os.path.exists(path):
+ os.unlink(path)
+ except Exception:
+ # Silently ignore cleanup errors
+ pass
+
+
+def safe_unlink(path: str) -> None:
+ """
+ Safely delete a file, ignoring errors if it doesn't exist.
+
+ Args:
+ path: Path to the file to delete
+ """
+ try:
+ if path and os.path.exists(path):
+ os.unlink(path)
+ except Exception:
+ # Silently ignore errors
+ pass
\ No newline at end of file
diff --git a/src/utils/hash_utils.py b/src/utils/hash_utils.py
new file mode 100644
index 00000000..c25c8856
--- /dev/null
+++ b/src/utils/hash_utils.py
@@ -0,0 +1,76 @@
+import io
+import os
+import base64
+import hashlib
+from typing import BinaryIO, Optional, Union
+
+
+def _b64url(data: bytes) -> str:
+ """URL-safe base64 without padding"""
+ return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8")
+
+
+def stream_hash(
+ source: Union[str, os.PathLike, BinaryIO],
+ *,
+ algo: str = "sha256",
+ include_filename: Optional[str] = None,
+ chunk_size: int = 1024 * 1024, # 1 MiB
+) -> bytes:
+ """
+ Memory-safe, incremental hash of a file path or binary stream.
+ - source: path or file-like object with .read()
+ - algo: hashlib algorithm name ('sha256', 'blake2b', 'sha3_256', etc.)
+ - include_filename: if provided, the UTF-8 bytes of this string are prepended
+ - chunk_size: read size per iteration
+ Returns: raw digest bytes
+ """
+ try:
+ h = hashlib.new(algo)
+ except ValueError as e:
+ raise ValueError(f"Unsupported hash algorithm: {algo}") from e
+
+ def _update_from_file(f: BinaryIO):
+ if include_filename:
+ h.update(include_filename.encode("utf-8"))
+ for chunk in iter(lambda: f.read(chunk_size), b""):
+ h.update(chunk)
+
+ if isinstance(source, (str, os.PathLike)):
+ with open(source, "rb", buffering=io.DEFAULT_BUFFER_SIZE) as f:
+ _update_from_file(f)
+ else:
+ f = source
+ # Preserve position if seekable
+ pos = None
+ try:
+ if f.seekable():
+ pos = f.tell()
+ f.seek(0)
+ except Exception:
+ pos = None
+ try:
+ _update_from_file(f)
+ finally:
+ if pos is not None:
+ try:
+ f.seek(pos)
+ except Exception:
+ pass
+
+ return h.digest()
+
+
+def hash_id(
+ source: Union[str, os.PathLike, BinaryIO],
+ *,
+ algo: str = "sha256",
+ include_filename: Optional[str] = None,
+ length: int = 24, # characters of base64url (set 0 or None for full)
+) -> str:
+ """
+ Deterministic, URL-safe base64 digest (no prefix).
+ """
+ b = stream_hash(source, algo=algo, include_filename=include_filename)
+ s = _b64url(b)
+ return s[:length] if length else s
\ No newline at end of file
diff --git a/uv.lock b/uv.lock
index c64e6db4..30f7727a 100644
--- a/uv.lock
+++ b/uv.lock
@@ -1,5 +1,5 @@
version = 1
-revision = 3
+revision = 2
requires-python = ">=3.13"
resolution-markers = [
"sys_platform == 'darwin'",
@@ -2282,7 +2282,7 @@ wheels = [
[[package]]
name = "openrag"
-version = "0.1.13"
+version = "0.1.14.dev1"
source = { editable = "." }
dependencies = [
{ name = "agentd" },