Compare commits
10 commits
main
...
feature/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
30fd84e20d | ||
|
|
6a087cbdfc | ||
|
|
d263d281a0 | ||
|
|
ceaa015c91 | ||
|
|
65c552a35f | ||
|
|
e86bce7280 | ||
|
|
34d02a84c7 | ||
|
|
04c6d8f608 | ||
|
|
253dce89fe | ||
|
|
2db9d1ee03 |
13 changed files with 5063 additions and 4139 deletions
4
.gitignore
vendored
4
.gitignore
vendored
|
|
@ -197,3 +197,7 @@ SWE-bench_testsample/
|
|||
|
||||
# ChromaDB Data
|
||||
.chromadb_data/
|
||||
# TUI development artifacts
|
||||
*.backup
|
||||
create_tui_files.sh
|
||||
add_tui_command.py
|
||||
|
|
|
|||
|
|
@ -92,6 +92,7 @@ def _discover_commands() -> List[Type[SupportsCliCommand]]:
|
|||
("cognee.cli.commands.cognify_command", "CognifyCommand"),
|
||||
("cognee.cli.commands.delete_command", "DeleteCommand"),
|
||||
("cognee.cli.commands.config_command", "ConfigCommand"),
|
||||
("cognee.cli.commands.tui_command", "TuiCommand"),
|
||||
]
|
||||
|
||||
for module_path, class_name in command_modules:
|
||||
|
|
|
|||
52
cognee/cli/commands/tui_command.py
Normal file
52
cognee/cli/commands/tui_command.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
import argparse
|
||||
from cognee.cli.reference import SupportsCliCommand
|
||||
from cognee.cli import DEFAULT_DOCS_URL
|
||||
import cognee.cli.echo as fmt
|
||||
from cognee.cli.exceptions import CliCommandException
|
||||
|
||||
|
||||
class TuiCommand(SupportsCliCommand):
|
||||
command_string = "tui"
|
||||
help_string = "Launch interactive Terminal User Interface"
|
||||
docs_url = DEFAULT_DOCS_URL
|
||||
|
||||
description = """
|
||||
Launch the Cognee Terminal User Interface (TUI).
|
||||
|
||||
The TUI provides an interactive, text-based interface for managing your
|
||||
knowledge graphs with features like:
|
||||
|
||||
- **Context Management**: Add and manage data sources
|
||||
- **Search & Query**: Interactive knowledge graph querying
|
||||
- **Settings**: Configure API keys and models
|
||||
- **Live Updates**: Real-time status and progress indicators
|
||||
|
||||
The TUI is keyboard-driven and supports:
|
||||
- Arrow key navigation
|
||||
- Keyboard shortcuts (h=Home, c=Context, s=Search, etc.)
|
||||
- Tab completion for inputs
|
||||
|
||||
Perfect for managing Cognee from the terminal or SSH sessions!
|
||||
"""
|
||||
|
||||
def configure_parser(self, parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument(
|
||||
"--no-mouse",
|
||||
action="store_true",
|
||||
help="Disable mouse support (keyboard only mode)",
|
||||
)
|
||||
|
||||
def execute(self, args: argparse.Namespace) -> None:
|
||||
try:
|
||||
fmt.echo("Starting Cognee TUI...")
|
||||
fmt.note("Press 'q' to quit, '?' for help")
|
||||
|
||||
# Import and run TUI
|
||||
from cognee.cli.tui import run_tui
|
||||
|
||||
run_tui(mouse=not args.no_mouse)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
fmt.note("\nTUI closed by user")
|
||||
except Exception as e:
|
||||
raise CliCommandException(f"Failed to start TUI: {str(e)}", error_code=1) from e
|
||||
5
cognee/cli/tui/__init__.py
Normal file
5
cognee/cli/tui/__init__.py
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
"""Cognee TUI - Terminal User Interface"""
|
||||
|
||||
from cognee.cli.tui.app import run_tui, CogneeTUI
|
||||
|
||||
__all__ = ["run_tui", "CogneeTUI"]
|
||||
139
cognee/cli/tui/app.py
Normal file
139
cognee/cli/tui/app.py
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
"""
|
||||
Cognee TUI - Main Application
|
||||
Text-based User Interface for managing Cognee knowledge graphs
|
||||
"""
|
||||
|
||||
from typing import ClassVar
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.binding import Binding
|
||||
from textual.widgets import Header, Footer, Button, Markdown
|
||||
from textual.containers import VerticalScroll
|
||||
from textual.screen import Screen
|
||||
from textual.events import Key
|
||||
from cognee.cli.tui.screens.home import HomeScreen
|
||||
|
||||
|
||||
class CogneeTUI(App):
|
||||
"""Cognee Terminal User Interface Application"""
|
||||
|
||||
CSS = """
|
||||
Screen {
|
||||
background: $surface;
|
||||
}
|
||||
|
||||
.box {
|
||||
border: solid $primary;
|
||||
background: $panel;
|
||||
padding: 1 2;
|
||||
margin: 1;
|
||||
}
|
||||
|
||||
Button {
|
||||
margin: 1 2;
|
||||
min-width: 30;
|
||||
}
|
||||
|
||||
Button:hover {
|
||||
background: $primary;
|
||||
}
|
||||
|
||||
#menu-container {
|
||||
width: 60;
|
||||
height: auto;
|
||||
border: heavy $primary;
|
||||
background: $panel;
|
||||
padding: 2;
|
||||
}
|
||||
|
||||
.title {
|
||||
text-align: center;
|
||||
text-style: bold;
|
||||
color: $accent;
|
||||
padding: 1;
|
||||
}
|
||||
|
||||
.center {
|
||||
align: center middle;
|
||||
}
|
||||
|
||||
/* Brand color for Manage Context button only */
|
||||
#context {
|
||||
background: #5C10F4;
|
||||
color: #F4F4F4;
|
||||
border: tall #5C10F4;
|
||||
}
|
||||
#context:hover {
|
||||
background: #A550FF;
|
||||
border: tall #A550FF;
|
||||
}
|
||||
"""
|
||||
|
||||
TITLE = "Cognee TUI - AI Memory and Context Manager"
|
||||
SUB_TITLE = "Navigate with arrow keys • Press ? for help"
|
||||
|
||||
BINDINGS: ClassVar[list[Binding]] = [
|
||||
Binding("q", "quit", "Quit", priority=True),
|
||||
Binding("?", "help", "Help"),
|
||||
Binding("d", "toggle_dark", "Toggle Dark Mode"),
|
||||
]
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""Initialize the app with the home screen"""
|
||||
self.push_screen(HomeScreen())
|
||||
|
||||
def action_help(self) -> None:
|
||||
"""Show help information"""
|
||||
help_text = """
|
||||
# Cognee TUI Help
|
||||
|
||||
## Navigation
|
||||
- Arrow Keys: Navigate between UI elements
|
||||
- Enter: Select/activate items
|
||||
- Tab: Move to next field
|
||||
- Esc: Go back
|
||||
|
||||
## Keyboard Shortcuts
|
||||
- q: Quit application
|
||||
- d: Toggle dark/light mode
|
||||
- ?: Show this help
|
||||
|
||||
## Workflow
|
||||
1. Add Context: Add data sources
|
||||
2. Cognify: Process data
|
||||
3. Search: Query knowledge graph
|
||||
4. Settings: Configure API keys
|
||||
"""
|
||||
self.push_screen(HelpScreen(help_text))
|
||||
|
||||
|
||||
class HelpScreen(Screen):
|
||||
"""Help screen"""
|
||||
|
||||
def __init__(self, help_text: str):
|
||||
super().__init__()
|
||||
self.help_text = help_text
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header()
|
||||
with VerticalScroll():
|
||||
yield Markdown(self.help_text)
|
||||
yield Button("Close (Esc)", id="close", variant="primary")
|
||||
yield Footer()
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
if event.button.id == "close":
|
||||
self.app.pop_screen()
|
||||
|
||||
def on_key(self, event: Key) -> None:
|
||||
if event.key == "escape":
|
||||
self.app.pop_screen()
|
||||
|
||||
|
||||
def run_tui(mouse: bool = True) -> None:
|
||||
"""Entry point to run the TUI application.
|
||||
|
||||
Args:
|
||||
mouse: Enable mouse support (default: True)
|
||||
"""
|
||||
app = CogneeTUI()
|
||||
app.run(mouse=mouse)
|
||||
8
cognee/cli/tui/screens/__init__.py
Normal file
8
cognee/cli/tui/screens/__init__.py
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
"""Screens for Cognee TUI"""
|
||||
|
||||
from cognee.cli.tui.screens.home import HomeScreen
|
||||
from cognee.cli.tui.screens.context import ContextScreen
|
||||
from cognee.cli.tui.screens.query import QueryScreen
|
||||
from cognee.cli.tui.screens.settings import SettingsScreen
|
||||
|
||||
__all__ = ["HomeScreen", "ContextScreen", "QueryScreen", "SettingsScreen"]
|
||||
480
cognee/cli/tui/screens/context.py
Normal file
480
cognee/cli/tui/screens/context.py
Normal file
|
|
@ -0,0 +1,480 @@
|
|||
"""Context Management Screen"""
|
||||
|
||||
import asyncio
|
||||
import io
|
||||
import os
|
||||
from pathlib import Path
|
||||
from contextlib import redirect_stdout, redirect_stderr
|
||||
from textual.screen import Screen
|
||||
from textual.app import ComposeResult
|
||||
from textual.widgets import Header, Footer, Button, Static, Input, DataTable, Checkbox
|
||||
from textual.containers import Container, Vertical
|
||||
from textual.binding import Binding
|
||||
from textual.events import Key
|
||||
import cognee
|
||||
from cognee.api.v1.datasets.datasets import datasets as ds_api
|
||||
|
||||
|
||||
class ContextScreen(Screen):
|
||||
"""Context management screen"""
|
||||
|
||||
BINDINGS = [
|
||||
Binding("escape", "back", "Back"),
|
||||
Binding("up", "arrow_up", show=False),
|
||||
Binding("down", "arrow_down", show=False),
|
||||
Binding("left", "arrow_left", show=False),
|
||||
Binding("right", "arrow_right", show=False),
|
||||
]
|
||||
DEFAULT_DATASET = "main_dataset"
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._datasets: list[dict] = []
|
||||
self._dataset_id_to_name: dict[str, str] = {}
|
||||
self._data_items_by_id: dict[str, dict] = {}
|
||||
self._dataset_row_to_id: dict[DataTable.RowKey, str] = {}
|
||||
self._file_row_to_id: dict[DataTable.RowKey, str] = {}
|
||||
self._dataset_row_keys: list[DataTable.RowKey] = []
|
||||
self._file_row_keys: list[DataTable.RowKey] = []
|
||||
self._selected_dataset_id: str | None = None
|
||||
self._selected_data_id: str | None = None
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header()
|
||||
with Container():
|
||||
yield Static("[bold]📁 Context Management[/bold]\n", classes="title")
|
||||
with Vertical():
|
||||
yield Static("[b]Datasets[/b] and [b]Files[/b] in dataset", classes="center")
|
||||
# Central tables
|
||||
yield DataTable(id="datasets_table")
|
||||
yield DataTable(id="files_table")
|
||||
# Tables are the primary UI (no dropdowns)
|
||||
yield Input(placeholder="comma-separated node sets (optional)", id="nodeset_input")
|
||||
yield Static(
|
||||
"\nEnter text or a file path to add to the selected dataset:", classes="center"
|
||||
)
|
||||
yield Input(placeholder="Text or /absolute/path/to/file.pdf", id="data_input")
|
||||
yield Button("Add to Context", id="add_btn", variant="primary")
|
||||
yield Button("Cognify (process data)", id="cognify_btn", variant="success")
|
||||
yield Static("\nSearch (runs against selected dataset context):", classes="center")
|
||||
yield Input(placeholder="e.g., What are the main topics?", id="search_input")
|
||||
yield Button("Search", id="search_btn", variant="default")
|
||||
yield Checkbox(
|
||||
"Save search output to searched_context.md",
|
||||
id="save_search_checkbox",
|
||||
value=False,
|
||||
)
|
||||
yield Static(
|
||||
"\nExport context to Markdown (runs one or more queries):", classes="center"
|
||||
)
|
||||
yield Input(placeholder="Queries to export (comma-separated)", id="export_queries")
|
||||
yield Button("Export Context to MD", id="export_btn", variant="default")
|
||||
yield Static("", id="status")
|
||||
yield Button("← Back", id="back_btn")
|
||||
yield Footer()
|
||||
|
||||
async def _set_status(self, message: str) -> None:
|
||||
try:
|
||||
status = self.query_one("#status", Static)
|
||||
status.update(message)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def on_mount(self) -> None:
|
||||
# Load datasets and populate select
|
||||
try:
|
||||
await self._set_status("Loading datasets...")
|
||||
datasets = await ds_api.list_datasets()
|
||||
# datasets may be model objects; normalize to dicts with id, name
|
||||
normalized = []
|
||||
for d in datasets:
|
||||
# Support both object and dict
|
||||
d_id = str(getattr(d, "id", None) or d.get("id"))
|
||||
d_name = str(getattr(d, "name", None) or d.get("name"))
|
||||
normalized.append({"id": d_id, "name": d_name})
|
||||
self._datasets = normalized
|
||||
self._dataset_id_to_name = {d["id"]: d["name"] for d in normalized if d.get("id")}
|
||||
# Init datasets table
|
||||
ds_table = self.query_one("#datasets_table", DataTable)
|
||||
if not ds_table.columns:
|
||||
ds_table.add_columns("Name", "ID", "Created At")
|
||||
ds_table.clear()
|
||||
self._dataset_row_to_id.clear()
|
||||
self._dataset_row_keys = []
|
||||
for d in normalized:
|
||||
row_key = ds_table.add_row(
|
||||
d.get("name", ""), d.get("id", ""), d.get("created_at", "") or ""
|
||||
)
|
||||
self._dataset_row_to_id[row_key] = d["id"]
|
||||
self._dataset_row_keys.append(row_key)
|
||||
# Focus datasets table and preselect first row if available
|
||||
ds_table.focus()
|
||||
if self._dataset_row_to_id:
|
||||
first_row_key = next(iter(self._dataset_row_to_id.keys()))
|
||||
self._selected_dataset_id = self._dataset_row_to_id[first_row_key]
|
||||
# Try to position the cursor on the first cell to ensure arrows work
|
||||
try:
|
||||
ds_table.cursor_coordinate = (0, 0)
|
||||
except Exception:
|
||||
pass
|
||||
await self._load_dataset_files(self._selected_dataset_id)
|
||||
await self._set_status("Datasets loaded. Select a dataset to view files.")
|
||||
except Exception as ex:
|
||||
await self._set_status(f"[red]Failed to load datasets:[/red] {ex}")
|
||||
|
||||
async def _load_dataset_files(self, dataset_id: str) -> None:
|
||||
try:
|
||||
await self._set_status("Loading dataset files...")
|
||||
data_items = await ds_api.list_data(dataset_id)
|
||||
normalized = []
|
||||
self._data_items_by_id = {}
|
||||
for item in data_items:
|
||||
i_id = str(getattr(item, "id", None) or item.get("id"))
|
||||
i_name = str(getattr(item, "name", None) or item.get("name") or "Unnamed")
|
||||
raw_loc = (
|
||||
getattr(item, "raw_data_location", None)
|
||||
or item.get("raw_data_location")
|
||||
or item.get("rawDataLocation")
|
||||
)
|
||||
orig_loc = (
|
||||
getattr(item, "original_data_location", None)
|
||||
or item.get("original_data_location")
|
||||
or item.get("originalDataLocation")
|
||||
)
|
||||
normalized.append(
|
||||
{
|
||||
"id": i_id,
|
||||
"name": i_name,
|
||||
"raw_data_location": raw_loc,
|
||||
"original_data_location": orig_loc,
|
||||
"raw": raw_loc,
|
||||
"orig": orig_loc,
|
||||
}
|
||||
)
|
||||
self._data_items_by_id[i_id] = normalized[-1]
|
||||
# Populate files table
|
||||
files_table = self.query_one("#files_table", DataTable)
|
||||
if not files_table.columns:
|
||||
files_table.add_columns("Name", "ID", "Path")
|
||||
files_table.clear()
|
||||
self._file_row_to_id.clear()
|
||||
self._file_row_keys = []
|
||||
for i in normalized:
|
||||
row_key = files_table.add_row(
|
||||
i.get("name", ""),
|
||||
i.get("id", ""),
|
||||
(
|
||||
i.get("original_data_location")
|
||||
or i.get("raw_data_location")
|
||||
or i.get("orig")
|
||||
or i.get("raw")
|
||||
or ""
|
||||
),
|
||||
)
|
||||
self._file_row_to_id[row_key] = i["id"]
|
||||
self._file_row_keys.append(row_key)
|
||||
self._selected_data_id = None
|
||||
await self._set_status(f"Loaded {len(normalized)} file(s) for the dataset.")
|
||||
except Exception as ex:
|
||||
await self._set_status(f"[red]Failed to load dataset files:[/red] {ex}")
|
||||
|
||||
async def _handle_add(self) -> None:
|
||||
data_input = self.query_one("#data_input", Input)
|
||||
nodeset_input = self.query_one("#nodeset_input", Input)
|
||||
content = (data_input.value or "").strip()
|
||||
selected_dataset_id = self._selected_dataset_id
|
||||
dataset_name = self._dataset_id_to_name.get(selected_dataset_id, self.DEFAULT_DATASET)
|
||||
raw_nodeset = (nodeset_input.value or "").strip()
|
||||
node_set = [s.strip() for s in raw_nodeset.split(",") if s.strip()] if raw_nodeset else None
|
||||
if not content:
|
||||
await self._set_status("[red]Please enter text or a file path[/red]")
|
||||
return
|
||||
try:
|
||||
await self._set_status(
|
||||
f"Adding data to dataset [b]{dataset_name}[/b] "
|
||||
f"{'(with node sets: ' + ', '.join(node_set) + ')' if node_set else ''}..."
|
||||
)
|
||||
with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
|
||||
await cognee.add(content, dataset_name=dataset_name, node_set=node_set)
|
||||
await self._set_status("[green]✓ Added successfully.[/green] You can now run Cognify.")
|
||||
except Exception as ex:
|
||||
await self._set_status(f"[red]Add failed:[/red] {ex}")
|
||||
|
||||
async def _handle_cognify(self) -> None:
|
||||
try:
|
||||
await self._set_status("Processing data into knowledge graph (cognify)...")
|
||||
with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
|
||||
await cognee.cognify()
|
||||
await self._set_status("[green]✓ Cognify complete.[/green]")
|
||||
except Exception as ex:
|
||||
await self._set_status(f"[red]Cognify failed:[/red] {ex}")
|
||||
|
||||
async def _handle_search(self) -> None:
|
||||
try:
|
||||
dataset_id = self._selected_dataset_id
|
||||
ds_name = self._dataset_id_to_name.get(dataset_id, None)
|
||||
q_input = self.query_one("#search_input", Input)
|
||||
save_cb = self.query_one("#save_search_checkbox", Checkbox)
|
||||
query_text = (q_input.value or "").strip()
|
||||
if not query_text:
|
||||
await self._set_status(":warning: Please enter a search query.")
|
||||
return
|
||||
await self._set_status("Searching...")
|
||||
# If a dataset is chosen, we can scope via datasets=[name]
|
||||
kwargs = {}
|
||||
if ds_name:
|
||||
kwargs["datasets"] = [ds_name]
|
||||
results = await cognee.search(query_text=query_text, **kwargs)
|
||||
rendered = (
|
||||
"\n".join(f"- {str(item)}" for item in results)
|
||||
if isinstance(results, list)
|
||||
else str(results)
|
||||
)
|
||||
await self._set_status(f"[b]Search results[/b]:\n{rendered}")
|
||||
# Optionally save to searched_context.md
|
||||
if save_cb.value:
|
||||
# Choose directory next to selected file if possible, else current working dir
|
||||
target_dir: Path | None = None
|
||||
if self._selected_data_id:
|
||||
data_item = self._data_items_by_id.get(self._selected_data_id)
|
||||
if data_item:
|
||||
loc = (
|
||||
data_item.get("original_data_location")
|
||||
or data_item.get("raw_data_location")
|
||||
or data_item.get("orig")
|
||||
or data_item.get("raw")
|
||||
)
|
||||
if isinstance(loc, str) and loc.startswith("file://"):
|
||||
loc = loc[len("file://") :]
|
||||
if isinstance(loc, str) and (
|
||||
loc.startswith("/") or (len(loc) > 2 and loc[1:3] in (":\\", ":/"))
|
||||
):
|
||||
try:
|
||||
p = Path(loc)
|
||||
target_dir = p.parent if p.exists() or p.parent.exists() else None
|
||||
except Exception:
|
||||
target_dir = None
|
||||
if target_dir is None:
|
||||
target_dir = Path.cwd()
|
||||
out_path = target_dir / "searched_context.md"
|
||||
try:
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(out_path, "a", encoding="utf-8") as f:
|
||||
f.write(f"### Query: {query_text}\n{rendered}\n\n")
|
||||
await self._set_status(f"[green]✓ Saved search output to:[/green] {out_path}")
|
||||
except Exception as ex:
|
||||
await self._set_status(f"[red]Failed to save search output:[/red] {ex}")
|
||||
except Exception as ex:
|
||||
await self._set_status(f"[red]Search failed:[/red] {ex}")
|
||||
|
||||
def _choose_export_path(self, data_item: dict) -> Path | None:
|
||||
# Prefer original path; fallback to raw path
|
||||
loc = data_item.get("orig") or data_item.get("raw")
|
||||
if not loc:
|
||||
return None
|
||||
# Accept absolute POSIX paths or file:// URIs
|
||||
if isinstance(loc, str) and loc.startswith("file://"):
|
||||
loc = loc[len("file://") :]
|
||||
if not isinstance(loc, str) or not (
|
||||
loc.startswith("/") or loc[1:3] == ":\\" or loc[1:3] == ":/"
|
||||
):
|
||||
return None
|
||||
try:
|
||||
p = Path(loc)
|
||||
if p.is_dir():
|
||||
return p / "context_export.md"
|
||||
# write next to file with suffix
|
||||
stem = p.stem
|
||||
return p.with_name(f"{stem}_context.md")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
async def _handle_export(self) -> None:
|
||||
try:
|
||||
export_queries = self.query_one("#export_queries", Input)
|
||||
data_id = self._selected_data_id
|
||||
if not data_id:
|
||||
await self._set_status(":warning: Please select a file to export context for.")
|
||||
return
|
||||
data_item = self._data_items_by_id.get(data_id)
|
||||
if not data_item:
|
||||
await self._set_status(":warning: Selected file metadata not available.")
|
||||
return
|
||||
export_path = self._choose_export_path(data_item)
|
||||
if not export_path:
|
||||
await self._set_status(
|
||||
"[red]Can't determine a local file path to save Markdown next to the original file.[/red]"
|
||||
)
|
||||
return
|
||||
raw_queries = (export_queries.value or "").strip()
|
||||
queries = [q.strip() for q in raw_queries.split(",") if q.strip()]
|
||||
await self._set_status("Running export...")
|
||||
md_parts: list[str] = []
|
||||
md_parts.append(f"# Context Export for {data_item.get('name', 'selected item')}")
|
||||
# Include simple metadata block
|
||||
md_parts.append("")
|
||||
md_parts.append("## Source")
|
||||
md_parts.append(
|
||||
f"- Dataset: {self._dataset_id_to_name.get(self._selected_dataset_id, self.DEFAULT_DATASET)}"
|
||||
)
|
||||
if data_item.get("orig"):
|
||||
md_parts.append(f"- Original: {data_item.get('orig')}")
|
||||
if data_item.get("raw"):
|
||||
md_parts.append(f"- Raw: {data_item.get('raw')}")
|
||||
# Run queries if provided
|
||||
if queries:
|
||||
md_parts.append("\n## Search Results")
|
||||
for q in queries:
|
||||
md_parts.append(f"\n### Query: {q}\n")
|
||||
try:
|
||||
ds_name = self._dataset_id_to_name.get(self._selected_dataset_id, None)
|
||||
kwargs = {}
|
||||
if ds_name:
|
||||
kwargs["datasets"] = [ds_name]
|
||||
results = await cognee.search(query_text=q, **kwargs)
|
||||
if isinstance(results, list):
|
||||
if results:
|
||||
for r in results:
|
||||
md_parts.append(f"- {str(r)}")
|
||||
else:
|
||||
md_parts.append("- (no results)")
|
||||
else:
|
||||
md_parts.append(str(results))
|
||||
except Exception as ex:
|
||||
md_parts.append(f"- (search failed: {ex})")
|
||||
# Write file
|
||||
export_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(export_path, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(md_parts).strip() + "\n")
|
||||
await self._set_status(f"[green]✓ Exported to:[/green] {export_path}")
|
||||
except Exception as ex:
|
||||
await self._set_status(f"[red]Export failed:[/red] {ex}")
|
||||
|
||||
def on_button_pressed(self, event) -> None:
|
||||
if event.button.id == "back_btn":
|
||||
self.app.pop_screen()
|
||||
return
|
||||
if event.button.id == "add_btn":
|
||||
asyncio.create_task(self._handle_add())
|
||||
return
|
||||
if event.button.id == "cognify_btn":
|
||||
asyncio.create_task(self._handle_cognify())
|
||||
return
|
||||
if event.button.id == "search_btn":
|
||||
asyncio.create_task(self._handle_search())
|
||||
return
|
||||
if event.button.id == "export_btn":
|
||||
asyncio.create_task(self._handle_export())
|
||||
return
|
||||
|
||||
def on_data_table_row_selected(self, message: DataTable.RowSelected) -> None:
|
||||
# Selecting a dataset row loads its files and syncs dropdown
|
||||
try:
|
||||
if message.data_table.id == "datasets_table":
|
||||
row_key = message.row_key
|
||||
dataset_id = self._dataset_row_to_id.get(row_key)
|
||||
if dataset_id:
|
||||
self._selected_dataset_id = dataset_id
|
||||
asyncio.create_task(self._load_dataset_files(dataset_id))
|
||||
elif message.data_table.id == "files_table":
|
||||
row_key = message.row_key
|
||||
data_id = self._file_row_to_id.get(row_key)
|
||||
if data_id:
|
||||
self._selected_data_id = data_id
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _active_table(self) -> DataTable | None:
|
||||
try:
|
||||
files_table = self.query_one("#files_table", DataTable)
|
||||
datasets_table = self.query_one("#datasets_table", DataTable)
|
||||
if files_table.has_focus:
|
||||
return files_table
|
||||
return datasets_table
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def action_arrow_up(self) -> None:
|
||||
table = self._active_table()
|
||||
if table:
|
||||
try:
|
||||
table.action_cursor_up()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def action_arrow_down(self) -> None:
|
||||
table = self._active_table()
|
||||
if table:
|
||||
try:
|
||||
table.action_cursor_down()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def action_arrow_left(self) -> None:
|
||||
table = self._active_table()
|
||||
if table:
|
||||
try:
|
||||
# Move focus to datasets table on left, otherwise move cursor
|
||||
if table.id == "files_table":
|
||||
self.query_one("#datasets_table", DataTable).focus()
|
||||
else:
|
||||
table.action_cursor_left()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def action_arrow_right(self) -> None:
|
||||
table = self._active_table()
|
||||
if table:
|
||||
try:
|
||||
# Move focus to files table on right, otherwise move cursor
|
||||
if table.id == "datasets_table":
|
||||
self.query_one("#files_table", DataTable).focus()
|
||||
else:
|
||||
table.action_cursor_right()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def on_key(self, event: Key) -> None:
|
||||
"""Fallback manual cursor movement to guarantee arrow navigation."""
|
||||
table = self._active_table()
|
||||
if not table:
|
||||
return
|
||||
try:
|
||||
# Determine current row from cursor_coordinate if available
|
||||
current_row = 0
|
||||
try:
|
||||
coord = table.cursor_coordinate
|
||||
if coord and isinstance(coord, tuple):
|
||||
current_row = int(coord[0])
|
||||
except Exception:
|
||||
pass
|
||||
if event.key in ("up", "down"):
|
||||
max_rows = (
|
||||
len(self._file_row_keys)
|
||||
if table.id == "files_table"
|
||||
else len(self._dataset_row_keys)
|
||||
)
|
||||
if max_rows <= 0:
|
||||
return
|
||||
if event.key == "up":
|
||||
new_row = max(0, current_row - 1)
|
||||
else:
|
||||
new_row = min(max_rows - 1, current_row + 1)
|
||||
try:
|
||||
table.cursor_coordinate = (new_row, 0)
|
||||
event.stop()
|
||||
except Exception:
|
||||
pass
|
||||
elif event.key == "left":
|
||||
if table.id == "files_table":
|
||||
self.query_one("#datasets_table", DataTable).focus()
|
||||
event.stop()
|
||||
elif event.key == "right":
|
||||
if table.id == "datasets_table":
|
||||
self.query_one("#files_table", DataTable).focus()
|
||||
event.stop()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def action_back(self) -> None:
|
||||
self.app.pop_screen()
|
||||
61
cognee/cli/tui/screens/home.py
Normal file
61
cognee/cli/tui/screens/home.py
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
"""Home Screen for Cognee TUI"""
|
||||
|
||||
from textual.screen import Screen
|
||||
from textual.app import ComposeResult
|
||||
from textual.widgets import Header, Footer, Button, Static
|
||||
from textual.containers import Container, Vertical
|
||||
|
||||
|
||||
class HomeScreen(Screen):
|
||||
"""Main dashboard screen"""
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header()
|
||||
|
||||
with Container(id="menu-container", classes="center"):
|
||||
yield Static("[bold cyan]🧠 Cognee Context Manager[/bold cyan]", classes="title")
|
||||
yield Static("\nManage your AI memory and context\n", classes="center")
|
||||
|
||||
with Vertical():
|
||||
yield Button("📁 Manage Context", id="context", variant="primary")
|
||||
yield Button("🔍 Search & Query", id="query", variant="success")
|
||||
yield Button("⚙️ Settings", id="settings", variant="default")
|
||||
yield Button("❓ Help", id="help", variant="default")
|
||||
yield Button("🚪 Exit", id="exit", variant="error")
|
||||
|
||||
yield Static("\n[dim]Use arrow keys • Enter to select[/dim]", classes="center")
|
||||
|
||||
yield Footer()
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
button_id = event.button.id
|
||||
|
||||
if button_id == "context":
|
||||
from cognee.cli.tui.screens.context import ContextScreen
|
||||
|
||||
self.app.push_screen(ContextScreen())
|
||||
|
||||
elif button_id == "query":
|
||||
from cognee.cli.tui.screens.query import QueryScreen
|
||||
|
||||
self.app.push_screen(QueryScreen())
|
||||
|
||||
elif button_id == "settings":
|
||||
from cognee.cli.tui.screens.settings import SettingsScreen
|
||||
|
||||
self.app.push_screen(SettingsScreen())
|
||||
|
||||
elif button_id == "help":
|
||||
self.app.action_help()
|
||||
|
||||
elif button_id == "exit":
|
||||
self.app.exit()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
# Ensure initial focus so arrow keys can move between buttons
|
||||
try:
|
||||
first_button = self.query_one("#context", Button)
|
||||
first_button.focus()
|
||||
except Exception:
|
||||
# If the button isn't found for any reason, ignore
|
||||
pass
|
||||
64
cognee/cli/tui/screens/query.py
Normal file
64
cognee/cli/tui/screens/query.py
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
"""Query Screen"""
|
||||
|
||||
import asyncio
|
||||
from textual.screen import Screen
|
||||
from textual.app import ComposeResult
|
||||
from textual.widgets import Header, Footer, Button, Static, Input, Markdown
|
||||
from textual.containers import Container, Vertical, VerticalScroll
|
||||
from textual.binding import Binding
|
||||
import cognee
|
||||
|
||||
|
||||
class QueryScreen(Screen):
|
||||
"""Query screen"""
|
||||
|
||||
BINDINGS = [Binding("escape", "back", "Back")]
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header()
|
||||
with Container():
|
||||
yield Static("[bold]🔍 Search & Query[/bold]\n", classes="title")
|
||||
with Vertical():
|
||||
yield Static("Enter your question and run a graph-aware search:", classes="center")
|
||||
yield Input(placeholder="e.g., What are the main topics?", id="query_input")
|
||||
yield Button("Run Search", id="run_btn", variant="primary")
|
||||
with VerticalScroll():
|
||||
yield Markdown("", id="results_md")
|
||||
yield Button("← Back", id="back_btn")
|
||||
yield Footer()
|
||||
|
||||
async def _set_results(self, content: str) -> None:
|
||||
try:
|
||||
md = self.query_one("#results_md", Markdown)
|
||||
md.update(content)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def _run_search(self) -> None:
|
||||
query_input = self.query_one("#query_input", Input)
|
||||
query_text = (query_input.value or "").strip()
|
||||
if not query_text:
|
||||
await self._set_results(":warning: Please enter a question to search.")
|
||||
return
|
||||
try:
|
||||
await self._set_results("_Searching..._")
|
||||
results = await cognee.search(query_text=query_text)
|
||||
# Normalize results for display
|
||||
if isinstance(results, list):
|
||||
rendered = "\n".join(f"- {str(item)}" for item in results)
|
||||
else:
|
||||
rendered = str(results)
|
||||
await self._set_results(f"### Results\n\n{rendered}")
|
||||
except Exception as ex:
|
||||
await self._set_results(f"**Search failed:** {ex}")
|
||||
|
||||
def on_button_pressed(self, event) -> None:
|
||||
if event.button.id == "back_btn":
|
||||
self.app.pop_screen()
|
||||
return
|
||||
if event.button.id == "run_btn":
|
||||
asyncio.create_task(self._run_search())
|
||||
return
|
||||
|
||||
def action_back(self) -> None:
|
||||
self.app.pop_screen()
|
||||
27
cognee/cli/tui/screens/settings.py
Normal file
27
cognee/cli/tui/screens/settings.py
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
"""Settings Screen"""
|
||||
|
||||
from textual.screen import Screen
|
||||
from textual.app import ComposeResult
|
||||
from textual.widgets import Header, Footer, Button, Static
|
||||
from textual.containers import Container
|
||||
from textual.binding import Binding
|
||||
|
||||
|
||||
class SettingsScreen(Screen):
|
||||
"""Settings screen"""
|
||||
|
||||
BINDINGS = [Binding("escape", "back", "Back")]
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header()
|
||||
with Container():
|
||||
yield Static("[bold]⚙️ Settings[/bold]\n", classes="title")
|
||||
yield Static("Settings features coming soon!")
|
||||
yield Button("← Back", id="back_btn")
|
||||
yield Footer()
|
||||
|
||||
def on_button_pressed(self, event) -> None:
|
||||
self.app.pop_screen()
|
||||
|
||||
def action_back(self) -> None:
|
||||
self.app.pop_screen()
|
||||
4
cognee/cli/tui/widgets/__init__.py
Normal file
4
cognee/cli/tui/widgets/__init__.py
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
"""Custom widgets for Cognee TUI"""
|
||||
|
||||
# Custom widgets can be added here as needed
|
||||
__all__ = []
|
||||
|
|
@ -57,6 +57,7 @@ dependencies = [
|
|||
"websockets>=15.0.1,<16.0.0",
|
||||
"mistralai>=1.9.10",
|
||||
"tenacity>=9.0.0",
|
||||
"textual>=0.70.0,<1.0.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue