diff --git a/README.md b/README.md index a8c2c4e..e2a7a21 100644 --- a/README.md +++ b/README.md @@ -6,13 +6,13 @@ Inspired by `lf` and `ranger` file managers, written in python. It is lightweight (~400 lines of code) and easy to customize. -Supports keyboard navigation and mouse navigation could be enabled (set MOUSE_ENABLED=True in a line #68). +Supports keyboard navigation and mouse navigation could be enabled (set MOUSE_ENABLED=True in a line #69). ## Key bindings ### For kubectl -You can customize these bindings or add extra bindings in `KEY_BINDINGS` variable of `kls` in a line #15: +You can customize these bindings or add extra bindings in `KEY_BINDINGS` variable of `kls` in a line #14: - `Ctrl+y` - get **Y**aml of resource - `Ctrl+d` - **D**escribe resource diff --git a/diagrams/another_dot_diagram.dot b/diagrams/another_dot_diagram.dot new file mode 100644 index 0000000..188e61e --- /dev/null +++ b/diagrams/another_dot_diagram.dot @@ -0,0 +1,63 @@ +digraph state_machine { + // Graph styling + rankdir=TB; + compound=true; + node [shape=record, style=filled, fillcolor=lightblue, fontsize=10]; + edge [fontsize=9]; + + // States with entry/exit/activities + Normal [label="{Normal State|Entry: None\l\ +Activities:\l\ +• Wait for key input\l\ +• Handle state-independent navigation\l\ +Exit: None\l}"]; + + EmptyFilter [label="{EmptyFilter State|Entry: draw_footer(menu, '/')\l\ +Activities:\l\ +• Wait for key input\l\ +• Handle state-independent navigation\l\ +• Monitor for filter input\l\ +Exit: None\l}"]; + + FilledFilter [label="{FilledFilter State|Entry:\l\ +• Update filtered_rows\l\ +• Draw menu or footer with filter\l\ +Activities:\l\ +• Process filter input\l\ +• Update filtered_rows on changes\l\ +• Update display if visibility changes\l\ +• Handle state-independent navigation\l\ +Exit:\l\ +• Clear filter (on ESC)\l\ +• Update filtered_rows\l\ +• Update display\l}"]; + + // Exit node + exit [shape=doublecircle, fillcolor=lightpink, label="Exit\n(SELECTED_MENU = None)"]; + + // Initial state indicator + start [shape=point, fillcolor=black]; + start -> Normal; + + // Transitions + Normal -> EmptyFilter [label="/ (slash)\nAction: Initialize empty filter"]; + Normal -> exit [label="ESC\nAction: Clear selection"]; + + EmptyFilter -> Normal [label="ESC\nAction: draw_footer('')"]; + EmptyFilter -> FilledFilter [label="alnum or -\nActions:\n• Add char to filter\n• Create filtered_rows\n• Update display"]; + + FilledFilter -> Normal [label="ESC\nActions:\n• Clear filter\n• Reset filtered_rows\n• Update display"]; + FilledFilter -> EmptyFilter [label="BACKSPACE\n(when filter empty)\nAction: draw_footer('/')"]; + FilledFilter -> FilledFilter [label="alnum or -\nActions:\n• Add char to filter\n• Update filtered_rows\n• Update display"]; + + // State-independent actions note + subgraph cluster_notes { + label="Notes"; + style=filled; + fillcolor=lightyellow; + node [style=filled, fillcolor=white]; + note1 [label="State-Independent Actions:\n• Vertical/Horizontal navigation\n• Available in all states\n• Handled by handle_state_independent_input()"]; + note2 [label="Display Updates:\n• draw_menu_with_filter()\n- Called when visible rows change\n• draw_footer()\n- Called for filter updates"]; + note3 [label="Filtering:\n• Uses CircularList for row management\n• Filters are case-insensitive\n• Supports alphanumeric and '-' chars"]; + } +} \ No newline at end of file diff --git a/diagrams/draft.dot b/diagrams/draft.dot new file mode 100644 index 0000000..5d63fa5 --- /dev/null +++ b/diagrams/draft.dot @@ -0,0 +1,21 @@ +digraph { + newrank=true; + subgraph cluster_menu_states { + label="Menu States" + color=lightgrey; + style=filled; + node [style=filled,color=white]; + Normal -> EmptyFilter [label = "S"]; + EmptyFilter -> FilledFilter [label = "A"]; + EmptyFilter -> Normal [label = "E"]; + FilledFilter:ne -> Normal:se [label = "E"]; + FilledFilter -> EmptyFilter [label = "B"]; + FilledFilter:s -> FilledFilter:s [label = "B"]; + FilledFilter:e -> FilledFilter:e [label = "A"]; + FilledFilter:w -> FilledFilter:w [label = "V"]; + EmptyFilter:w -> EmptyFilter:w [label = "V"]; + Normal:w -> Normal:w [label = "V"]; + } + { rank=same; Normal; Exit; } + Normal -> Exit [label = "E"]; +} \ No newline at end of file diff --git a/diagrams/menu_lifecycle.dot b/diagrams/menu_lifecycle.dot new file mode 100644 index 0000000..3a63d8f --- /dev/null +++ b/diagrams/menu_lifecycle.dot @@ -0,0 +1,57 @@ +digraph fsm { + + // Create states cluster for l1, l2, l3 + subgraph cluster_states { +rankdir="LR" + label = "States"; + style=filled; + color=lightgrey; + node [style=filled,color=white]; + + l1 [label = "Normal state"]; + l2 [label = "Filter state\n(empty filter)"]; + l3 [label = "Filter state\n(non-empty filter)"]; + } + + // Other nodes + exit [label = "Exit"]; + i1 [label = "Other menu"]; + e1 [label = "External program"]; + + + // Transitions + l1 -> exit [label = "E"]; + + // Mode switches + l1 -> l2 [label = "S"]; + l2 -> l3 [label = "A"]; + l2 -> l1 [label = "E"]; + l3 -> l1 [label = "E"]; + l3 -> l2 [label = "B"]; + + // Horizontal navigation + l1 -> i1 [label = "H"]; + l2 -> i1 [label = "H"]; + l3 -> i1 [label = "H"]; + + // External program + l1 -> e1 [label = "K"]; + l2 -> e1 [label = "K"]; + l3 -> e1 [label = "K"]; + + // Self-loops + l1 -> l1 [label = "V"]; + l2 -> l2 [label = "V"]; + l3 -> l3 [label = "V"]; + l3 -> l3 [label = "A"]; + l3 -> l3 [label = "B"]; + + label = "Keys:\n\ +S - Slash (start filter) \l\ +E - Escape \l\ +A - Type text \l\ +B - Backspace \l\ +H - Horizontal navigation (Right,Left,Tab) \l\ +V - Vertical navigation (Up, Down) \l\ +K - Key binding \l" +} \ No newline at end of file diff --git a/kls b/kls index 8cd95b6..cadb9c0 100755 --- a/kls +++ b/kls @@ -1,84 +1,86 @@ #!/usr/bin/env python3 -from typing import Optional, Callable +from typing import Optional, Callable, Self import subprocess import curses import curses.ascii import asyncio - +from enum import Enum, auto SCREEN: curses.window = curses.initscr() - # ****************************** # # START OF CONFIGURATION SECTION # # ****************************** # KEY_BINDINGS: dict[str, dict[str, str]] = { # can be extended "^Y": { # Ctrl + y "description": "Yaml", - "command": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml' + "command": "kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml", + "kind": "all", # this key binding is valid for all api resources }, "^D": { # Ctrl + d "description": "Describe", - "command": 'kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml' + "command": "kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml", + "kind": "all", }, "^E": { # Ctrl + e "description": "Edit", - "command": 'kubectl -n {namespace} edit {api_resource} {resource}' + "command": "kubectl -n {namespace} edit {api_resource} {resource}", + "kind": "all", }, "^L": { # Ctrl + l "description": "Logs", - "command": 'kubectl -n {namespace} logs {resource} | lnav' + "command": "kubectl -n {namespace} logs {resource} | lnav", + "kind": "pods", # this key binding is valid for pods only }, "^X": { # Ctrl + x "description": "eXec", - "command": 'kubectl -n {namespace} exec -it {resource} sh' + "command": "kubectl -n {namespace} exec -it {resource} sh", + "kind": "pods", }, "^N": { # Ctrl + n "description": "Network debug", - "command": 'kubectl -n {namespace} debug {resource} -it --image=nicolaka/netshoot' + "command": "kubectl -n {namespace} debug {resource} -it --image=nicolaka/netshoot", + "kind": "pods", }, "^A": { # Ctrl + a (a means Access logs! :-)) "description": "istio-proxy Access logs", - "command": 'kubectl -n {namespace} logs {resource} -c istio-proxy | lnav' + "command": "kubectl -n {namespace} logs {resource} -c istio-proxy | lnav", + "kind": "pods", }, "^P": { # Ctrl + p (p means Proxy! :-)) "description": "exec istio-Proxy", - "command": 'kubectl -n {namespace} exec -it {resource} -c istio-proxy bash' + "command": "kubectl -n {namespace} exec -it {resource} -c istio-proxy bash", + "kind": "pods", }, "^R": { # Ctrl + r (r means Reveal! :-)) "description": "Reveal secret", - "command": "kubectl get secret {resource} -n {namespace} -o yaml | yq '.data |= with_entries(.value |= @base64d)' -y | batcat -l yaml" + "command": "kubectl get secret {resource} -n {namespace} -o yaml" + " | yq '.data |= with_entries(.value |= @base64d)' -y | batcat -l yaml", + "kind": "secrets", }, "Delete": { # It is actually KEY_DC "description": "Delete", - "command": 'kubectl -n {namespace} delete {api_resource} {resource}' - } + "command": "kubectl -n {namespace} delete {api_resource} {resource}", + "kind": "all", + }, } -# which api resources are on the top of menu? -TOP_API_RESOURCES: list[str] = [ - "pods", "services", "configmaps", "secrets", "persistentvolumeclaims", - "ingresses", "nodes", "deployments", "statefulsets", "daemonsets", - "storageclasses", "serviceentries", "destinationrules", "authorizationpolicies", - "virtualservices", "gateways", "telemetry", "envoyfilters" -] - -QUERY_API_RESOURCES: bool = False # Should we merge TOP_API_RESOURCES with all other api resources from cluster? BATCAT_STYLE: str = " --paging always --style numbers" # style of batcat +SELECTED_ROW_STYLE = curses.A_REVERSE | curses.A_BOLD MOUSE_ENABLED: bool = False WIDTH: int = curses.COLS -WIDTH_UNIT: int = int(WIDTH / 8) +WIDTH_UNIT: int = int(WIDTH / 10) +CONTEXTS_WIDTH = int(WIDTH_UNIT * 1.5) NAMESPACES_WIDTH = int(WIDTH_UNIT * 1.5) API_RESOURCES_WIDTH = int(WIDTH_UNIT * 1.5) -RESOURCES_WIDTH = WIDTH - (API_RESOURCES_WIDTH + NAMESPACES_WIDTH) +RESOURCES_WIDTH = WIDTH - (CONTEXTS_WIDTH + NAMESPACES_WIDTH + API_RESOURCES_WIDTH) HEADER_HEIGHT: int = 4 FOOTER_HEIGHT: int = 3 ROWS_HEIGHT: int = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3 # Generate HELP_TEXT from KEY_BINDINGS HELP_TEXT: str = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items()) -HELP_TEXT += ", /: filter mode, Esc: exit filter mode or kls, arrows/TAB/PgUp/PgDn: navigation" -SELECTED_ROW_STYLE = curses.A_REVERSE | curses.A_BOLD -ALLOWED_SPECIAL_KEYS = list(KEY_BINDINGS.keys()) + ["KEY_DC", "/", "\x1b", "KEY_BACKSPACE", "\x08", "KEY_MOUSE", "KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END", "\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"] +HELP_TEXT += ", /: filter mode, Esc: exit filter mode or kls, arrows/TAB: navigation" + # **************************** # # END OF CONFIGURATION SECTION # @@ -86,41 +88,106 @@ ALLOWED_SPECIAL_KEYS = list(KEY_BINDINGS.keys()) + ["KEY_DC", "/", "\x1b", "KEY_ class CircularList: - def __init__(self, elements: list[str]): - self.elements: list[str] = elements - self.size: int = len(elements) + def __init__(self, items: list[str]): + self.items: list[str] = items + self.size: int = len(items) self.index: int = 0 def __getitem__(self, index: slice) -> list[str]: start, stop, step = index.indices(self.size) - return [self.elements[(self.index + i) % self.size] for i in range(start, stop, step)] + return [self.items[(self.index + i) % self.size] for i in range(start, stop, step)] def shift(self, steps: int) -> None: self.index = (self.index + steps) % self.size +class MenuState(Enum): + NORMAL = auto() + EMPTY_FILTER = auto() + FILLED_FILTER = auto() + + class Menu: - def __init__(self, title: str, rows: list[str], begin_x: int|float, width: int|float, rows_height: int): + selected = None # Class variable to track selected object + + def __init__( + self, + title: str, + rows: list[str], + begin_x: int, + width: int, + ): self.title: str = title self.rows: list[str] = rows self.filter: str = "" - self.filter_mode: bool = False + self.state: MenuState = MenuState.NORMAL self.filtered_rows: CircularList = CircularList([x for x in self.rows if self.filter in x]) - self.visible_rows: Callable[[], list[str]] = lambda: self.filtered_rows[:rows_height] + self.visible_rows: Callable[[], list[str]] = lambda: self.filtered_rows[:ROWS_HEIGHT] self.visible_row_index: int = 0 - self.selected_row: Callable[[], Optional[str]] = lambda: self.visible_rows()[ - self.visible_row_index] if self.visible_rows() else None - self.rows_height: int = rows_height + self.selected_row: Callable[[], Optional[str]] = ( + lambda: self.visible_rows()[self.visible_row_index] if self.visible_rows() else None + ) self.width: int = int(width) self.begin_x: int = int(begin_x) self.win: curses.window = curses.newwin(curses.LINES - FOOTER_HEIGHT, self.width, 0, self.begin_x) + self.dependent_menus: list[Self] = [] + + def refresh_filtered_rows(self): + self.filtered_rows = CircularList([x for x in self.rows if self.filter in x]) + + async def set_state(self, state: MenuState) -> None: + self.state = state + # entry activities + match self.state: + case MenuState.NORMAL: + self.filter = "" + self.draw_menu_or_footer("") + await self.refresh_dependent_menus() + case MenuState.EMPTY_FILTER: + self.filter = "" + self.draw_menu_or_footer("/") + await self.refresh_dependent_menus() + case MenuState.FILLED_FILTER: + self.draw_menu_or_footer(f"/{self.filter}") # if redrawing whole menu is not needed + await self.refresh_dependent_menus() + + def draw_rows(self) -> None: + for index, row in enumerate(self.visible_rows()): + draw_row(self.win, row, index + HEADER_HEIGHT, 2, selected=row == self.selected_row()) + + def draw_menu_with_footer(self) -> None: + self.win.erase() + draw_row(self.win, self.title, 1, 2, selected=self == Menu.selected) + self.draw_rows() + draw_row( + self.win, + f"/{self.filter}" if self.state in [MenuState.EMPTY_FILTER, MenuState.FILLED_FILTER] else "", + curses.LINES - FOOTER_HEIGHT - 2, + 2, + ) + + def draw_menu_or_footer(self, footer_text: str) -> None: + previous_visible_rows = self.visible_rows() + self.refresh_filtered_rows() + if self.visible_rows() != previous_visible_rows: # draw whole menu + self.visible_row_index = 0 + self.draw_menu_with_footer() + self.refresh_dependent_menus() + else: # draw footer only + draw_row(self.win, footer_text, curses.LINES - FOOTER_HEIGHT - 2, 2) + + def set_dependent_menus(self, menus: list[Self]) -> None: + self.dependent_menus = menus + + async def refresh_dependent_menus(self): + for menu in self.dependent_menus: + await refresh_menu(menu) # Global variables -THIRD_MENU_LOCK: asyncio.Lock = asyncio.Lock() -THIRD_MENU_TASK: Optional[asyncio.Task] = None -menus: list[Menu] = [] -selected_menu: Optional[Menu] = None +FOURTH_MENU_LOCK: asyncio.Lock = asyncio.Lock() +FOURTH_MENU_TASK: Optional[asyncio.Task] = None +MENUS: list[Menu] = [] def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = False) -> None: @@ -129,274 +196,66 @@ def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = window.refresh() -def draw_rows(menu: Menu) -> None: - for index, row in enumerate(menu.visible_rows()): - draw_row(menu.win, row, index + HEADER_HEIGHT, 2, selected = row == menu.selected_row()) +async def refresh_menu(menu: Menu) -> None: + if menu == MENUS[1]: + menu.rows = await get_namespaces() + elif menu == MENUS[2]: + menu.rows = await get_api_resources() + menu.refresh_filtered_rows() + menu.visible_row_index = 0 + menu.draw_menu_with_footer() -def draw_menu(menu: Menu) -> None: - menu.win.erase() - draw_row(menu.win, menu.title, 1, 2, selected = menu == selected_menu) - draw_rows(menu) - draw_row(menu.win, f"/{menu.filter}" if menu.filter_mode else "", curses.LINES - FOOTER_HEIGHT - 2, 2) - - -async def refresh_third_menu(namespace: Optional[str], api_resource: Optional[str]) -> None: - global THIRD_MENU_TASK +async def refresh_resources_menu(namespace: Optional[str], api_resource: Optional[str]) -> None: try: - async with THIRD_MENU_LOCK: - menu = menus[2] + async with FOURTH_MENU_LOCK: + menu = MENUS[3] previous_menu_rows = menu.rows if api_resource and namespace: try: menu.rows = await kubectl_async( - f"-n {namespace} get {api_resource} --no-headers --ignore-not-found") + f"-n {namespace} get {api_resource} --no-headers --ignore-not-found --sort-by='{{.metadata.name}}'" + ) except subprocess.CalledProcessError: menu.rows = [] else: menu.rows = [] index_before_update = menu.filtered_rows.index - menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x]) + menu.refresh_filtered_rows() menu.filtered_rows.index = index_before_update if menu.visible_row_index >= len(menu.visible_rows()): menu.visible_row_index = 0 if previous_menu_rows != menu.rows: - draw_menu(menu) + menu.draw_menu_with_footer() except asyncio.CancelledError: raise -async def handle_key_bindings(key: str, namespace: str, api_resource: str, resource: str) -> None: - if not resource: - return - if key in ("l", "x", "n") and api_resource != "pods": - return - if key == "KEY_DC": - key = "Delete" - if THIRD_MENU_TASK is not None: - THIRD_MENU_TASK.cancel() - try: - await THIRD_MENU_TASK - except asyncio.CancelledError: - pass - async with THIRD_MENU_LOCK: - curses.def_prog_mode() - curses.endwin() - command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource) - if "batcat" in command: - command += BATCAT_STYLE - await subprocess_call_async(command) - curses.reset_prog_mode() - SCREEN.refresh() - enable_mouse_support() +async def get_contexts() -> list[str]: + try: + current_context = await kubectl_async("config current-context") + contexts = await kubectl_async("config get-contexts --no-headers -o name") + contexts.remove(current_context[0]) + contexts.insert(0, current_context[0]) + return [line.split()[0] for line in contexts if line.strip()] + except subprocess.CalledProcessError: + return [] -def handle_filter_state(key: str, menu: Menu) -> None: - global selected_menu - if key == "/" and not menu.filter_mode: - menu.filter_mode = True - menu.filter = "" - elif key == "\x1b": # Escape key - if menu.filter_mode: - menu.filter_mode = False - menu.filter = "" - else: - selected_menu = None - elif menu.filter_mode: - if key in ["KEY_BACKSPACE", "\x08"] and menu.filter: - menu.filter = menu.filter[:-1] # Remove last character - elif key.isalnum() or key == "-": # Allow letters, numbers, and dashes - menu.filter += key.lower() - menu.visible_row_index = 0 - menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x]) - draw_menu(menu) - if menu != menus[2]: - menus[2].visible_row_index = 0 - - -def handle_mouse(menu: Menu) -> None: - if not MOUSE_ENABLED: +async def switch_context(context: str) -> None: + if not context: return try: - mouse_info: tuple[int, ...] = curses.getmouse() - except curses.error: # this fixes scrolling error - return - row_number = mouse_info[2] - HEADER_HEIGHT - column_number = mouse_info[1] - next_menu: Optional[Menu] = None - if column_number > (menu.begin_x + menu.width): - next_menu = menus[(menus.index(menu) + 1) % 3] - if column_number > (next_menu.begin_x + next_menu.width): - next_menu = menus[(menus.index(next_menu) + 1) % 3] - globals().update(selected_menu=next_menu) - elif column_number < menu.begin_x: - next_menu = menus[(menus.index(menu) - 1) % 3] - if column_number < next_menu.begin_x: - next_menu = menus[(menus.index(next_menu) - 1) % 3] - globals().update(selected_menu=next_menu) - if next_menu: - draw_row(menu.win, menu.title, 1, 2, selected=False) - draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) - menu = next_menu - char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1) - char_str = chr(char_int & 0xFF) - if not char_str or ord(char_str) > 127 or ' ' in char_str: - return - if 0 <= row_number < len(menu.visible_rows()): - menu.visible_row_index = row_number - draw_rows(menu) - if menu != menus[2]: - menus[2].visible_row_index = 0 + await kubectl_async(f"config use-context {context}") + except subprocess.CalledProcessError: + pass -def handle_vertical_navigation(key: str, menu: Menu) -> None: - if len(menu.visible_rows()) <= 1: - return - keys_numbers: dict[str, int] = { - "KEY_DOWN": 1, "KEY_UP": -1, - "KEY_NPAGE": 1, "KEY_PPAGE": -1, - 'KEY_HOME': 0, 'KEY_END': -1 - } - if key in ["KEY_DOWN", "KEY_UP"]: - if menu.filtered_rows.size > menu.rows_height: - menu.filtered_rows.shift(keys_numbers[key]) - else: - menu.visible_row_index = (menu.visible_row_index + keys_numbers[key]) % menu.filtered_rows.size - elif key in ["KEY_NPAGE", "KEY_PPAGE"]: - menu.filtered_rows.shift(keys_numbers[key] * len(menu.visible_rows())) - elif key in ['KEY_HOME', 'KEY_END']: - menu.visible_row_index = keys_numbers[key] - draw_rows(menu) - if menu != menus[2]: - menus[2].visible_row_index = 0 - - -def handle_horizontal_navigation(key: str, menu: Menu) -> None: - increment: int = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key] - next_menu = menus[(menus.index(menu) + increment) % 3] - draw_row(menu.win, menu.title, 1, 2, selected=False) - draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) - globals().update(selected_menu=next_menu) - - -async def confirm_action(message: str) -> bool: - rows, cols = SCREEN.getmaxyx() - popup_height = 5 - popup_width = len(message) + 10 - start_y = (rows - popup_height) // 2 - start_x = (cols - popup_width) // 2 - - popup = curses.newwin(popup_height, popup_width, start_y, start_x) - popup.box() - popup.addstr(2, 2, message) - popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel") - - popup.refresh() - while True: - key = await get_key_async(popup) - if key.lower() == 'y': - return True - if key.lower() == 'n': - popup.clear() - popup.refresh() - return False - - -async def get_key_async(popup: curses.window) -> str: - return await asyncio.to_thread(popup.getkey) - - -async def kubectl_async(command: str) -> list[str]: - process = await asyncio.create_subprocess_shell( - f"kubectl {command} 2> /dev/null", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - stdout, stderr = await process.communicate() - if stderr: - raise subprocess.CalledProcessError(process.returncode, command, stderr=stderr) - return stdout.decode().strip().split("\n") - - -async def catch_input(menu: Menu) -> None: - global THIRD_MENU_TASK - while True: - try: - key = await get_key_async(SCREEN) - break - except curses.error: - if THIRD_MENU_TASK is None or THIRD_MENU_TASK.done() or THIRD_MENU_TASK.cancelled(): - THIRD_MENU_TASK = asyncio.create_task( - refresh_third_menu( - menus[0].selected_row(), - menus[1].selected_row() - ) - ) - await asyncio.sleep(0.1) - # Convert control keys to their string representation (e.g., Ctrl+Y -> ^Y) - # Handle special keys (e.g., "KEY_UP", "KEY_DC") - if key.startswith("KEY_") or key == "\x1b": - key_str = key # Use the key as-is for special keys - else: - # Handle single-character keys (e.g., "a", "^Y") - key_str = curses.ascii.unctrl(key) if curses.ascii.iscntrl(ord(key)) else key - # Check if the key is allowed - if key_str not in ALLOWED_SPECIAL_KEYS and not key.isalnum(): - return # Ignore the key if it's not in the allowed list or not alphanumeric - if key.isalnum() and not menu.filter_mode: - return - - if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]: - handle_horizontal_navigation(key, menu) - elif key in ["KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END"]: - if THIRD_MENU_TASK is not None: - THIRD_MENU_TASK.cancel() - try: - await THIRD_MENU_TASK - except asyncio.CancelledError: - pass - handle_vertical_navigation(key, menu) - elif key == "KEY_MOUSE": - handle_mouse(menu) - elif key == "KEY_DC" and await confirm_action("Are you sure you want to delete this resource?"): - await handle_key_bindings( - key, - menus[0].selected_row(), - menus[1].selected_row(), - menus[2].selected_row() and menus[2].selected_row().split()[0] - ) - elif key in ["/", "\x1b", "KEY_BACKSPACE", "\x08"] or key.isalnum() or key == "-": - handle_filter_state(key, menu) - elif key != "KEY_DC" and curses.ascii.unctrl(key) in KEY_BINDINGS: - await handle_key_bindings( - curses.ascii.unctrl(key), - menus[0].selected_row(), - menus[1].selected_row(), - menus[2].selected_row() and menus[2].selected_row().split()[0] - ) - - -async def subprocess_call_async(command: str) -> None: - process = await asyncio.create_subprocess_shell(command) - await process.communicate() - - -def enable_mouse_support() -> None: - if MOUSE_ENABLED: - curses.mousemask(curses.REPORT_MOUSE_POSITION) - print('\033[?1003h') - - -async def init_menus() -> None: - global menus, selected_menu - api_resources_kubectl: list[str] = [ - x.split()[0] for x in await kubectl_async("api-resources --no-headers --verbs=get") - ] - api_resources = list( - dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl) - ) if QUERY_API_RESOURCES else TOP_API_RESOURCES - +async def get_namespaces() -> list[str]: namespaces: list[str] = [] + context = MENUS[0].selected_row() and MENUS[0].selected_row().split()[0] + if not context: + return namespaces try: namespaces = await kubectl_async("config view --minify --output 'jsonpath={..namespace}'") except: @@ -412,14 +271,186 @@ async def init_menus() -> None: namespaces = all_namespaces except: pass + return namespaces - menus = [ - Menu("Namespaces", namespaces, 0, NAMESPACES_WIDTH, ROWS_HEIGHT), - Menu("API resources", api_resources, NAMESPACES_WIDTH, API_RESOURCES_WIDTH, ROWS_HEIGHT), - Menu("Resources", [], NAMESPACES_WIDTH + API_RESOURCES_WIDTH, RESOURCES_WIDTH, ROWS_HEIGHT) - ] - selected_menu = menus[0] +async def get_api_resources() -> list[str]: + try: + api_resources = await kubectl_async("api-resources --no-headers --verbs=get") + return sorted(list(set([x.split()[0] for x in api_resources]))) # dedup + except subprocess.CalledProcessError: + return [] + + +async def handle_key_bindings(key: str) -> None: + api_resource = MENUS[2].selected_row() + if KEY_BINDINGS[key]["kind"] != api_resource and KEY_BINDINGS[key]["kind"] != "all": + return + resource = MENUS[3].selected_row() and MENUS[3].selected_row().split()[0] + if not resource: + return + namespace = MENUS[1].selected_row() + if key == "KEY_DC": + key = "Delete" + await cancel_resources_refreshing() + async with FOURTH_MENU_LOCK: + curses.def_prog_mode() + curses.endwin() + command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource) + if "batcat" in command: + command += BATCAT_STYLE + await subprocess_call_async(command) + curses.reset_prog_mode() + SCREEN.refresh() + enable_mouse_support() + + +def handle_mouse(menu: Menu) -> None: + if not MOUSE_ENABLED: + return + try: + mouse_info: tuple[int, ...] = curses.getmouse() + except curses.error: # this fixes scrolling error + return + row_number = mouse_info[2] - HEADER_HEIGHT + column_number = mouse_info[1] + next_menu: Optional[Menu] = None + if column_number > (menu.begin_x + menu.width): + next_menu = MENUS[(MENUS.index(menu) + 1) % MENUS.__len__()] + if column_number > (next_menu.begin_x + next_menu.width): + next_menu = MENUS[(MENUS.index(next_menu) + 1) % MENUS.__len__()] + Menu.selected = next_menu + elif column_number < menu.begin_x: + next_menu = MENUS[(MENUS.index(menu) - 1) % MENUS.__len__()] + if column_number < next_menu.begin_x: + next_menu = MENUS[(MENUS.index(next_menu) - 1) % MENUS.__len__()] + Menu.selected = next_menu + if next_menu: + draw_row(menu.win, menu.title, 1, 2, selected=False) + draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) + menu = next_menu + char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1) + char_str = chr(char_int & 0xFF) + if not char_str or ord(char_str) > 127 or " " in char_str: + return + if 0 <= row_number < len(menu.visible_rows()): + menu.visible_row_index = row_number + menu.draw_rows() + menu.refresh_dependent_menus() + + +async def move_selection_vertically(key: str, menu: Menu) -> None: + if len(menu.visible_rows()) <= 1: + return + keys_numbers: dict[str, int] = {"KEY_DOWN": 1, "KEY_UP": -1} + if menu.filtered_rows.size > ROWS_HEIGHT: + menu.filtered_rows.shift(keys_numbers[key]) + else: + menu.visible_row_index = (menu.visible_row_index + keys_numbers[key]) % menu.filtered_rows.size + menu.draw_rows() + + +def move_selection_horizontally(key: str, menu: Menu) -> None: + increment: int = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key] + next_menu = MENUS[(MENUS.index(menu) + increment) % MENUS.__len__()] + draw_row(menu.win, menu.title, 1, 2, selected=False) + draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) + Menu.selected = next_menu + + +def confirm_action(message: str) -> bool: + rows, cols = SCREEN.getmaxyx() + popup_height = 5 + popup_width = len(message) + 10 + start_y = (rows - popup_height) // 2 + start_x = (cols - popup_width) // 2 + + popup = curses.newwin(popup_height, popup_width, start_y, start_x) + popup.box() + popup.addstr(2, 2, message) + popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel") + + popup.refresh() + while True: + key = popup.getkey() + if key.lower() == "y": + confirm = True + elif key.lower() == "n": + confirm = False + else: + continue + popup.clear() + popup.refresh() + return confirm + + +async def cancel_resources_refreshing() -> None: + if not (FOURTH_MENU_TASK is None or FOURTH_MENU_TASK.done()): + FOURTH_MENU_TASK.cancel() + try: + await FOURTH_MENU_TASK + except asyncio.CancelledError: + pass + + +async def kubectl_async(command: str) -> list[str]: + process = await asyncio.create_subprocess_shell( + f"kubectl {command}", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await process.communicate() + if stderr: + raise subprocess.CalledProcessError(process.returncode, command, stderr=stderr) + return stdout.decode().strip().split("\n") + + +async def handle_state_independent_input(menu: Menu, key: str) -> None: + if key in ["KEY_UP", "KEY_DOWN"]: # V (Vertical navigation) + if len(menu.visible_rows()) > 1: + await cancel_resources_refreshing() + await move_selection_vertically(key, menu) + if menu == MENUS[0]: + await switch_context(menu.selected_row()) + await menu.refresh_dependent_menus() + elif key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]: # H (Vertical navigation) + move_selection_horizontally(key, menu) + elif key == "KEY_MOUSE": + handle_mouse(menu) + elif key == "KEY_DC": + if not MENUS[3].selected_row(): + return + if confirm_action("Are you sure you want to delete this resource?"): + await handle_key_bindings(key) + elif not key.startswith("KEY_") and curses.ascii.unctrl(key) in KEY_BINDINGS: # K (Key Bindings) + await handle_key_bindings(curses.ascii.unctrl(key)) + + +async def subprocess_call_async(command: str) -> None: + process = await asyncio.create_subprocess_shell(command) + await process.communicate() + + +def enable_mouse_support() -> None: + curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION) + if MOUSE_ENABLED: + print("\033[?1003h") + + +async def init_menus() -> list[Menu]: + MENUS.append(Menu("Contexts", await get_contexts(), 0, CONTEXTS_WIDTH)) + MENUS.append(Menu("Namespaces", await get_namespaces(), CONTEXTS_WIDTH, NAMESPACES_WIDTH)) + MENUS.append( + Menu("API resources", await get_api_resources(), CONTEXTS_WIDTH + NAMESPACES_WIDTH, API_RESOURCES_WIDTH) + ) + MENUS.append(Menu("Resources", [], CONTEXTS_WIDTH + NAMESPACES_WIDTH + API_RESOURCES_WIDTH, RESOURCES_WIDTH)) + return MENUS + + +async def main_async() -> None: + global MENUS, FOURTH_MENU_TASK + MENUS = await init_menus() + Menu.selected = MENUS[0] SCREEN.refresh() SCREEN.nodelay(True) SCREEN.keypad(True) @@ -428,18 +459,56 @@ async def init_menus() -> None: curses.use_default_colors() curses.noecho() enable_mouse_support() + for index, menu in enumerate(MENUS): + menu.draw_menu_with_footer() + menu.set_dependent_menus(MENUS[index + 1 :]) + draw_row(curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), HELP_TEXT, 1, 2) + while True: + menu = Menu.selected + try: + key = SCREEN.getkey() + except curses.error: + if FOURTH_MENU_TASK is None or FOURTH_MENU_TASK.done(): + FOURTH_MENU_TASK = asyncio.create_task( + refresh_resources_menu(MENUS[1].selected_row(), MENUS[2].selected_row()) + ) + await asyncio.sleep(0.01) + continue + # handle state-dependent keys + match menu.state: + case MenuState.NORMAL: + if key == "\x1b": # E (Escape) + break # Exit + elif key == "/": # S (Slash) + await menu.set_state(MenuState.EMPTY_FILTER) # Transition to EmptyFilter state + continue + case MenuState.EMPTY_FILTER: + if key == "\x1b": # E (Escape) + await menu.set_state(MenuState.NORMAL) # Transition to Normal state + continue + elif key.isalnum() or key == "-": # A (Type text) + menu.filter += key.lower() + await menu.set_state(MenuState.FILLED_FILTER) # Transition to FilledFilter state + continue + case MenuState.FILLED_FILTER: # FilledFilter state + if key == "\x1b": # E (Escape) + await menu.set_state(MenuState.NORMAL) # Transition to Normal state + continue + elif key in ["KEY_BACKSPACE", "\x08"]: # B (Backspace) + if len(menu.filter) == 1: + await menu.set_state(MenuState.EMPTY_FILTER) # Transition to EmptyFilter state + continue + menu.filter = menu.filter[:-1] + menu.draw_menu_or_footer(f"/{menu.filter}") + continue + elif key.isalnum() or key == "-": # A (Type text) + menu.filter += key.lower() # Stay in FilledFilter state + menu.draw_menu_or_footer(f"/{menu.filter}") + continue -async def main_async() -> None: - await init_menus() - for menu in menus: - draw_menu(menu) - draw_row( - curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), - HELP_TEXT, 1, 2 - ) - while selected_menu: - await catch_input(selected_menu) + # handle state-independent keys (Vertical/Horizontal navigation etc. available in all states) + await handle_state_independent_input(menu, key) def main(screen: curses.window) -> None: diff --git a/tests/manual_tests.txt b/tests/manual_tests.txt new file mode 100644 index 0000000..4f5bb0b --- /dev/null +++ b/tests/manual_tests.txt @@ -0,0 +1,2 @@ +1. Test all key bindings on pods, services and secrets +2. Test filters diff --git a/tests.py b/tests/tests.py similarity index 100% rename from tests.py rename to tests/tests.py