Enhance OpenSearch ingestion and metadata handling

Updated the OpenSearchVectorStoreComponent to improve document metadata ingestion, including support for Data objects in docs_metadata. Added new edges and nodes to ingestion_flow.json for dynamic metadata input. Changed Dockerfile.langflow to use the fix-file-component branch.
This commit is contained in:
Edwin Jose 2025-09-26 02:10:51 -04:00
parent 372c7e2445
commit 843fc92b76
6 changed files with 970 additions and 78 deletions

View file

@ -7,7 +7,7 @@ ENV RUSTFLAGS="--cfg reqwest_unstable"
# Accept build arguments for git repository and branch # Accept build arguments for git repository and branch
ARG GIT_REPO=https://github.com/langflow-ai/langflow.git ARG GIT_REPO=https://github.com/langflow-ai/langflow.git
ARG GIT_BRANCH=main ARG GIT_BRANCH=fix-file-component
WORKDIR /app WORKDIR /app

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -422,7 +422,12 @@ class LangflowFileProcessor(TaskProcessor):
tweaks=final_tweaks, tweaks=final_tweaks,
settings=self.settings, settings=self.settings,
jwt_token=effective_jwt, jwt_token=effective_jwt,
delete_after_ingest=self.delete_after_ingest delete_after_ingest=self.delete_after_ingest,
owner=self.owner_user_id,
owner_name=self.owner_name,
owner_email=self.owner_email,
connector_type="local",
) )
# Update task with success # Update task with success

View file

@ -118,7 +118,7 @@ class ChatService:
extra_headers["X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER"] = json.dumps( extra_headers["X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER"] = json.dumps(
filter_expression filter_expression
) )
logger.info(f"[LF] Extra headers {extra_headers}")
# Ensure the Langflow client exists; try lazy init if needed # Ensure the Langflow client exists; try lazy init if needed
langflow_client = await clients.ensure_langflow_client() langflow_client = await clients.ensure_langflow_client()
if not langflow_client: if not langflow_client:

View file

@ -107,16 +107,17 @@ class LangflowFileService:
if connector_type: if connector_type:
metadata_tweaks.append({"key": "connector_type", "value": connector_type}) metadata_tweaks.append({"key": "connector_type", "value": connector_type})
if metadata_tweaks: # if metadata_tweaks:
# Initialize the OpenSearch component tweaks if not already present # # Initialize the OpenSearch component tweaks if not already present
if "OpenSearchHybrid-Ve6bS" not in tweaks: # if "OpenSearchHybrid-Ve6bS" not in tweaks:
tweaks["OpenSearchHybrid-Ve6bS"] = {} # tweaks["OpenSearchHybrid-Ve6bS"] = {}
tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks # tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
logger.debug( # logger.debug(
"[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks) # "[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks)
) # )
# if tweaks: if tweaks:
# payload["tweaks"] = tweaks payload["tweaks"] = tweaks
logger.debug(f"[LF] Tweaks {tweaks}")
if session_id: if session_id:
payload["session_id"] = session_id payload["session_id"] = session_id
@ -132,12 +133,13 @@ class LangflowFileService:
# Avoid logging full payload to prevent leaking sensitive data (e.g., JWT) # Avoid logging full payload to prevent leaking sensitive data (e.g., JWT)
headers={ headers={
"X-Langflow-Global-Var-JWT": str(jwt_token), "X-Langflow-Global-Var-JWT": str(jwt_token),
"X-Langflow-Global-Var-Owner": str(owner), "X-Langflow-Global-Var-OWNER": str(owner),
"X-Langflow-Global-Var-Owner-Name": str(owner_name), "X-Langflow-Global-Var-OWNER_NAME": str(owner_name),
"X-Langflow-Global-Var-Owner-Email": str(owner_email), "X-Langflow-Global-Var-OWNER_EMAIL": str(owner_email),
"X-Langflow-Global-Var-Connector-Type": str(connector_type), "X-Langflow-Global-Var-CONNECTOR_TYPE": str(connector_type),
} }
logger.info(f"[LF] Headers {headers}") logger.info(f"[LF] Headers {headers}")
logger.info(f"[LF] Payload {payload}")
resp = await clients.langflow_request( resp = await clients.langflow_request(
"POST", "POST",
f"/api/v1/run/{self.flow_id_ingest}", f"/api/v1/run/{self.flow_id_ingest}",
@ -163,6 +165,7 @@ class LangflowFileService:
body=resp.text[:1000], body=resp.text[:1000],
error=str(e), error=str(e),
) )
raise raise
return resp_json return resp_json
@ -174,6 +177,10 @@ class LangflowFileService:
settings: Optional[Dict[str, Any]] = None, settings: Optional[Dict[str, Any]] = None,
jwt_token: Optional[str] = None, jwt_token: Optional[str] = None,
delete_after_ingest: bool = True, delete_after_ingest: bool = True,
owner: Optional[str] = None,
owner_name: Optional[str] = None,
owner_email: Optional[str] = None,
connector_type: Optional[str] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Combined upload, ingest, and delete operation. Combined upload, ingest, and delete operation.
@ -260,6 +267,10 @@ class LangflowFileService:
session_id=session_id, session_id=session_id,
tweaks=final_tweaks, tweaks=final_tweaks,
jwt_token=jwt_token, jwt_token=jwt_token,
owner=owner,
owner_name=owner_name,
owner_email=owner_email,
connector_type=connector_type,
) )
logger.debug("[LF] Ingestion completed successfully") logger.debug("[LF] Ingestion completed successfully")
except Exception as e: except Exception as e: