Merge branch 'main' of github.com:langflow-ai/openrag into docling-settings

This commit is contained in:
Mike Fortman 2025-09-29 16:47:12 -05:00
commit c46e9208cc
18 changed files with 965 additions and 868 deletions

2
.gitignore vendored
View file

@ -18,3 +18,5 @@ wheels/
1001*.pdf
*.json
.DS_Store
config.yaml

View file

@ -1,33 +0,0 @@
# OpenRAG Configuration File
# This file allows you to configure OpenRAG settings.
# Environment variables will override these settings unless edited is true.
# Track if this config has been manually edited (prevents env var overrides)
edited: false
# Model provider configuration
provider:
# Supported providers: "openai", "anthropic", "azure", etc.
model_provider: "openai"
# API key for the model provider (can also be set via OPENAI_API_KEY env var)
api_key: ""
# Knowledge base and document processing configuration
knowledge:
# Embedding model for vector search
embedding_model: "text-embedding-3-small"
# Text chunk size for document processing
chunk_size: 1000
# Overlap between chunks
chunk_overlap: 200
# Docling preset setting
ocr: false
picture_descriptions: false
table_structure: false
# AI agent configuration
agent:
# Language model for the chat agent
llm_model: "gpt-4o-mini"
# System prompt for the agent
system_prompt: "You are a helpful AI assistant with access to a knowledge base. Answer questions based on the provided context."

View file

@ -38,13 +38,9 @@ The file is loaded into your OpenSearch database, and appears in the Knowledge p
To load and process a directory from the mapped location, click <Icon name="Plus" aria-hidden="true"/> **Add Knowledge**, and then click **Process Folder**.
The files are loaded into your OpenSearch database, and appear in the Knowledge page.
### Ingest files through OAuth connectors (#oauth-ingestion)
### Ingest files through OAuth connectors {#oauth-ingestion}
OpenRAG supports the following enterprise-grade OAuth connectors for seamless document synchronization.
- **Google Drive**
- **OneDrive**
- **AWS**
OpenRAG supports Google Drive, OneDrive, and AWS S3 as OAuth connectors for seamless document synchronization.
OAuth integration allows individual users to connect their personal cloud storage accounts to OpenRAG. Each user must separately authorize OpenRAG to access their own cloud storage files. When a user connects a cloud service, they are redirected to authenticate with that service provider and grant OpenRAG permission to sync documents from their personal cloud storage.

View file

