Compare commits

...
Sign in to create a new pull request.

10 commits

Author SHA1 Message Date
vasilije
30fd84e20d reformat 2025-11-18 06:12:35 -08:00
vasilije
6a087cbdfc create a working tui 2025-11-18 06:11:58 -08:00
Gnanasaikiran
d263d281a0 refactor: Remove unused Static import from TUI app
- Static widget is not used in app.py (only in screen files)
- Resolves CodeRabbit nitpick comment
2025-11-15 17:34:00 +05:30
Gnanasaikiran
ceaa015c91 fix: Use Markdown widget and fix docstring formatting
- Replace Static with Markdown for proper help text rendering
- Remove extra blank lines from run_tui docstring
- Add period to docstring summary for PEP 257 compliance
- Fix closing quotes indentation
2025-11-15 16:46:05 +05:30
Gnanasaikiran
65c552a35f fix: Improve help screen rendering and docstring formatting
- Use Markdown widget for proper help text formatting
- Fix extra blank lines in run_tui docstring
- Resolves CodeRabbit actionable comments
2025-11-15 16:35:06 +05:30
Gnanasaikiran
e86bce7280 refactor: Improve type hints and code organization in TUI
- Add ClassVar annotation to BINDINGS for proper type checking
- Move widget imports to module level for consistency
- Add type hints to event handler parameters
- Add return type annotation to run_tui function
- Organize .gitignore with proper section comments
- Resolves CodeRabbit nitpick suggestions
2025-11-15 16:19:16 +05:30
Gnanasaikiran
34d02a84c7 fix: Correct run_tui function signature for mouse parameter
- Add mouse parameter with default value True
- Fix docstring formatting
- Resolves CodeRabbit runtime error
2025-11-15 14:22:35 +05:30
Gnanasaikiran
04c6d8f608 chore: Update .gitignore 2025-11-15 14:16:40 +05:30
Gnanasaikiran
253dce89fe fix: Implement --no-mouse flag for TUI command
- Pass mouse parameter from CLI args to run_tui()
- Update run_tui() to accept and forward mouse flag to app.run()
- Resolves CodeRabbit review comment
2025-11-15 14:11:22 +05:30
Gnanasaikiran
2db9d1ee03 feat: Add lightweight TUI for cognee CLI
- Implement interactive Terminal User Interface using Textual
- Add main TUI application with keyboard navigation
- Create home screen with main menu
- Add placeholder screens for Context, Query, and Settings
- Integrate TUI command into existing CLI structure
- Support keyboard shortcuts (q=quit, ?=help, d=toggle dark mode)

Closes #1762
2025-11-15 13:37:22 +05:30
13 changed files with 5063 additions and 4139 deletions

4
.gitignore vendored
View file

@ -197,3 +197,7 @@ SWE-bench_testsample/
# ChromaDB Data
.chromadb_data/
# TUI development artifacts
*.backup
create_tui_files.sh
add_tui_command.py

View file

@ -92,6 +92,7 @@ def _discover_commands() -> List[Type[SupportsCliCommand]]:
("cognee.cli.commands.cognify_command", "CognifyCommand"),
("cognee.cli.commands.delete_command", "DeleteCommand"),
("cognee.cli.commands.config_command", "ConfigCommand"),
("cognee.cli.commands.tui_command", "TuiCommand"),
]
for module_path, class_name in command_modules:

View file

@ -0,0 +1,52 @@
import argparse
from cognee.cli.reference import SupportsCliCommand
from cognee.cli import DEFAULT_DOCS_URL
import cognee.cli.echo as fmt
from cognee.cli.exceptions import CliCommandException
class TuiCommand(SupportsCliCommand):
command_string = "tui"
help_string = "Launch interactive Terminal User Interface"
docs_url = DEFAULT_DOCS_URL
description = """
Launch the Cognee Terminal User Interface (TUI).
The TUI provides an interactive, text-based interface for managing your
knowledge graphs with features like:
- **Context Management**: Add and manage data sources
- **Search & Query**: Interactive knowledge graph querying
- **Settings**: Configure API keys and models
- **Live Updates**: Real-time status and progress indicators
The TUI is keyboard-driven and supports:
- Arrow key navigation
- Keyboard shortcuts (h=Home, c=Context, s=Search, etc.)
- Tab completion for inputs
Perfect for managing Cognee from the terminal or SSH sessions!
"""
def configure_parser(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--no-mouse",
action="store_true",
help="Disable mouse support (keyboard only mode)",
)
def execute(self, args: argparse.Namespace) -> None:
try:
fmt.echo("Starting Cognee TUI...")
fmt.note("Press 'q' to quit, '?' for help")
# Import and run TUI
from cognee.cli.tui import run_tui
run_tui(mouse=not args.no_mouse)
except KeyboardInterrupt:
fmt.note("\nTUI closed by user")
except Exception as e:
raise CliCommandException(f"Failed to start TUI: {str(e)}", error_code=1) from e

