Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
Vasilije
5f21f6a520
Merge branch 'dev' into new_md_branch 2025-09-21 21:50:36 +02:00
vasilije
109c4ae569 added new functionality to save datasets 2025-09-10 20:59:54 -07:00
10 changed files with 702 additions and 12 deletions

View file

@ -27,7 +27,10 @@ from .api.v1.visualize import visualize_graph, start_visualization_server
from cognee.modules.visualization.cognee_network_visualization import (
cognee_network_visualization,
)
from .api.v1.save.save import save
from .api.v1.ui import start_ui
# Pipelines
from .modules import pipelines

View file

@ -242,6 +242,9 @@ app.include_router(get_cognify_router(), prefix="/api/v1/cognify", tags=["cognif
app.include_router(get_memify_router(), prefix="/api/v1/memify", tags=["memify"])
app.include_router(get_search_router(), prefix="/api/v1/search", tags=["search"])
from cognee.api.v1.save.routers.get_save_router import get_save_router
app.include_router(get_save_router(), prefix="/api/v1/save", tags=["save"])
app.include_router(
get_permissions_router(),

View file

@ -11,6 +11,8 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
)
from cognee.modules.engine.operations.setup import setup
from cognee.tasks.ingestion import ingest_data, resolve_data_directories
from cognee.modules.ingestion import discover_directory_datasets
import os
async def add(
@ -141,6 +143,56 @@ async def add(
- GRAPH_DATABASE_PROVIDER: "kuzu" (default), "neo4j"
"""
# Special handling: folder:// scheme maps nested folders to datasets
def _is_folder_uri(item: Union[str, BinaryIO]) -> bool:
return isinstance(item, str) and item.startswith("folder://")
items = data if isinstance(data, list) else [data]
if any(_is_folder_uri(item) for item in items):
await setup()
# Support multiple folder:// roots in one call
last_run_info = None
for item in items:
if not _is_folder_uri(item):
continue
root_path = item.replace("folder://", "", 1)
if not os.path.isabs(root_path):
root_path = os.path.abspath(root_path)
root_name = os.path.basename(os.path.normpath(root_path)) or "dataset"
dataset_map = discover_directory_datasets(root_path, parent_dir=root_name)
# Process each discovered dataset independently via the pipeline
for ds_name, file_list in dataset_map.items():
# Authorize/create dataset
user, ds = await resolve_authorized_user_dataset(None, ds_name, user)
await reset_dataset_pipeline_run_status(
ds.id, user, pipeline_names=["add_pipeline", "cognify_pipeline"]
)
tasks = [
# We already have resolved file list per dataset
Task(ingest_data, ds_name, user, node_set, None, preferred_loaders),
]
async for run_info in run_pipeline(
tasks=tasks,
datasets=[ds.id],
data=file_list,
user=user,
pipeline_name="add_pipeline",
vector_db_config=vector_db_config,
graph_db_config=graph_db_config,
incremental_loading=incremental_loading,
):
last_run_info = run_info
return last_run_info
# Default behavior: single dataset ingestion
tasks = [
Task(resolve_data_directories, include_subdirectories=True),
Task(ingest_data, dataset_name, user, node_set, dataset_id, preferred_loaders),

View file

@ -0,0 +1,2 @@
# Re-export save function for local imports
from .save import save # noqa: F401

View file

@ -0,0 +1,54 @@
from typing import Optional, List
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from cognee.api.v1.models import InDTO
from cognee.api.v1.save.save import save as save_fn
from cognee.modules.users.models import User
from cognee.api.v1.auth.dependencies.get_authenticated_user import get_authenticated_user
from cognee.shared.telemetry import send_telemetry
class SavePayloadDTO(InDTO):
datasets: Optional[List[str]] = None
dataset_ids: Optional[List[UUID]] = None
export_root_directory: Optional[str] = None
# alias support
path: Optional[str] = None
def get_save_router() -> APIRouter:
router = APIRouter()
@router.post("", response_model=dict)
async def save(payload: SavePayloadDTO, user: User = Depends(get_authenticated_user)):
"""
Save dataset exports to markdown files.
For each accessible dataset, produces a folder with one markdown per data item
containing summary, path ascii tree, question ideas, and search results.
"""
send_telemetry(
"Save API Endpoint Invoked",
user.id,
additional_properties={
"endpoint": "POST /v1/save",
},
)
try:
datasets = payload.datasets if payload.datasets else payload.dataset_ids
result = await save_fn(
datasets=datasets,
export_root_directory=payload.export_root_directory or payload.path,
user=user,
)
return result
except Exception as error:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Error during save operation: {str(error)}",
) from error
return router

335
cognee/api/v1/save/save.py Normal file
View file

@ -0,0 +1,335 @@
import os
import asyncio
import json
from typing import Optional, Union, List, Dict
from uuid import UUID
from pydantic import BaseModel
from cognee.base_config import get_base_config
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from cognee.modules.data.methods import get_authorized_existing_datasets, get_dataset_data
from cognee.infrastructure.files.utils.get_data_file_path import get_data_file_path
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.shared.logging_utils import get_logger
from cognee.api.v1.search import search
from cognee.modules.search.types import SearchType
logger = get_logger("save")
class QuestionsModel(BaseModel):
questions: List[str]
def _sanitize_filename(name: str) -> str:
safe = "".join(c if c.isalnum() or c in ("-", "_", ".", " ") else "_" for c in name)
return safe.strip().replace(" ", "_")
def _dataset_dir_name(dataset) -> str:
# Prefer readable dataset name when available, fallback to id
if getattr(dataset, "name", None):
return _sanitize_filename(str(dataset.name))
return str(dataset.id)
def _file_markdown_name(data_item, used_names: set[str]) -> str:
# Use original file name if present, else data.name
name = getattr(data_item, "name", None) or "file"
base = _sanitize_filename(str(name))
filename = f"{base}.md"
if filename in used_names:
short_id = str(getattr(data_item, "id", ""))[:8]
filename = f"{base}__{short_id}.md"
used_names.add(filename)
return filename
def _ascii_path_tree(path_str: str) -> str:
if not path_str:
return "(no path)"
# Normalize special schemes but keep segments readable
try:
normalized = get_data_file_path(path_str)
except Exception:
normalized = path_str
# Keep the path compact show last 5 segments
parts = [p for p in normalized.replace("\\", "/").split("/") if p]
if len(parts) > 6:
display = [""] + parts[-5:]
else:
display = parts
# Render a single-branch tree
lines = []
for idx, seg in enumerate(display):
prefix = "└── " if idx == 0 else (" " * idx + "└── ")
lines.append(f"{prefix}{seg}")
return "\n".join(lines)
async def _get_summary_via_summaries(query_text: str, dataset_id: UUID, top_k: int) -> str:
try:
results = await search(
query_text=query_text,
query_type=SearchType.SUMMARIES,
dataset_ids=[dataset_id],
top_k=top_k,
)
if not results:
return ""
texts: List[str] = []
for r in results[:top_k]:
texts.append(str(r))
return "\n\n".join(texts)
except Exception as e:
logger.error(
"SUMMARIES search failed for '%s' in dataset %s: %s",
query_text,
str(dataset_id),
str(e),
)
return ""
async def _generate_questions(file_name: str, summary_text: str) -> List[str]:
prompt = (
"You are an expert analyst. Given a file and its summary, propose 10 diverse, high-signal "
"questions to further explore the file's content, implications, relationships, and gaps. "
"Avoid duplicates; vary depth and angle (overview, details, cross-references, temporal, quality).\n\n"
f"File: {file_name}\n\nSummary:\n{summary_text[:4000]}"
)
model = await LLMGateway.acreate_structured_output(
text_input=prompt,
system_prompt="Return strictly a JSON with key 'questions' and value as an array of 10 concise strings.",
response_model=QuestionsModel,
)
# model can be either pydantic model or dict-like, normalize
try:
questions = list(getattr(model, "questions", []))
except Exception:
questions = []
# Fallback if the tool returned a dict-like
if not questions and isinstance(model, dict):
questions = list(model.get("questions", []) or [])
# Enforce 10 max
return questions[:10]
async def _run_searches_for_question(
question: str, dataset_id: UUID, search_types: List[SearchType], top_k: int
) -> Dict[str, Union[str, List[dict], List[str]]]:
async def run_one(st: SearchType):
try:
result = await search(
query_text=question,
query_type=st,
dataset_ids=[dataset_id],
top_k=top_k,
)
return st.value, result
except Exception as e:
logger.error("Search failed for type %s: %s", st.value, str(e))
return st.value, [f"Error: {str(e)}"]
pairs = await asyncio.gather(*[run_one(st) for st in search_types])
return {k: v for k, v in pairs}
def _format_results_md(results: Dict[str, Union[str, List[dict], List[str]]]) -> str:
lines: List[str] = []
for st, payload in results.items():
lines.append(f"#### {st}")
if isinstance(payload, list):
# Printed as bullet items; stringify dicts
for item in payload[:5]:
if isinstance(item, dict):
# compact representation
snippet = json.dumps(item, ensure_ascii=False)[:800]
lines.append(f"- {snippet}")
else:
text = str(item)
lines.append(f"- {text[:800]}")
else:
lines.append(str(payload))
lines.append("")
return "\n".join(lines)
async def save(
datasets: Optional[Union[List[str], List[UUID]]] = None,
export_root_directory: Optional[str] = None,
user: Optional[User] = None,
# Configurable knobs
max_questions: int = 10,
search_types: Optional[List[Union[str, SearchType]]] = None,
top_k: int = 5,
include_summary: bool = True,
include_ascii_tree: bool = True,
concurrency: int = 4,
timeout: Optional[float] = None,
) -> Dict[str, str]:
"""
Export per-dataset markdown summaries and search insights for each ingested file.
For every dataset the user can read:
- Create a folder under export_root_directory (or data_root_directory/exports)
- For each data item (file), create a .md containing:
- Summary of the file (from existing TextSummary nodes)
- A small ASCII path tree showing its folder position
- Up to N LLM-generated question ideas (configurable)
- Results of configured Cognee searches per question
Also creates an index.md per dataset with links to files and an optional dataset summary.
Returns a mapping of dataset_id -> export_directory path.
"""
base_config = get_base_config()
export_root = export_root_directory or os.path.join(
base_config.data_root_directory, "memory_export"
)
os.makedirs(export_root, exist_ok=True)
if user is None:
user = await get_default_user()
datasets_list = await get_authorized_existing_datasets(datasets, "read", user)
results: Dict[str, str] = {}
for dataset in datasets_list:
ds_dir = os.path.join(export_root, _dataset_dir_name(dataset))
os.makedirs(ds_dir, exist_ok=True)
results[str(dataset.id)] = ds_dir
data_items = await get_dataset_data(dataset.id)
# Normalize search types
if not search_types:
effective_search_types = [
SearchType.GRAPH_COMPLETION,
SearchType.INSIGHTS,
SearchType.CHUNKS,
]
else:
effective_search_types = []
for st in search_types:
if isinstance(st, SearchType):
effective_search_types.append(st)
else:
try:
effective_search_types.append(SearchType[str(st)])
except Exception:
logger.warning("Unknown search type '%s', skipping", str(st))
sem = asyncio.Semaphore(max(1, int(concurrency)))
used_names: set[str] = set()
index_entries: List[tuple[str, str]] = []
async def process_one(data_item):
async with sem:
file_label = getattr(data_item, "name", str(data_item.id))
original_path = getattr(data_item, "original_data_location", None)
ascii_tree = (
_ascii_path_tree(original_path or file_label) if include_ascii_tree else ""
)
summary_text = ""
if include_summary:
# Use SUMMARIES search scoped to dataset to derive file summary
file_query = getattr(data_item, "name", str(data_item.id)) or "file"
summary_text = await _get_summary_via_summaries(file_query, dataset.id, top_k)
if not summary_text:
summary_text = "Summary not available."
if max_questions == 0:
questions = []
else:
questions = await _generate_questions(file_label, summary_text)
if max_questions is not None and max_questions >= 0:
questions = questions[:max_questions]
async def searches_for_question(q: str):
return await _run_searches_for_question(
q, dataset.id, effective_search_types, top_k
)
# Run per-question searches concurrently
per_q_results = await asyncio.gather(*[searches_for_question(q) for q in questions])
# Build markdown content
md_lines = [f"# {file_label}", ""]
if include_ascii_tree:
md_lines.extend(["## Location", "", "```", ascii_tree, "```", ""])
if include_summary:
md_lines.extend(["## Summary", "", summary_text, ""])
md_lines.append("## Question ideas")
for idx, q in enumerate(questions, start=1):
md_lines.append(f"- {idx}. {q}")
md_lines.append("")
md_lines.append("## Searches")
md_lines.append("")
for q, per_type in zip(questions, per_q_results):
md_lines.append(f"### Q: {q}")
md_lines.append(_format_results_md(per_type))
md_lines.append("")
# Write to file (collision-safe)
md_filename = _file_markdown_name(data_item, used_names)
export_path = os.path.join(ds_dir, md_filename)
tmp_path = export_path + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
f.write("\n".join(md_lines))
os.replace(tmp_path, export_path)
index_entries.append((file_label, md_filename))
tasks = [asyncio.create_task(process_one(item)) for item in data_items]
if timeout and timeout > 0:
try:
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout)
except asyncio.TimeoutError:
logger.error("Save timed out for dataset %s", str(dataset.id))
else:
await asyncio.gather(*tasks, return_exceptions=True)
# Build dataset index.md with TOC and optional dataset summary via SUMMARIES
try:
index_lines = [f"# Dataset: {_dataset_dir_name(dataset)}", "", "## Files", ""]
for display, fname in sorted(index_entries, key=lambda x: x[0].lower()):
index_lines.append(f"- [{display}]({fname})")
# Dataset summary section
try:
summaries = await search(
query_text="dataset overview",
query_type=SearchType.SUMMARIES,
dataset_ids=[dataset.id],
top_k=top_k,
)
except Exception as e:
logger.error("Dataset summary search failed: %s", str(e))
summaries = []
if summaries:
index_lines.extend(["", "## Dataset summary (top summaries)", ""])
for s in summaries[:top_k]:
index_lines.append(f"- {str(s)[:800]}")
with open(os.path.join(ds_dir, "index.md"), "w", encoding="utf-8") as f:
f.write("\n".join(index_lines))
except Exception as e:
logger.error("Failed to write dataset index for %s: %s", str(dataset.id), str(e))
return results

View file

@ -92,6 +92,7 @@ def _discover_commands() -> List[Type[SupportsCliCommand]]:
("cognee.cli.commands.cognify_command", "CognifyCommand"),
("cognee.cli.commands.delete_command", "DeleteCommand"),
("cognee.cli.commands.config_command", "ConfigCommand"),
("cognee.cli.commands.save_command", "SaveCommand"),
]
for module_path, class_name in command_modules:

View file

@ -24,7 +24,8 @@ Supported Input Types:
- **File paths**: Local file paths (absolute paths starting with "/")
- **File URLs**: "file:///absolute/path" or "file://relative/path"
- **S3 paths**: "s3://bucket-name/path/to/file"
- **Lists**: Multiple files or text strings in a single call
- **Folder URIs**: "folder:///abs/path" groups files by subfolders into datasets (prefix with parent)
- **Lists**: Multiple items in a single call
Supported File Formats:
- Text files (.txt, .md, .csv)
@ -41,13 +42,13 @@ After adding data, use `cognee cognify` to process it into knowledge graphs.
parser.add_argument(
"data",
nargs="+",
help="Data to add: text content, file paths (/path/to/file), file URLs (file://path), S3 paths (s3://bucket/file), or mix of these",
help="Data to add: text, files (/path), file://, s3://, folder:///abs/path (datasets by subfolders)",
)
parser.add_argument(
"--dataset-name",
"-d",
default="main_dataset",
help="Dataset name to organize your data (default: main_dataset)",
help="Dataset name (ignored for folder://; subfolders become datasets)",
)
def execute(self, args: argparse.Namespace) -> None:
@ -55,20 +56,27 @@ After adding data, use `cognee cognify` to process it into knowledge graphs.
# Import cognee here to avoid circular imports
import cognee
fmt.echo(f"Adding {len(args.data)} item(s) to dataset '{args.dataset_name}'...")
def contains_folder_uri(items):
return any(isinstance(i, str) and i.startswith("folder://") for i in items)
inputs = args.data
dataset_label = args.dataset_name
if contains_folder_uri(inputs):
fmt.echo("Detected folder:// input. Subfolders will be added as separate datasets.")
else:
fmt.echo(f"Adding {len(inputs)} item(s) to dataset '{dataset_label}'...")
# Run the async add function
async def run_add():
try:
# Pass all data items as a list to cognee.add if multiple items
if len(args.data) == 1:
data_to_add = args.data[0]
else:
data_to_add = args.data
data_to_add = inputs if len(inputs) > 1 else inputs[0]
fmt.echo("Processing data...")
await cognee.add(data=data_to_add, dataset_name=args.dataset_name)
fmt.success(f"Successfully added data to dataset '{args.dataset_name}'")
await cognee.add(data=data_to_add, dataset_name=dataset_label)
if contains_folder_uri(inputs):
fmt.success("Successfully added folder datasets")
else:
fmt.success(f"Successfully added data to dataset '{dataset_label}'")
except Exception as e:
raise CliCommandInnerException(f"Failed to add data: {str(e)}")

View file

@ -0,0 +1,116 @@
import argparse
import asyncio
from typing import Optional
from cognee.cli.reference import SupportsCliCommand
from cognee.cli import DEFAULT_DOCS_URL
import cognee.cli.echo as fmt
from cognee.cli.exceptions import CliCommandException, CliCommandInnerException
from cognee.cli.config import SEARCH_TYPE_CHOICES
class SaveCommand(SupportsCliCommand):
command_string = "save"
help_string = "Export dataset summaries and search insights to markdown files"
docs_url = DEFAULT_DOCS_URL
description = """
Export per-dataset, per-file markdown reports with summaries, ASCII path, question ideas,
and search results. Creates an index.md per dataset.
"""
def configure_parser(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--datasets",
nargs="*",
help="Dataset names to process (default: all accessible)",
)
parser.add_argument(
"--export-root",
default=None,
help="Export root directory (default: <data_root_directory>/memory_export)",
)
parser.add_argument(
"--path",
default=None,
help="Alias for --export-root",
)
parser.add_argument(
"--max-questions",
type=int,
default=10,
help="Maximum number of question ideas per file (default: 10)",
)
parser.add_argument(
"--search-types",
nargs="*",
choices=SEARCH_TYPE_CHOICES,
default=["GRAPH_COMPLETION", "INSIGHTS", "CHUNKS"],
help="Search types to run per question",
)
parser.add_argument(
"--top-k",
type=int,
default=5,
help="Top-k results to retrieve for each search (default: 5)",
)
parser.add_argument(
"--no-summary",
action="store_true",
help="Exclude file summary section",
)
parser.add_argument(
"--no-path",
action="store_true",
help="Exclude ASCII path section",
)
parser.add_argument(
"--concurrency",
type=int,
default=4,
help="Max concurrent files processed per dataset (default: 4)",
)
parser.add_argument(
"--timeout",
type=float,
default=None,
help="Optional per-dataset timeout in seconds",
)
def execute(self, args: argparse.Namespace) -> None:
try:
import cognee
fmt.echo("Starting save export...")
async def run_save():
try:
result = await cognee.save(
datasets=args.datasets,
export_root_directory=args.export_root or args.path,
max_questions=args.max_questions,
search_types=args.search_types,
top_k=args.top_k,
include_summary=(not args.no_summary),
include_ascii_tree=(not args.no_path),
concurrency=args.concurrency,
timeout=args.timeout,
)
return result
except Exception as e:
raise CliCommandInnerException(f"Failed to save: {str(e)}")
results = asyncio.run(run_save())
if results:
fmt.success("Export complete:")
for ds_id, path in results.items():
fmt.echo(f"- {ds_id}: {path}")
else:
fmt.note("No datasets to export or no outputs generated.")
except CliCommandInnerException as e:
fmt.error(str(e))
raise CliCommandException(self.docs_url)
except Exception as e:
fmt.error(f"Unexpected error: {str(e)}")
raise CliCommandException(self.docs_url)

View file

@ -0,0 +1,116 @@
import os
import asyncio
from uuid import uuid4
import pytest
@pytest.mark.asyncio
async def test_save_uses_custom_export_path(tmp_path, monkeypatch):
# Import target after tmp fixtures are ready
from cognee.api.v1.save import save as save_mod
# Prepare two mock datasets
class Dataset:
def __init__(self, id_, name):
self.id = id_
self.name = name
ds1 = Dataset(uuid4(), "dataset_alpha")
ds2 = Dataset(uuid4(), "dataset_beta")
# Mock dataset discovery
async def mock_get_authorized_existing_datasets(datasets, permission_type, user):
return [ds1, ds2]
monkeypatch.setattr(
save_mod, "get_authorized_existing_datasets", mock_get_authorized_existing_datasets
)
# Mock data items (with filename collision in ds1)
class DataItem:
def __init__(self, id_, name, original_path=None):
self.id = id_
self.name = name
self.original_data_location = original_path
ds1_items = [
DataItem(uuid4(), "report.txt", "/root/a/report.txt"),
DataItem(uuid4(), "report.txt", "/root/b/report.txt"), # collision
]
ds2_items = [
DataItem(uuid4(), "notes.md", "/root/x/notes.md"),
]
async def mock_get_dataset_data(dataset_id):
if dataset_id == ds1.id:
return ds1_items
if dataset_id == ds2.id:
return ds2_items
return []
monkeypatch.setattr(save_mod, "get_dataset_data", mock_get_dataset_data)
# Mock summary retrieval
async def mock_get_document_summaries_text(data_id: str) -> str:
return "This is a summary."
monkeypatch.setattr(save_mod, "_get_document_summaries_text", mock_get_document_summaries_text)
# Mock questions
async def mock_generate_questions(file_name: str, summary_text: str):
return ["Q1?", "Q2?", "Q3?"]
monkeypatch.setattr(save_mod, "_generate_questions", mock_generate_questions)
# Mock searches per question
async def mock_run_searches_for_question(question, dataset_id, search_types, top_k):
return {st.value: [f"{question} -> ok"] for st in search_types}
monkeypatch.setattr(save_mod, "_run_searches_for_question", mock_run_searches_for_question)
# Use custom export path
export_dir = tmp_path / "my_exports"
export_dir_str = str(export_dir)
# Run
result = await save_mod.save(
datasets=None,
export_root_directory=export_dir_str,
max_questions=3,
search_types=["GRAPH_COMPLETION", "INSIGHTS", "CHUNKS"],
top_k=2,
include_summary=True,
include_ascii_tree=True,
concurrency=2,
timeout=None,
)
# Verify returned mapping points to our custom path
assert str(ds1.id) in result and str(ds2.id) in result
assert result[str(ds1.id)].startswith(export_dir_str)
assert result[str(ds2.id)].startswith(export_dir_str)
# Verify directories and files exist
ds1_dir = result[str(ds1.id)]
ds2_dir = result[str(ds2.id)]
assert os.path.isdir(ds1_dir)
assert os.path.isdir(ds2_dir)
# index.md present
assert os.path.isfile(os.path.join(ds1_dir, "index.md"))
assert os.path.isfile(os.path.join(ds2_dir, "index.md"))
# File markdowns exist; collision handling: two files with similar base
ds1_files = [f for f in os.listdir(ds1_dir) if f.endswith(".md") and f != "index.md"]
assert len(ds1_files) == 2
assert any(f == "report.txt.md" for f in ds1_files)
assert any(f.startswith("report.txt__") and f.endswith(".md") for f in ds1_files)
# Content sanity: ensure question headers exist in one file
sample_md_path = os.path.join(ds1_dir, ds1_files[0])
with open(sample_md_path, "r", encoding="utf-8") as fh:
content = fh.read()
assert "## Question ideas" in content
assert "## Searches" in content