diff --git a/cognee/__init__.py b/cognee/__init__.py index be5a16b3b..8032b3a77 100644 --- a/cognee/__init__.py +++ b/cognee/__init__.py @@ -27,6 +27,7 @@ from .api.v1.visualize import visualize_graph, start_visualization_server from cognee.modules.visualization.cognee_network_visualization import ( cognee_network_visualization, ) +from .api.v1.save.save import save # Pipelines from .modules import pipelines diff --git a/cognee/api/client.py b/cognee/api/client.py index 7d5f48672..05497d902 100644 --- a/cognee/api/client.py +++ b/cognee/api/client.py @@ -242,6 +242,9 @@ app.include_router(get_cognify_router(), prefix="/api/v1/cognify", tags=["cognif app.include_router(get_memify_router(), prefix="/api/v1/memify", tags=["memify"]) app.include_router(get_search_router(), prefix="/api/v1/search", tags=["search"]) +from cognee.api.v1.save.routers.get_save_router import get_save_router + +app.include_router(get_save_router(), prefix="/api/v1/save", tags=["save"]) app.include_router( get_permissions_router(), diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index eeb867984..8e8d66b04 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -11,6 +11,8 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( ) from cognee.modules.engine.operations.setup import setup from cognee.tasks.ingestion import ingest_data, resolve_data_directories +from cognee.modules.ingestion import discover_directory_datasets +import os async def add( @@ -141,6 +143,56 @@ async def add( - GRAPH_DATABASE_PROVIDER: "kuzu" (default), "neo4j" """ + + # Special handling: folder:// scheme maps nested folders to datasets + def _is_folder_uri(item: Union[str, BinaryIO]) -> bool: + return isinstance(item, str) and item.startswith("folder://") + + items = data if isinstance(data, list) else [data] + if any(_is_folder_uri(item) for item in items): + await setup() + # Support multiple folder:// roots in one call + last_run_info = None + for item in items: + if not _is_folder_uri(item): + continue + + root_path = item.replace("folder://", "", 1) + if not os.path.isabs(root_path): + root_path = os.path.abspath(root_path) + + root_name = os.path.basename(os.path.normpath(root_path)) or "dataset" + dataset_map = discover_directory_datasets(root_path, parent_dir=root_name) + + # Process each discovered dataset independently via the pipeline + for ds_name, file_list in dataset_map.items(): + # Authorize/create dataset + user, ds = await resolve_authorized_user_dataset(None, ds_name, user) + + await reset_dataset_pipeline_run_status( + ds.id, user, pipeline_names=["add_pipeline", "cognify_pipeline"] + ) + + tasks = [ + # We already have resolved file list per dataset + Task(ingest_data, ds_name, user, node_set, None, preferred_loaders), + ] + + async for run_info in run_pipeline( + tasks=tasks, + datasets=[ds.id], + data=file_list, + user=user, + pipeline_name="add_pipeline", + vector_db_config=vector_db_config, + graph_db_config=graph_db_config, + incremental_loading=incremental_loading, + ): + last_run_info = run_info + + return last_run_info + + # Default behavior: single dataset ingestion tasks = [ Task(resolve_data_directories, include_subdirectories=True), Task(ingest_data, dataset_name, user, node_set, dataset_id, preferred_loaders), diff --git a/cognee/api/v1/save/__init__.py b/cognee/api/v1/save/__init__.py new file mode 100644 index 000000000..cc3767183 --- /dev/null +++ b/cognee/api/v1/save/__init__.py @@ -0,0 +1,2 @@ +# Re-export save function for local imports +from .save import save # noqa: F401 diff --git a/cognee/api/v1/save/routers/get_save_router.py b/cognee/api/v1/save/routers/get_save_router.py new file mode 100644 index 000000000..e60c45344 --- /dev/null +++ b/cognee/api/v1/save/routers/get_save_router.py @@ -0,0 +1,54 @@ +from typing import Optional, List +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, status + +from cognee.api.v1.models import InDTO +from cognee.api.v1.save.save import save as save_fn +from cognee.modules.users.models import User +from cognee.api.v1.auth.dependencies.get_authenticated_user import get_authenticated_user +from cognee.shared.telemetry import send_telemetry + + +class SavePayloadDTO(InDTO): + datasets: Optional[List[str]] = None + dataset_ids: Optional[List[UUID]] = None + export_root_directory: Optional[str] = None + # alias support + path: Optional[str] = None + + +def get_save_router() -> APIRouter: + router = APIRouter() + + @router.post("", response_model=dict) + async def save(payload: SavePayloadDTO, user: User = Depends(get_authenticated_user)): + """ + Save dataset exports to markdown files. + + For each accessible dataset, produces a folder with one markdown per data item + containing summary, path ascii tree, question ideas, and search results. + """ + send_telemetry( + "Save API Endpoint Invoked", + user.id, + additional_properties={ + "endpoint": "POST /v1/save", + }, + ) + + try: + datasets = payload.datasets if payload.datasets else payload.dataset_ids + result = await save_fn( + datasets=datasets, + export_root_directory=payload.export_root_directory or payload.path, + user=user, + ) + return result + except Exception as error: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"Error during save operation: {str(error)}", + ) from error + + return router diff --git a/cognee/api/v1/save/save.py b/cognee/api/v1/save/save.py new file mode 100644 index 000000000..688ded7bf --- /dev/null +++ b/cognee/api/v1/save/save.py @@ -0,0 +1,335 @@ +import os +import asyncio +import json +from typing import Optional, Union, List, Dict +from uuid import UUID + +from pydantic import BaseModel + +from cognee.base_config import get_base_config +from cognee.modules.users.models import User +from cognee.modules.users.methods import get_default_user +from cognee.modules.data.methods import get_authorized_existing_datasets, get_dataset_data +from cognee.infrastructure.files.utils.get_data_file_path import get_data_file_path +from cognee.infrastructure.llm.LLMGateway import LLMGateway +from cognee.shared.logging_utils import get_logger +from cognee.api.v1.search import search +from cognee.modules.search.types import SearchType + + +logger = get_logger("save") + + +class QuestionsModel(BaseModel): + questions: List[str] + + +def _sanitize_filename(name: str) -> str: + safe = "".join(c if c.isalnum() or c in ("-", "_", ".", " ") else "_" for c in name) + return safe.strip().replace(" ", "_") + + +def _dataset_dir_name(dataset) -> str: + # Prefer readable dataset name when available, fallback to id + if getattr(dataset, "name", None): + return _sanitize_filename(str(dataset.name)) + return str(dataset.id) + + +def _file_markdown_name(data_item, used_names: set[str]) -> str: + # Use original file name if present, else data.name + name = getattr(data_item, "name", None) or "file" + base = _sanitize_filename(str(name)) + filename = f"{base}.md" + if filename in used_names: + short_id = str(getattr(data_item, "id", ""))[:8] + filename = f"{base}__{short_id}.md" + used_names.add(filename) + return filename + + +def _ascii_path_tree(path_str: str) -> str: + if not path_str: + return "(no path)" + + # Normalize special schemes but keep segments readable + try: + normalized = get_data_file_path(path_str) + except Exception: + normalized = path_str + + # Keep the path compact – show last 5 segments + parts = [p for p in normalized.replace("\\", "/").split("/") if p] + if len(parts) > 6: + display = ["…"] + parts[-5:] + else: + display = parts + + # Render a single-branch tree + lines = [] + for idx, seg in enumerate(display): + prefix = "└── " if idx == 0 else (" " * idx + "└── ") + lines.append(f"{prefix}{seg}") + return "\n".join(lines) + + +async def _get_summary_via_summaries(query_text: str, dataset_id: UUID, top_k: int) -> str: + try: + results = await search( + query_text=query_text, + query_type=SearchType.SUMMARIES, + dataset_ids=[dataset_id], + top_k=top_k, + ) + if not results: + return "" + texts: List[str] = [] + for r in results[:top_k]: + texts.append(str(r)) + return "\n\n".join(texts) + except Exception as e: + logger.error( + "SUMMARIES search failed for '%s' in dataset %s: %s", + query_text, + str(dataset_id), + str(e), + ) + return "" + + +async def _generate_questions(file_name: str, summary_text: str) -> List[str]: + prompt = ( + "You are an expert analyst. Given a file and its summary, propose 10 diverse, high-signal " + "questions to further explore the file's content, implications, relationships, and gaps. " + "Avoid duplicates; vary depth and angle (overview, details, cross-references, temporal, quality).\n\n" + f"File: {file_name}\n\nSummary:\n{summary_text[:4000]}" + ) + + model = await LLMGateway.acreate_structured_output( + text_input=prompt, + system_prompt="Return strictly a JSON with key 'questions' and value as an array of 10 concise strings.", + response_model=QuestionsModel, + ) + + # model can be either pydantic model or dict-like, normalize + try: + questions = list(getattr(model, "questions", [])) + except Exception: + questions = [] + + # Fallback if the tool returned a dict-like + if not questions and isinstance(model, dict): + questions = list(model.get("questions", []) or []) + + # Enforce 10 max + return questions[:10] + + +async def _run_searches_for_question( + question: str, dataset_id: UUID, search_types: List[SearchType], top_k: int +) -> Dict[str, Union[str, List[dict], List[str]]]: + async def run_one(st: SearchType): + try: + result = await search( + query_text=question, + query_type=st, + dataset_ids=[dataset_id], + top_k=top_k, + ) + return st.value, result + except Exception as e: + logger.error("Search failed for type %s: %s", st.value, str(e)) + return st.value, [f"Error: {str(e)}"] + + pairs = await asyncio.gather(*[run_one(st) for st in search_types]) + return {k: v for k, v in pairs} + + +def _format_results_md(results: Dict[str, Union[str, List[dict], List[str]]]) -> str: + lines: List[str] = [] + for st, payload in results.items(): + lines.append(f"#### {st}") + if isinstance(payload, list): + # Printed as bullet items; stringify dicts + for item in payload[:5]: + if isinstance(item, dict): + # compact representation + snippet = json.dumps(item, ensure_ascii=False)[:800] + lines.append(f"- {snippet}") + else: + text = str(item) + lines.append(f"- {text[:800]}") + else: + lines.append(str(payload)) + lines.append("") + return "\n".join(lines) + + +async def save( + datasets: Optional[Union[List[str], List[UUID]]] = None, + export_root_directory: Optional[str] = None, + user: Optional[User] = None, + # Configurable knobs + max_questions: int = 10, + search_types: Optional[List[Union[str, SearchType]]] = None, + top_k: int = 5, + include_summary: bool = True, + include_ascii_tree: bool = True, + concurrency: int = 4, + timeout: Optional[float] = None, +) -> Dict[str, str]: + """ + Export per-dataset markdown summaries and search insights for each ingested file. + + For every dataset the user can read: + - Create a folder under export_root_directory (or data_root_directory/exports) + - For each data item (file), create a .md containing: + - Summary of the file (from existing TextSummary nodes) + - A small ASCII path tree showing its folder position + - Up to N LLM-generated question ideas (configurable) + - Results of configured Cognee searches per question + Also creates an index.md per dataset with links to files and an optional dataset summary. + + Returns a mapping of dataset_id -> export_directory path. + """ + base_config = get_base_config() + export_root = export_root_directory or os.path.join( + base_config.data_root_directory, "memory_export" + ) + os.makedirs(export_root, exist_ok=True) + + if user is None: + user = await get_default_user() + + datasets_list = await get_authorized_existing_datasets(datasets, "read", user) + results: Dict[str, str] = {} + + for dataset in datasets_list: + ds_dir = os.path.join(export_root, _dataset_dir_name(dataset)) + os.makedirs(ds_dir, exist_ok=True) + results[str(dataset.id)] = ds_dir + + data_items = await get_dataset_data(dataset.id) + + # Normalize search types + if not search_types: + effective_search_types = [ + SearchType.GRAPH_COMPLETION, + SearchType.INSIGHTS, + SearchType.CHUNKS, + ] + else: + effective_search_types = [] + for st in search_types: + if isinstance(st, SearchType): + effective_search_types.append(st) + else: + try: + effective_search_types.append(SearchType[str(st)]) + except Exception: + logger.warning("Unknown search type '%s', skipping", str(st)) + + sem = asyncio.Semaphore(max(1, int(concurrency))) + used_names: set[str] = set() + index_entries: List[tuple[str, str]] = [] + + async def process_one(data_item): + async with sem: + file_label = getattr(data_item, "name", str(data_item.id)) + original_path = getattr(data_item, "original_data_location", None) + + ascii_tree = ( + _ascii_path_tree(original_path or file_label) if include_ascii_tree else "" + ) + + summary_text = "" + if include_summary: + # Use SUMMARIES search scoped to dataset to derive file summary + file_query = getattr(data_item, "name", str(data_item.id)) or "file" + summary_text = await _get_summary_via_summaries(file_query, dataset.id, top_k) + if not summary_text: + summary_text = "Summary not available." + + if max_questions == 0: + questions = [] + else: + questions = await _generate_questions(file_label, summary_text) + if max_questions is not None and max_questions >= 0: + questions = questions[:max_questions] + + async def searches_for_question(q: str): + return await _run_searches_for_question( + q, dataset.id, effective_search_types, top_k + ) + + # Run per-question searches concurrently + per_q_results = await asyncio.gather(*[searches_for_question(q) for q in questions]) + + # Build markdown content + md_lines = [f"# {file_label}", ""] + if include_ascii_tree: + md_lines.extend(["## Location", "", "```", ascii_tree, "```", ""]) + if include_summary: + md_lines.extend(["## Summary", "", summary_text, ""]) + + md_lines.append("## Question ideas") + for idx, q in enumerate(questions, start=1): + md_lines.append(f"- {idx}. {q}") + md_lines.append("") + + md_lines.append("## Searches") + md_lines.append("") + for q, per_type in zip(questions, per_q_results): + md_lines.append(f"### Q: {q}") + md_lines.append(_format_results_md(per_type)) + md_lines.append("") + + # Write to file (collision-safe) + md_filename = _file_markdown_name(data_item, used_names) + export_path = os.path.join(ds_dir, md_filename) + tmp_path = export_path + ".tmp" + with open(tmp_path, "w", encoding="utf-8") as f: + f.write("\n".join(md_lines)) + os.replace(tmp_path, export_path) + + index_entries.append((file_label, md_filename)) + + tasks = [asyncio.create_task(process_one(item)) for item in data_items] + + if timeout and timeout > 0: + try: + await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout) + except asyncio.TimeoutError: + logger.error("Save timed out for dataset %s", str(dataset.id)) + else: + await asyncio.gather(*tasks, return_exceptions=True) + + # Build dataset index.md with TOC and optional dataset summary via SUMMARIES + try: + index_lines = [f"# Dataset: {_dataset_dir_name(dataset)}", "", "## Files", ""] + for display, fname in sorted(index_entries, key=lambda x: x[0].lower()): + index_lines.append(f"- [{display}]({fname})") + + # Dataset summary section + try: + summaries = await search( + query_text="dataset overview", + query_type=SearchType.SUMMARIES, + dataset_ids=[dataset.id], + top_k=top_k, + ) + except Exception as e: + logger.error("Dataset summary search failed: %s", str(e)) + summaries = [] + + if summaries: + index_lines.extend(["", "## Dataset summary (top summaries)", ""]) + for s in summaries[:top_k]: + index_lines.append(f"- {str(s)[:800]}") + + with open(os.path.join(ds_dir, "index.md"), "w", encoding="utf-8") as f: + f.write("\n".join(index_lines)) + except Exception as e: + logger.error("Failed to write dataset index for %s: %s", str(dataset.id), str(e)) + + return results diff --git a/cognee/cli/_cognee.py b/cognee/cli/_cognee.py index f8ade67dd..9934aae4c 100644 --- a/cognee/cli/_cognee.py +++ b/cognee/cli/_cognee.py @@ -65,6 +65,7 @@ def _discover_commands() -> List[Type[SupportsCliCommand]]: ("cognee.cli.commands.cognify_command", "CognifyCommand"), ("cognee.cli.commands.delete_command", "DeleteCommand"), ("cognee.cli.commands.config_command", "ConfigCommand"), + ("cognee.cli.commands.save_command", "SaveCommand"), ] for module_path, class_name in command_modules: diff --git a/cognee/cli/commands/add_command.py b/cognee/cli/commands/add_command.py index 4260b52bc..55f5a8fc3 100644 --- a/cognee/cli/commands/add_command.py +++ b/cognee/cli/commands/add_command.py @@ -24,7 +24,8 @@ Supported Input Types: - **File paths**: Local file paths (absolute paths starting with "/") - **File URLs**: "file:///absolute/path" or "file://relative/path" - **S3 paths**: "s3://bucket-name/path/to/file" -- **Lists**: Multiple files or text strings in a single call +- **Folder URIs**: "folder:///abs/path" groups files by subfolders into datasets (prefix with parent) +- **Lists**: Multiple items in a single call Supported File Formats: - Text files (.txt, .md, .csv) @@ -41,13 +42,13 @@ After adding data, use `cognee cognify` to process it into knowledge graphs. parser.add_argument( "data", nargs="+", - help="Data to add: text content, file paths (/path/to/file), file URLs (file://path), S3 paths (s3://bucket/file), or mix of these", + help="Data to add: text, files (/path), file://, s3://, folder:///abs/path (datasets by subfolders)", ) parser.add_argument( "--dataset-name", "-d", default="main_dataset", - help="Dataset name to organize your data (default: main_dataset)", + help="Dataset name (ignored for folder://; subfolders become datasets)", ) def execute(self, args: argparse.Namespace) -> None: @@ -55,20 +56,27 @@ After adding data, use `cognee cognify` to process it into knowledge graphs. # Import cognee here to avoid circular imports import cognee - fmt.echo(f"Adding {len(args.data)} item(s) to dataset '{args.dataset_name}'...") + def contains_folder_uri(items): + return any(isinstance(i, str) and i.startswith("folder://") for i in items) + + inputs = args.data + dataset_label = args.dataset_name + + if contains_folder_uri(inputs): + fmt.echo("Detected folder:// input. Subfolders will be added as separate datasets.") + else: + fmt.echo(f"Adding {len(inputs)} item(s) to dataset '{dataset_label}'...") # Run the async add function async def run_add(): try: - # Pass all data items as a list to cognee.add if multiple items - if len(args.data) == 1: - data_to_add = args.data[0] - else: - data_to_add = args.data - + data_to_add = inputs if len(inputs) > 1 else inputs[0] fmt.echo("Processing data...") - await cognee.add(data=data_to_add, dataset_name=args.dataset_name) - fmt.success(f"Successfully added data to dataset '{args.dataset_name}'") + await cognee.add(data=data_to_add, dataset_name=dataset_label) + if contains_folder_uri(inputs): + fmt.success("Successfully added folder datasets") + else: + fmt.success(f"Successfully added data to dataset '{dataset_label}'") except Exception as e: raise CliCommandInnerException(f"Failed to add data: {str(e)}") diff --git a/cognee/cli/commands/save_command.py b/cognee/cli/commands/save_command.py new file mode 100644 index 000000000..512c8fed0 --- /dev/null +++ b/cognee/cli/commands/save_command.py @@ -0,0 +1,116 @@ +import argparse +import asyncio +from typing import Optional + +from cognee.cli.reference import SupportsCliCommand +from cognee.cli import DEFAULT_DOCS_URL +import cognee.cli.echo as fmt +from cognee.cli.exceptions import CliCommandException, CliCommandInnerException +from cognee.cli.config import SEARCH_TYPE_CHOICES + + +class SaveCommand(SupportsCliCommand): + command_string = "save" + help_string = "Export dataset summaries and search insights to markdown files" + docs_url = DEFAULT_DOCS_URL + description = """ +Export per-dataset, per-file markdown reports with summaries, ASCII path, question ideas, +and search results. Creates an index.md per dataset. + """ + + def configure_parser(self, parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "--datasets", + nargs="*", + help="Dataset names to process (default: all accessible)", + ) + parser.add_argument( + "--export-root", + default=None, + help="Export root directory (default: /memory_export)", + ) + parser.add_argument( + "--path", + default=None, + help="Alias for --export-root", + ) + parser.add_argument( + "--max-questions", + type=int, + default=10, + help="Maximum number of question ideas per file (default: 10)", + ) + parser.add_argument( + "--search-types", + nargs="*", + choices=SEARCH_TYPE_CHOICES, + default=["GRAPH_COMPLETION", "INSIGHTS", "CHUNKS"], + help="Search types to run per question", + ) + parser.add_argument( + "--top-k", + type=int, + default=5, + help="Top-k results to retrieve for each search (default: 5)", + ) + parser.add_argument( + "--no-summary", + action="store_true", + help="Exclude file summary section", + ) + parser.add_argument( + "--no-path", + action="store_true", + help="Exclude ASCII path section", + ) + parser.add_argument( + "--concurrency", + type=int, + default=4, + help="Max concurrent files processed per dataset (default: 4)", + ) + parser.add_argument( + "--timeout", + type=float, + default=None, + help="Optional per-dataset timeout in seconds", + ) + + def execute(self, args: argparse.Namespace) -> None: + try: + import cognee + + fmt.echo("Starting save export...") + + async def run_save(): + try: + result = await cognee.save( + datasets=args.datasets, + export_root_directory=args.export_root or args.path, + max_questions=args.max_questions, + search_types=args.search_types, + top_k=args.top_k, + include_summary=(not args.no_summary), + include_ascii_tree=(not args.no_path), + concurrency=args.concurrency, + timeout=args.timeout, + ) + return result + except Exception as e: + raise CliCommandInnerException(f"Failed to save: {str(e)}") + + results = asyncio.run(run_save()) + + if results: + fmt.success("Export complete:") + for ds_id, path in results.items(): + fmt.echo(f"- {ds_id}: {path}") + else: + fmt.note("No datasets to export or no outputs generated.") + + except CliCommandInnerException as e: + fmt.error(str(e)) + raise CliCommandException(self.docs_url) + except Exception as e: + fmt.error(f"Unexpected error: {str(e)}") + raise CliCommandException(self.docs_url) diff --git a/cognee/tests/test_save_export_path.py b/cognee/tests/test_save_export_path.py new file mode 100644 index 000000000..d1d26b9e4 --- /dev/null +++ b/cognee/tests/test_save_export_path.py @@ -0,0 +1,116 @@ +import os +import asyncio +from uuid import uuid4 + +import pytest + + +@pytest.mark.asyncio +async def test_save_uses_custom_export_path(tmp_path, monkeypatch): + # Import target after tmp fixtures are ready + from cognee.api.v1.save import save as save_mod + + # Prepare two mock datasets + class Dataset: + def __init__(self, id_, name): + self.id = id_ + self.name = name + + ds1 = Dataset(uuid4(), "dataset_alpha") + ds2 = Dataset(uuid4(), "dataset_beta") + + # Mock dataset discovery + async def mock_get_authorized_existing_datasets(datasets, permission_type, user): + return [ds1, ds2] + + monkeypatch.setattr( + save_mod, "get_authorized_existing_datasets", mock_get_authorized_existing_datasets + ) + + # Mock data items (with filename collision in ds1) + class DataItem: + def __init__(self, id_, name, original_path=None): + self.id = id_ + self.name = name + self.original_data_location = original_path + + ds1_items = [ + DataItem(uuid4(), "report.txt", "/root/a/report.txt"), + DataItem(uuid4(), "report.txt", "/root/b/report.txt"), # collision + ] + ds2_items = [ + DataItem(uuid4(), "notes.md", "/root/x/notes.md"), + ] + + async def mock_get_dataset_data(dataset_id): + if dataset_id == ds1.id: + return ds1_items + if dataset_id == ds2.id: + return ds2_items + return [] + + monkeypatch.setattr(save_mod, "get_dataset_data", mock_get_dataset_data) + + # Mock summary retrieval + async def mock_get_document_summaries_text(data_id: str) -> str: + return "This is a summary." + + monkeypatch.setattr(save_mod, "_get_document_summaries_text", mock_get_document_summaries_text) + + # Mock questions + async def mock_generate_questions(file_name: str, summary_text: str): + return ["Q1?", "Q2?", "Q3?"] + + monkeypatch.setattr(save_mod, "_generate_questions", mock_generate_questions) + + # Mock searches per question + async def mock_run_searches_for_question(question, dataset_id, search_types, top_k): + return {st.value: [f"{question} -> ok"] for st in search_types} + + monkeypatch.setattr(save_mod, "_run_searches_for_question", mock_run_searches_for_question) + + # Use custom export path + export_dir = tmp_path / "my_exports" + export_dir_str = str(export_dir) + + # Run + result = await save_mod.save( + datasets=None, + export_root_directory=export_dir_str, + max_questions=3, + search_types=["GRAPH_COMPLETION", "INSIGHTS", "CHUNKS"], + top_k=2, + include_summary=True, + include_ascii_tree=True, + concurrency=2, + timeout=None, + ) + + # Verify returned mapping points to our custom path + assert str(ds1.id) in result and str(ds2.id) in result + assert result[str(ds1.id)].startswith(export_dir_str) + assert result[str(ds2.id)].startswith(export_dir_str) + + # Verify directories and files exist + ds1_dir = result[str(ds1.id)] + ds2_dir = result[str(ds2.id)] + + assert os.path.isdir(ds1_dir) + assert os.path.isdir(ds2_dir) + + # index.md present + assert os.path.isfile(os.path.join(ds1_dir, "index.md")) + assert os.path.isfile(os.path.join(ds2_dir, "index.md")) + + # File markdowns exist; collision handling: two files with similar base + ds1_files = [f for f in os.listdir(ds1_dir) if f.endswith(".md") and f != "index.md"] + assert len(ds1_files) == 2 + assert any(f == "report.txt.md" for f in ds1_files) + assert any(f.startswith("report.txt__") and f.endswith(".md") for f in ds1_files) + + # Content sanity: ensure question headers exist in one file + sample_md_path = os.path.join(ds1_dir, ds1_files[0]) + with open(sample_md_path, "r", encoding="utf-8") as fh: + content = fh.read() + assert "## Question ideas" in content + assert "## Searches" in content