Refactoring

This commit is contained in:
Digital Studium 2024-12-31 23:10:03 +03:00
parent 22f1770aee
commit 4db4959369
7 changed files with 513 additions and 301 deletions

View File

@ -6,13 +6,13 @@
Inspired by `lf` and `ranger` file managers, written in python. Inspired by `lf` and `ranger` file managers, written in python.
It is lightweight (~400 lines of code) and easy to customize. It is lightweight (~400 lines of code) and easy to customize.
Supports keyboard navigation and mouse navigation could be enabled (set MOUSE_ENABLED=True in a line #68). Supports keyboard navigation and mouse navigation could be enabled (set MOUSE_ENABLED=True in a line #69).
## Key bindings ## Key bindings
### For kubectl ### For kubectl
You can customize these bindings or add extra bindings in `KEY_BINDINGS` variable of `kls` in a line #15: You can customize these bindings or add extra bindings in `KEY_BINDINGS` variable of `kls` in a line #14:
- `Ctrl+y` - get **Y**aml of resource - `Ctrl+y` - get **Y**aml of resource
- `Ctrl+d` - **D**escribe resource - `Ctrl+d` - **D**escribe resource

View File

@ -0,0 +1,63 @@
digraph state_machine {
// Graph styling
rankdir=TB;
compound=true;
node [shape=record, style=filled, fillcolor=lightblue, fontsize=10];
edge [fontsize=9];
// States with entry/exit/activities
Normal [label="{Normal State|Entry: None\l\
Activities:\l\
• Wait for key input\l\
• Handle state-independent navigation\l\
Exit: None\l}"];
EmptyFilter [label="{EmptyFilter State|Entry: draw_footer(menu, '/')\l\
Activities:\l\
• Wait for key input\l\
• Handle state-independent navigation\l\
• Monitor for filter input\l\
Exit: None\l}"];
FilledFilter [label="{FilledFilter State|Entry:\l\
• Update filtered_rows\l\
• Draw menu or footer with filter\l\
Activities:\l\
• Process filter input\l\
• Update filtered_rows on changes\l\
• Update display if visibility changes\l\
• Handle state-independent navigation\l\
Exit:\l\
• Clear filter (on ESC)\l\
• Update filtered_rows\l\
• Update display\l}"];
// Exit node
exit [shape=doublecircle, fillcolor=lightpink, label="Exit\n(SELECTED_MENU = None)"];
// Initial state indicator
start [shape=point, fillcolor=black];
start -> Normal;
// Transitions
Normal -> EmptyFilter [label="/ (slash)\nAction: Initialize empty filter"];
Normal -> exit [label="ESC\nAction: Clear selection"];
EmptyFilter -> Normal [label="ESC\nAction: draw_footer('')"];
EmptyFilter -> FilledFilter [label="alnum or -\nActions:\n• Add char to filter\n• Create filtered_rows\n• Update display"];
FilledFilter -> Normal [label="ESC\nActions:\n• Clear filter\n• Reset filtered_rows\n• Update display"];
FilledFilter -> EmptyFilter [label="BACKSPACE\n(when filter empty)\nAction: draw_footer('/')"];
FilledFilter -> FilledFilter [label="alnum or -\nActions:\n• Add char to filter\n• Update filtered_rows\n• Update display"];
// State-independent actions note
subgraph cluster_notes {
label="Notes";
style=filled;
fillcolor=lightyellow;
node [style=filled, fillcolor=white];
note1 [label="State-Independent Actions:\n• Vertical/Horizontal navigation\n• Available in all states\n• Handled by handle_state_independent_input()"];
note2 [label="Display Updates:\n• draw_menu_with_filter()\n- Called when visible rows change\n• draw_footer()\n- Called for filter updates"];
note3 [label="Filtering:\n• Uses CircularList for row management\n• Filters are case-insensitive\n• Supports alphanumeric and '-' chars"];
}
}

21
diagrams/draft.dot Normal file
View File

@ -0,0 +1,21 @@
digraph {
newrank=true;
subgraph cluster_menu_states {
label="Menu States"
color=lightgrey;
style=filled;
node [style=filled,color=white];
Normal -> EmptyFilter [label = "S"];
EmptyFilter -> FilledFilter [label = "A"];
EmptyFilter -> Normal [label = "E"];
FilledFilter:ne -> Normal:se [label = "E"];
FilledFilter -> EmptyFilter [label = "B"];
FilledFilter:s -> FilledFilter:s [label = "B"];
FilledFilter:e -> FilledFilter:e [label = "A"];
FilledFilter:w -> FilledFilter:w [label = "V"];
EmptyFilter:w -> EmptyFilter:w [label = "V"];
Normal:w -> Normal:w [label = "V"];
}
{ rank=same; Normal; Exit; }
Normal -> Exit [label = "E"];
}

View File

@ -0,0 +1,57 @@
digraph fsm {
// Create states cluster for l1, l2, l3
subgraph cluster_states {
rankdir="LR"
label = "States";
style=filled;
color=lightgrey;
node [style=filled,color=white];
l1 [label = "Normal state"];
l2 [label = "Filter state\n(empty filter)"];
l3 [label = "Filter state\n(non-empty filter)"];
}
// Other nodes
exit [label = "Exit"];
i1 [label = "Other menu"];
e1 [label = "External program"];
// Transitions
l1 -> exit [label = "E"];
// Mode switches
l1 -> l2 [label = "S"];
l2 -> l3 [label = "A"];
l2 -> l1 [label = "E"];
l3 -> l1 [label = "E"];
l3 -> l2 [label = "B"];
// Horizontal navigation
l1 -> i1 [label = "H"];
l2 -> i1 [label = "H"];
l3 -> i1 [label = "H"];
// External program
l1 -> e1 [label = "K"];
l2 -> e1 [label = "K"];
l3 -> e1 [label = "K"];
// Self-loops
l1 -> l1 [label = "V"];
l2 -> l2 [label = "V"];
l3 -> l3 [label = "V"];
l3 -> l3 [label = "A"];
l3 -> l3 [label = "B"];
label = "Keys:\n\
S - Slash (start filter) \l\
E - Escape \l\
A - Type text \l\
B - Backspace \l\
H - Horizontal navigation (Right,Left,Tab) \l\
V - Vertical navigation (Up, Down) \l\
K - Key binding \l"
}

673
kls
View File

@ -1,84 +1,86 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from typing import Optional, Callable from typing import Optional, Callable, Self
import subprocess import subprocess
import curses import curses
import curses.ascii import curses.ascii
import asyncio import asyncio
from enum import Enum, auto
SCREEN: curses.window = curses.initscr() SCREEN: curses.window = curses.initscr()
# ****************************** # # ****************************** #
# START OF CONFIGURATION SECTION # # START OF CONFIGURATION SECTION #
# ****************************** # # ****************************** #
KEY_BINDINGS: dict[str, dict[str, str]] = { # can be extended KEY_BINDINGS: dict[str, dict[str, str]] = { # can be extended
"^Y": { # Ctrl + y "^Y": { # Ctrl + y
"description": "Yaml", "description": "Yaml",
"command": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml' "command": "kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml",
"kind": "all", # this key binding is valid for all api resources
}, },
"^D": { # Ctrl + d "^D": { # Ctrl + d
"description": "Describe", "description": "Describe",
"command": 'kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml' "command": "kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml",
"kind": "all",
}, },
"^E": { # Ctrl + e "^E": { # Ctrl + e
"description": "Edit", "description": "Edit",
"command": 'kubectl -n {namespace} edit {api_resource} {resource}' "command": "kubectl -n {namespace} edit {api_resource} {resource}",
"kind": "all",
}, },
"^L": { # Ctrl + l "^L": { # Ctrl + l
"description": "Logs", "description": "Logs",
"command": 'kubectl -n {namespace} logs {resource} | lnav' "command": "kubectl -n {namespace} logs {resource} | lnav",
"kind": "pods", # this key binding is valid for pods only
}, },
"^X": { # Ctrl + x "^X": { # Ctrl + x
"description": "eXec", "description": "eXec",
"command": 'kubectl -n {namespace} exec -it {resource} sh' "command": "kubectl -n {namespace} exec -it {resource} sh",
"kind": "pods",
}, },
"^N": { # Ctrl + n "^N": { # Ctrl + n
"description": "Network debug", "description": "Network debug",
"command": 'kubectl -n {namespace} debug {resource} -it --image=nicolaka/netshoot' "command": "kubectl -n {namespace} debug {resource} -it --image=nicolaka/netshoot",
"kind": "pods",
}, },
"^A": { # Ctrl + a (a means Access logs! :-)) "^A": { # Ctrl + a (a means Access logs! :-))
"description": "istio-proxy Access logs", "description": "istio-proxy Access logs",
"command": 'kubectl -n {namespace} logs {resource} -c istio-proxy | lnav' "command": "kubectl -n {namespace} logs {resource} -c istio-proxy | lnav",
"kind": "pods",
}, },
"^P": { # Ctrl + p (p means Proxy! :-)) "^P": { # Ctrl + p (p means Proxy! :-))
"description": "exec istio-Proxy", "description": "exec istio-Proxy",
"command": 'kubectl -n {namespace} exec -it {resource} -c istio-proxy bash' "command": "kubectl -n {namespace} exec -it {resource} -c istio-proxy bash",
"kind": "pods",
}, },
"^R": { # Ctrl + r (r means Reveal! :-)) "^R": { # Ctrl + r (r means Reveal! :-))
"description": "Reveal secret", "description": "Reveal secret",
"command": "kubectl get secret {resource} -n {namespace} -o yaml | yq '.data |= with_entries(.value |= @base64d)' -y | batcat -l yaml" "command": "kubectl get secret {resource} -n {namespace} -o yaml"
" | yq '.data |= with_entries(.value |= @base64d)' -y | batcat -l yaml",
"kind": "secrets",
}, },
"Delete": { # It is actually KEY_DC "Delete": { # It is actually KEY_DC
"description": "Delete", "description": "Delete",
"command": 'kubectl -n {namespace} delete {api_resource} {resource}' "command": "kubectl -n {namespace} delete {api_resource} {resource}",
} "kind": "all",
},
} }
# which api resources are on the top of menu?
TOP_API_RESOURCES: list[str] = [
"pods", "services", "configmaps", "secrets", "persistentvolumeclaims",
"ingresses", "nodes", "deployments", "statefulsets", "daemonsets",
"storageclasses", "serviceentries", "destinationrules", "authorizationpolicies",
"virtualservices", "gateways", "telemetry", "envoyfilters"
]
QUERY_API_RESOURCES: bool = False # Should we merge TOP_API_RESOURCES with all other api resources from cluster?
BATCAT_STYLE: str = " --paging always --style numbers" # style of batcat BATCAT_STYLE: str = " --paging always --style numbers" # style of batcat
SELECTED_ROW_STYLE = curses.A_REVERSE | curses.A_BOLD
MOUSE_ENABLED: bool = False MOUSE_ENABLED: bool = False
WIDTH: int = curses.COLS WIDTH: int = curses.COLS
WIDTH_UNIT: int = int(WIDTH / 8) WIDTH_UNIT: int = int(WIDTH / 10)
CONTEXTS_WIDTH = int(WIDTH_UNIT * 1.5)
NAMESPACES_WIDTH = int(WIDTH_UNIT * 1.5) NAMESPACES_WIDTH = int(WIDTH_UNIT * 1.5)
API_RESOURCES_WIDTH = int(WIDTH_UNIT * 1.5) API_RESOURCES_WIDTH = int(WIDTH_UNIT * 1.5)
RESOURCES_WIDTH = WIDTH - (API_RESOURCES_WIDTH + NAMESPACES_WIDTH) RESOURCES_WIDTH = WIDTH - (CONTEXTS_WIDTH + NAMESPACES_WIDTH + API_RESOURCES_WIDTH)
HEADER_HEIGHT: int = 4 HEADER_HEIGHT: int = 4
FOOTER_HEIGHT: int = 3 FOOTER_HEIGHT: int = 3
ROWS_HEIGHT: int = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3 ROWS_HEIGHT: int = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3
# Generate HELP_TEXT from KEY_BINDINGS # Generate HELP_TEXT from KEY_BINDINGS
HELP_TEXT: str = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items()) HELP_TEXT: str = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items())
HELP_TEXT += ", /: filter mode, Esc: exit filter mode or kls, arrows/TAB/PgUp/PgDn: navigation" HELP_TEXT += ", /: filter mode, Esc: exit filter mode or kls, arrows/TAB: navigation"
SELECTED_ROW_STYLE = curses.A_REVERSE | curses.A_BOLD
ALLOWED_SPECIAL_KEYS = list(KEY_BINDINGS.keys()) + ["KEY_DC", "/", "\x1b", "KEY_BACKSPACE", "\x08", "KEY_MOUSE", "KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END", "\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]
# **************************** # # **************************** #
# END OF CONFIGURATION SECTION # # END OF CONFIGURATION SECTION #
@ -86,41 +88,106 @@ ALLOWED_SPECIAL_KEYS = list(KEY_BINDINGS.keys()) + ["KEY_DC", "/", "\x1b", "KEY_
class CircularList: class CircularList:
def __init__(self, elements: list[str]): def __init__(self, items: list[str]):
self.elements: list[str] = elements self.items: list[str] = items
self.size: int = len(elements) self.size: int = len(items)
self.index: int = 0 self.index: int = 0
def __getitem__(self, index: slice) -> list[str]: def __getitem__(self, index: slice) -> list[str]:
start, stop, step = index.indices(self.size) start, stop, step = index.indices(self.size)
return [self.elements[(self.index + i) % self.size] for i in range(start, stop, step)] return [self.items[(self.index + i) % self.size] for i in range(start, stop, step)]
def shift(self, steps: int) -> None: def shift(self, steps: int) -> None:
self.index = (self.index + steps) % self.size self.index = (self.index + steps) % self.size
class MenuState(Enum):
NORMAL = auto()
EMPTY_FILTER = auto()
FILLED_FILTER = auto()
class Menu: class Menu:
def __init__(self, title: str, rows: list[str], begin_x: int|float, width: int|float, rows_height: int): selected = None # Class variable to track selected object
def __init__(
self,
title: str,
rows: list[str],
begin_x: int,
width: int,
):
self.title: str = title self.title: str = title
self.rows: list[str] = rows self.rows: list[str] = rows
self.filter: str = "" self.filter: str = ""
self.filter_mode: bool = False self.state: MenuState = MenuState.NORMAL
self.filtered_rows: CircularList = CircularList([x for x in self.rows if self.filter in x]) self.filtered_rows: CircularList = CircularList([x for x in self.rows if self.filter in x])
self.visible_rows: Callable[[], list[str]] = lambda: self.filtered_rows[:rows_height] self.visible_rows: Callable[[], list[str]] = lambda: self.filtered_rows[:ROWS_HEIGHT]
self.visible_row_index: int = 0 self.visible_row_index: int = 0
self.selected_row: Callable[[], Optional[str]] = lambda: self.visible_rows()[ self.selected_row: Callable[[], Optional[str]] = (
self.visible_row_index] if self.visible_rows() else None lambda: self.visible_rows()[self.visible_row_index] if self.visible_rows() else None
self.rows_height: int = rows_height )
self.width: int = int(width) self.width: int = int(width)
self.begin_x: int = int(begin_x) self.begin_x: int = int(begin_x)
self.win: curses.window = curses.newwin(curses.LINES - FOOTER_HEIGHT, self.width, 0, self.begin_x) self.win: curses.window = curses.newwin(curses.LINES - FOOTER_HEIGHT, self.width, 0, self.begin_x)
self.dependent_menus: list[Self] = []
def refresh_filtered_rows(self):
self.filtered_rows = CircularList([x for x in self.rows if self.filter in x])
async def set_state(self, state: MenuState) -> None:
self.state = state
# entry activities
match self.state:
case MenuState.NORMAL:
self.filter = ""
self.draw_menu_or_footer("")
await self.refresh_dependent_menus()
case MenuState.EMPTY_FILTER:
self.filter = ""
self.draw_menu_or_footer("/")
await self.refresh_dependent_menus()
case MenuState.FILLED_FILTER:
self.draw_menu_or_footer(f"/{self.filter}") # if redrawing whole menu is not needed
await self.refresh_dependent_menus()
def draw_rows(self) -> None:
for index, row in enumerate(self.visible_rows()):
draw_row(self.win, row, index + HEADER_HEIGHT, 2, selected=row == self.selected_row())
def draw_menu_with_footer(self) -> None:
self.win.erase()
draw_row(self.win, self.title, 1, 2, selected=self == Menu.selected)
self.draw_rows()
draw_row(
self.win,
f"/{self.filter}" if self.state in [MenuState.EMPTY_FILTER, MenuState.FILLED_FILTER] else "",
curses.LINES - FOOTER_HEIGHT - 2,
2,
)
def draw_menu_or_footer(self, footer_text: str) -> None:
previous_visible_rows = self.visible_rows()
self.refresh_filtered_rows()
if self.visible_rows() != previous_visible_rows: # draw whole menu
self.visible_row_index = 0
self.draw_menu_with_footer()
self.refresh_dependent_menus()
else: # draw footer only
draw_row(self.win, footer_text, curses.LINES - FOOTER_HEIGHT - 2, 2)
def set_dependent_menus(self, menus: list[Self]) -> None:
self.dependent_menus = menus
async def refresh_dependent_menus(self):
for menu in self.dependent_menus:
await refresh_menu(menu)
# Global variables # Global variables
THIRD_MENU_LOCK: asyncio.Lock = asyncio.Lock() FOURTH_MENU_LOCK: asyncio.Lock = asyncio.Lock()
THIRD_MENU_TASK: Optional[asyncio.Task] = None FOURTH_MENU_TASK: Optional[asyncio.Task] = None
menus: list[Menu] = [] MENUS: list[Menu] = []
selected_menu: Optional[Menu] = None
def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = False) -> None: def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = False) -> None:
@ -129,274 +196,66 @@ def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool =
window.refresh() window.refresh()
def draw_rows(menu: Menu) -> None: async def refresh_menu(menu: Menu) -> None:
for index, row in enumerate(menu.visible_rows()): if menu == MENUS[1]:
draw_row(menu.win, row, index + HEADER_HEIGHT, 2, selected = row == menu.selected_row()) menu.rows = await get_namespaces()
elif menu == MENUS[2]:
menu.rows = await get_api_resources()
menu.refresh_filtered_rows()
menu.visible_row_index = 0
menu.draw_menu_with_footer()
def draw_menu(menu: Menu) -> None: async def refresh_resources_menu(namespace: Optional[str], api_resource: Optional[str]) -> None:
menu.win.erase()
draw_row(menu.win, menu.title, 1, 2, selected = menu == selected_menu)
draw_rows(menu)
draw_row(menu.win, f"/{menu.filter}" if menu.filter_mode else "", curses.LINES - FOOTER_HEIGHT - 2, 2)
async def refresh_third_menu(namespace: Optional[str], api_resource: Optional[str]) -> None:
global THIRD_MENU_TASK
try: try:
async with THIRD_MENU_LOCK: async with FOURTH_MENU_LOCK:
menu = menus[2] menu = MENUS[3]
previous_menu_rows = menu.rows previous_menu_rows = menu.rows
if api_resource and namespace: if api_resource and namespace:
try: try:
menu.rows = await kubectl_async( menu.rows = await kubectl_async(
f"-n {namespace} get {api_resource} --no-headers --ignore-not-found") f"-n {namespace} get {api_resource} --no-headers --ignore-not-found --sort-by='{{.metadata.name}}'"
)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
menu.rows = [] menu.rows = []
else: else:
menu.rows = [] menu.rows = []
index_before_update = menu.filtered_rows.index index_before_update = menu.filtered_rows.index
menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x]) menu.refresh_filtered_rows()
menu.filtered_rows.index = index_before_update menu.filtered_rows.index = index_before_update
if menu.visible_row_index >= len(menu.visible_rows()): if menu.visible_row_index >= len(menu.visible_rows()):
menu.visible_row_index = 0 menu.visible_row_index = 0
if previous_menu_rows != menu.rows: if previous_menu_rows != menu.rows:
draw_menu(menu) menu.draw_menu_with_footer()
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise
async def handle_key_bindings(key: str, namespace: str, api_resource: str, resource: str) -> None: async def get_contexts() -> list[str]:
if not resource:
return
if key in ("l", "x", "n") and api_resource != "pods":
return
if key == "KEY_DC":
key = "Delete"
if THIRD_MENU_TASK is not None:
THIRD_MENU_TASK.cancel()
try: try:
await THIRD_MENU_TASK current_context = await kubectl_async("config current-context")
except asyncio.CancelledError: contexts = await kubectl_async("config get-contexts --no-headers -o name")
contexts.remove(current_context[0])
contexts.insert(0, current_context[0])
return [line.split()[0] for line in contexts if line.strip()]
except subprocess.CalledProcessError:
return []
async def switch_context(context: str) -> None:
if not context:
return
try:
await kubectl_async(f"config use-context {context}")
except subprocess.CalledProcessError:
pass pass
async with THIRD_MENU_LOCK:
curses.def_prog_mode()
curses.endwin()
command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource)
if "batcat" in command:
command += BATCAT_STYLE
await subprocess_call_async(command)
curses.reset_prog_mode()
SCREEN.refresh()
enable_mouse_support()
def handle_filter_state(key: str, menu: Menu) -> None: async def get_namespaces() -> list[str]:
global selected_menu
if key == "/" and not menu.filter_mode:
menu.filter_mode = True
menu.filter = ""
elif key == "\x1b": # Escape key
if menu.filter_mode:
menu.filter_mode = False
menu.filter = ""
else:
selected_menu = None
elif menu.filter_mode:
if key in ["KEY_BACKSPACE", "\x08"] and menu.filter:
menu.filter = menu.filter[:-1] # Remove last character
elif key.isalnum() or key == "-": # Allow letters, numbers, and dashes
menu.filter += key.lower()
menu.visible_row_index = 0
menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x])
draw_menu(menu)
if menu != menus[2]:
menus[2].visible_row_index = 0
def handle_mouse(menu: Menu) -> None:
if not MOUSE_ENABLED:
return
try:
mouse_info: tuple[int, ...] = curses.getmouse()
except curses.error: # this fixes scrolling error
return
row_number = mouse_info[2] - HEADER_HEIGHT
column_number = mouse_info[1]
next_menu: Optional[Menu] = None
if column_number > (menu.begin_x + menu.width):
next_menu = menus[(menus.index(menu) + 1) % 3]
if column_number > (next_menu.begin_x + next_menu.width):
next_menu = menus[(menus.index(next_menu) + 1) % 3]
globals().update(selected_menu=next_menu)
elif column_number < menu.begin_x:
next_menu = menus[(menus.index(menu) - 1) % 3]
if column_number < next_menu.begin_x:
next_menu = menus[(menus.index(next_menu) - 1) % 3]
globals().update(selected_menu=next_menu)
if next_menu:
draw_row(menu.win, menu.title, 1, 2, selected=False)
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)
menu = next_menu
char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1)
char_str = chr(char_int & 0xFF)
if not char_str or ord(char_str) > 127 or ' ' in char_str:
return
if 0 <= row_number < len(menu.visible_rows()):
menu.visible_row_index = row_number
draw_rows(menu)
if menu != menus[2]:
menus[2].visible_row_index = 0
def handle_vertical_navigation(key: str, menu: Menu) -> None:
if len(menu.visible_rows()) <= 1:
return
keys_numbers: dict[str, int] = {
"KEY_DOWN": 1, "KEY_UP": -1,
"KEY_NPAGE": 1, "KEY_PPAGE": -1,
'KEY_HOME': 0, 'KEY_END': -1
}
if key in ["KEY_DOWN", "KEY_UP"]:
if menu.filtered_rows.size > menu.rows_height:
menu.filtered_rows.shift(keys_numbers[key])
else:
menu.visible_row_index = (menu.visible_row_index + keys_numbers[key]) % menu.filtered_rows.size
elif key in ["KEY_NPAGE", "KEY_PPAGE"]:
menu.filtered_rows.shift(keys_numbers[key] * len(menu.visible_rows()))
elif key in ['KEY_HOME', 'KEY_END']:
menu.visible_row_index = keys_numbers[key]
draw_rows(menu)
if menu != menus[2]:
menus[2].visible_row_index = 0
def handle_horizontal_navigation(key: str, menu: Menu) -> None:
increment: int = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
next_menu = menus[(menus.index(menu) + increment) % 3]
draw_row(menu.win, menu.title, 1, 2, selected=False)
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)
globals().update(selected_menu=next_menu)
async def confirm_action(message: str) -> bool:
rows, cols = SCREEN.getmaxyx()
popup_height = 5
popup_width = len(message) + 10
start_y = (rows - popup_height) // 2
start_x = (cols - popup_width) // 2
popup = curses.newwin(popup_height, popup_width, start_y, start_x)
popup.box()
popup.addstr(2, 2, message)
popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel")
popup.refresh()
while True:
key = await get_key_async(popup)
if key.lower() == 'y':
return True
if key.lower() == 'n':
popup.clear()
popup.refresh()
return False
async def get_key_async(popup: curses.window) -> str:
return await asyncio.to_thread(popup.getkey)
async def kubectl_async(command: str) -> list[str]:
process = await asyncio.create_subprocess_shell(
f"kubectl {command} 2> /dev/null",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if stderr:
raise subprocess.CalledProcessError(process.returncode, command, stderr=stderr)
return stdout.decode().strip().split("\n")
async def catch_input(menu: Menu) -> None:
global THIRD_MENU_TASK
while True:
try:
key = await get_key_async(SCREEN)
break
except curses.error:
if THIRD_MENU_TASK is None or THIRD_MENU_TASK.done() or THIRD_MENU_TASK.cancelled():
THIRD_MENU_TASK = asyncio.create_task(
refresh_third_menu(
menus[0].selected_row(),
menus[1].selected_row()
)
)
await asyncio.sleep(0.1)
# Convert control keys to their string representation (e.g., Ctrl+Y -> ^Y)
# Handle special keys (e.g., "KEY_UP", "KEY_DC")
if key.startswith("KEY_") or key == "\x1b":
key_str = key # Use the key as-is for special keys
else:
# Handle single-character keys (e.g., "a", "^Y")
key_str = curses.ascii.unctrl(key) if curses.ascii.iscntrl(ord(key)) else key
# Check if the key is allowed
if key_str not in ALLOWED_SPECIAL_KEYS and not key.isalnum():
return # Ignore the key if it's not in the allowed list or not alphanumeric
if key.isalnum() and not menu.filter_mode:
return
if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]:
handle_horizontal_navigation(key, menu)
elif key in ["KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END"]:
if THIRD_MENU_TASK is not None:
THIRD_MENU_TASK.cancel()
try:
await THIRD_MENU_TASK
except asyncio.CancelledError:
pass
handle_vertical_navigation(key, menu)
elif key == "KEY_MOUSE":
handle_mouse(menu)
elif key == "KEY_DC" and await confirm_action("Are you sure you want to delete this resource?"):
await handle_key_bindings(
key,
menus[0].selected_row(),
menus[1].selected_row(),
menus[2].selected_row() and menus[2].selected_row().split()[0]
)
elif key in ["/", "\x1b", "KEY_BACKSPACE", "\x08"] or key.isalnum() or key == "-":
handle_filter_state(key, menu)
elif key != "KEY_DC" and curses.ascii.unctrl(key) in KEY_BINDINGS:
await handle_key_bindings(
curses.ascii.unctrl(key),
menus[0].selected_row(),
menus[1].selected_row(),
menus[2].selected_row() and menus[2].selected_row().split()[0]
)
async def subprocess_call_async(command: str) -> None:
process = await asyncio.create_subprocess_shell(command)
await process.communicate()
def enable_mouse_support() -> None:
if MOUSE_ENABLED:
curses.mousemask(curses.REPORT_MOUSE_POSITION)
print('\033[?1003h')
async def init_menus() -> None:
global menus, selected_menu
api_resources_kubectl: list[str] = [
x.split()[0] for x in await kubectl_async("api-resources --no-headers --verbs=get")
]
api_resources = list(
dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl)
) if QUERY_API_RESOURCES else TOP_API_RESOURCES
namespaces: list[str] = [] namespaces: list[str] = []
context = MENUS[0].selected_row() and MENUS[0].selected_row().split()[0]
if not context:
return namespaces
try: try:
namespaces = await kubectl_async("config view --minify --output 'jsonpath={..namespace}'") namespaces = await kubectl_async("config view --minify --output 'jsonpath={..namespace}'")
except: except:
@ -412,14 +271,186 @@ async def init_menus() -> None:
namespaces = all_namespaces namespaces = all_namespaces
except: except:
pass pass
return namespaces
menus = [
Menu("Namespaces", namespaces, 0, NAMESPACES_WIDTH, ROWS_HEIGHT),
Menu("API resources", api_resources, NAMESPACES_WIDTH, API_RESOURCES_WIDTH, ROWS_HEIGHT),
Menu("Resources", [], NAMESPACES_WIDTH + API_RESOURCES_WIDTH, RESOURCES_WIDTH, ROWS_HEIGHT)
]
selected_menu = menus[0]
async def get_api_resources() -> list[str]:
try:
api_resources = await kubectl_async("api-resources --no-headers --verbs=get")
return sorted(list(set([x.split()[0] for x in api_resources]))) # dedup
except subprocess.CalledProcessError:
return []
async def handle_key_bindings(key: str) -> None:
api_resource = MENUS[2].selected_row()
if KEY_BINDINGS[key]["kind"] != api_resource and KEY_BINDINGS[key]["kind"] != "all":
return
resource = MENUS[3].selected_row() and MENUS[3].selected_row().split()[0]
if not resource:
return
namespace = MENUS[1].selected_row()
if key == "KEY_DC":
key = "Delete"
await cancel_resources_refreshing()
async with FOURTH_MENU_LOCK:
curses.def_prog_mode()
curses.endwin()
command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource)
if "batcat" in command:
command += BATCAT_STYLE
await subprocess_call_async(command)
curses.reset_prog_mode()
SCREEN.refresh()
enable_mouse_support()
def handle_mouse(menu: Menu) -> None:
if not MOUSE_ENABLED:
return
try:
mouse_info: tuple[int, ...] = curses.getmouse()
except curses.error: # this fixes scrolling error
return
row_number = mouse_info[2] - HEADER_HEIGHT
column_number = mouse_info[1]
next_menu: Optional[Menu] = None
if column_number > (menu.begin_x + menu.width):
next_menu = MENUS[(MENUS.index(menu) + 1) % MENUS.__len__()]
if column_number > (next_menu.begin_x + next_menu.width):
next_menu = MENUS[(MENUS.index(next_menu) + 1) % MENUS.__len__()]
Menu.selected = next_menu
elif column_number < menu.begin_x:
next_menu = MENUS[(MENUS.index(menu) - 1) % MENUS.__len__()]
if column_number < next_menu.begin_x:
next_menu = MENUS[(MENUS.index(next_menu) - 1) % MENUS.__len__()]
Menu.selected = next_menu
if next_menu:
draw_row(menu.win, menu.title, 1, 2, selected=False)
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)
menu = next_menu
char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1)
char_str = chr(char_int & 0xFF)
if not char_str or ord(char_str) > 127 or " " in char_str:
return
if 0 <= row_number < len(menu.visible_rows()):
menu.visible_row_index = row_number
menu.draw_rows()
menu.refresh_dependent_menus()
async def move_selection_vertically(key: str, menu: Menu) -> None:
if len(menu.visible_rows()) <= 1:
return
keys_numbers: dict[str, int] = {"KEY_DOWN": 1, "KEY_UP": -1}
if menu.filtered_rows.size > ROWS_HEIGHT:
menu.filtered_rows.shift(keys_numbers[key])
else:
menu.visible_row_index = (menu.visible_row_index + keys_numbers[key]) % menu.filtered_rows.size
menu.draw_rows()
def move_selection_horizontally(key: str, menu: Menu) -> None:
increment: int = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
next_menu = MENUS[(MENUS.index(menu) + increment) % MENUS.__len__()]
draw_row(menu.win, menu.title, 1, 2, selected=False)
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)
Menu.selected = next_menu
def confirm_action(message: str) -> bool:
rows, cols = SCREEN.getmaxyx()
popup_height = 5
popup_width = len(message) + 10
start_y = (rows - popup_height) // 2
start_x = (cols - popup_width) // 2
popup = curses.newwin(popup_height, popup_width, start_y, start_x)
popup.box()
popup.addstr(2, 2, message)
popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel")
popup.refresh()
while True:
key = popup.getkey()
if key.lower() == "y":
confirm = True
elif key.lower() == "n":
confirm = False
else:
continue
popup.clear()
popup.refresh()
return confirm
async def cancel_resources_refreshing() -> None:
if not (FOURTH_MENU_TASK is None or FOURTH_MENU_TASK.done()):
FOURTH_MENU_TASK.cancel()
try:
await FOURTH_MENU_TASK
except asyncio.CancelledError:
pass
async def kubectl_async(command: str) -> list[str]:
process = await asyncio.create_subprocess_shell(
f"kubectl {command}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if stderr:
raise subprocess.CalledProcessError(process.returncode, command, stderr=stderr)
return stdout.decode().strip().split("\n")
async def handle_state_independent_input(menu: Menu, key: str) -> None:
if key in ["KEY_UP", "KEY_DOWN"]: # V (Vertical navigation)
if len(menu.visible_rows()) > 1:
await cancel_resources_refreshing()
await move_selection_vertically(key, menu)
if menu == MENUS[0]:
await switch_context(menu.selected_row())
await menu.refresh_dependent_menus()
elif key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]: # H (Vertical navigation)
move_selection_horizontally(key, menu)
elif key == "KEY_MOUSE":
handle_mouse(menu)
elif key == "KEY_DC":
if not MENUS[3].selected_row():
return
if confirm_action("Are you sure you want to delete this resource?"):
await handle_key_bindings(key)
elif not key.startswith("KEY_") and curses.ascii.unctrl(key) in KEY_BINDINGS: # K (Key Bindings)
await handle_key_bindings(curses.ascii.unctrl(key))
async def subprocess_call_async(command: str) -> None:
process = await asyncio.create_subprocess_shell(command)
await process.communicate()
def enable_mouse_support() -> None:
curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION)
if MOUSE_ENABLED:
print("\033[?1003h")
async def init_menus() -> list[Menu]:
MENUS.append(Menu("Contexts", await get_contexts(), 0, CONTEXTS_WIDTH))
MENUS.append(Menu("Namespaces", await get_namespaces(), CONTEXTS_WIDTH, NAMESPACES_WIDTH))
MENUS.append(
Menu("API resources", await get_api_resources(), CONTEXTS_WIDTH + NAMESPACES_WIDTH, API_RESOURCES_WIDTH)
)
MENUS.append(Menu("Resources", [], CONTEXTS_WIDTH + NAMESPACES_WIDTH + API_RESOURCES_WIDTH, RESOURCES_WIDTH))
return MENUS
async def main_async() -> None:
global MENUS, FOURTH_MENU_TASK
MENUS = await init_menus()
Menu.selected = MENUS[0]
SCREEN.refresh() SCREEN.refresh()
SCREEN.nodelay(True) SCREEN.nodelay(True)
SCREEN.keypad(True) SCREEN.keypad(True)
@ -428,18 +459,56 @@ async def init_menus() -> None:
curses.use_default_colors() curses.use_default_colors()
curses.noecho() curses.noecho()
enable_mouse_support() enable_mouse_support()
for index, menu in enumerate(MENUS):
menu.draw_menu_with_footer()
async def main_async() -> None: menu.set_dependent_menus(MENUS[index + 1 :])
await init_menus() draw_row(curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), HELP_TEXT, 1, 2)
for menu in menus: while True:
draw_menu(menu) menu = Menu.selected
draw_row( try:
curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), key = SCREEN.getkey()
HELP_TEXT, 1, 2 except curses.error:
if FOURTH_MENU_TASK is None or FOURTH_MENU_TASK.done():
FOURTH_MENU_TASK = asyncio.create_task(
refresh_resources_menu(MENUS[1].selected_row(), MENUS[2].selected_row())
) )
while selected_menu: await asyncio.sleep(0.01)
await catch_input(selected_menu) continue
# handle state-dependent keys
match menu.state:
case MenuState.NORMAL:
if key == "\x1b": # E (Escape)
break # Exit
elif key == "/": # S (Slash)
await menu.set_state(MenuState.EMPTY_FILTER) # Transition to EmptyFilter state
continue
case MenuState.EMPTY_FILTER:
if key == "\x1b": # E (Escape)
await menu.set_state(MenuState.NORMAL) # Transition to Normal state
continue
elif key.isalnum() or key == "-": # A (Type text)
menu.filter += key.lower()
await menu.set_state(MenuState.FILLED_FILTER) # Transition to FilledFilter state
continue
case MenuState.FILLED_FILTER: # FilledFilter state
if key == "\x1b": # E (Escape)
await menu.set_state(MenuState.NORMAL) # Transition to Normal state
continue
elif key in ["KEY_BACKSPACE", "\x08"]: # B (Backspace)
if len(menu.filter) == 1:
await menu.set_state(MenuState.EMPTY_FILTER) # Transition to EmptyFilter state
continue
menu.filter = menu.filter[:-1]
menu.draw_menu_or_footer(f"/{menu.filter}")
continue
elif key.isalnum() or key == "-": # A (Type text)
menu.filter += key.lower() # Stay in FilledFilter state
menu.draw_menu_or_footer(f"/{menu.filter}")
continue
# handle state-independent keys (Vertical/Horizontal navigation etc. available in all states)
await handle_state_independent_input(menu, key)
def main(screen: curses.window) -> None: def main(screen: curses.window) -> None:

2
tests/manual_tests.txt Normal file
View File

@ -0,0 +1,2 @@
1. Test all key bindings on pods, services and secrets
2. Test filters