@ -79,8 +79,46 @@ For more information on virtual environments, see [uv](https://docs.astral.sh/uv
Command completed successfully
```
7. To open the OpenRAG application, click **Open App** or press <kbd>6</kbd>.
8. Continue with the Quickstart.
7. To open the OpenRAG application, click **Open App**, press <kbd>6</kbd>, or navigate to `http://localhost:3000`.
The application opens.
8. Select your language model and embedding model provider, and complete the required fields.
**Your provider can only be selected once, and you must use the same provider for your language model and embedding model.**
The language model can be changed, but the embeddings model cannot be changed.
To change your provider selection, you must restart OpenRAG and delete the `config.yml` file.
<Tabs groupId="Embedding provider">
<TabItem value="OpenAI" label="OpenAI" default>
9. If you already entered a value for `OPENAI_API_KEY` in the TUI in Step 5, enable **Get API key from environment variable**.
10. Under **Advanced settings**, select your **Embedding Model** and **Language Model**.
11. To load 2 sample PDFs, enable **Sample dataset**.
This is recommended, but not required.
12. Click **Complete**.
</TabItem>
<TabItem value="IBM watsonx.ai" label="IBM watsonx.ai">
9. Complete the fields for **watsonx.ai API Endpoint**, **IBM API key**, and **IBM Project ID**.
These values are found in your IBM watsonx deployment.
10. Under **Advanced settings**, select your **Embedding Model** and **Language Model**.
11. To load 2 sample PDFs, enable **Sample dataset**.
This is recommended, but not required.
12. Click **Complete**.
</TabItem>
<TabItem value="Ollama" label="Ollama">
9. Enter your Ollama server's base URL address.
The default Ollama server address is `http://localhost:11434`.
Since OpenRAG is running in a container, you may need to change `localhost` to access services outside of the container. For example, change `http://localhost:11434` to `http://host.docker.internal:11434` to connect to Ollama.
OpenRAG automatically sends a test connection to your Ollama server to confirm connectivity.
10. Select the **Embedding Model** and **Language Model** your Ollama server is running.
OpenRAG automatically lists the available models from your Ollama server.
11. To load 2 sample PDFs, enable **Sample dataset**.
This is recommended, but not required.
12. Click **Complete**.
</TabItem>
</Tabs>
13. Continue with the [Quickstart](/quickstart).
### Advanced Setup {#advanced-setup}

View file

@ -11,7 +11,41 @@ Get started with OpenRAG by loading your knowledge, swapping out your language m
## Prerequisites
- Install and start OpenRAG
- [Install and start OpenRAG](/install)
- Create a [Langflow API key](https://docs.langflow.org/api-keys-and-authentication)
<details>
<summary>Create a Langflow API key</summary>
A Langflow API key is a user-specific token you can use with Langflow.
It is **only** used for sending requests to the Langflow server.
It does **not** access to OpenRAG.
To create a Langflow API key, do the following:
1. In Langflow, click your user icon, and then select **Settings**.
2. Click **Langflow API Keys**, and then click <Icon name="Plus" aria-hidden="true"/> **Add New**.
3. Name your key, and then click **Create API Key**.
4. Copy the API key and store it securely.
5. To use your Langflow API key in a request, set a `LANGFLOW_API_KEY` environment variable in your terminal, and then include an `x-api-key` header or query parameter with your request.
For example:
```bash
# Set variable
export LANGFLOW_API_KEY="sk..."
# Send request
curl --request POST \
--url "http://LANGFLOW_SERVER_ADDRESS/api/v1/run/FLOW_ID" \
--header "Content-Type: application/json" \
--header "x-api-key: $LANGFLOW_API_KEY" \
--data '{
"output_type": "chat",
"input_type": "chat",
"input_value": "Hello"
}'
```
</details>
## Find your way around
@ -20,14 +54,18 @@ Get started with OpenRAG by loading your knowledge, swapping out your language m
For more information, see [Langflow Agents](/agents).
2. Ask `What documents are available to you?`
The agent responds with a message summarizing the documents that OpenRAG loads by default, which are PDFs about evaluating data quality when using LLMs in health care.
Knowledge is stored in OpenSearch.
For more information, see [Knowledge](/knowledge).
3. To confirm the agent is correct, click <Icon name="Library" aria-hidden="true"/> **Knowledge**.
The **Knowledge** page lists the documents OpenRAG has ingested into the OpenSearch vector database. Click on a document to display the chunks derived from splitting the default documents into the vector database.
The **Knowledge** page lists the documents OpenRAG has ingested into the OpenSearch vector database.
Click on a document to display the chunks derived from splitting the default documents into the vector database.
## Add your own knowledge
1. To add documents to your knowledge base, click <Icon name="Plus" aria-hidden="true"/> **Add Knowledge**.
* Select **Add File** to add a single file from your local machine (mapped with the Docker volume mount).
* Select **Process Folder** to process an entire folder of documents from your local machine (mapped with the Docker volume mount).
* Select your cloud storage provider to add knowledge from an OAuth-connected storage provider. For more information, see [OAuth ingestion](/knowledge#oauth-ingestion).
2. Return to the Chat window and ask a question about your loaded data.
For example, with a manual about a PC tablet loaded, ask `How do I connect this device to WiFI?`
The agent responds with a message indicating it now has your knowledge as context for answering questions.
@ -40,353 +78,289 @@ If you aren't getting the results you need, you can further tune the knowledge i
To modify the knowledge ingestion or Agent behavior, click <Icon name="Settings2" aria-hidden="true"/> **Settings**.
In this example, you'll try a different LLM to demonstrate how the Agent's response changes.
You can only change the **Language model**, and not the **Model provider** that you started with in OpenRAG.
If you're using Ollama, you can use any installed model.
1. To edit the Agent's behavior, click **Edit in Langflow**.
You can more quickly access the **Language Model** and **Agent Instructions** fields in this page, but for illustration purposes, navigate to the Langflow visual builder.
2. OpenRAG warns you that you're entering Langflow. Click **Proceed**.
3. The OpenRAG OpenSearch Agent flow appears.
![OpenRAG Open Search Agent Flow](/img/opensearch-agent-flow.png)
![OpenRAG OpenSearch Agent Flow](/img/opensearch-agent-flow.png)
4. In the **Language Model** component, under **Model Provider**, select **Anthropic**.
:::note
This guide uses an Anthropic model for demonstration purposes. If you want to use a different provider, change the **Model Provider** and **Model Name** fields, and then provide credentials for your selected provider.
:::
4. In the **Language Model** component, under **Model**, select a different OpenAI model.
5. Save your flow with <kbd>Command+S</kbd>.
6. In OpenRAG, start a new conversation by clicking the <Icon name="Plus" aria-hidden="true"/> in the **Conversations** tab.
7. Ask the same question as before to demonstrate how a different language model changes the results.
## Integrate OpenRAG into your application
:::tip
Ensure the `openrag-backend` container has port 8000 exposed in your `docker-compose.yml`:
To integrate OpenRAG into your application, use the [Langflow API](https://docs.langflow.org/api-reference-api-examples).
Make requests with Python, TypeScript, or any HTTP client to run one of OpenRAG's default flows and get a response, and then modify the flow further to improve results.
```yaml
openrag-backend:
ports:
- "8000:8000"
```
:::
Langflow provides code snippets to help you get started with the Langflow API.
OpenRAG provides a REST API that you can call from Python, TypeScript, or any HTTP client to chat with your documents.
1. To navigate to the OpenRAG OpenSearch Agent flow, click <Icon name="Settings2" aria-hidden="true"/> **Settings**, and then click **Edit in Langflow** in the OpenRAG OpenSearch Agent flow.
2. Click **Share**, and then click **API access**.
These example requests are run assuming OpenRAG is in "no-auth" mode.
For complete API documentation, including authentication, request and response parameters, and example requests, see the API documentation.
The default code in the API access pane constructs a request with the Langflow server `url`, `headers`, and a `payload` of request data. The code snippets automatically include the `LANGFLOW_SERVER_ADDRESS` and `FLOW_ID` values for the flow. Replace these values if you're using the code for a different server or flow. The default Langflow server address is http://localhost:7860.
### Chat with your documents
Prompt OpenRAG at the `/chat` API endpoint.
<Tabs>
<TabItem value="python" label="Python">
```python
import requests
url = "http://localhost:8000/chat"
payload = {
"prompt": "What documents are available to you?",
"previous_response_id": None
}
response = requests.post(url, json=payload)
print("OpenRAG Response:", response.json())
```
</TabItem>
<TabItem value="typescript" label="TypeScript">
```typescript
import fetch from 'node-fetch';
const response = await fetch("http://localhost:8000/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
prompt: "What documents are available to you?",
previous_response_id: null
})
});
const data = await response.json();
console.log("OpenRAG Response:", data);
```
</TabItem>
<TabItem value="curl" label="curl">
```bash
curl -X POST "http://localhost:8000/chat" \
-H "Content-Type: application/json" \
-d '{
"prompt": "What documents are available to you?",
"previous_response_id": null
}'
```
</TabItem>
</Tabs>
<details closed>
<summary>Response</summary>
```
{
"response": "I have access to a wide range of documents depending on the context and the tools enabled in this environment. Specifically, I can search for and retrieve documents related to various topics such as technical papers, articles, manuals, guides, knowledge base entries, and other text-based resources. If you specify a particular subject or type of document you're interested in, I can try to locate relevant materials for you. Let me know what you need!",
"response_id": "resp_68d3fdbac93081958b8781b97919fe7007f98bd83932fa1a"
}
```
</details>
### Search your documents
Search your document knowledge base at the `/search` endpoint.
<Tabs>
<TabItem value="python" label="Python">
```python
import requests
url = "http://localhost:8000/search"
payload = {"query": "healthcare data quality", "limit": 5}
response = requests.post(url, json=payload)
results = response.json()
print("Search Results:")
for result in results.get("results", []):
print(f"- {result.get('filename')}: {result.get('text', '')[:100]}...")
```
</TabItem>
<TabItem value="typescript" label="TypeScript">
```typescript
const response = await fetch("http://localhost:8000/search", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
query: "healthcare data quality",
limit: 5
})
});
const results = await response.json();
console.log("Search Results:");
results.results?.forEach((result, index) => {
const filename = result.filename || 'Unknown';
const text = result.text?.substring(0, 100) || '';
console.log(`${index + 1}. ${filename}: ${text}...`);
});
```
</TabItem>
<TabItem value="curl" label="curl">
```bash
curl -X POST "http://localhost:8000/search" \
-H "Content-Type: application/json" \
-d '{"query": "healthcare data quality", "limit": 5}'
```
</TabItem>
</Tabs>
<details closed>
<summary>Example response</summary>
```
Found 5 results
1. 2506.08231v1.pdf: variables with high performance metrics. These variables might also require fewer replication analys...
2. 2506.08231v1.pdf: on EHR data and may lack the clinical domain knowledge needed to perform well on the tasks where EHR...
3. 2506.08231v1.pdf: Abstract Large language models (LLMs) are increasingly used to extract clinical data from electronic...
4. 2506.08231v1.pdf: these multidimensional assessments, the framework not only quantifies accuracy, but can also be appl...
5. 2506.08231v1.pdf: observed in only the model metrics, but not the abstractor metrics, it indicates that model errors m...
```
</details>
### Use chat and search together
Create a complete chat application that combines an interactive terminal chat with session continuity and search functionality.
<Tabs>
<TabItem value="python" label="Python">
```python
import requests
# Configuration
OPENRAG_BASE_URL = "http://localhost:8000"
CHAT_URL = f"{OPENRAG_BASE_URL}/chat"
SEARCH_URL = f"{OPENRAG_BASE_URL}/search"
DEFAULT_SEARCH_LIMIT = 5
def chat_with_openrag(message, previous_response_id=None):
try:
response = requests.post(CHAT_URL, json={
"prompt": message,
"previous_response_id": previous_response_id
})
response.raise_for_status()
data = response.json()
return data.get("response"), data.get("response_id")
except Exception as e:
return f"Error: {str(e)}", None
def search_documents(query, limit=DEFAULT_SEARCH_LIMIT):
try:
response = requests.post(SEARCH_URL, json={
"query": query,
"limit": limit
})
response.raise_for_status()
data = response.json()
return data.get("results", [])
except Exception as e:
return []
# Interactive chat with session continuity and search
previous_response_id = None
while True:
question = input("Your question (or 'search <query>' to search): ").strip()
if question.lower() in ['quit', 'exit', 'q']:
break
if not question:
continue
<Tabs>
<TabItem value="python" label="Python">
if question.lower().startswith('search '):
query = question[7:].strip()
print("Searching documents...")
results = search_documents(query)
print(f"\nFound {len(results)} results:")
for i, result in enumerate(results, 1):
filename = result.get('filename', 'Unknown')
text = result.get('text', '')[:100]
print(f"{i}. {filename}: {text}...")
print()
else:
print("OpenRAG is thinking...")
result, response_id = chat_with_openrag(question, previous_response_id)
print(f"OpenRAG: {result}\n")
previous_response_id = response_id
```
</TabItem>
<TabItem value="typescript" label="TypeScript">
```ts
import fetch from 'node-fetch';
// Configuration
const OPENRAG_BASE_URL = "http://localhost:8000";
const CHAT_URL = `${OPENRAG_BASE_URL}/chat`;
const SEARCH_URL = `${OPENRAG_BASE_URL}/search`;
const DEFAULT_SEARCH_LIMIT = 5;
async function chatWithOpenRAG(message: string, previousResponseId?: string | null) {
try {
const response = await fetch(CHAT_URL, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
prompt: message,
previous_response_id: previousResponseId
})
});
const data = await response.json();
return [data.response || "No response received", data.response_id || null];
} catch (error) {
return [`Error: ${error}`, null];
```python
import requests
import os
import uuid
api_key = 'LANGFLOW_API_KEY'
url = "http://LANGFLOW_SERVER_ADDRESS/api/v1/run/FLOW_ID" # The complete API endpoint URL for this flow
# Request payload configuration
payload = {
"output_type": "chat",
"input_type": "chat",
"input_value": "hello world!"
}
}
payload["session_id"] = str(uuid.uuid4())
headers = {"x-api-key": api_key}
try:
# Send API request
response = requests.request("POST", url, json=payload, headers=headers)
response.raise_for_status() # Raise exception for bad status codes
# Print response
print(response.text)
except requests.exceptions.RequestException as e:
print(f"Error making API request: {e}")
except ValueError as e:
print(f"Error parsing response: {e}")
```
</TabItem>
<TabItem value="typescript" label="TypeScript">
```typescript
const crypto = require('crypto');
const apiKey = 'LANGFLOW_API_KEY';
const payload = {
"output_type": "chat",
"input_type": "chat",
"input_value": "hello world!"
};
payload.session_id = crypto.randomUUID();
const options = {
method: 'POST',
headers: {
'Content-Type': 'application/json',
"x-api-key": apiKey
},
body: JSON.stringify(payload)
};
fetch('http://LANGFLOW_SERVER_ADDRESS/api/v1/run/FLOW_ID', options)
.then(response => response.json())
.then(response => console.warn(response))
.catch(err => console.error(err));
```
</TabItem>
<TabItem value="curl" label="curl">
```bash
curl --request POST \
--url 'http://LANGFLOW_SERVER_ADDRESS/api/v1/run/FLOW_ID?stream=false' \
--header 'Content-Type: application/json' \
--header "x-api-key: LANGFLOW_API_KEY" \
--data '{
"output_type": "chat",
"input_type": "chat",
"input_value": "hello world!",
}'
```
</TabItem>
</Tabs>
async function searchDocuments(query: string, limit: number = DEFAULT_SEARCH_LIMIT) {
try {
const response = await fetch(SEARCH_URL, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ query, limit })
});
const data = await response.json();
return data.results || [];
} catch (error) {
return [];
3. Copy the snippet, paste it in a script file, and then run the script to send the request. If you are using the curl snippet, you can run the command directly in your terminal.
If the request is successful, the response includes many details about the flow run, including the session ID, inputs, outputs, components, durations, and more.
The following is an example of a response from running the **Simple Agent** template flow:
<details>
<summary>Result</summary>
```json
{
"session_id": "29deb764-af3f-4d7d-94a0-47491ed241d6",
"outputs": [
{
"inputs": {
"input_value": "hello world!"
},
"outputs": [
{
"results": {
"message": {
"text_key": "text",
"data": {
"timestamp": "2025-06-16 19:58:23 UTC",
"sender": "Machine",
"sender_name": "AI",
"session_id": "29deb764-af3f-4d7d-94a0-47491ed241d6",
"text": "Hello world! 🌍 How can I assist you today?",
"files": [],
"error": false,
"edit": false,
"properties": {
"text_color": "",
"background_color": "",
"edited": false,
"source": {
"id": "Agent-ZOknz",
"display_name": "Agent",
"source": "gpt-4o-mini"
},
"icon": "bot",
"allow_markdown": false,
"positive_feedback": null,
"state": "complete",
"targets": []
},
"category": "message",
"content_blocks": [
{
"title": "Agent Steps",
"contents": [
{
"type": "text",
"duration": 2,
"header": {
"title": "Input",
"icon": "MessageSquare"
},
"text": "**Input**: hello world!"
},
{
"type": "text",
"duration": 226,
"header": {
"title": "Output",
"icon": "MessageSquare"
},
"text": "Hello world! 🌍 How can I assist you today?"
}
],
"allow_markdown": true,
"media_url": null
}
],
"id": "f3d85d9a-261c-4325-b004-95a1bf5de7ca",
"flow_id": "29deb764-af3f-4d7d-94a0-47491ed241d6",
"duration": null
},
"default_value": "",
"text": "Hello world! 🌍 How can I assist you today?",
"sender": "Machine",
"sender_name": "AI",
"files": [],
"session_id": "29deb764-af3f-4d7d-94a0-47491ed241d6",
"timestamp": "2025-06-16T19:58:23+00:00",
"flow_id": "29deb764-af3f-4d7d-94a0-47491ed241d6",
"error": false,
"edit": false,
"properties": {
"text_color": "",
"background_color": "",
"edited": false,
"source": {
"id": "Agent-ZOknz",
"display_name": "Agent",
"source": "gpt-4o-mini"
},
"icon": "bot",
"allow_markdown": false,
"positive_feedback": null,
"state": "complete",
"targets": []
},
"category": "message",
"content_blocks": [
{
"title": "Agent Steps",
"contents": [
{
"type": "text",
"duration": 2,
"header": {
"title": "Input",
"icon": "MessageSquare"
},
"text": "**Input**: hello world!"
},
{
"type": "text",
"duration": 226,
"header": {
"title": "Output",
"icon": "MessageSquare"
},
"text": "Hello world! 🌍 How can I assist you today?"
}
],
"allow_markdown": true,
"media_url": null
}
],
"duration": null
}
},
"artifacts": {
"message": "Hello world! 🌍 How can I assist you today?",
"sender": "Machine",
"sender_name": "AI",
"files": [],
"type": "object"
},
"outputs": {
"message": {
"message": "Hello world! 🌍 How can I assist you today?",
"type": "text"
}
},
"logs": {
"message": []
},
"messages": [
{
"message": "Hello world! 🌍 How can I assist you today?",
"sender": "Machine",
"sender_name": "AI",
"session_id": "29deb764-af3f-4d7d-94a0-47491ed241d6",
"stream_url": null,
"component_id": "ChatOutput-aF5lw",
"files": [],
"type": "text"
}
],
"timedelta": null,
"duration": null,
"component_display_name": "Chat Output",
"component_id": "ChatOutput-aF5lw",
"used_frozen_result": false
}
]
}
]
}
// Interactive chat with session continuity and search
let previousResponseId = null;
const readline = require('readline');
const rl = readline.createInterface({ input: process.stdin, output: process.stdout });
const askQuestion = () => {
rl.question("Your question (or 'search <query>' to search): ", async (question) => {
if (question.toLowerCase() === 'quit' || question.toLowerCase() === 'exit' || question.toLowerCase() === 'q') {
console.log("Goodbye!");
rl.close();
return;
}
if (!question.trim()) {
askQuestion();
return;
}
if (question.toLowerCase().startsWith('search ')) {
const query = question.substring(7).trim();
console.log("Searching documents...");
const results = await searchDocuments(query);
console.log(`\nFound ${results.length} results:`);
results.forEach((result, i) => {
const filename = result.filename || 'Unknown';
const text = result.text?.substring(0, 100) || '';
console.log(`${i + 1}. ${filename}: ${text}...`);
});
console.log();
} else {
console.log("OpenRAG is thinking...");
const [result, responseId] = await chatWithOpenRAG(question, previousResponseId);
console.log(`\nOpenRAG: ${result}\n`);
previousResponseId = responseId;
}
askQuestion();
});
};
console.log("OpenRAG Chat Interface");
console.log("Ask questions about your documents. Type 'quit' to exit.");
console.log("Use 'search <query>' to search documents directly.\n");
askQuestion();
```
</TabItem>
</Tabs>
<details closed>
<summary>Example response</summary>
```
Your question (or 'search <query>' to search): search healthcare
Searching documents...
Found 5 results:
1. 2506.08231v1.pdf: variables with high performance metrics. These variables might also require fewer replication analys...
2. 2506.08231v1.pdf: on EHR data and may lack the clinical domain knowledge needed to perform well on the tasks where EHR...
3. 2506.08231v1.pdf: Abstract Large language models (LLMs) are increasingly used to extract clinical data from electronic...
4. 2506.08231v1.pdf: Acknowledgements Darren Johnson for support in publication planning and management. The authors used...
5. 2506.08231v1.pdf: Ensuring Reliability of Curated EHR-Derived Data: The Validation of Accuracy for LLM/ML-Extracted In...
Your question (or 'search <query>' to search): what's the weather today?
OpenRAG is thinking...
OpenRAG: I don't have access to real-time weather data. Could you please provide me with your location? Then I can help you find the weather information.
Your question (or 'search <query>' to search): newark nj
OpenRAG is thinking...
```
</details>
## Next steps
TBD
To further explore the API, see:
* The Langflow [Quickstart](https://docs.langflow.org/quickstart#extract-data-from-the-response) extends this example with extracting fields from the response.
* [Get started with the Langflow API](https://docs.langflow.org/api-reference-api-examples)

View file

@ -231,11 +231,8 @@ async def upload_and_ingest_user_file(
except Exception:
# Clean up temp file on error
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
from utils.file_utils import safe_unlink
safe_unlink(temp_path)
raise
except Exception as e:

View file

@ -164,12 +164,9 @@ async def langflow_upload_ingest_task(
except Exception:
# Clean up temp files on error
from utils.file_utils import safe_unlink
for temp_path in temp_file_paths:
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
safe_unlink(temp_path)
raise
except Exception as e:

View file

@ -26,7 +26,7 @@ async def cancel_task(request: Request, task_service, session_manager):
task_id = request.path_params.get("task_id")
user = request.state.user
success = task_service.cancel_task(user.user_id, task_id)
success = await task_service.cancel_task(user.user_id, task_id)
if not success:
return JSONResponse(
{"error": "Task not found or cannot be cancelled"}, status_code=400

View file

@ -514,6 +514,9 @@ class AppClients:
ssl_assert_fingerprint=None,
headers=headers,
http_compress=True,
timeout=30, # 30 second timeout
max_retries=3,
retry_on_timeout=True,
)

View file

@ -53,25 +53,27 @@ class LangflowConnectorService:
filename=document.filename,
)
from utils.file_utils import auto_cleanup_tempfile
suffix = self._get_file_extension(document.mimetype)
# Create temporary file from document content
with tempfile.NamedTemporaryFile(
delete=False, suffix=suffix
) as tmp_file:
tmp_file.write(document.content)
tmp_file.flush()
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
# Write document content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename.replace(" ", "_").replace("/", "_")+suffix,
content,
document.mimetype or "application/octet-stream",
)
langflow_file_id = None # Initialize to track if upload succeeded
try:
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename.replace(" ", "_").replace("/", "_")+suffix,
content,
document.mimetype or "application/octet-stream",
)
upload_result = await self.langflow_service.upload_user_file(
file_tuple, jwt_token
)
@ -125,7 +127,7 @@ class LangflowConnectorService:
error=str(e),
)
# Try to clean up Langflow file if upload succeeded but processing failed
if "langflow_file_id" in locals():
if langflow_file_id is not None:
try:
await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug(
@ -140,10 +142,6 @@ class LangflowConnectorService:
)
raise
finally:
# Clean up temporary file
os.unlink(tmp_file.name)
def _get_file_extension(self, mimetype: str) -> str:
"""Get file extension based on MIME type"""
mime_to_ext = {

View file

@ -54,52 +54,50 @@ class ConnectorService:
"""Process a document from a connector using existing processing pipeline"""
# Create temporary file from document content
with tempfile.NamedTemporaryFile(
delete=False, suffix=self._get_file_extension(document.mimetype)
) as tmp_file:
tmp_file.write(document.content)
tmp_file.flush()
from utils.file_utils import auto_cleanup_tempfile
try:
# Use existing process_file_common function with connector document metadata
# We'll use the document service's process_file_common method
from services.document_service import DocumentService
with auto_cleanup_tempfile(suffix=self._get_file_extension(document.mimetype)) as tmp_path:
# Write document content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
doc_service = DocumentService(session_manager=self.session_manager)
# Use existing process_file_common function with connector document metadata
# We'll use the document service's process_file_common method
from services.document_service import DocumentService
logger.debug("Processing connector document", document_id=document.id)
doc_service = DocumentService(session_manager=self.session_manager)
# Process using the existing pipeline but with connector document metadata
result = await doc_service.process_file_common(
file_path=tmp_file.name,
file_hash=document.id, # Use connector document ID as hash
owner_user_id=owner_user_id,
original_filename=document.filename, # Pass the original Google Doc title
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
file_size=len(document.content) if document.content else 0,
connector_type=connector_type,
logger.debug("Processing connector document", document_id=document.id)
# Process using consolidated processing pipeline
from models.processors import TaskProcessor
processor = TaskProcessor(document_service=doc_service)
result = await processor.process_document_standard(
file_path=tmp_path,
file_hash=document.id, # Use connector document ID as hash
owner_user_id=owner_user_id,
original_filename=document.filename, # Pass the original Google Doc title
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
file_size=len(document.content) if document.content else 0,
connector_type=connector_type,
)
logger.debug("Document processing result", result=result)
# If successfully indexed or already exists, update the indexed documents with connector metadata
if result["status"] in ["indexed", "unchanged"]:
# Update all chunks with connector-specific metadata
await self._update_connector_metadata(
document, owner_user_id, connector_type, jwt_token
)
logger.debug("Document processing result", result=result)
# If successfully indexed or already exists, update the indexed documents with connector metadata
if result["status"] in ["indexed", "unchanged"]:
# Update all chunks with connector-specific metadata
await self._update_connector_metadata(
document, owner_user_id, connector_type, jwt_token
)
return {
**result,
"filename": document.filename,
"source_url": document.source_url,
}
finally:
# Clean up temporary file
os.unlink(tmp_file.name)
return {
**result,
"filename": document.filename,
"source_url": document.source_url,
}
async def _update_connector_metadata(
self,

View file

@ -1,4 +1,3 @@
from abc import ABC, abstractmethod
from typing import Any
from .tasks import UploadTask, FileTask
from utils.logging_config import get_logger
@ -6,22 +5,160 @@ from utils.logging_config import get_logger
logger = get_logger(__name__)
class TaskProcessor(ABC):
"""Abstract base class for task processors"""
class TaskProcessor:
"""Base class for task processors with shared processing logic"""
def __init__(self, document_service=None):
self.document_service = document_service
async def check_document_exists(
self,
file_hash: str,
opensearch_client,
) -> bool:
"""
Check if a document with the given hash already exists in OpenSearch.
Consolidated hash checking for all processors.
"""
from config.settings import INDEX_NAME
import asyncio
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash)
return exists
except (asyncio.TimeoutError, Exception) as e:
if attempt == max_retries - 1:
logger.error(
"OpenSearch exists check failed after retries",
file_hash=file_hash,
error=str(e),
attempt=attempt + 1
)
# On final failure, assume document doesn't exist (safer to reprocess than skip)
logger.warning(
"Assuming document doesn't exist due to connection issues",
file_hash=file_hash
)
return False
else:
logger.warning(
"OpenSearch exists check failed, retrying",
file_hash=file_hash,
error=str(e),
attempt=attempt + 1,
retry_in=retry_delay
)
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
async def process_document_standard(
self,
file_path: str,
file_hash: str,
owner_user_id: str = None,
original_filename: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
file_size: int = None,
connector_type: str = "local",
):
"""
Standard processing pipeline for non-Langflow processors:
docling conversion + embeddings + OpenSearch indexing.
"""
import datetime
from config.settings import INDEX_NAME, EMBED_MODEL, clients
from services.document_service import chunk_texts_for_embeddings
from utils.document_processing import extract_relevant
# Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.document_service.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
)
# Check if already exists
if await self.check_document_exists(file_hash, opensearch_client):
return {"status": "unchanged", "id": file_hash}
# Convert and extract
result = clients.converter.convert(file_path)
full_doc = result.document.export_to_dict()
slim_doc = extract_relevant(full_doc)
texts = [c["text"] for c in slim_doc["chunks"]]
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Index each chunk as a separate document
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": file_hash,
"filename": original_filename
if original_filename
else slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": connector_type,
"indexed_time": datetime.datetime.now().isoformat(),
}
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if owner_user_id is not None:
chunk_doc["owner"] = owner_user_id
if owner_name is not None:
chunk_doc["owner_name"] = owner_name
if owner_email is not None:
chunk_doc["owner_email"] = owner_email
chunk_id = f"{file_hash}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for chunk",
chunk_id=chunk_id,
error=str(e),
)
logger.error("Chunk document details", chunk_doc=chunk_doc)
raise
return {"status": "indexed", "id": file_hash}
@abstractmethod
async def process_item(
self, upload_task: UploadTask, item: Any, file_task: FileTask
) -> None:
"""
Process a single item in the task.
This is a base implementation that should be overridden by subclasses.
When TaskProcessor is used directly (not via subclass), this method
is not called - only the utility methods like process_document_standard
are used.
Args:
upload_task: The overall upload task
item: The item to process (could be file path, file info, etc.)
file_task: The specific file task to update
"""
pass
raise NotImplementedError(
"process_item should be overridden by subclasses when used in task processing"
)
class DocumentFileProcessor(TaskProcessor):
@ -35,7 +172,7 @@ class DocumentFileProcessor(TaskProcessor):
owner_name: str = None,
owner_email: str = None,
):
self.document_service = document_service
super().__init__(document_service)
self.owner_user_id = owner_user_id
self.jwt_token = jwt_token
self.owner_name = owner_name
@ -44,16 +181,52 @@ class DocumentFileProcessor(TaskProcessor):
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a regular file path using DocumentService"""
# This calls the existing logic with user context
await self.document_service.process_single_file_task(
upload_task,
item,
owner_user_id=self.owner_user_id,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
"""Process a regular file path using consolidated methods"""
from models.tasks import TaskStatus
from utils.hash_utils import hash_id
import time
import os
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
try:
# Compute hash
file_hash = hash_id(item)
# Get file size
try:
file_size = os.path.getsize(item)
except Exception:
file_size = 0
# Use consolidated standard processing
result = await self.process_document_standard(
file_path=item,
file_hash=file_hash,
owner_user_id=self.owner_user_id,
original_filename=os.path.basename(item),
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
file_size=file_size,
connector_type="local",
)
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
finally:
upload_task.processed_files += 1
upload_task.updated_at = time.time()
class ConnectorFileProcessor(TaskProcessor):
@ -69,6 +242,7 @@ class ConnectorFileProcessor(TaskProcessor):
owner_name: str = None,
owner_email: str = None,
):
super().__init__()
self.connector_service = connector_service
self.connection_id = connection_id
self.files_to_process = files_to_process
@ -76,53 +250,79 @@ class ConnectorFileProcessor(TaskProcessor):
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
# Create lookup map for file info - handle both file objects and file IDs
self.file_info_map = {}
for f in files_to_process:
if isinstance(f, dict):
# Full file info objects
self.file_info_map[f["id"]] = f
else:
# Just file IDs - will need to fetch metadata during processing
self.file_info_map[f] = None
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a connector file using ConnectorService"""
"""Process a connector file using consolidated methods"""
from models.tasks import TaskStatus
from utils.hash_utils import hash_id
import tempfile
import time
import os
file_id = item # item is the connector file ID
self.file_info_map.get(file_id)
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
# Get the connector and connection info
connector = await self.connector_service.get_connector(self.connection_id)
connection = await self.connector_service.connection_manager.get_connection(
self.connection_id
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
try:
file_id = item # item is the connector file ID
# Get file content from connector (the connector will fetch metadata if needed)
document = await connector.get_file_content(file_id)
# Get the connector and connection info
connector = await self.connector_service.get_connector(self.connection_id)
connection = await self.connector_service.connection_manager.get_connection(
self.connection_id
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
# Use the user_id passed during initialization
if not self.user_id:
raise ValueError("user_id not provided to ConnectorFileProcessor")
# Get file content from connector
document = await connector.get_file_content(file_id)
# Process using existing pipeline
result = await self.connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
if not self.user_id:
raise ValueError("user_id not provided to ConnectorFileProcessor")
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
# Create temporary file from document content
from utils.file_utils import auto_cleanup_tempfile
suffix = self.connector_service._get_file_extension(document.mimetype)
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
# Write content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
# Compute hash
file_hash = hash_id(tmp_path)
# Use consolidated standard processing
result = await self.process_document_standard(
file_path=tmp_path,
file_hash=file_hash,
owner_user_id=self.user_id,
original_filename=document.filename,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
file_size=len(document.content),
connector_type=connection.connector_type,
)
# Add connector-specific metadata
result.update({
"source_url": document.source_url,
"document_id": document.id,
})
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
class LangflowConnectorFileProcessor(TaskProcessor):
@ -138,6 +338,7 @@ class LangflowConnectorFileProcessor(TaskProcessor):
owner_name: str = None,
owner_email: str = None,
):
super().__init__()
self.langflow_connector_service = langflow_connector_service
self.connection_id = connection_id
self.files_to_process = files_to_process
@ -145,57 +346,85 @@ class LangflowConnectorFileProcessor(TaskProcessor):
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
# Create lookup map for file info - handle both file objects and file IDs
self.file_info_map = {}
for f in files_to_process:
if isinstance(f, dict):
# Full file info objects
self.file_info_map[f["id"]] = f
else:
# Just file IDs - will need to fetch metadata during processing
self.file_info_map[f] = None
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a connector file using LangflowConnectorService"""
from models.tasks import TaskStatus
from utils.hash_utils import hash_id
import tempfile
import time
import os
file_id = item # item is the connector file ID
self.file_info_map.get(file_id)
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
# Get the connector and connection info
connector = await self.langflow_connector_service.get_connector(
self.connection_id
)
connection = (
await self.langflow_connector_service.connection_manager.get_connection(
try:
file_id = item # item is the connector file ID
# Get the connector and connection info
connector = await self.langflow_connector_service.get_connector(
self.connection_id
)
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
connection = (
await self.langflow_connector_service.connection_manager.get_connection(
self.connection_id
)
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
# Get file content from connector (the connector will fetch metadata if needed)
document = await connector.get_file_content(file_id)
# Get file content from connector
document = await connector.get_file_content(file_id)
# Use the user_id passed during initialization
if not self.user_id:
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
if not self.user_id:
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
# Process using Langflow pipeline
result = await self.langflow_connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
# Create temporary file and compute hash to check for duplicates
from utils.file_utils import auto_cleanup_tempfile
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
suffix = self.langflow_connector_service._get_file_extension(document.mimetype)
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
# Write content to temp file
with open(tmp_path, 'wb') as f:
f.write(document.content)
# Compute hash and check if already exists
file_hash = hash_id(tmp_path)
# Check if document already exists
opensearch_client = self.langflow_connector_service.session_manager.get_user_opensearch_client(
self.user_id, self.jwt_token
)
if await self.check_document_exists(file_hash, opensearch_client):
file_task.status = TaskStatus.COMPLETED
file_task.result = {"status": "unchanged", "id": file_hash}
file_task.updated_at = time.time()
upload_task.successful_files += 1
return
# Process using Langflow pipeline
result = await self.langflow_connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise
class S3FileProcessor(TaskProcessor):
@ -213,7 +442,7 @@ class S3FileProcessor(TaskProcessor):
):
import boto3
self.document_service = document_service
super().__init__(document_service)
self.bucket = bucket
self.s3_client = s3_client or boto3.client("s3")
self.owner_user_id = owner_user_id
@ -238,34 +467,17 @@ class S3FileProcessor(TaskProcessor):
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
tmp = tempfile.NamedTemporaryFile(delete=False)
from utils.file_utils import auto_cleanup_tempfile
from utils.hash_utils import hash_id
try:
# Download object to temporary file
self.s3_client.download_fileobj(self.bucket, item, tmp)
tmp.flush()
with auto_cleanup_tempfile() as tmp_path:
# Download object to temporary file
with open(tmp_path, 'wb') as tmp_file:
self.s3_client.download_fileobj(self.bucket, item, tmp_file)
loop = asyncio.get_event_loop()
slim_doc = await loop.run_in_executor(
self.document_service.process_pool, process_document_sync, tmp.name
)
opensearch_client = (
self.document_service.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
if exists:
result = {"status": "unchanged", "id": slim_doc["id"]}
else:
texts = [c["text"] for c in slim_doc["chunks"]]
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Compute hash
file_hash = hash_id(tmp_path)
# Get object size
try:
@ -274,54 +486,29 @@ class S3FileProcessor(TaskProcessor):
except Exception:
file_size = 0
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": slim_doc["id"],
"filename": slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": "s3", # S3 uploads
"indexed_time": datetime.datetime.now().isoformat(),
}
# Use consolidated standard processing
result = await self.process_document_standard(
file_path=tmp_path,
file_hash=file_hash,
owner_user_id=self.owner_user_id,
original_filename=item, # Use S3 key as filename
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
file_size=file_size,
connector_type="s3",
)
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if self.owner_user_id is not None:
chunk_doc["owner"] = self.owner_user_id
if self.owner_name is not None:
chunk_doc["owner_name"] = self.owner_name
if self.owner_email is not None:
chunk_doc["owner_email"] = self.owner_email
chunk_id = f"{slim_doc['id']}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for S3 chunk",
chunk_id=chunk_id,
error=str(e),
chunk_doc=chunk_doc,
)
raise
result = {"status": "indexed", "id": slim_doc["id"]}
result["path"] = f"s3://{self.bucket}/{item}"
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
result["path"] = f"s3://{self.bucket}/{item}"
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
except Exception as e:
file_task.status = TaskStatus.FAILED
file_task.error = str(e)
upload_task.failed_files += 1
finally:
tmp.close()
os.remove(tmp.name)
file_task.updated_at = time.time()
@ -341,6 +528,7 @@ class LangflowFileProcessor(TaskProcessor):
settings: dict = None,
delete_after_ingest: bool = True,
):
super().__init__()
self.langflow_file_service = langflow_file_service
self.session_manager = session_manager
self.owner_user_id = owner_user_id
@ -366,7 +554,22 @@ class LangflowFileProcessor(TaskProcessor):
file_task.updated_at = time.time()
try:
# Read file content
# Compute hash and check if already exists
from utils.hash_utils import hash_id
file_hash = hash_id(item)
# Check if document already exists
opensearch_client = self.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
if await self.check_document_exists(file_hash, opensearch_client):
file_task.status = TaskStatus.COMPLETED
file_task.result = {"status": "unchanged", "id": file_hash}
file_task.updated_at = time.time()
upload_task.successful_files += 1
return
# Read file content for processing
with open(item, 'rb') as f:
content = f.read()

View file

@ -112,98 +112,6 @@ class DocumentService:
return False
return False
async def process_file_common(
self,
file_path: str,
file_hash: str = None,
owner_user_id: str = None,
original_filename: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
file_size: int = None,
connector_type: str = "local",
):
"""
Common processing logic for both upload and upload_path.
1. Optionally compute SHA256 hash if not provided.
2. Convert with docling and extract relevant content.
3. Add embeddings.
4. Index into OpenSearch.
"""
if file_hash is None:
sha256 = hashlib.sha256()
async with aiofiles.open(file_path, "rb") as f:
while True:
chunk = await f.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
file_hash = sha256.hexdigest()
# Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=file_hash)
if exists:
return {"status": "unchanged", "id": file_hash}
# convert and extract
result = clients.converter.convert(file_path)
full_doc = result.document.export_to_dict()
slim_doc = extract_relevant(full_doc)
texts = [c["text"] for c in slim_doc["chunks"]]
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Index each chunk as a separate document
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": file_hash,
"filename": original_filename
if original_filename
else slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": connector_type,
"indexed_time": datetime.datetime.now().isoformat(),
}
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if owner_user_id is not None:
chunk_doc["owner"] = owner_user_id
if owner_name is not None:
chunk_doc["owner_name"] = owner_name
if owner_email is not None:
chunk_doc["owner_email"] = owner_email
chunk_id = f"{file_hash}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for chunk",
chunk_id=chunk_id,
error=str(e),
)
logger.error("Chunk document details", chunk_doc=chunk_doc)
raise
return {"status": "indexed", "id": file_hash}
async def process_upload_file(
self,
@ -214,20 +122,22 @@ class DocumentService:
owner_email: str = None,
):
"""Process an uploaded file from form data"""
sha256 = hashlib.sha256()
tmp = tempfile.NamedTemporaryFile(delete=False)
file_size = 0
try:
while True:
chunk = await upload_file.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
tmp.write(chunk)
file_size += len(chunk)
tmp.flush()
from utils.hash_utils import hash_id
from utils.file_utils import auto_cleanup_tempfile
import os
file_hash = sha256.hexdigest()
with auto_cleanup_tempfile() as tmp_path:
# Stream upload file to temporary file
file_size = 0
with open(tmp_path, 'wb') as tmp_file:
while True:
chunk = await upload_file.read(1 << 20)
if not chunk:
break
tmp_file.write(chunk)
file_size += len(chunk)
file_hash = hash_id(tmp_path)
# Get user's OpenSearch client with JWT for OIDC auth
opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
@ -243,22 +153,22 @@ class DocumentService:
if exists:
return {"status": "unchanged", "id": file_hash}
result = await self.process_file_common(
tmp.name,
file_hash,
# Use consolidated standard processing
from models.processors import TaskProcessor
processor = TaskProcessor(document_service=self)
result = await processor.process_document_standard(
file_path=tmp_path,
file_hash=file_hash,
owner_user_id=owner_user_id,
original_filename=upload_file.filename,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
file_size=file_size,
connector_type="local",
)
return result
finally:
tmp.close()
os.remove(tmp.name)
async def process_upload_context(self, upload_file, filename: str = None):
"""Process uploaded file and return content for context"""
import io
@ -294,145 +204,3 @@ class DocumentService:
"pages": len(slim_doc["chunks"]),
"content_length": len(full_content),
}
async def process_single_file_task(
self,
upload_task,
file_path: str,
owner_user_id: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
connector_type: str = "local",
):
"""Process a single file and update task tracking - used by task service"""
from models.tasks import TaskStatus
import time
import asyncio
file_task = upload_task.file_tasks[file_path]
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
try:
# Handle regular file processing
loop = asyncio.get_event_loop()
# Run CPU-intensive docling processing in separate process
slim_doc = await loop.run_in_executor(
self.process_pool, process_document_sync, file_path
)
# Check if already indexed
opensearch_client = self.session_manager.get_user_opensearch_client(
owner_user_id, jwt_token
)
exists = await opensearch_client.exists(index=INDEX_NAME, id=slim_doc["id"])
if exists:
result = {"status": "unchanged", "id": slim_doc["id"]}
else:
# Generate embeddings and index (I/O bound, keep in main process)
texts = [c["text"] for c in slim_doc["chunks"]]
# Split into batches to avoid token limits (8191 limit, use 8000 with buffer)
text_batches = chunk_texts_for_embeddings(texts, max_tokens=8000)
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
model=EMBED_MODEL, input=batch
)
embeddings.extend([d.embedding for d in resp.data])
# Get file size
file_size = 0
try:
file_size = os.path.getsize(file_path)
except OSError:
pass # Keep file_size as 0 if can't get size
# Index each chunk
for i, (chunk, vect) in enumerate(zip(slim_doc["chunks"], embeddings)):
chunk_doc = {
"document_id": slim_doc["id"],
"filename": slim_doc["filename"],
"mimetype": slim_doc["mimetype"],
"page": chunk["page"],
"text": chunk["text"],
"chunk_embedding": vect,
"file_size": file_size,
"connector_type": connector_type,
"indexed_time": datetime.datetime.now().isoformat(),
}
# Only set owner fields if owner_user_id is provided (for no-auth mode support)
if owner_user_id is not None:
chunk_doc["owner"] = owner_user_id
if owner_name is not None:
chunk_doc["owner_name"] = owner_name
if owner_email is not None:
chunk_doc["owner_email"] = owner_email
chunk_id = f"{slim_doc['id']}_{i}"
try:
await opensearch_client.index(
index=INDEX_NAME, id=chunk_id, body=chunk_doc
)
except Exception as e:
logger.error(
"OpenSearch indexing failed for batch chunk",
chunk_id=chunk_id,
error=str(e),
)
logger.error("Chunk document details", chunk_doc=chunk_doc)
raise
result = {"status": "indexed", "id": slim_doc["id"]}
result["path"] = file_path
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
except Exception as e:
import traceback
from concurrent.futures import BrokenExecutor
if isinstance(e, BrokenExecutor):
logger.error(
"Process pool broken while processing file", file_path=file_path
)
logger.info("Worker process likely crashed")
logger.info(
"You should see detailed crash logs above from the worker process"
)
# Mark pool as broken for potential recreation
self._process_pool_broken = True
# Attempt to recreate the pool for future operations
if self._recreate_process_pool():
logger.info("Process pool successfully recreated")
else:
logger.warning(
"Failed to recreate process pool - future operations may fail"
)
file_task.error = f"Worker process crashed: {str(e)}"
else:
logger.error(
"Failed to process file", file_path=file_path, error=str(e)
)
file_task.error = str(e)
logger.error("Full traceback available")
traceback.print_exc()
file_task.status = TaskStatus.FAILED
upload_task.failed_files += 1
finally:
file_task.updated_at = time.time()
upload_task.processed_files += 1
upload_task.updated_at = time.time()
if upload_task.processed_files >= upload_task.total_files:
upload_task.status = TaskStatus.COMPLETED

View file

@ -130,10 +130,21 @@ class TaskService:
async def process_with_semaphore(file_path: str):
async with semaphore:
await self.document_service.process_single_file_task(
upload_task, file_path
from models.processors import DocumentFileProcessor
file_task = upload_task.file_tasks[file_path]
# Create processor with user context (all None for background processing)
processor = DocumentFileProcessor(
document_service=self.document_service,
owner_user_id=None,
jwt_token=None,
owner_name=None,
owner_email=None,
)
# Process the file
await processor.process_item(upload_task, file_path, file_task)
tasks = [
process_with_semaphore(file_path)
for file_path in upload_task.file_tasks.keys()
@ -141,6 +152,11 @@ class TaskService:
await asyncio.gather(*tasks, return_exceptions=True)
# Check if task is complete
if upload_task.processed_files >= upload_task.total_files:
upload_task.status = TaskStatus.COMPLETED
upload_task.updated_at = time.time()
except Exception as e:
logger.error(
"Background upload processor failed", task_id=task_id, error=str(e)
@ -336,7 +352,7 @@ class TaskService:
tasks.sort(key=lambda x: x["created_at"], reverse=True)
return tasks
def cancel_task(self, user_id: str, task_id: str) -> bool:
async def cancel_task(self, user_id: str, task_id: str) -> bool:
"""Cancel a task if it exists and is not already completed.
Supports cancellation of shared default tasks stored under the anonymous user.
@ -368,18 +384,28 @@ class TaskService:
and not upload_task.background_task.done()
):
upload_task.background_task.cancel()
# Wait for the background task to actually stop to avoid race conditions
try:
await upload_task.background_task
except asyncio.CancelledError:
pass # Expected when we cancel the task
except Exception:
pass # Ignore other errors during cancellation
# Mark task as failed (cancelled)
upload_task.status = TaskStatus.FAILED
upload_task.updated_at = time.time()
# Mark all pending file tasks as failed
# Mark all pending and running file tasks as failed
for file_task in upload_task.file_tasks.values():
if file_task.status == TaskStatus.PENDING:
if file_task.status in [TaskStatus.PENDING, TaskStatus.RUNNING]:
# Increment failed_files counter for both pending and running
# (running files haven't been counted yet in either counter)
upload_task.failed_files += 1
file_task.status = TaskStatus.FAILED
file_task.error = "Task cancelled by user"
file_task.updated_at = time.time()
upload_task.failed_files += 1
return True

View file

@ -229,15 +229,9 @@ def process_document_sync(file_path: str):
# Compute file hash
try:
from utils.hash_utils import hash_id
logger.info("Computing file hash", worker_pid=os.getpid())
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
while True:
chunk = f.read(1 << 20)
if not chunk:
break
sha256.update(chunk)
file_hash = sha256.hexdigest()
file_hash = hash_id(file_path)
logger.info(
"File hash computed",
worker_pid=os.getpid(),

60
src/utils/file_utils.py Normal file
View file

@ -0,0 +1,60 @@
"""File handling utilities for OpenRAG"""
import os
import tempfile
from contextlib import contextmanager
from typing import Optional
@contextmanager
def auto_cleanup_tempfile(suffix: Optional[str] = None, prefix: Optional[str] = None, dir: Optional[str] = None):
"""
Context manager for temporary files that automatically cleans up.
Unlike tempfile.NamedTemporaryFile with delete=True, this keeps the file
on disk for the duration of the context, making it safe for async operations.
Usage:
with auto_cleanup_tempfile(suffix=".pdf") as tmp_path:
# Write to the file
with open(tmp_path, 'wb') as f:
f.write(content)
# Use tmp_path for processing
result = await process_file(tmp_path)
# File is automatically deleted here
Args:
suffix: Optional file suffix/extension (e.g., ".pdf")
prefix: Optional file prefix
dir: Optional directory for temp file
Yields:
str: Path to the temporary file
"""
fd, path = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
try:
os.close(fd) # Close the file descriptor immediately
yield path
finally:
# Always clean up, even if an exception occurred
try:
if os.path.exists(path):
os.unlink(path)
except Exception:
# Silently ignore cleanup errors
pass
def safe_unlink(path: str) -> None:
"""
Safely delete a file, ignoring errors if it doesn't exist.
Args:
path: Path to the file to delete
"""
try:
if path and os.path.exists(path):
os.unlink(path)
except Exception:
# Silently ignore errors
pass

76
src/utils/hash_utils.py Normal file
View file

@ -0,0 +1,76 @@
import io
import os
import base64
import hashlib
from typing import BinaryIO, Optional, Union
def _b64url(data: bytes) -> str:
"""URL-safe base64 without padding"""
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8")
def stream_hash(
source: Union[str, os.PathLike, BinaryIO],
*,
algo: str = "sha256",
include_filename: Optional[str] = None,
chunk_size: int = 1024 * 1024, # 1 MiB
) -> bytes:
"""
Memory-safe, incremental hash of a file path or binary stream.
- source: path or file-like object with .read()
- algo: hashlib algorithm name ('sha256', 'blake2b', 'sha3_256', etc.)
- include_filename: if provided, the UTF-8 bytes of this string are prepended
- chunk_size: read size per iteration
Returns: raw digest bytes
"""
try:
h = hashlib.new(algo)
except ValueError as e:
raise ValueError(f"Unsupported hash algorithm: {algo}") from e
def _update_from_file(f: BinaryIO):
if include_filename:
h.update(include_filename.encode("utf-8"))
for chunk in iter(lambda: f.read(chunk_size), b""):
h.update(chunk)
if isinstance(source, (str, os.PathLike)):
with open(source, "rb", buffering=io.DEFAULT_BUFFER_SIZE) as f:
_update_from_file(f)
else:
f = source
# Preserve position if seekable
pos = None
try:
if f.seekable():
pos = f.tell()
f.seek(0)
except Exception:
pos = None
try:
_update_from_file(f)
finally:
if pos is not None:
try:
f.seek(pos)
except Exception:
pass
return h.digest()
def hash_id(
source: Union[str, os.PathLike, BinaryIO],
*,
algo: str = "sha256",
include_filename: Optional[str] = None,
length: int = 24, # characters of base64url (set 0 or None for full)
) -> str:
"""
Deterministic, URL-safe base64 digest (no prefix).
"""
b = stream_hash(source, algo=algo, include_filename=include_filename)
s = _b64url(b)
return s[:length] if length else s

4
uv.lock generated
View file

@ -1,5 +1,5 @@
version = 1
revision = 3
revision = 2
requires-python = ">=3.13"
resolution-markers = [
"sys_platform == 'darwin'",
@ -2282,7 +2282,7 @@ wheels = [
[[package]]
name = "openrag"
version = "0.1.13"
version = "0.1.14.dev1"
source = { editable = "." }
dependencies = [
{ name = "agentd" },