View file

@ -0,0 +1,5 @@
"""Cognee TUI - Terminal User Interface"""
from cognee.cli.tui.app import run_tui, CogneeTUI
__all__ = ["run_tui", "CogneeTUI"]

139
cognee/cli/tui/app.py Normal file
View file

@ -0,0 +1,139 @@
"""
Cognee TUI - Main Application
Text-based User Interface for managing Cognee knowledge graphs
"""
from typing import ClassVar
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.widgets import Header, Footer, Button, Markdown
from textual.containers import VerticalScroll
from textual.screen import Screen
from textual.events import Key
from cognee.cli.tui.screens.home import HomeScreen
class CogneeTUI(App):
"""Cognee Terminal User Interface Application"""
CSS = """
Screen {
background: $surface;
}
.box {
border: solid $primary;
background: $panel;
padding: 1 2;
margin: 1;
}
Button {
margin: 1 2;
min-width: 30;
}
Button:hover {
background: $primary;
}
#menu-container {
width: 60;
height: auto;
border: heavy $primary;
background: $panel;
padding: 2;
}
.title {
text-align: center;
text-style: bold;
color: $accent;
padding: 1;
}
.center {
align: center middle;
}
/* Brand color for Manage Context button only */
#context {
background: #5C10F4;
color: #F4F4F4;
border: tall #5C10F4;
}
#context:hover {
background: #A550FF;
border: tall #A550FF;
}
"""
TITLE = "Cognee TUI - AI Memory and Context Manager"
SUB_TITLE = "Navigate with arrow keys • Press ? for help"
BINDINGS: ClassVar[list[Binding]] = [
Binding("q", "quit", "Quit", priority=True),
Binding("?", "help", "Help"),
Binding("d", "toggle_dark", "Toggle Dark Mode"),
]
def on_mount(self) -> None:
"""Initialize the app with the home screen"""
self.push_screen(HomeScreen())
def action_help(self) -> None:
"""Show help information"""
help_text = """
# Cognee TUI Help
## Navigation
- Arrow Keys: Navigate between UI elements
- Enter: Select/activate items
- Tab: Move to next field
- Esc: Go back
## Keyboard Shortcuts
- q: Quit application
- d: Toggle dark/light mode
- ?: Show this help
## Workflow
1. Add Context: Add data sources
2. Cognify: Process data
3. Search: Query knowledge graph
4. Settings: Configure API keys
"""
self.push_screen(HelpScreen(help_text))
class HelpScreen(Screen):
"""Help screen"""
def __init__(self, help_text: str):
super().__init__()
self.help_text = help_text
def compose(self) -> ComposeResult:
yield Header()
with VerticalScroll():
yield Markdown(self.help_text)
yield Button("Close (Esc)", id="close", variant="primary")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "close":
self.app.pop_screen()
def on_key(self, event: Key) -> None:
if event.key == "escape":
self.app.pop_screen()
def run_tui(mouse: bool = True) -> None:
"""Entry point to run the TUI application.
Args:
mouse: Enable mouse support (default: True)
"""
app = CogneeTUI()
app.run(mouse=mouse)

View file

@ -0,0 +1,8 @@
"""Screens for Cognee TUI"""
from cognee.cli.tui.screens.home import HomeScreen
from cognee.cli.tui.screens.context import ContextScreen
from cognee.cli.tui.screens.query import QueryScreen
from cognee.cli.tui.screens.settings import SettingsScreen
__all__ = ["HomeScreen", "ContextScreen", "QueryScreen", "SettingsScreen"]

