other flows

This commit is contained in:
phact 2025-10-10 22:46:36 -04:00
parent 12ae6d3fb1
commit af741f3784
6 changed files with 320 additions and 8 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,174 @@
#!/usr/bin/env python3
"""
Extract embedded component code from a Langflow JSON flow.
Example:
python scripts/extract_flow_component.py \\
--flow-file flows/ingestion_flow.json \\
--display-name "OpenSearch (Multi-Model)" \\
--output flows/components/opensearch_multimodel.py
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Optional
def should_select_component(
node: dict,
*,
display_name: Optional[str],
metadata_module: Optional[str],
) -> bool:
"""Return True if the node matches the requested component filters."""
node_data = node.get("data", {})
component = node_data.get("node", {})
if display_name and component.get("display_name") != display_name:
return False
if metadata_module:
metadata = component.get("metadata", {})
if metadata.get("module") != metadata_module:
return False
template = component.get("template", {})
code_entry = template.get("code")
return isinstance(code_entry, dict) and "value" in code_entry
def extract_code_from_flow(
flow_path: Path,
*,
display_name: Optional[str],
metadata_module: Optional[str],
match_index: int,
) -> str:
"""Fetch the embedded code string from the matching component node."""
try:
flow_data = json.loads(flow_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
raise SystemExit(f"[error] failed to parse {flow_path}: {exc}") from exc
matches = []
for node in flow_data.get("data", {}).get("nodes", []):
if should_select_component(
node,
display_name=display_name,
metadata_module=metadata_module,
):
matches.append(node)
if not matches:
raise SystemExit(
"[error] no component found matching the supplied filters "
f"in {flow_path}"
)
if match_index < 0 or match_index >= len(matches):
raise SystemExit(
f"[error] match index {match_index} out of range "
f"(found {len(matches)} matches)"
)
target = matches[match_index]
return target["data"]["node"]["template"]["code"]["value"]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Extract component code from a Langflow JSON flow."
)
parser.add_argument(
"--flow-file",
required=True,
type=Path,
help="Path to the flow JSON file.",
)
parser.add_argument(
"--display-name",
help="Component display_name to match (e.g. 'OpenSearch (Multi-Model)').",
)
parser.add_argument(
"--metadata-module",
help="Component metadata.module value to match.",
)
parser.add_argument(
"--match-index",
type=int,
default=0,
help="Index of the matched component when multiple exist (default: 0).",
)
parser.add_argument(
"--output",
type=Path,
help="Destination file for the extracted code (stdout if omitted).",
)
args = parser.parse_args()
if not args.display_name and not args.metadata_module:
# Offer an interactive selection of component display names
if not args.flow_file.exists():
parser.error(f"Flow file not found: {args.flow_file}")
try:
flow_data = json.loads(args.flow_file.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
raise SystemExit(f"[error] failed to parse {args.flow_file}: {exc}") from exc
nodes = flow_data.get("data", {}).get("nodes", [])
display_names = sorted(
{
node.get("data", {})
.get("node", {})
.get("display_name", "<unknown>")
for node in nodes
}
)
if not display_names:
parser.error(
"Unable to locate any components in the flow; supply --metadata-module instead."
)
print("Select a component display name:")
for idx, name in enumerate(display_names):
print(f" [{idx}] {name}")
while True:
choice = input(f"Enter choice (0-{len(display_names)-1}): ").strip() or "0"
if choice.isdigit():
index = int(choice)
if 0 <= index < len(display_names):
args.display_name = display_names[index]
break
print("Invalid selection, please try again.")
return args
def main() -> None:
args = parse_args()
if not args.flow_file.exists():
raise SystemExit(f"[error] flow file not found: {args.flow_file}")
code = extract_code_from_flow(
args.flow_file,
display_name=args.display_name,
metadata_module=args.metadata_module,
match_index=args.match_index,
)
if args.output:
args.output.write_text(code, encoding="utf-8")
else:
print(code)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,138 @@
#!/usr/bin/env python3
"""
Utility to sync embedded component code inside Langflow JSON files.
Given a Python source file (e.g. the OpenSearch component implementation) and
a target selector, this script updates every flow definition in ``./flows`` so
that the component's ``template.code.value`` matches the supplied file.
Example:
python scripts/update_flow_components.py \\
--code-file flows/components/opensearch_multimodel.py \\
--display-name \"OpenSearch (Multi-Model)\"
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Iterable
def load_code(source_path: Path) -> str:
try:
return source_path.read_text(encoding="utf-8")
except FileNotFoundError as exc:
raise SystemExit(f"[error] code file not found: {source_path}") from exc
def should_update_component(node: dict, *, display_name: str | None, metadata_module: str | None) -> bool:
node_data = node.get("data", {})
component = node_data.get("node", {})
if display_name and component.get("display_name") != display_name:
return False
if metadata_module:
metadata = component.get("metadata", {})
module_name = metadata.get("module")
if module_name != metadata_module:
return False
template = component.get("template", {})
code_entry = template.get("code")
return isinstance(code_entry, dict) and "value" in code_entry
def update_flow(flow_path: Path, code: str, *, display_name: str | None, metadata_module: str | None, dry_run: bool) -> bool:
with flow_path.open(encoding="utf-8") as fh:
try:
data = json.load(fh)
except json.JSONDecodeError as exc:
raise SystemExit(f"[error] failed to parse {flow_path}: {exc}") from exc
changed = False
for node in data.get("data", {}).get("nodes", []):
if not should_update_component(node, display_name=display_name, metadata_module=metadata_module):
continue
template = node["data"]["node"]["template"]
if template["code"]["value"] != code:
if dry_run:
changed = True
else:
template["code"]["value"] = code
changed = True
if changed and not dry_run:
flow_path.write_text(
json.dumps(data, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
return changed
def iter_flow_files(flows_dir: Path) -> Iterable[Path]:
for path in sorted(flows_dir.glob("*.json")):
if path.is_file():
yield path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Update embedded component code in Langflow JSON files.")
parser.add_argument("--code-file", required=True, type=Path, help="Path to the Python file containing the component code.")
parser.add_argument("--flows-dir", type=Path, default=Path("flows"), help="Directory containing Langflow JSON files.")
parser.add_argument("--display-name", help="Component display_name to match (e.g. 'OpenSearch (Multi-Model)').")
parser.add_argument("--metadata-module", help="Component metadata.module value to match.")
parser.add_argument("--dry-run", action="store_true", help="Report which files would change without modifying them.")
args = parser.parse_args()
if not args.display_name and not args.metadata_module:
parser.error("At least one of --display-name or --metadata-module must be provided.")
return args
def main() -> None:
args = parse_args()
flows_dir: Path = args.flows_dir
if not flows_dir.exists():
raise SystemExit(f"[error] flows directory not found: {flows_dir}")
code = load_code(args.code_file)
updated_files = []
for flow_path in iter_flow_files(flows_dir):
changed = update_flow(
flow_path,
code,
display_name=args.display_name,
metadata_module=args.metadata_module,
dry_run=args.dry_run,
)
if changed:
updated_files.append(flow_path)
if args.dry_run:
if updated_files:
print("[dry-run] files that would be updated:")
for path in updated_files:
print(f" - {path}")
else:
print("[dry-run] no files would change.")
else:
if updated_files:
print("Updated component code in:")
for path in updated_files:
print(f" - {path}")
else:
print("No updates were necessary.")
if __name__ == "__main__":
main()