diff --git a/flows/ingestion_flow.json b/flows/ingestion_flow.json index 989368d4..2240b3d0 100644 --- a/flows/ingestion_flow.json +++ b/flows/ingestion_flow.json @@ -667,7 +667,7 @@ ], "frozen": false, "icon": "braces", - "last_updated": "2025-12-03T21:41:00.148Z", + "last_updated": "2025-12-12T20:12:18.129Z", "legacy": false, "lf_version": "1.7.0.dev21", "metadata": {}, @@ -717,7 +717,7 @@ "value": "5488df7c-b93f-4f87-a446-b67028bc0813" }, "_frontend_node_folder_id": { - "value": "79455c62-cdb1-4f14-bf44-8e76acc020a6" + "value": "75fd27c1-8f4b-46a1-88bb-a8a8e72719e3" }, "_type": "Component", "code": { @@ -1399,7 +1399,7 @@ "description": "Uses Docling to process input documents connecting to your instance of Docling Serve.", "display_name": "Docling Serve", "documentation": "https://docling-project.github.io/docling/", - "edited": false, + "edited": true, "field_order": [ "path", "file_path", @@ -1417,9 +1417,8 @@ "frozen": false, "icon": "Docling", "legacy": false, - "lf_version": "1.7.0.dev21", "metadata": { - "code_hash": "26eeb513dded", + "code_hash": "5723576d00e5", "dependencies": { "dependencies": [ { @@ -1428,20 +1427,20 @@ }, { "name": "docling_core", - "version": "2.48.4" + "version": "2.49.0" }, { "name": "pydantic", - "version": "2.10.6" + "version": "2.11.10" }, { "name": "lfx", - "version": "0.1.12.dev31" + "version": "0.2.0.dev21" } ], "total_dependencies": 4 }, - "module": "lfx.components.docling.docling_remote.DoclingRemoteComponent" + "module": "custom_components.docling_serve" }, "minimized": false, "output_types": [], @@ -1451,8 +1450,12 @@ "cache": true, "display_name": "Files", "group_outputs": false, + "hidden": null, + "loop_types": null, "method": "load_files", "name": "dataframe", + "options": null, + "required_inputs": null, "selected": "DataFrame", "tool_mode": true, "types": [ @@ -1473,6 +1476,7 @@ "list": false, "list_add_label": "Add More", "name": "api_headers", + "override_skip": false, "placeholder": "", "required": false, "show": true, @@ -1480,6 +1484,7 @@ "tool_mode": false, "trace_as_input": true, "trace_as_metadata": true, + "track_in_telemetry": false, "type": "NestedDict", "value": {} }, @@ -1493,12 +1498,14 @@ "list_add_label": "Add More", "load_from_db": false, "name": "api_url", + "override_skip": false, "placeholder": "", "required": true, "show": true, "title_case": false, "tool_mode": false, "trace_as_metadata": true, + "track_in_telemetry": false, "type": "str", "value": "http://localhost:5001" }, @@ -1518,7 +1525,7 @@ "show": true, "title_case": false, "type": "code", - "value": "import base64\nimport time\nfrom concurrent.futures import Future, ThreadPoolExecutor\nfrom pathlib import Path\nfrom typing import Any\n\nimport httpx\nfrom docling_core.types.doc import DoclingDocument\nfrom pydantic import ValidationError\n\nfrom lfx.base.data import BaseFileComponent\nfrom lfx.inputs import IntInput, NestedDictInput, StrInput\nfrom lfx.inputs.inputs import FloatInput\nfrom lfx.schema import Data\nfrom lfx.utils.util import transform_localhost_url\n\n\nclass DoclingRemoteComponent(BaseFileComponent):\n display_name = \"Docling Serve\"\n description = \"Uses Docling to process input documents connecting to your instance of Docling Serve.\"\n documentation = \"https://docling-project.github.io/docling/\"\n trace_type = \"tool\"\n icon = \"Docling\"\n name = \"DoclingRemote\"\n\n MAX_500_RETRIES = 5\n\n # https://docling-project.github.io/docling/usage/supported_formats/\n VALID_EXTENSIONS = [\n \"adoc\",\n \"asciidoc\",\n \"asc\",\n \"bmp\",\n \"csv\",\n \"dotx\",\n \"dotm\",\n \"docm\",\n \"docx\",\n \"htm\",\n \"html\",\n \"jpeg\",\n \"json\",\n \"md\",\n \"pdf\",\n \"png\",\n \"potx\",\n \"ppsx\",\n \"pptm\",\n \"potm\",\n \"ppsm\",\n \"pptx\",\n \"tiff\",\n \"txt\",\n \"xls\",\n \"xlsx\",\n \"xhtml\",\n \"xml\",\n \"webp\",\n ]\n\n inputs = [\n *BaseFileComponent.get_base_inputs(),\n StrInput(\n name=\"api_url\",\n display_name=\"Server address\",\n info=\"URL of the Docling Serve instance.\",\n required=True,\n ),\n IntInput(\n name=\"max_concurrency\",\n display_name=\"Concurrency\",\n info=\"Maximum number of concurrent requests for the server.\",\n advanced=True,\n value=2,\n ),\n FloatInput(\n name=\"max_poll_timeout\",\n display_name=\"Maximum poll time\",\n info=\"Maximum waiting time for the document conversion to complete.\",\n advanced=True,\n value=3600,\n ),\n NestedDictInput(\n name=\"api_headers\",\n display_name=\"HTTP headers\",\n advanced=True,\n required=False,\n info=(\"Optional dictionary of additional headers required for connecting to Docling Serve.\"),\n ),\n NestedDictInput(\n name=\"docling_serve_opts\",\n display_name=\"Docling options\",\n advanced=True,\n required=False,\n info=(\n \"Optional dictionary of additional options. \"\n \"See https://github.com/docling-project/docling-serve/blob/main/docs/usage.md for more information.\"\n ),\n ),\n ]\n\n outputs = [\n *BaseFileComponent.get_base_outputs(),\n ]\n\n def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:\n # Transform localhost URLs to container-accessible hosts when running in a container\n transformed_url = transform_localhost_url(self.api_url)\n base_url = f\"{transformed_url}/v1\"\n\n def _convert_document(client: httpx.Client, file_path: Path, options: dict[str, Any]) -> Data | None:\n encoded_doc = base64.b64encode(file_path.read_bytes()).decode()\n payload = {\n \"options\": options,\n \"sources\": [{\"kind\": \"file\", \"base64_string\": encoded_doc, \"filename\": file_path.name}],\n }\n\n response = client.post(f\"{base_url}/convert/source/async\", json=payload)\n response.raise_for_status()\n task = response.json()\n\n http_failures = 0\n retry_status_start = 500\n retry_status_end = 600\n start_wait_time = time.monotonic()\n while task[\"task_status\"] not in (\"success\", \"failure\"):\n # Check if processing exceeds the maximum poll timeout\n processing_time = time.monotonic() - start_wait_time\n if processing_time >= self.max_poll_timeout:\n msg = (\n f\"Processing time {processing_time=} exceeds the maximum poll timeout {self.max_poll_timeout=}.\"\n \"Please increase the max_poll_timeout parameter or review why the processing \"\n \"takes long on the server.\"\n )\n self.log(msg)\n raise RuntimeError(msg)\n\n # Call for a new status update\n time.sleep(2)\n response = client.get(f\"{base_url}/status/poll/{task['task_id']}\")\n\n # Check if the status call gets into 5xx errors and retry\n if retry_status_start <= response.status_code < retry_status_end:\n http_failures += 1\n if http_failures > self.MAX_500_RETRIES:\n self.log(f\"The status requests got a http response {response.status_code} too many times.\")\n return None\n continue\n\n # Update task status\n task = response.json()\n\n result_resp = client.get(f\"{base_url}/result/{task['task_id']}\")\n result_resp.raise_for_status()\n result = result_resp.json()\n\n if \"json_content\" not in result[\"document\"] or result[\"document\"][\"json_content\"] is None:\n self.log(\"No JSON DoclingDocument found in the result.\")\n return None\n\n try:\n doc = DoclingDocument.model_validate(result[\"document\"][\"json_content\"])\n return Data(data={\"doc\": doc, \"file_path\": str(file_path)})\n except ValidationError as e:\n self.log(f\"Error validating the document. {e}\")\n return None\n\n docling_options = {\n \"to_formats\": [\"json\"],\n \"image_export_mode\": \"placeholder\",\n **(self.docling_serve_opts or {}),\n }\n\n processed_data: list[Data | None] = []\n with (\n httpx.Client(headers=self.api_headers) as client,\n ThreadPoolExecutor(max_workers=self.max_concurrency) as executor,\n ):\n futures: list[tuple[int, Future]] = []\n for i, file in enumerate(file_list):\n if file.path is None:\n processed_data.append(None)\n continue\n\n futures.append((i, executor.submit(_convert_document, client, file.path, docling_options)))\n\n for _index, future in futures:\n try:\n result_data = future.result()\n processed_data.append(result_data)\n except (httpx.HTTPStatusError, httpx.RequestError, KeyError, ValueError) as exc:\n self.log(f\"Docling remote processing failed: {exc}\")\n raise\n\n return self.rollup_data(file_list, processed_data)\n" + "value": "import base64\nimport time\nfrom concurrent.futures import Future, ThreadPoolExecutor\nfrom pathlib import Path\nfrom typing import Any\n\nimport httpx\nfrom docling_core.types.doc import DoclingDocument\nfrom pydantic import ValidationError\n\nfrom lfx.base.data import BaseFileComponent\nfrom lfx.inputs import IntInput, NestedDictInput, StrInput\nfrom lfx.inputs.inputs import FloatInput\nfrom lfx.schema import Data\nfrom lfx.utils.util import transform_localhost_url\n\n\nclass DoclingRemoteComponent(BaseFileComponent):\n display_name = \"Docling Serve\"\n description = \"Uses Docling to process input documents connecting to your instance of Docling Serve.\"\n documentation = \"https://docling-project.github.io/docling/\"\n trace_type = \"tool\"\n icon = \"Docling\"\n name = \"DoclingRemote\"\n\n MAX_500_RETRIES = 5\n\n # https://docling-project.github.io/docling/usage/supported_formats/\n VALID_EXTENSIONS = [\n \"adoc\",\n \"asciidoc\",\n \"asc\",\n \"bmp\",\n \"csv\",\n \"dotx\",\n \"dotm\",\n \"docm\",\n \"docx\",\n \"htm\",\n \"html\",\n \"jpeg\",\n \"jpg\",\n \"json\",\n \"md\",\n \"pdf\",\n \"png\",\n \"potx\",\n \"ppsx\",\n \"pptm\",\n \"potm\",\n \"ppsm\",\n \"pptx\",\n \"tiff\",\n \"txt\",\n \"xls\",\n \"xlsx\",\n \"xhtml\",\n \"xml\",\n \"webp\",\n ]\n\n inputs = [\n *BaseFileComponent.get_base_inputs(),\n StrInput(\n name=\"api_url\",\n display_name=\"Server address\",\n info=\"URL of the Docling Serve instance.\",\n required=True,\n ),\n IntInput(\n name=\"max_concurrency\",\n display_name=\"Concurrency\",\n info=\"Maximum number of concurrent requests for the server.\",\n advanced=True,\n value=2,\n ),\n FloatInput(\n name=\"max_poll_timeout\",\n display_name=\"Maximum poll time\",\n info=\"Maximum waiting time for the document conversion to complete.\",\n advanced=True,\n value=3600,\n ),\n NestedDictInput(\n name=\"api_headers\",\n display_name=\"HTTP headers\",\n advanced=True,\n required=False,\n info=(\"Optional dictionary of additional headers required for connecting to Docling Serve.\"),\n ),\n NestedDictInput(\n name=\"docling_serve_opts\",\n display_name=\"Docling options\",\n advanced=True,\n required=False,\n info=(\n \"Optional dictionary of additional options. \"\n \"See https://github.com/docling-project/docling-serve/blob/main/docs/usage.md for more information.\"\n ),\n ),\n ]\n\n outputs = [\n *BaseFileComponent.get_base_outputs(),\n ]\n\n def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:\n # Transform localhost URLs to container-accessible hosts when running in a container\n transformed_url = transform_localhost_url(self.api_url)\n base_url = f\"{transformed_url}/v1\"\n\n def _convert_document(client: httpx.Client, file_path: Path, options: dict[str, Any]) -> Data | None:\n encoded_doc = base64.b64encode(file_path.read_bytes()).decode()\n payload = {\n \"options\": options,\n \"sources\": [{\"kind\": \"file\", \"base64_string\": encoded_doc, \"filename\": file_path.name}],\n }\n\n response = client.post(f\"{base_url}/convert/source/async\", json=payload)\n response.raise_for_status()\n task = response.json()\n\n http_failures = 0\n retry_status_start = 500\n retry_status_end = 600\n start_wait_time = time.monotonic()\n while task[\"task_status\"] not in (\"success\", \"failure\"):\n # Check if processing exceeds the maximum poll timeout\n processing_time = time.monotonic() - start_wait_time\n if processing_time >= self.max_poll_timeout:\n msg = (\n f\"Processing time {processing_time=} exceeds the maximum poll timeout {self.max_poll_timeout=}.\"\n \"Please increase the max_poll_timeout parameter or review why the processing \"\n \"takes long on the server.\"\n )\n self.log(msg)\n raise RuntimeError(msg)\n\n # Call for a new status update\n time.sleep(2)\n response = client.get(f\"{base_url}/status/poll/{task['task_id']}\")\n\n # Check if the status call gets into 5xx errors and retry\n if retry_status_start <= response.status_code < retry_status_end:\n http_failures += 1\n if http_failures > self.MAX_500_RETRIES:\n self.log(f\"The status requests got a http response {response.status_code} too many times.\")\n return None\n continue\n\n # Update task status\n task = response.json()\n\n result_resp = client.get(f\"{base_url}/result/{task['task_id']}\")\n result_resp.raise_for_status()\n result = result_resp.json()\n\n if \"json_content\" not in result[\"document\"] or result[\"document\"][\"json_content\"] is None:\n self.log(\"No JSON DoclingDocument found in the result.\")\n return None\n\n try:\n doc = DoclingDocument.model_validate(result[\"document\"][\"json_content\"])\n return Data(data={\"doc\": doc, \"file_path\": str(file_path)})\n except ValidationError as e:\n self.log(f\"Error validating the document. {e}\")\n return None\n\n docling_options = {\n \"to_formats\": [\"json\"],\n \"image_export_mode\": \"placeholder\",\n **(self.docling_serve_opts or {}),\n }\n\n processed_data: list[Data | None] = []\n with (\n httpx.Client(headers=self.api_headers) as client,\n ThreadPoolExecutor(max_workers=self.max_concurrency) as executor,\n ):\n futures: list[tuple[int, Future]] = []\n for i, file in enumerate(file_list):\n if file.path is None:\n processed_data.append(None)\n continue\n\n futures.append((i, executor.submit(_convert_document, client, file.path, docling_options)))\n\n for _index, future in futures:\n try:\n result_data = future.result()\n processed_data.append(result_data)\n except (httpx.HTTPStatusError, httpx.RequestError, KeyError, ValueError) as exc:\n self.log(f\"Docling remote processing failed: {exc}\")\n raise\n\n return self.rollup_data(file_list, processed_data)\n" }, "delete_server_file_after_processing": { "_input_type": "BoolInput", @@ -1529,24 +1536,28 @@ "list": false, "list_add_label": "Add More", "name": "delete_server_file_after_processing", + "override_skip": false, "placeholder": "", "required": false, "show": true, "title_case": false, "tool_mode": false, "trace_as_metadata": true, + "track_in_telemetry": true, "type": "bool", "value": true }, "docling_serve_opts": { "_input_type": "NestedDictInput", - "advanced": false, + "advanced": true, "display_name": "Docling options", "dynamic": false, "info": "Optional dictionary of additional options. See https://github.com/docling-project/docling-serve/blob/main/docs/usage.md for more information.", "list": false, "list_add_label": "Add More", + "load_from_db": false, "name": "docling_serve_opts", + "override_skip": false, "placeholder": "", "required": false, "show": true, @@ -1554,6 +1565,7 @@ "tool_mode": false, "trace_as_input": true, "trace_as_metadata": true, + "track_in_telemetry": false, "type": "NestedDict", "value": { "do_ocr": false, @@ -1580,11 +1592,13 @@ "list": true, "list_add_label": "Add More", "name": "file_path", + "override_skip": false, "placeholder": "", "required": false, "show": true, "title_case": false, "trace_as_metadata": true, + "track_in_telemetry": false, "type": "other", "value": "" }, @@ -1597,12 +1611,14 @@ "list": false, "list_add_label": "Add More", "name": "ignore_unspecified_files", + "override_skip": false, "placeholder": "", "required": false, "show": true, "title_case": false, "tool_mode": false, "trace_as_metadata": true, + "track_in_telemetry": true, "type": "bool", "value": false }, @@ -1615,30 +1631,34 @@ "list": false, "list_add_label": "Add More", "name": "ignore_unsupported_extensions", + "override_skip": false, "placeholder": "", "required": false, "show": true, "title_case": false, "tool_mode": false, "trace_as_metadata": true, + "track_in_telemetry": true, "type": "bool", "value": true }, "max_concurrency": { "_input_type": "IntInput", - "advanced": false, + "advanced": true, "display_name": "Concurrency", "dynamic": false, "info": "Maximum number of concurrent requests for the server.", "list": false, "list_add_label": "Add More", "name": "max_concurrency", + "override_skip": false, "placeholder": "", "required": false, "show": true, "title_case": false, "tool_mode": false, "trace_as_metadata": true, + "track_in_telemetry": true, "type": "int", "value": 2 }, @@ -1651,12 +1671,14 @@ "list": false, "list_add_label": "Add More", "name": "max_poll_timeout", + "override_skip": false, "placeholder": "", "required": false, "show": true, "title_case": false, "tool_mode": false, "trace_as_metadata": true, + "track_in_telemetry": true, "type": "float", "value": 3600 }, @@ -1678,6 +1700,7 @@ "htm", "html", "jpeg", + "jpg", "json", "md", "pdf", @@ -1702,16 +1725,19 @@ "gz" ], "file_path": [], - "info": "Supported file extensions: adoc, asciidoc, asc, bmp, csv, dotx, dotm, docm, docx, htm, html, jpeg, json, md, pdf, png, potx, ppsx, pptm, potm, ppsm, pptx, tiff, txt, xls, xlsx, xhtml, xml, webp; optionally bundled in file extensions: zip, tar, tgz, bz2, gz", + "info": "Supported file extensions: adoc, asciidoc, asc, bmp, csv, dotx, dotm, docm, docx, htm, html, jpeg, jpg, json, md, pdf, png, potx, ppsx, pptm, potm, ppsm, pptx, tiff, txt, xls, xlsx, xhtml, xml, webp; optionally bundled in file extensions: zip, tar, tgz, bz2, gz", "list": true, "list_add_label": "Add More", "name": "path", + "override_skip": false, "placeholder": "", "required": false, "show": true, "temp_file": false, "title_case": false, + "tool_mode": true, "trace_as_metadata": true, + "track_in_telemetry": false, "type": "file", "value": "" }, @@ -1725,12 +1751,14 @@ "list_add_label": "Add More", "load_from_db": false, "name": "separator", + "override_skip": false, "placeholder": "", "required": false, "show": true, "title_case": false, "tool_mode": false, "trace_as_metadata": true, + "track_in_telemetry": false, "type": "str", "value": "\n\n" }, @@ -1743,12 +1771,14 @@ "list": false, "list_add_label": "Add More", "name": "silent_errors", + "override_skip": false, "placeholder": "", "required": false, "show": true, "title_case": false, "tool_mode": false, "trace_as_metadata": true, + "track_in_telemetry": true, "type": "bool", "value": false } @@ -1761,7 +1791,7 @@ "dragging": false, "id": "DoclingRemote-Dp3PX", "measured": { - "height": 475, + "height": 312, "width": 320 }, "position": { @@ -2060,7 +2090,7 @@ ], "frozen": false, "icon": "table", - "last_updated": "2025-12-03T21:41:00.319Z", + "last_updated": "2025-12-12T20:12:18.208Z", "legacy": false, "lf_version": "1.7.0.dev21", "metadata": { @@ -2107,7 +2137,7 @@ "value": "5488df7c-b93f-4f87-a446-b67028bc0813" }, "_frontend_node_folder_id": { - "value": "79455c62-cdb1-4f14-bf44-8e76acc020a6" + "value": "75fd27c1-8f4b-46a1-88bb-a8a8e72719e3" }, "_type": "Component", "ascending": { @@ -2511,7 +2541,7 @@ ], "frozen": false, "icon": "table", - "last_updated": "2025-12-03T21:41:00.320Z", + "last_updated": "2025-12-12T20:12:18.209Z", "legacy": false, "lf_version": "1.7.0.dev21", "metadata": { @@ -2558,7 +2588,7 @@ "value": "5488df7c-b93f-4f87-a446-b67028bc0813" }, "_frontend_node_folder_id": { - "value": "79455c62-cdb1-4f14-bf44-8e76acc020a6" + "value": "75fd27c1-8f4b-46a1-88bb-a8a8e72719e3" }, "_type": "Component", "ascending": { @@ -2962,7 +2992,7 @@ ], "frozen": false, "icon": "table", - "last_updated": "2025-12-03T21:41:00.320Z", + "last_updated": "2025-12-12T20:12:18.209Z", "legacy": false, "lf_version": "1.7.0.dev21", "metadata": { @@ -3009,7 +3039,7 @@ "value": "5488df7c-b93f-4f87-a446-b67028bc0813" }, "_frontend_node_folder_id": { - "value": "79455c62-cdb1-4f14-bf44-8e76acc020a6" + "value": "75fd27c1-8f4b-46a1-88bb-a8a8e72719e3" }, "_type": "Component", "ascending": { @@ -4126,7 +4156,7 @@ "x": 2261.865622928042, "y": 1349.2821108833643 }, - "selected": true, + "selected": false, "type": "genericNode" }, { @@ -4163,7 +4193,7 @@ ], "frozen": false, "icon": "binary", - "last_updated": "2025-12-03T21:41:00.158Z", + "last_updated": "2025-12-12T20:12:18.131Z", "legacy": false, "lf_version": "1.7.0.dev21", "metadata": { @@ -4231,7 +4261,7 @@ "value": "5488df7c-b93f-4f87-a446-b67028bc0813" }, "_frontend_node_folder_id": { - "value": "79455c62-cdb1-4f14-bf44-8e76acc020a6" + "value": "75fd27c1-8f4b-46a1-88bb-a8a8e72719e3" }, "_type": "Component", "api_base": { @@ -4688,7 +4718,7 @@ ], "frozen": false, "icon": "binary", - "last_updated": "2025-12-03T21:41:00.159Z", + "last_updated": "2025-12-12T20:12:18.132Z", "legacy": false, "lf_version": "1.7.0.dev21", "metadata": { @@ -4756,7 +4786,7 @@ "value": "5488df7c-b93f-4f87-a446-b67028bc0813" }, "_frontend_node_folder_id": { - "value": "79455c62-cdb1-4f14-bf44-8e76acc020a6" + "value": "75fd27c1-8f4b-46a1-88bb-a8a8e72719e3" }, "_type": "Component", "api_base": { @@ -4969,8 +4999,7 @@ "load_from_db": false, "name": "model", "options": [ - "embeddinggemma:latest", - "mxbai-embed-large:latest", + "all-minilm:latest", "nomic-embed-text:latest" ], "options_metadata": [], @@ -4986,7 +5015,7 @@ "trace_as_metadata": true, "track_in_telemetry": true, "type": "str", - "value": "embeddinggemma:latest" + "value": "all-minilm:latest" }, "model_kwargs": { "_input_type": "DictInput", @@ -5215,7 +5244,7 @@ ], "frozen": false, "icon": "binary", - "last_updated": "2025-12-03T21:41:00.159Z", + "last_updated": "2025-12-12T20:12:18.133Z", "legacy": false, "lf_version": "1.7.0.dev21", "metadata": { @@ -5283,7 +5312,7 @@ "value": "5488df7c-b93f-4f87-a446-b67028bc0813" }, "_frontend_node_folder_id": { - "value": "79455c62-cdb1-4f14-bf44-8e76acc020a6" + "value": "75fd27c1-8f4b-46a1-88bb-a8a8e72719e3" }, "_type": "Component", "api_base": { @@ -5708,15 +5737,16 @@ } ], "viewport": { - "x": -848.3573799283768, - "y": -648.7033245837173, - "zoom": 0.6472397864500404 + "x": 249.3666737262397, + "y": -156.8776378758762, + "zoom": 0.38977017930844676 } }, "description": "Load your data for chat context with Retrieval Augmented Generation.", "endpoint_name": null, "id": "5488df7c-b93f-4f87-a446-b67028bc0813", "is_component": false, + "locked": true, "last_tested_version": "1.7.0.dev21", "name": "OpenSearch Ingestion Flow", "tags": [ @@ -5725,4 +5755,4 @@ "rag", "q-a" ] -} +} \ No newline at end of file diff --git a/frontend/hooks/useChatStreaming.ts b/frontend/hooks/useChatStreaming.ts index c67a0ca6..89d0d810 100644 --- a/frontend/hooks/useChatStreaming.ts +++ b/frontend/hooks/useChatStreaming.ts @@ -162,6 +162,19 @@ export function useChatStreaming({ if (line.trim()) { try { const chunk = JSON.parse(line); + + // Investigation logging for Granite 3.3 8b tool call detection + const chunkKeys = Object.keys(chunk); + const toolRelatedKeys = chunkKeys.filter(key => + key.toLowerCase().includes('tool') || + key.toLowerCase().includes('call') || + key.toLowerCase().includes('retrieval') || + key.toLowerCase().includes('function') || + key.toLowerCase().includes('result') + ); + if (toolRelatedKeys.length > 0) { + console.log('[Tool Detection] Found tool-related keys:', toolRelatedKeys, chunk); + } // Extract response ID if present if (chunk.id) { @@ -449,6 +462,42 @@ export function useChatStreaming({ } } } + + // Heuristic detection for implicit tool calls (Granite 3.3 8b workaround) + // Check if chunk contains retrieval results without explicit tool call markers + const hasImplicitToolCall = ( + // Check for various result indicators in the chunk + (chunk.results && Array.isArray(chunk.results) && chunk.results.length > 0) || + (chunk.outputs && Array.isArray(chunk.outputs) && chunk.outputs.length > 0) || + // Check for retrieval-related fields + chunk.retrieved_documents || + chunk.retrieval_results || + // Check for nested data structures that might contain results + (chunk.data && typeof chunk.data === 'object' && ( + chunk.data.results || + chunk.data.retrieved_documents || + chunk.data.retrieval_results + )) + ); + + if (hasImplicitToolCall && currentFunctionCalls.length === 0) { + console.log('[Heuristic Detection] Detected implicit tool call:', chunk); + + // Create a synthetic function call for the UI + const results = chunk.results || chunk.outputs || chunk.retrieved_documents || + chunk.retrieval_results || chunk.data?.results || + chunk.data?.retrieved_documents || []; + + const syntheticFunctionCall: FunctionCall = { + name: "Retrieval", + arguments: { implicit: true, detected_heuristically: true }, + status: "completed", + type: "retrieval_call", + result: results, + }; + currentFunctionCalls.push(syntheticFunctionCall); + console.log('[Heuristic Detection] Created synthetic function call'); + } // Update streaming message in real-time if ( @@ -486,6 +535,29 @@ export function useChatStreaming({ "No response received from the server. Please try again.", ); } + + // Post-processing: Heuristic detection based on final content + // If no explicit tool calls detected but content shows RAG indicators + if (currentFunctionCalls.length === 0 && currentContent) { + // Check for citation patterns that indicate RAG usage + const hasCitations = /\(Source:|\[Source:|\bSource:|filename:|document:/i.test(currentContent); + // Check for common RAG response patterns + const hasRAGPattern = /based on.*(?:document|file|information|data)|according to.*(?:document|file)/i.test(currentContent); + + if (hasCitations || hasRAGPattern) { + console.log('[Post-Processing] Detected RAG usage from content patterns'); + const syntheticFunctionCall: FunctionCall = { + name: "Retrieval", + arguments: { + implicit: true, + detected_from: hasCitations ? "citations" : "content_patterns" + }, + status: "completed", + type: "retrieval_call", + }; + currentFunctionCalls.push(syntheticFunctionCall); + } + } // Finalize the message const finalMessage: Message = { diff --git a/src/agent.py b/src/agent.py index db332dc1..08ca99d4 100644 --- a/src/agent.py +++ b/src/agent.py @@ -135,6 +135,7 @@ async def async_response_stream( full_response = "" chunk_count = 0 + detected_tool_call = False # Track if we've detected a tool call async for chunk in response: chunk_count += 1 logger.debug( @@ -158,6 +159,17 @@ async def async_response_stream( else: delta_text = str(chunk.delta) full_response += delta_text + + # Enhanced logging for tool call detection (Granite 3.3 8b investigation) + chunk_attrs = dir(chunk) if hasattr(chunk, '__dict__') else [] + tool_related_attrs = [attr for attr in chunk_attrs if 'tool' in attr.lower() or 'call' in attr.lower() or 'retrieval' in attr.lower()] + if tool_related_attrs: + logger.info( + "Tool-related attributes found in chunk", + chunk_count=chunk_count, + attributes=tool_related_attrs, + chunk_type=type(chunk).__name__ + ) # Send the raw event as JSON followed by newline for easy parsing try: @@ -169,7 +181,57 @@ async def async_response_stream( chunk_data = chunk.__dict__ else: chunk_data = str(chunk) + + # Log detailed chunk structure for investigation (especially for Granite 3.3 8b) + if isinstance(chunk_data, dict): + # Check for any fields that might indicate tool usage + potential_tool_fields = { + k: v for k, v in chunk_data.items() + if any(keyword in str(k).lower() for keyword in ['tool', 'call', 'retrieval', 'function', 'result', 'output']) + } + if potential_tool_fields: + logger.info( + "Potential tool-related fields in chunk", + chunk_count=chunk_count, + fields=list(potential_tool_fields.keys()), + sample_data=str(potential_tool_fields)[:500] + ) + # Middleware: Detect implicit tool calls and inject standardized events + # This helps Granite 3.3 8b and other models that don't emit standard markers + if isinstance(chunk_data, dict) and not detected_tool_call: + # Check if this chunk contains retrieval results + has_results = any([ + 'results' in chunk_data and isinstance(chunk_data.get('results'), list), + 'outputs' in chunk_data and isinstance(chunk_data.get('outputs'), list), + 'retrieved_documents' in chunk_data, + 'retrieval_results' in chunk_data, + ]) + + if has_results: + logger.info( + "Detected implicit tool call in backend, injecting synthetic event", + chunk_fields=list(chunk_data.keys()) + ) + # Inject a synthetic tool call event before this chunk + synthetic_event = { + "type": "response.output_item.done", + "item": { + "type": "retrieval_call", + "id": f"synthetic_{chunk_count}", + "name": "Retrieval", + "tool_name": "Retrieval", + "status": "completed", + "inputs": {"implicit": True, "backend_detected": True}, + "results": chunk_data.get('results') or chunk_data.get('outputs') or + chunk_data.get('retrieved_documents') or + chunk_data.get('retrieval_results') or [] + } + } + # Send the synthetic event first + yield (json.dumps(synthetic_event, default=str) + "\n").encode("utf-8") + detected_tool_call = True # Mark that we've injected a tool call + yield (json.dumps(chunk_data, default=str) + "\n").encode("utf-8") except Exception as e: # Fallback to string representation diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 5e7204cc..ca5fec7c 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -180,6 +180,22 @@ class LangflowFileService: body=resp.text[:1000], ) resp.raise_for_status() + + # Check if response is actually JSON before parsing + content_type = resp.headers.get("content-type", "") + if "application/json" not in content_type: + logger.error( + "[LF] Unexpected response content type from Langflow", + content_type=content_type, + status_code=resp.status_code, + body=resp.text[:1000], + ) + raise ValueError( + f"Langflow returned {content_type} instead of JSON. " + f"This may indicate the ingestion flow failed or the endpoint is incorrect. " + f"Response preview: {resp.text[:500]}" + ) + try: resp_json = resp.json() except Exception as e: diff --git a/src/services/langflow_history_service.py b/src/services/langflow_history_service.py index ee3366c1..c9a77cfa 100644 --- a/src/services/langflow_history_service.py +++ b/src/services/langflow_history_service.py @@ -88,6 +88,7 @@ class LangflowHistoryService: } # Extract function calls from content_blocks if present + # Convert to match streaming format: chunk.item.type === "tool_call" content_blocks = msg.get("content_blocks", []) if content_blocks: chunks = [] @@ -95,23 +96,23 @@ class LangflowHistoryService: if block.get("title") == "Agent Steps" and block.get("contents"): for content in block["contents"]: if content.get("type") == "tool_use": - # Convert Langflow tool_use format to OpenRAG chunks format + # Convert Langflow tool_use format to match streaming chunks format + # Frontend expects: chunk.item.type === "tool_call" with tool_name, inputs, results chunk = { - "type": "function", - "function": { - "name": content.get("name", ""), - "arguments": content.get("tool_input", {}), - "response": content.get("output", {}) - }, - "function_call_result": content.get("output", {}), - "duration": content.get("duration"), - "error": content.get("error") + "type": "response.output_item.added", + "item": { + "type": "tool_call", + "tool_name": content.get("name", ""), + "inputs": content.get("tool_input", {}), + "results": content.get("output", {}), + "id": content.get("id") or content.get("run_id", ""), + "status": "completed" if not content.get("error") else "error" + } } chunks.append(chunk) if chunks: converted_msg["chunks"] = chunks - converted_msg["response_data"] = {"tool_calls": chunks} converted_messages.append(converted_msg) diff --git a/src/tui/managers/container_manager.py b/src/tui/managers/container_manager.py index 666d66bb..91f022be 100644 --- a/src/tui/managers/container_manager.py +++ b/src/tui/managers/container_manager.py @@ -2,7 +2,8 @@ import asyncio import json -import subprocess +import os +import re import time from dataclasses import dataclass, field from enum import Enum @@ -121,6 +122,36 @@ class ContainerManager: self._compose_search_log += f"\n 3. Falling back to: {cwd_path.absolute()}" return Path(filename) + def _get_env_from_file(self) -> Dict[str, str]: + """Read environment variables from .env file, prioritizing file values over os.environ. + + Uses python-dotenv's load_dotenv() for standard .env file parsing, which handles: + - Quoted values (single and double quotes) + - Variable expansion (${VAR}) + - Multiline values + - Escaped characters + - Comments + + This ensures Docker Compose commands use the latest values from .env file, + even if os.environ has stale values. + """ + from dotenv import load_dotenv + + env = dict(os.environ) # Start with current environment + env_file = Path(".env") + + if env_file.exists(): + try: + # Load .env file with override=True to ensure file values take precedence + # This loads into os.environ, then we copy to our dict + load_dotenv(dotenv_path=env_file, override=True) + # Update our dict with all environment variables (including those from .env) + env.update(os.environ) + except Exception as e: + logger.debug(f"Error reading .env file for Docker Compose: {e}") + + return env + def is_available(self) -> bool: """Check if container runtime with compose is available.""" return (self.runtime_info.runtime_type != RuntimeType.NONE and @@ -153,7 +184,6 @@ class ContainerManager: continue try: - import re content = compose_file.read_text() current_service = None in_ports_section = False @@ -245,11 +275,15 @@ class ContainerManager: cmd.extend(args) try: + # Get environment variables from .env file to ensure latest values + env = self._get_env_from_file() + process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=Path.cwd(), + env=env, ) stdout, stderr = await process.communicate() @@ -287,11 +321,15 @@ class ContainerManager: cmd.extend(args) try: + # Get environment variables from .env file to ensure latest values + env = self._get_env_from_file() + process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=Path.cwd(), + env=env, ) if process.stdout: @@ -356,11 +394,15 @@ class ContainerManager: cmd.extend(args) try: + # Get environment variables from .env file to ensure latest values + env = self._get_env_from_file() + process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=Path.cwd(), + env=env, ) except Exception as e: success_flag["value"] = False @@ -926,7 +968,6 @@ class ContainerManager: async for message, replace_last in self._stream_compose_command(["up", "-d"], up_success, cpu_mode): # Detect error patterns in the output - import re lower_msg = message.lower() # Check for common error patterns @@ -1110,11 +1151,15 @@ class ContainerManager: cmd.extend(["logs", "-f", service_name]) try: + # Get environment variables from .env file to ensure latest values + env = self._get_env_from_file() + process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=Path.cwd(), + env=env, ) if process.stdout: diff --git a/src/tui/managers/env_manager.py b/src/tui/managers/env_manager.py index 51e2a11f..4f5ee461 100644 --- a/src/tui/managers/env_manager.py +++ b/src/tui/managers/env_manager.py @@ -1,5 +1,6 @@ """Environment configuration manager for OpenRAG TUI.""" +import os import secrets import string from dataclasses import dataclass, field @@ -7,12 +8,10 @@ from datetime import datetime from pathlib import Path from typing import Dict, List, Optional +from dotenv import load_dotenv from utils.logging_config import get_logger -logger = get_logger(__name__) - from ..utils.validation import ( - sanitize_env_value, validate_documents_paths, validate_google_oauth_client_id, validate_non_empty, @@ -20,6 +19,8 @@ from ..utils.validation import ( validate_url, ) +logger = get_logger(__name__) + @dataclass class EnvConfig: @@ -119,9 +120,15 @@ class EnvManager: return f"'{escaped_value}'" def load_existing_env(self) -> bool: - """Load existing .env file if it exists, or fall back to environment variables.""" - import os + """Load existing .env file if it exists, or fall back to environment variables. + Uses python-dotenv's load_dotenv() for standard .env file parsing, which handles: + - Quoted values (single and double quotes) + - Variable expansion (${VAR}) + - Multiline values + - Escaped characters + - Comments + """ # Map env vars to config attributes # These are environment variable names, not actual secrets attr_map = { # pragma: allowlist secret @@ -158,36 +165,23 @@ class EnvManager: loaded_from_file = False - # Try to load from .env file first + # Load .env file using python-dotenv for standard parsing + # override=True ensures .env file values take precedence over existing environment variables if self.env_file.exists(): try: - with open(self.env_file, "r") as f: - for line in f: - line = line.strip() - if not line or line.startswith("#"): - continue - - if "=" in line: - key, value = line.split("=", 1) - key = key.strip() - value = sanitize_env_value(value) - - if key in attr_map: - setattr(self.config, attr_map[key], value) - + # Load .env file with override=True to ensure file values take precedence + load_dotenv(dotenv_path=self.env_file, override=True) loaded_from_file = True - + logger.debug(f"Loaded .env file from {self.env_file}") except Exception as e: logger.error("Error loading .env file", error=str(e)) - # Fall back to environment variables if .env file doesn't exist or failed to load - if not loaded_from_file: - logger.info("No .env file found, loading from environment variables") - for env_key, attr_name in attr_map.items(): - value = os.environ.get(env_key, "") - if value: - setattr(self.config, attr_name, value) - return True + # Map environment variables to config attributes + # This works whether values came from .env file or existing environment variables + for env_key, attr_name in attr_map.items(): + value = os.environ.get(env_key, "") + if value: + setattr(self.config, attr_name, value) return loaded_from_file @@ -546,23 +540,19 @@ class EnvManager: """Ensure OPENRAG_VERSION is set in .env file to match TUI version.""" try: from ..utils.version_check import get_current_version + import os current_version = get_current_version() if current_version == "unknown": return # Check if OPENRAG_VERSION is already set in .env if self.env_file.exists(): - env_content = self.env_file.read_text() - if "OPENRAG_VERSION" in env_content: - # Already set, check if it needs updating - for line in env_content.splitlines(): - if line.strip().startswith("OPENRAG_VERSION"): - existing_value = line.split("=", 1)[1].strip() - existing_value = sanitize_env_value(existing_value) - if existing_value == current_version: - # Already correct, no update needed - return - break + # Load .env file using load_dotenv + load_dotenv(dotenv_path=self.env_file, override=False) + existing_value = os.environ.get("OPENRAG_VERSION", "") + if existing_value and existing_value == current_version: + # Already correct, no update needed + return # Set or update OPENRAG_VERSION self.config.openrag_version = current_version diff --git a/src/tui/screens/welcome.py b/src/tui/screens/welcome.py index 629614c0..146b437f 100644 --- a/src/tui/screens/welcome.py +++ b/src/tui/screens/welcome.py @@ -41,7 +41,8 @@ class WelcomeScreen(Screen): self.has_env_file = self.env_manager.env_file.exists() # Load .env file if it exists - load_dotenv() + # override=True ensures .env file values take precedence over existing environment variables + load_dotenv(override=True) # Check OAuth config immediately self.has_oauth_config = bool(os.getenv("GOOGLE_OAUTH_CLIENT_ID")) or bool( diff --git a/src/tui/utils/validation.py b/src/tui/utils/validation.py index c91c4f00..cdbeb810 100644 --- a/src/tui/utils/validation.py +++ b/src/tui/utils/validation.py @@ -96,21 +96,6 @@ def validate_non_empty(value: str) -> bool: return bool(value and value.strip()) -def sanitize_env_value(value: str) -> str: - """Sanitize environment variable value.""" - # Remove leading/trailing whitespace - value = value.strip() - - # Remove quotes if they wrap the entire value - if len(value) >= 2: - if (value.startswith('"') and value.endswith('"')) or ( - value.startswith("'") and value.endswith("'") - ): - value = value[1:-1] - - return value - - def validate_documents_paths(paths_str: str) -> tuple[bool, str, list[str]]: """ Validate comma-separated documents paths for volume mounting.