View file

@ -0,0 +1,480 @@
"""Context Management Screen"""
import asyncio
import io
import os
from pathlib import Path
from contextlib import redirect_stdout, redirect_stderr
from textual.screen import Screen
from textual.app import ComposeResult
from textual.widgets import Header, Footer, Button, Static, Input, DataTable, Checkbox
from textual.containers import Container, Vertical
from textual.binding import Binding
from textual.events import Key
import cognee
from cognee.api.v1.datasets.datasets import datasets as ds_api
class ContextScreen(Screen):
"""Context management screen"""
BINDINGS = [
Binding("escape", "back", "Back"),
Binding("up", "arrow_up", show=False),
Binding("down", "arrow_down", show=False),
Binding("left", "arrow_left", show=False),
Binding("right", "arrow_right", show=False),
]
DEFAULT_DATASET = "main_dataset"
def __init__(self) -> None:
super().__init__()
self._datasets: list[dict] = []
self._dataset_id_to_name: dict[str, str] = {}
self._data_items_by_id: dict[str, dict] = {}
self._dataset_row_to_id: dict[DataTable.RowKey, str] = {}
self._file_row_to_id: dict[DataTable.RowKey, str] = {}
self._dataset_row_keys: list[DataTable.RowKey] = []
self._file_row_keys: list[DataTable.RowKey] = []
self._selected_dataset_id: str | None = None
self._selected_data_id: str | None = None
def compose(self) -> ComposeResult:
yield Header()
with Container():
yield Static("[bold]📁 Context Management[/bold]\n", classes="title")
with Vertical():
yield Static("[b]Datasets[/b] and [b]Files[/b] in dataset", classes="center")
# Central tables
yield DataTable(id="datasets_table")
yield DataTable(id="files_table")
# Tables are the primary UI (no dropdowns)
yield Input(placeholder="comma-separated node sets (optional)", id="nodeset_input")
yield Static(
"\nEnter text or a file path to add to the selected dataset:", classes="center"
)
yield Input(placeholder="Text or /absolute/path/to/file.pdf", id="data_input")
yield Button("Add to Context", id="add_btn", variant="primary")
yield Button("Cognify (process data)", id="cognify_btn", variant="success")
yield Static("\nSearch (runs against selected dataset context):", classes="center")
yield Input(placeholder="e.g., What are the main topics?", id="search_input")
yield Button("Search", id="search_btn", variant="default")
yield Checkbox(
"Save search output to searched_context.md",
id="save_search_checkbox",
value=False,
)
yield Static(
"\nExport context to Markdown (runs one or more queries):", classes="center"
)
yield Input(placeholder="Queries to export (comma-separated)", id="export_queries")
yield Button("Export Context to MD", id="export_btn", variant="default")
yield Static("", id="status")
yield Button("← Back", id="back_btn")
yield Footer()
async def _set_status(self, message: str) -> None:
try:
status = self.query_one("#status", Static)
status.update(message)
except Exception:
pass
async def on_mount(self) -> None:
# Load datasets and populate select
try:
await self._set_status("Loading datasets...")
datasets = await ds_api.list_datasets()
# datasets may be model objects; normalize to dicts with id, name
normalized = []
for d in datasets:
# Support both object and dict
d_id = str(getattr(d, "id", None) or d.get("id"))
d_name = str(getattr(d, "name", None) or d.get("name"))
normalized.append({"id": d_id, "name": d_name})
self._datasets = normalized
self._dataset_id_to_name = {d["id"]: d["name"] for d in normalized if d.get("id")}
# Init datasets table
ds_table = self.query_one("#datasets_table", DataTable)
if not ds_table.columns:
ds_table.add_columns("Name", "ID", "Created At")
ds_table.clear()
self._dataset_row_to_id.clear()
self._dataset_row_keys = []
for d in normalized:
row_key = ds_table.add_row(
d.get("name", ""), d.get("id", ""), d.get("created_at", "") or ""
)
self._dataset_row_to_id[row_key] = d["id"]
self._dataset_row_keys.append(row_key)
# Focus datasets table and preselect first row if available
ds_table.focus()
if self._dataset_row_to_id:
first_row_key = next(iter(self._dataset_row_to_id.keys()))
self._selected_dataset_id = self._dataset_row_to_id[first_row_key]
# Try to position the cursor on the first cell to ensure arrows work
try:
ds_table.cursor_coordinate = (0, 0)
except Exception:
pass
await self._load_dataset_files(self._selected_dataset_id)
await self._set_status("Datasets loaded. Select a dataset to view files.")
except Exception as ex:
await self._set_status(f"[red]Failed to load datasets:[/red] {ex}")
async def _load_dataset_files(self, dataset_id: str) -> None:
try:
await self._set_status("Loading dataset files...")
data_items = await ds_api.list_data(dataset_id)
normalized = []
self._data_items_by_id = {}
for item in data_items:
i_id = str(getattr(item, "id", None) or item.get("id"))
i_name = str(getattr(item, "name", None) or item.get("name") or "Unnamed")
raw_loc = (
getattr(item, "raw_data_location", None)
or item.get("raw_data_location")
or item.get("rawDataLocation")
)
orig_loc = (
getattr(item, "original_data_location", None)
or item.get("original_data_location")
or item.get("originalDataLocation")
)
normalized.append(
{
"id": i_id,
"name": i_name,
"raw_data_location": raw_loc,
"original_data_location": orig_loc,
"raw": raw_loc,
"orig": orig_loc,
}
)
self._data_items_by_id[i_id] = normalized[-1]
# Populate files table
files_table = self.query_one("#files_table", DataTable)
if not files_table.columns:
files_table.add_columns("Name", "ID", "Path")
files_table.clear()
self._file_row_to_id.clear()
self._file_row_keys = []
for i in normalized:
row_key = files_table.add_row(
i.get("name", ""),
i.get("id", ""),
(
i.get("original_data_location")
or i.get("raw_data_location")
or i.get("orig")
or i.get("raw")
or ""
),
)
self._file_row_to_id[row_key] = i["id"]
self._file_row_keys.append(row_key)
self._selected_data_id = None
await self._set_status(f"Loaded {len(normalized)} file(s) for the dataset.")
except Exception as ex:
await self._set_status(f"[red]Failed to load dataset files:[/red] {ex}")
async def _handle_add(self) -> None:
data_input = self.query_one("#data_input", Input)
nodeset_input = self.query_one("#nodeset_input", Input)
content = (data_input.value or "").strip()
selected_dataset_id = self._selected_dataset_id
dataset_name = self._dataset_id_to_name.get(selected_dataset_id, self.DEFAULT_DATASET)
raw_nodeset = (nodeset_input.value or "").strip()
node_set = [s.strip() for s in raw_nodeset.split(",") if s.strip()] if raw_nodeset else None
if not content:
await self._set_status("[red]Please enter text or a file path[/red]")
return
try:
await self._set_status(
f"Adding data to dataset [b]{dataset_name}[/b] "
f"{'(with node sets: ' + ', '.join(node_set) + ')' if node_set else ''}..."
)
with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
await cognee.add(content, dataset_name=dataset_name, node_set=node_set)
await self._set_status("[green]✓ Added successfully.[/green] You can now run Cognify.")
except Exception as ex:
await self._set_status(f"[red]Add failed:[/red] {ex}")
async def _handle_cognify(self) -> None:
try:
await self._set_status("Processing data into knowledge graph (cognify)...")
with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
await cognee.cognify()
await self._set_status("[green]✓ Cognify complete.[/green]")
except Exception as ex:
await self._set_status(f"[red]Cognify failed:[/red] {ex}")
async def _handle_search(self) -> None:
try:
dataset_id = self._selected_dataset_id
ds_name = self._dataset_id_to_name.get(dataset_id, None)
q_input = self.query_one("#search_input", Input)
save_cb = self.query_one("#save_search_checkbox", Checkbox)
query_text = (q_input.value or "").strip()
if not query_text:
await self._set_status(":warning: Please enter a search query.")
return
await self._set_status("Searching...")
# If a dataset is chosen, we can scope via datasets=[name]
kwargs = {}
if ds_name:
kwargs["datasets"] = [ds_name]
results = await cognee.search(query_text=query_text, **kwargs)
rendered = (
"\n".join(f"- {str(item)}" for item in results)
if isinstance(results, list)
else str(results)
)
await self._set_status(f"[b]Search results[/b]:\n{rendered}")
# Optionally save to searched_context.md
if save_cb.value:
# Choose directory next to selected file if possible, else current working dir
target_dir: Path | None = None
if self._selected_data_id:
data_item = self._data_items_by_id.get(self._selected_data_id)
if data_item:
loc = (
data_item.get("original_data_location")
or data_item.get("raw_data_location")
or data_item.get("orig")
or data_item.get("raw")
)
if isinstance(loc, str) and loc.startswith("file://"):
loc = loc[len("file://") :]
if isinstance(loc, str) and (
loc.startswith("/") or (len(loc) > 2 and loc[1:3] in (":\\", ":/"))
):
try:
p = Path(loc)
target_dir = p.parent if p.exists() or p.parent.exists() else None
except Exception:
target_dir = None
if target_dir is None:
target_dir = Path.cwd()
out_path = target_dir / "searched_context.md"
try:
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "a", encoding="utf-8") as f:
f.write(f"### Query: {query_text}\n{rendered}\n\n")
await self._set_status(f"[green]✓ Saved search output to:[/green] {out_path}")
except Exception as ex:
await self._set_status(f"[red]Failed to save search output:[/red] {ex}")
except Exception as ex:
await self._set_status(f"[red]Search failed:[/red] {ex}")
def _choose_export_path(self, data_item: dict) -> Path | None:
# Prefer original path; fallback to raw path
loc = data_item.get("orig") or data_item.get("raw")
if not loc:
return None
# Accept absolute POSIX paths or file:// URIs
if isinstance(loc, str) and loc.startswith("file://"):
loc = loc[len("file://") :]
if not isinstance(loc, str) or not (
loc.startswith("/") or loc[1:3] == ":\\" or loc[1:3] == ":/"
):
return None
try:
p = Path(loc)
if p.is_dir():
return p / "context_export.md"
# write next to file with suffix
stem = p.stem
return p.with_name(f"{stem}_context.md")
except Exception:
return None
async def _handle_export(self) -> None:
try:
export_queries = self.query_one("#export_queries", Input)
data_id = self._selected_data_id
if not data_id:
await self._set_status(":warning: Please select a file to export context for.")
return
data_item = self._data_items_by_id.get(data_id)
if not data_item:
await self._set_status(":warning: Selected file metadata not available.")
return
export_path = self._choose_export_path(data_item)
if not export_path:
await self._set_status(
"[red]Can't determine a local file path to save Markdown next to the original file.[/red]"
)
return
raw_queries = (export_queries.value or "").strip()
queries = [q.strip() for q in raw_queries.split(",") if q.strip()]
await self._set_status("Running export...")
md_parts: list[str] = []
md_parts.append(f"# Context Export for {data_item.get('name', 'selected item')}")
# Include simple metadata block
md_parts.append("")
md_parts.append("## Source")
md_parts.append(
f"- Dataset: {self._dataset_id_to_name.get(self._selected_dataset_id, self.DEFAULT_DATASET)}"
)
if data_item.get("orig"):
md_parts.append(f"- Original: {data_item.get('orig')}")
if data_item.get("raw"):
md_parts.append(f"- Raw: {data_item.get('raw')}")
# Run queries if provided
if queries:
md_parts.append("\n## Search Results")
for q in queries:
md_parts.append(f"\n### Query: {q}\n")
try:
ds_name = self._dataset_id_to_name.get(self._selected_dataset_id, None)
kwargs = {}
if ds_name:
kwargs["datasets"] = [ds_name]
results = await cognee.search(query_text=q, **kwargs)
if isinstance(results, list):
if results:
for r in results:
md_parts.append(f"- {str(r)}")
else:
md_parts.append("- (no results)")
else:
md_parts.append(str(results))
except Exception as ex:
md_parts.append(f"- (search failed: {ex})")
# Write file
export_path.parent.mkdir(parents=True, exist_ok=True)
with open(export_path, "w", encoding="utf-8") as f:
f.write("\n".join(md_parts).strip() + "\n")
await self._set_status(f"[green]✓ Exported to:[/green] {export_path}")
except Exception as ex:
await self._set_status(f"[red]Export failed:[/red] {ex}")
def on_button_pressed(self, event) -> None:
if event.button.id == "back_btn":
self.app.pop_screen()
return
if event.button.id == "add_btn":
asyncio.create_task(self._handle_add())
return
if event.button.id == "cognify_btn":
asyncio.create_task(self._handle_cognify())
return
if event.button.id == "search_btn":
asyncio.create_task(self._handle_search())
return
if event.button.id == "export_btn":
asyncio.create_task(self._handle_export())
return
def on_data_table_row_selected(self, message: DataTable.RowSelected) -> None:
# Selecting a dataset row loads its files and syncs dropdown
try:
if message.data_table.id == "datasets_table":
row_key = message.row_key
dataset_id = self._dataset_row_to_id.get(row_key)
if dataset_id:
self._selected_dataset_id = dataset_id
asyncio.create_task(self._load_dataset_files(dataset_id))
elif message.data_table.id == "files_table":
row_key = message.row_key
data_id = self._file_row_to_id.get(row_key)
if data_id:
self._selected_data_id = data_id
except Exception:
pass
def _active_table(self) -> DataTable | None:
try:
files_table = self.query_one("#files_table", DataTable)
datasets_table = self.query_one("#datasets_table", DataTable)
if files_table.has_focus:
return files_table
return datasets_table
except Exception:
return None
def action_arrow_up(self) -> None:
table = self._active_table()
if table:
try:
table.action_cursor_up()
except Exception:
pass
def action_arrow_down(self) -> None:
table = self._active_table()
if table:
try:
table.action_cursor_down()
except Exception:
pass
def action_arrow_left(self) -> None:
table = self._active_table()
if table:
try:
# Move focus to datasets table on left, otherwise move cursor
if table.id == "files_table":
self.query_one("#datasets_table", DataTable).focus()
else:
table.action_cursor_left()
except Exception:
pass
def action_arrow_right(self) -> None:
table = self._active_table()
if table:
try:
# Move focus to files table on right, otherwise move cursor
if table.id == "datasets_table":
self.query_one("#files_table", DataTable).focus()
else:
table.action_cursor_right()
except Exception:
pass
def on_key(self, event: Key) -> None:
"""Fallback manual cursor movement to guarantee arrow navigation."""
table = self._active_table()
if not table:
return
try:
# Determine current row from cursor_coordinate if available
current_row = 0
try:
coord = table.cursor_coordinate
if coord and isinstance(coord, tuple):
current_row = int(coord[0])
except Exception:
pass
if event.key in ("up", "down"):
max_rows = (
len(self._file_row_keys)
if table.id == "files_table"
else len(self._dataset_row_keys)
)
if max_rows <= 0:
return
if event.key == "up":
new_row = max(0, current_row - 1)
else:
new_row = min(max_rows - 1, current_row + 1)
try:
table.cursor_coordinate = (new_row, 0)
event.stop()
except Exception:
pass
elif event.key == "left":
if table.id == "files_table":
self.query_one("#datasets_table", DataTable).focus()
event.stop()
elif event.key == "right":
if table.id == "datasets_table":
self.query_one("#files_table", DataTable).focus()
event.stop()
except Exception:
pass
def action_back(self) -> None:
self.app.pop_screen()

