Compare commits
2 commits
main
...
new_md_bra
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5f21f6a520 | ||
|
|
109c4ae569 |
10 changed files with 702 additions and 12 deletions
|
|
@ -27,7 +27,10 @@ from .api.v1.visualize import visualize_graph, start_visualization_server
|
|||
from cognee.modules.visualization.cognee_network_visualization import (
|
||||
cognee_network_visualization,
|
||||
)
|
||||
|
||||
from .api.v1.save.save import save
|
||||
from .api.v1.ui import start_ui
|
||||
|
||||
|
||||
# Pipelines
|
||||
from .modules import pipelines
|
||||
|
|
|
|||
|
|
@ -242,6 +242,9 @@ app.include_router(get_cognify_router(), prefix="/api/v1/cognify", tags=["cognif
|
|||
app.include_router(get_memify_router(), prefix="/api/v1/memify", tags=["memify"])
|
||||
|
||||
app.include_router(get_search_router(), prefix="/api/v1/search", tags=["search"])
|
||||
from cognee.api.v1.save.routers.get_save_router import get_save_router
|
||||
|
||||
app.include_router(get_save_router(), prefix="/api/v1/save", tags=["save"])
|
||||
|
||||
app.include_router(
|
||||
get_permissions_router(),
|
||||
|
|
|
|||
|
|
@ -11,6 +11,8 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
|
|||
)
|
||||
from cognee.modules.engine.operations.setup import setup
|
||||
from cognee.tasks.ingestion import ingest_data, resolve_data_directories
|
||||
from cognee.modules.ingestion import discover_directory_datasets
|
||||
import os
|
||||
|
||||
|
||||
async def add(
|
||||
|
|
@ -141,6 +143,56 @@ async def add(
|
|||
- GRAPH_DATABASE_PROVIDER: "kuzu" (default), "neo4j"
|
||||
|
||||
"""
|
||||
|
||||
# Special handling: folder:// scheme maps nested folders to datasets
|
||||
def _is_folder_uri(item: Union[str, BinaryIO]) -> bool:
|
||||
return isinstance(item, str) and item.startswith("folder://")
|
||||
|
||||
items = data if isinstance(data, list) else [data]
|
||||
if any(_is_folder_uri(item) for item in items):
|
||||
await setup()
|
||||
# Support multiple folder:// roots in one call
|
||||
last_run_info = None
|
||||
for item in items:
|
||||
if not _is_folder_uri(item):
|
||||
continue
|
||||
|
||||
root_path = item.replace("folder://", "", 1)
|
||||
if not os.path.isabs(root_path):
|
||||
root_path = os.path.abspath(root_path)
|
||||
|
||||
root_name = os.path.basename(os.path.normpath(root_path)) or "dataset"
|
||||
dataset_map = discover_directory_datasets(root_path, parent_dir=root_name)
|
||||
|
||||
# Process each discovered dataset independently via the pipeline
|
||||
for ds_name, file_list in dataset_map.items():
|
||||
# Authorize/create dataset
|
||||
user, ds = await resolve_authorized_user_dataset(None, ds_name, user)
|
||||
|
||||
await reset_dataset_pipeline_run_status(
|
||||
ds.id, user, pipeline_names=["add_pipeline", "cognify_pipeline"]
|
||||
)
|
||||
|
||||
tasks = [
|
||||
# We already have resolved file list per dataset
|
||||
Task(ingest_data, ds_name, user, node_set, None, preferred_loaders),
|
||||
]
|
||||
|
||||
async for run_info in run_pipeline(
|
||||
tasks=tasks,
|
||||
datasets=[ds.id],
|
||||
data=file_list,
|
||||
user=user,
|
||||
pipeline_name="add_pipeline",
|
||||
vector_db_config=vector_db_config,
|
||||
graph_db_config=graph_db_config,
|
||||
incremental_loading=incremental_loading,
|
||||
):
|
||||
last_run_info = run_info
|
||||
|
||||
return last_run_info
|
||||
|
||||
# Default behavior: single dataset ingestion
|
||||
tasks = [
|
||||
Task(resolve_data_directories, include_subdirectories=True),
|
||||
Task(ingest_data, dataset_name, user, node_set, dataset_id, preferred_loaders),
|
||||
|
|
|
|||
2
cognee/api/v1/save/__init__.py
Normal file
2
cognee/api/v1/save/__init__.py
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
# Re-export save function for local imports
|
||||
from .save import save # noqa: F401
|
||||
54
cognee/api/v1/save/routers/get_save_router.py
Normal file
54
cognee/api/v1/save/routers/get_save_router.py
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
from typing import Optional, List
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
|
||||
from cognee.api.v1.models import InDTO
|
||||
from cognee.api.v1.save.save import save as save_fn
|
||||
from cognee.modules.users.models import User
|
||||
from cognee.api.v1.auth.dependencies.get_authenticated_user import get_authenticated_user
|
||||
from cognee.shared.telemetry import send_telemetry
|
||||
|
||||
|
||||
class SavePayloadDTO(InDTO):
|
||||
datasets: Optional[List[str]] = None
|
||||
dataset_ids: Optional[List[UUID]] = None
|
||||
export_root_directory: Optional[str] = None
|
||||
# alias support
|
||||
path: Optional[str] = None
|
||||
|
||||
|
||||
def get_save_router() -> APIRouter:
|
||||
router = APIRouter()
|
||||
|
||||
@router.post("", response_model=dict)
|
||||
async def save(payload: SavePayloadDTO, user: User = Depends(get_authenticated_user)):
|
||||
"""
|
||||
Save dataset exports to markdown files.
|
||||
|
||||
For each accessible dataset, produces a folder with one markdown per data item
|
||||
containing summary, path ascii tree, question ideas, and search results.
|
||||
"""
|
||||
send_telemetry(
|
||||
"Save API Endpoint Invoked",
|
||||
user.id,
|
||||
additional_properties={
|
||||
"endpoint": "POST /v1/save",
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
datasets = payload.datasets if payload.datasets else payload.dataset_ids
|
||||
result = await save_fn(
|
||||
datasets=datasets,
|
||||
export_root_directory=payload.export_root_directory or payload.path,
|
||||
user=user,
|
||||
)
|
||||
return result
|
||||
except Exception as error:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_409_CONFLICT,
|
||||
detail=f"Error during save operation: {str(error)}",
|
||||
) from error
|
||||
|
||||
return router
|
||||
335
cognee/api/v1/save/save.py
Normal file
335
cognee/api/v1/save/save.py
Normal file
|
|
@ -0,0 +1,335 @@
|
|||
import os
|
||||
import asyncio
|
||||
import json
|
||||
from typing import Optional, Union, List, Dict
|
||||
from uuid import UUID
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from cognee.base_config import get_base_config
|
||||
from cognee.modules.users.models import User
|
||||
from cognee.modules.users.methods import get_default_user
|
||||
from cognee.modules.data.methods import get_authorized_existing_datasets, get_dataset_data
|
||||
from cognee.infrastructure.files.utils.get_data_file_path import get_data_file_path
|
||||
from cognee.infrastructure.llm.LLMGateway import LLMGateway
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.api.v1.search import search
|
||||
from cognee.modules.search.types import SearchType
|
||||
|
||||
|
||||
logger = get_logger("save")
|
||||
|
||||
|
||||
class QuestionsModel(BaseModel):
|
||||
questions: List[str]
|
||||
|
||||
|
||||
def _sanitize_filename(name: str) -> str:
|
||||
safe = "".join(c if c.isalnum() or c in ("-", "_", ".", " ") else "_" for c in name)
|
||||
return safe.strip().replace(" ", "_")
|
||||
|
||||
|
||||
def _dataset_dir_name(dataset) -> str:
|
||||
# Prefer readable dataset name when available, fallback to id
|
||||
if getattr(dataset, "name", None):
|
||||
return _sanitize_filename(str(dataset.name))
|
||||
return str(dataset.id)
|
||||
|
||||
|
||||
def _file_markdown_name(data_item, used_names: set[str]) -> str:
|
||||
# Use original file name if present, else data.name
|
||||
name = getattr(data_item, "name", None) or "file"
|
||||
base = _sanitize_filename(str(name))
|
||||
filename = f"{base}.md"
|
||||
if filename in used_names:
|
||||
short_id = str(getattr(data_item, "id", ""))[:8]
|
||||
filename = f"{base}__{short_id}.md"
|
||||
used_names.add(filename)
|
||||
return filename
|
||||
|
||||
|
||||
def _ascii_path_tree(path_str: str) -> str:
|
||||
if not path_str:
|
||||
return "(no path)"
|
||||
|
||||
# Normalize special schemes but keep segments readable
|
||||
try:
|
||||
normalized = get_data_file_path(path_str)
|
||||
except Exception:
|
||||
normalized = path_str
|
||||
|
||||
# Keep the path compact – show last 5 segments
|
||||
parts = [p for p in normalized.replace("\\", "/").split("/") if p]
|
||||
if len(parts) > 6:
|
||||
display = ["…"] + parts[-5:]
|
||||
else:
|
||||
display = parts
|
||||
|
||||
# Render a single-branch tree
|
||||
lines = []
|
||||
for idx, seg in enumerate(display):
|
||||
prefix = "└── " if idx == 0 else (" " * idx + "└── ")
|
||||
lines.append(f"{prefix}{seg}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
async def _get_summary_via_summaries(query_text: str, dataset_id: UUID, top_k: int) -> str:
|
||||
try:
|
||||
results = await search(
|
||||
query_text=query_text,
|
||||
query_type=SearchType.SUMMARIES,
|
||||
dataset_ids=[dataset_id],
|
||||
top_k=top_k,
|
||||
)
|
||||
if not results:
|
||||
return ""
|
||||
texts: List[str] = []
|
||||
for r in results[:top_k]:
|
||||
texts.append(str(r))
|
||||
return "\n\n".join(texts)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"SUMMARIES search failed for '%s' in dataset %s: %s",
|
||||
query_text,
|
||||
str(dataset_id),
|
||||
str(e),
|
||||
)
|
||||
return ""
|
||||
|
||||
|
||||
async def _generate_questions(file_name: str, summary_text: str) -> List[str]:
|
||||
prompt = (
|
||||
"You are an expert analyst. Given a file and its summary, propose 10 diverse, high-signal "
|
||||
"questions to further explore the file's content, implications, relationships, and gaps. "
|
||||
"Avoid duplicates; vary depth and angle (overview, details, cross-references, temporal, quality).\n\n"
|
||||
f"File: {file_name}\n\nSummary:\n{summary_text[:4000]}"
|
||||
)
|
||||
|
||||
model = await LLMGateway.acreate_structured_output(
|
||||
text_input=prompt,
|
||||
system_prompt="Return strictly a JSON with key 'questions' and value as an array of 10 concise strings.",
|
||||
response_model=QuestionsModel,
|
||||
)
|
||||
|
||||
# model can be either pydantic model or dict-like, normalize
|
||||
try:
|
||||
questions = list(getattr(model, "questions", []))
|
||||
except Exception:
|
||||
questions = []
|
||||
|
||||
# Fallback if the tool returned a dict-like
|
||||
if not questions and isinstance(model, dict):
|
||||
questions = list(model.get("questions", []) or [])
|
||||
|
||||
# Enforce 10 max
|
||||
return questions[:10]
|
||||
|
||||
|
||||
async def _run_searches_for_question(
|
||||
question: str, dataset_id: UUID, search_types: List[SearchType], top_k: int
|
||||
) -> Dict[str, Union[str, List[dict], List[str]]]:
|
||||
async def run_one(st: SearchType):
|
||||
try:
|
||||
result = await search(
|
||||
query_text=question,
|
||||
query_type=st,
|
||||
dataset_ids=[dataset_id],
|
||||
top_k=top_k,
|
||||
)
|
||||
return st.value, result
|
||||
except Exception as e:
|
||||
logger.error("Search failed for type %s: %s", st.value, str(e))
|
||||
return st.value, [f"Error: {str(e)}"]
|
||||
|
||||
pairs = await asyncio.gather(*[run_one(st) for st in search_types])
|
||||
return {k: v for k, v in pairs}
|
||||
|
||||
|
||||
def _format_results_md(results: Dict[str, Union[str, List[dict], List[str]]]) -> str:
|
||||
lines: List[str] = []
|
||||
for st, payload in results.items():
|
||||
lines.append(f"#### {st}")
|
||||
if isinstance(payload, list):
|
||||
# Printed as bullet items; stringify dicts
|
||||
for item in payload[:5]:
|
||||
if isinstance(item, dict):
|
||||
# compact representation
|
||||
snippet = json.dumps(item, ensure_ascii=False)[:800]
|
||||
lines.append(f"- {snippet}")
|
||||
else:
|
||||
text = str(item)
|
||||
lines.append(f"- {text[:800]}")
|
||||
else:
|
||||
lines.append(str(payload))
|
||||
lines.append("")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
async def save(
|
||||
datasets: Optional[Union[List[str], List[UUID]]] = None,
|
||||
export_root_directory: Optional[str] = None,
|
||||
user: Optional[User] = None,
|
||||
# Configurable knobs
|
||||
max_questions: int = 10,
|
||||
search_types: Optional[List[Union[str, SearchType]]] = None,
|
||||
top_k: int = 5,
|
||||
include_summary: bool = True,
|
||||
include_ascii_tree: bool = True,
|
||||
concurrency: int = 4,
|
||||
timeout: Optional[float] = None,
|
||||
) -> Dict[str, str]:
|
||||
"""
|
||||
Export per-dataset markdown summaries and search insights for each ingested file.
|
||||
|
||||
For every dataset the user can read:
|
||||
- Create a folder under export_root_directory (or data_root_directory/exports)
|
||||
- For each data item (file), create a .md containing:
|
||||
- Summary of the file (from existing TextSummary nodes)
|
||||
- A small ASCII path tree showing its folder position
|
||||
- Up to N LLM-generated question ideas (configurable)
|
||||
- Results of configured Cognee searches per question
|
||||
Also creates an index.md per dataset with links to files and an optional dataset summary.
|
||||
|
||||
Returns a mapping of dataset_id -> export_directory path.
|
||||
"""
|
||||
base_config = get_base_config()
|
||||
export_root = export_root_directory or os.path.join(
|
||||
base_config.data_root_directory, "memory_export"
|
||||
)
|
||||
os.makedirs(export_root, exist_ok=True)
|
||||
|
||||
if user is None:
|
||||
user = await get_default_user()
|
||||
|
||||
datasets_list = await get_authorized_existing_datasets(datasets, "read", user)
|
||||
results: Dict[str, str] = {}
|
||||
|
||||
for dataset in datasets_list:
|
||||
ds_dir = os.path.join(export_root, _dataset_dir_name(dataset))
|
||||
os.makedirs(ds_dir, exist_ok=True)
|
||||
results[str(dataset.id)] = ds_dir
|
||||
|
||||
data_items = await get_dataset_data(dataset.id)
|
||||
|
||||
# Normalize search types
|
||||
if not search_types:
|
||||
effective_search_types = [
|
||||
SearchType.GRAPH_COMPLETION,
|
||||
SearchType.INSIGHTS,
|
||||
SearchType.CHUNKS,
|
||||
]
|
||||
else:
|
||||
effective_search_types = []
|
||||
for st in search_types:
|
||||
if isinstance(st, SearchType):
|
||||
effective_search_types.append(st)
|
||||
else:
|
||||
try:
|
||||
effective_search_types.append(SearchType[str(st)])
|
||||
except Exception:
|
||||
logger.warning("Unknown search type '%s', skipping", str(st))
|
||||
|
||||
sem = asyncio.Semaphore(max(1, int(concurrency)))
|
||||
used_names: set[str] = set()
|
||||
index_entries: List[tuple[str, str]] = []
|
||||
|
||||
async def process_one(data_item):
|
||||
async with sem:
|
||||
file_label = getattr(data_item, "name", str(data_item.id))
|
||||
original_path = getattr(data_item, "original_data_location", None)
|
||||
|
||||
ascii_tree = (
|
||||
_ascii_path_tree(original_path or file_label) if include_ascii_tree else ""
|
||||
)
|
||||
|
||||
summary_text = ""
|
||||
if include_summary:
|
||||
# Use SUMMARIES search scoped to dataset to derive file summary
|
||||
file_query = getattr(data_item, "name", str(data_item.id)) or "file"
|
||||
summary_text = await _get_summary_via_summaries(file_query, dataset.id, top_k)
|
||||
if not summary_text:
|
||||
summary_text = "Summary not available."
|
||||
|
||||
if max_questions == 0:
|
||||
questions = []
|
||||
else:
|
||||
questions = await _generate_questions(file_label, summary_text)
|
||||
if max_questions is not None and max_questions >= 0:
|
||||
questions = questions[:max_questions]
|
||||
|
||||
async def searches_for_question(q: str):
|
||||
return await _run_searches_for_question(
|
||||
q, dataset.id, effective_search_types, top_k
|
||||
)
|
||||
|
||||
# Run per-question searches concurrently
|
||||
per_q_results = await asyncio.gather(*[searches_for_question(q) for q in questions])
|
||||
|
||||
# Build markdown content
|
||||
md_lines = [f"# {file_label}", ""]
|
||||
if include_ascii_tree:
|
||||
md_lines.extend(["## Location", "", "```", ascii_tree, "```", ""])
|
||||
if include_summary:
|
||||
md_lines.extend(["## Summary", "", summary_text, ""])
|
||||
|
||||
md_lines.append("## Question ideas")
|
||||
for idx, q in enumerate(questions, start=1):
|
||||
md_lines.append(f"- {idx}. {q}")
|
||||
md_lines.append("")
|
||||
|
||||
md_lines.append("## Searches")
|
||||
md_lines.append("")
|
||||
for q, per_type in zip(questions, per_q_results):
|
||||
md_lines.append(f"### Q: {q}")
|
||||
md_lines.append(_format_results_md(per_type))
|
||||
md_lines.append("")
|
||||
|
||||
# Write to file (collision-safe)
|
||||
md_filename = _file_markdown_name(data_item, used_names)
|
||||
export_path = os.path.join(ds_dir, md_filename)
|
||||
tmp_path = export_path + ".tmp"
|
||||
with open(tmp_path, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(md_lines))
|
||||
os.replace(tmp_path, export_path)
|
||||
|
||||
index_entries.append((file_label, md_filename))
|
||||
|
||||
tasks = [asyncio.create_task(process_one(item)) for item in data_items]
|
||||
|
||||
if timeout and timeout > 0:
|
||||
try:
|
||||
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error("Save timed out for dataset %s", str(dataset.id))
|
||||
else:
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Build dataset index.md with TOC and optional dataset summary via SUMMARIES
|
||||
try:
|
||||
index_lines = [f"# Dataset: {_dataset_dir_name(dataset)}", "", "## Files", ""]
|
||||
for display, fname in sorted(index_entries, key=lambda x: x[0].lower()):
|
||||
index_lines.append(f"- [{display}]({fname})")
|
||||
|
||||
# Dataset summary section
|
||||
try:
|
||||
summaries = await search(
|
||||
query_text="dataset overview",
|
||||
query_type=SearchType.SUMMARIES,
|
||||
dataset_ids=[dataset.id],
|
||||
top_k=top_k,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Dataset summary search failed: %s", str(e))
|
||||
summaries = []
|
||||
|
||||
if summaries:
|
||||
index_lines.extend(["", "## Dataset summary (top summaries)", ""])
|
||||
for s in summaries[:top_k]:
|
||||
index_lines.append(f"- {str(s)[:800]}")
|
||||
|
||||
with open(os.path.join(ds_dir, "index.md"), "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(index_lines))
|
||||
except Exception as e:
|
||||
logger.error("Failed to write dataset index for %s: %s", str(dataset.id), str(e))
|
||||
|
||||
return results
|
||||
|
|
@ -92,6 +92,7 @@ def _discover_commands() -> List[Type[SupportsCliCommand]]:
|
|||
("cognee.cli.commands.cognify_command", "CognifyCommand"),
|
||||
("cognee.cli.commands.delete_command", "DeleteCommand"),
|
||||
("cognee.cli.commands.config_command", "ConfigCommand"),
|
||||
("cognee.cli.commands.save_command", "SaveCommand"),
|
||||
]
|
||||
|
||||
for module_path, class_name in command_modules:
|
||||
|
|
|
|||
|
|
@ -24,7 +24,8 @@ Supported Input Types:
|
|||
- **File paths**: Local file paths (absolute paths starting with "/")
|
||||
- **File URLs**: "file:///absolute/path" or "file://relative/path"
|
||||
- **S3 paths**: "s3://bucket-name/path/to/file"
|
||||
- **Lists**: Multiple files or text strings in a single call
|
||||
- **Folder URIs**: "folder:///abs/path" groups files by subfolders into datasets (prefix with parent)
|
||||
- **Lists**: Multiple items in a single call
|
||||
|
||||
Supported File Formats:
|
||||
- Text files (.txt, .md, .csv)
|
||||
|
|
@ -41,13 +42,13 @@ After adding data, use `cognee cognify` to process it into knowledge graphs.
|
|||
parser.add_argument(
|
||||
"data",
|
||||
nargs="+",
|
||||
help="Data to add: text content, file paths (/path/to/file), file URLs (file://path), S3 paths (s3://bucket/file), or mix of these",
|
||||
help="Data to add: text, files (/path), file://, s3://, folder:///abs/path (datasets by subfolders)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dataset-name",
|
||||
"-d",
|
||||
default="main_dataset",
|
||||
help="Dataset name to organize your data (default: main_dataset)",
|
||||
help="Dataset name (ignored for folder://; subfolders become datasets)",
|
||||
)
|
||||
|
||||
def execute(self, args: argparse.Namespace) -> None:
|
||||
|
|
@ -55,20 +56,27 @@ After adding data, use `cognee cognify` to process it into knowledge graphs.
|
|||
# Import cognee here to avoid circular imports
|
||||
import cognee
|
||||
|
||||
fmt.echo(f"Adding {len(args.data)} item(s) to dataset '{args.dataset_name}'...")
|
||||
def contains_folder_uri(items):
|
||||
return any(isinstance(i, str) and i.startswith("folder://") for i in items)
|
||||
|
||||
inputs = args.data
|
||||
dataset_label = args.dataset_name
|
||||
|
||||
if contains_folder_uri(inputs):
|
||||
fmt.echo("Detected folder:// input. Subfolders will be added as separate datasets.")
|
||||
else:
|
||||
fmt.echo(f"Adding {len(inputs)} item(s) to dataset '{dataset_label}'...")
|
||||
|
||||
# Run the async add function
|
||||
async def run_add():
|
||||
try:
|
||||
# Pass all data items as a list to cognee.add if multiple items
|
||||
if len(args.data) == 1:
|
||||
data_to_add = args.data[0]
|
||||
else:
|
||||
data_to_add = args.data
|
||||
|
||||
data_to_add = inputs if len(inputs) > 1 else inputs[0]
|
||||
fmt.echo("Processing data...")
|
||||
await cognee.add(data=data_to_add, dataset_name=args.dataset_name)
|
||||
fmt.success(f"Successfully added data to dataset '{args.dataset_name}'")
|
||||
await cognee.add(data=data_to_add, dataset_name=dataset_label)
|
||||
if contains_folder_uri(inputs):
|
||||
fmt.success("Successfully added folder datasets")
|
||||
else:
|
||||
fmt.success(f"Successfully added data to dataset '{dataset_label}'")
|
||||
except Exception as e:
|
||||
raise CliCommandInnerException(f"Failed to add data: {str(e)}")
|
||||
|
||||
|
|
|
|||
116
cognee/cli/commands/save_command.py
Normal file
116
cognee/cli/commands/save_command.py
Normal file
|
|
@ -0,0 +1,116 @@
|
|||
import argparse
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
from cognee.cli.reference import SupportsCliCommand
|
||||
from cognee.cli import DEFAULT_DOCS_URL
|
||||
import cognee.cli.echo as fmt
|
||||
from cognee.cli.exceptions import CliCommandException, CliCommandInnerException
|
||||
from cognee.cli.config import SEARCH_TYPE_CHOICES
|
||||
|
||||
|
||||
class SaveCommand(SupportsCliCommand):
|
||||
command_string = "save"
|
||||
help_string = "Export dataset summaries and search insights to markdown files"
|
||||
docs_url = DEFAULT_DOCS_URL
|
||||
description = """
|
||||
Export per-dataset, per-file markdown reports with summaries, ASCII path, question ideas,
|
||||
and search results. Creates an index.md per dataset.
|
||||
"""
|
||||
|
||||
def configure_parser(self, parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument(
|
||||
"--datasets",
|
||||
nargs="*",
|
||||
help="Dataset names to process (default: all accessible)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--export-root",
|
||||
default=None,
|
||||
help="Export root directory (default: <data_root_directory>/memory_export)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--path",
|
||||
default=None,
|
||||
help="Alias for --export-root",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--max-questions",
|
||||
type=int,
|
||||
default=10,
|
||||
help="Maximum number of question ideas per file (default: 10)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--search-types",
|
||||
nargs="*",
|
||||
choices=SEARCH_TYPE_CHOICES,
|
||||
default=["GRAPH_COMPLETION", "INSIGHTS", "CHUNKS"],
|
||||
help="Search types to run per question",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--top-k",
|
||||
type=int,
|
||||
default=5,
|
||||
help="Top-k results to retrieve for each search (default: 5)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-summary",
|
||||
action="store_true",
|
||||
help="Exclude file summary section",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-path",
|
||||
action="store_true",
|
||||
help="Exclude ASCII path section",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--concurrency",
|
||||
type=int,
|
||||
default=4,
|
||||
help="Max concurrent files processed per dataset (default: 4)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--timeout",
|
||||
type=float,
|
||||
default=None,
|
||||
help="Optional per-dataset timeout in seconds",
|
||||
)
|
||||
|
||||
def execute(self, args: argparse.Namespace) -> None:
|
||||
try:
|
||||
import cognee
|
||||
|
||||
fmt.echo("Starting save export...")
|
||||
|
||||
async def run_save():
|
||||
try:
|
||||
result = await cognee.save(
|
||||
datasets=args.datasets,
|
||||
export_root_directory=args.export_root or args.path,
|
||||
max_questions=args.max_questions,
|
||||
search_types=args.search_types,
|
||||
top_k=args.top_k,
|
||||
include_summary=(not args.no_summary),
|
||||
include_ascii_tree=(not args.no_path),
|
||||
concurrency=args.concurrency,
|
||||
timeout=args.timeout,
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
raise CliCommandInnerException(f"Failed to save: {str(e)}")
|
||||
|
||||
results = asyncio.run(run_save())
|
||||
|
||||
if results:
|
||||
fmt.success("Export complete:")
|
||||
for ds_id, path in results.items():
|
||||
fmt.echo(f"- {ds_id}: {path}")
|
||||
else:
|
||||
fmt.note("No datasets to export or no outputs generated.")
|
||||
|
||||
except CliCommandInnerException as e:
|
||||
fmt.error(str(e))
|
||||
raise CliCommandException(self.docs_url)
|
||||
except Exception as e:
|
||||
fmt.error(f"Unexpected error: {str(e)}")
|
||||
raise CliCommandException(self.docs_url)
|
||||
116
cognee/tests/test_save_export_path.py
Normal file
116
cognee/tests/test_save_export_path.py
Normal file
|
|
@ -0,0 +1,116 @@
|
|||
import os
|
||||
import asyncio
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_save_uses_custom_export_path(tmp_path, monkeypatch):
|
||||
# Import target after tmp fixtures are ready
|
||||
from cognee.api.v1.save import save as save_mod
|
||||
|
||||
# Prepare two mock datasets
|
||||
class Dataset:
|
||||
def __init__(self, id_, name):
|
||||
self.id = id_
|
||||
self.name = name
|
||||
|
||||
ds1 = Dataset(uuid4(), "dataset_alpha")
|
||||
ds2 = Dataset(uuid4(), "dataset_beta")
|
||||
|
||||
# Mock dataset discovery
|
||||
async def mock_get_authorized_existing_datasets(datasets, permission_type, user):
|
||||
return [ds1, ds2]
|
||||
|
||||
monkeypatch.setattr(
|
||||
save_mod, "get_authorized_existing_datasets", mock_get_authorized_existing_datasets
|
||||
)
|
||||
|
||||
# Mock data items (with filename collision in ds1)
|
||||
class DataItem:
|
||||
def __init__(self, id_, name, original_path=None):
|
||||
self.id = id_
|
||||
self.name = name
|
||||
self.original_data_location = original_path
|
||||
|
||||
ds1_items = [
|
||||
DataItem(uuid4(), "report.txt", "/root/a/report.txt"),
|
||||
DataItem(uuid4(), "report.txt", "/root/b/report.txt"), # collision
|
||||
]
|
||||
ds2_items = [
|
||||
DataItem(uuid4(), "notes.md", "/root/x/notes.md"),
|
||||
]
|
||||
|
||||
async def mock_get_dataset_data(dataset_id):
|
||||
if dataset_id == ds1.id:
|
||||
return ds1_items
|
||||
if dataset_id == ds2.id:
|
||||
return ds2_items
|
||||
return []
|
||||
|
||||
monkeypatch.setattr(save_mod, "get_dataset_data", mock_get_dataset_data)
|
||||
|
||||
# Mock summary retrieval
|
||||
async def mock_get_document_summaries_text(data_id: str) -> str:
|
||||
return "This is a summary."
|
||||
|
||||
monkeypatch.setattr(save_mod, "_get_document_summaries_text", mock_get_document_summaries_text)
|
||||
|
||||
# Mock questions
|
||||
async def mock_generate_questions(file_name: str, summary_text: str):
|
||||
return ["Q1?", "Q2?", "Q3?"]
|
||||
|
||||
monkeypatch.setattr(save_mod, "_generate_questions", mock_generate_questions)
|
||||
|
||||
# Mock searches per question
|
||||
async def mock_run_searches_for_question(question, dataset_id, search_types, top_k):
|
||||
return {st.value: [f"{question} -> ok"] for st in search_types}
|
||||
|
||||
monkeypatch.setattr(save_mod, "_run_searches_for_question", mock_run_searches_for_question)
|
||||
|
||||
# Use custom export path
|
||||
export_dir = tmp_path / "my_exports"
|
||||
export_dir_str = str(export_dir)
|
||||
|
||||
# Run
|
||||
result = await save_mod.save(
|
||||
datasets=None,
|
||||
export_root_directory=export_dir_str,
|
||||
max_questions=3,
|
||||
search_types=["GRAPH_COMPLETION", "INSIGHTS", "CHUNKS"],
|
||||
top_k=2,
|
||||
include_summary=True,
|
||||
include_ascii_tree=True,
|
||||
concurrency=2,
|
||||
timeout=None,
|
||||
)
|
||||
|
||||
# Verify returned mapping points to our custom path
|
||||
assert str(ds1.id) in result and str(ds2.id) in result
|
||||
assert result[str(ds1.id)].startswith(export_dir_str)
|
||||
assert result[str(ds2.id)].startswith(export_dir_str)
|
||||
|
||||
# Verify directories and files exist
|
||||
ds1_dir = result[str(ds1.id)]
|
||||
ds2_dir = result[str(ds2.id)]
|
||||
|
||||
assert os.path.isdir(ds1_dir)
|
||||
assert os.path.isdir(ds2_dir)
|
||||
|
||||
# index.md present
|
||||
assert os.path.isfile(os.path.join(ds1_dir, "index.md"))
|
||||
assert os.path.isfile(os.path.join(ds2_dir, "index.md"))
|
||||
|
||||
# File markdowns exist; collision handling: two files with similar base
|
||||
ds1_files = [f for f in os.listdir(ds1_dir) if f.endswith(".md") and f != "index.md"]
|
||||
assert len(ds1_files) == 2
|
||||
assert any(f == "report.txt.md" for f in ds1_files)
|
||||
assert any(f.startswith("report.txt__") and f.endswith(".md") for f in ds1_files)
|
||||
|
||||
# Content sanity: ensure question headers exist in one file
|
||||
sample_md_path = os.path.join(ds1_dir, ds1_files[0])
|
||||
with open(sample_md_path, "r", encoding="utf-8") as fh:
|
||||
content = fh.read()
|
||||
assert "## Question ideas" in content
|
||||
assert "## Searches" in content
|
||||
Loading…
Add table
Reference in a new issue