View file

@ -0,0 +1,61 @@
"""Home Screen for Cognee TUI"""
from textual.screen import Screen
from textual.app import ComposeResult
from textual.widgets import Header, Footer, Button, Static
from textual.containers import Container, Vertical
class HomeScreen(Screen):
"""Main dashboard screen"""
def compose(self) -> ComposeResult:
yield Header()
with Container(id="menu-container", classes="center"):
yield Static("[bold cyan]🧠 Cognee Context Manager[/bold cyan]", classes="title")
yield Static("\nManage your AI memory and context\n", classes="center")
with Vertical():
yield Button("📁 Manage Context", id="context", variant="primary")
yield Button("🔍 Search & Query", id="query", variant="success")
yield Button("⚙️ Settings", id="settings", variant="default")
yield Button("❓ Help", id="help", variant="default")
yield Button("🚪 Exit", id="exit", variant="error")
yield Static("\n[dim]Use arrow keys • Enter to select[/dim]", classes="center")
yield Footer()
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id
if button_id == "context":
from cognee.cli.tui.screens.context import ContextScreen
self.app.push_screen(ContextScreen())
elif button_id == "query":
from cognee.cli.tui.screens.query import QueryScreen
self.app.push_screen(QueryScreen())
elif button_id == "settings":
from cognee.cli.tui.screens.settings import SettingsScreen
self.app.push_screen(SettingsScreen())
elif button_id == "help":
self.app.action_help()
elif button_id == "exit":
self.app.exit()
def on_mount(self) -> None:
# Ensure initial focus so arrow keys can move between buttons
try:
first_button = self.query_one("#context", Button)
first_button.focus()
except Exception:
# If the button isn't found for any reason, ignore
pass

View file

@ -0,0 +1,64 @@
"""Query Screen"""
import asyncio
from textual.screen import Screen
from textual.app import ComposeResult
from textual.widgets import Header, Footer, Button, Static, Input, Markdown
from textual.containers import Container, Vertical, VerticalScroll
from textual.binding import Binding
import cognee
class QueryScreen(Screen):
"""Query screen"""
BINDINGS = [Binding("escape", "back", "Back")]
def compose(self) -> ComposeResult:
yield Header()
with Container():
yield Static("[bold]🔍 Search & Query[/bold]\n", classes="title")
with Vertical():
yield Static("Enter your question and run a graph-aware search:", classes="center")
yield Input(placeholder="e.g., What are the main topics?", id="query_input")
yield Button("Run Search", id="run_btn", variant="primary")
with VerticalScroll():
yield Markdown("", id="results_md")
yield Button("← Back", id="back_btn")
yield Footer()
async def _set_results(self, content: str) -> None:
try:
md = self.query_one("#results_md", Markdown)
md.update(content)
except Exception:
pass
async def _run_search(self) -> None:
query_input = self.query_one("#query_input", Input)
query_text = (query_input.value or "").strip()
if not query_text:
await self._set_results(":warning: Please enter a question to search.")
return
try:
await self._set_results("_Searching..._")
results = await cognee.search(query_text=query_text)
# Normalize results for display
if isinstance(results, list):
rendered = "\n".join(f"- {str(item)}" for item in results)
else:
rendered = str(results)
await self._set_results(f"### Results\n\n{rendered}")
except Exception as ex:
await self._set_results(f"**Search failed:** {ex}")
def on_button_pressed(self, event) -> None:
if event.button.id == "back_btn":
self.app.pop_screen()
return
if event.button.id == "run_btn":
asyncio.create_task(self._run_search())
return
def action_back(self) -> None:
self.app.pop_screen()

View file

@ -0,0 +1,27 @@
"""Settings Screen"""
from textual.screen import Screen
from textual.app import ComposeResult
from textual.widgets import Header, Footer, Button, Static
from textual.containers import Container
from textual.binding import Binding
class SettingsScreen(Screen):
"""Settings screen"""
BINDINGS = [Binding("escape", "back", "Back")]
def compose(self) -> ComposeResult:
yield Header()
with Container():
yield Static("[bold]⚙️ Settings[/bold]\n", classes="title")
yield Static("Settings features coming soon!")
yield Button("← Back", id="back_btn")
yield Footer()
def on_button_pressed(self, event) -> None:
self.app.pop_screen()
def action_back(self) -> None:
self.app.pop_screen()

View file

@ -0,0 +1,4 @@
"""Custom widgets for Cognee TUI"""
# Custom widgets can be added here as needed
__all__ = []

View file

@ -57,6 +57,7 @@ dependencies = [
"websockets>=15.0.1,<16.0.0",
"mistralai>=1.9.10",
"tenacity>=9.0.0",
"textual>=0.70.0,<1.0.0",
]
[project.optional-dependencies]

8356
uv.lock generated

File diff suppressed because it is too large Load diff