#!/usr/bin/env python3 import subprocess import curses import curses.ascii import asyncio KEY_BINDINGS = { # can be extended "^Y": { "description": "View resource in YAML format", "command": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml' }, "^D": { "description": "Describe resource", "command": 'kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml' }, "^E": { "description": "Edit resource", "command": 'kubectl -n {namespace} edit {api_resource} {resource}' }, "^L": { "description": "View logs", "command": 'kubectl -n {namespace} logs {resource} | batcat -l log' }, "^X": { "description": "Exec into pod", "command": 'kubectl -n {namespace} exec -it {resource} sh' }, "^N": { "description": "Network debug", "command": 'kubectl -n {namespace} debug {resource} -it --image=nicolaka/netshoot' }, "KEY_DC": { "description": "Delete resource", "command": 'kubectl -n {namespace} delete {api_resource} {resource}' } } BATCAT_STYLE = " --paging always --style numbers" # which api resources are on the top of menu? TOP_API_RESOURCES = ["pods", "services", "configmaps", "secrets", "persistentvolumeclaims", "ingresses", "nodes", "deployments", "statefulsets", "daemonsets", "storageclasses", "serviceentries", "destinationrules", "virtualservices", "gateways"] # Dynamically generate HELP_TEXT based on KEY_BINDINGS descriptions HELP_TEXT = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items()) HELP_TEXT += ", /: filter mode, Esc: exit filter mode or exit kls, arrows/TAB/PgUp/PgDn: navigation" MOUSE_ENABLED = False SCREEN = curses.initscr() # screen initialization, needed for ROWS_HEIGHT working HEADER_HEIGHT = 4 # in rows FOOTER_HEIGHT = 3 ROWS_HEIGHT = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3 # maximum number of visible rows indices WIDTH = curses.COLS QUERY_API_RESOURCES = False # classes class CircularList: def __init__(self, elements): self.elements = elements self.size = len(elements) self.index = 0 def __getitem__(self, index): start, stop, step = index.indices(self.size) return [self.elements[(self.index + i) % self.size] for i in range(start, stop, step)] def shift(self, steps): self.index = (self.index + steps) % self.size class Menu: def __init__(self, title: str, rows: list, begin_x: int, width: int, rows_height: int): self.title = title self.rows = rows # all rows self.filter = "" # filter for rows self.filter_mode = False # Tracks whether filter mode is active self.filtered_rows = CircularList([x for x in self.rows if self.filter in x]) # filtered rows self.visible_rows = lambda: self.filtered_rows[:rows_height] # visible rows self.visible_row_index = 0 # index of the selected visible row self.selected_row = lambda: self.visible_rows()[self.visible_row_index] if self.visible_rows() else None self.rows_height = rows_height self.width = width self.begin_x = begin_x self.win = curses.newwin(curses.LINES - FOOTER_HEIGHT, width, 0, begin_x) # helper functions def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = False): window.addstr(y, x, text, curses.A_REVERSE | curses.A_BOLD if selected else curses.A_NORMAL) window.clrtoeol() window.refresh() def draw_rows(menu: Menu): for index, row in enumerate(menu.visible_rows()): draw_row(menu.win, row, index + HEADER_HEIGHT, 2, selected=True if row == menu.selected_row() else False) def draw_menu(menu: Menu): menu.win.erase() # clear menu window draw_row(menu.win, menu.title, 1, 2, selected=True if menu == selected_menu else False) # draw title draw_rows(menu) # draw menu rows draw_row(menu.win, f"/{menu.filter}" if menu.filter_mode else "", curses.LINES - FOOTER_HEIGHT - 2, 2) # draw filter row async def refresh_third_menu(namespace, api_resource): menu = menus[2] previous_menu_rows = menu.rows if api_resource and namespace: try: menu.rows = await kubectl_async(f"-n {namespace} get {api_resource} --no-headers --ignore-not-found") except subprocess.CalledProcessError: menu.rows = [] # Fallback to an empty list if the command fails else: menu.rows = [] index_before_update = menu.filtered_rows.index menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x]) # update filtered rows menu.filtered_rows.index = index_before_update if menu.visible_row_index >= len(menu.visible_rows()): menu.visible_row_index = 0 if previous_menu_rows != menu.rows: draw_menu(menu) async def handle_key_bindings(key: str, namespace: str, api_resource: str, resource: str): if not resource: return if key in ("l", "x", "n") and api_resource != "pods" and not resource.startswith("pod/"): return curses.def_prog_mode() # save the previous terminal state curses.endwin() # without this, there are problems after exiting vim command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource) if "batcat" in command: command += BATCAT_STYLE await subprocess_call_async(command) curses.reset_prog_mode() # restore the previous terminal state SCREEN.refresh() enable_mouse_support() async def handle_filter_state(key: str, menu: Menu): if key == "/" and not menu.filter_mode: # Enter filter mode menu.filter_mode = True menu.filter = "" elif key == "\x1b": # Escape key if menu.filter_mode: # Exit filter mode menu.filter_mode = False menu.filter = "" else: globals().update(selected_menu=None) # Exit program elif menu.filter_mode: # Only process filter input in filter mode if key in ["KEY_BACKSPACE", "\x08"]: menu.filter = menu.filter[:-1] # Remove last character elif key.isalnum() or key == "-": # Allow letters, numbers, and dashes menu.filter += key.lower() menu.visible_row_index = 0 menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x]) # update filtered rows draw_menu(menu) if menu != menus[2]: menus[2].visible_row_index = 0 # reset the visible row index of third menu before redrawing async def handle_mouse(menu: Menu): if not MOUSE_ENABLED: return try: mouse_info = curses.getmouse() except curses.error: # this fixes scrolling error return row_number = mouse_info[2] - HEADER_HEIGHT column_number = mouse_info[1] next_menu = None if column_number > (menu.begin_x + menu.width): next_menu = menus[(menus.index(menu) + 1) % 3] if column_number > (next_menu.begin_x + next_menu.width): next_menu = menus[(menus.index(next_menu) + 1) % 3] globals().update(selected_menu=next_menu) elif column_number < menu.begin_x: next_menu = menus[(menus.index(menu) - 1) % 3] if column_number < next_menu.begin_x: next_menu = menus[(menus.index(next_menu) - 1) % 3] globals().update(selected_menu=next_menu) if next_menu: draw_row(menu.win, menu.title, 1, 2, selected=False) # remove selection from the current menu title draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) # and select the new menu title menu = next_menu char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1) # get char from current mouse position char_str = chr(char_int & 0xFF) if not char_str or ord(char_str) > 127 or ' ' in char_str: return if 0 <= row_number < len(menu.visible_rows()): menu.visible_row_index = row_number draw_rows(menu) # this will change selected row in menu if menu != menus[2]: menus[2].visible_row_index = 0 # reset the selected row index of third menu before redrawing async def handle_vertical_navigation(key: str, menu: Menu): if len(menu.visible_rows()) <= 1: return keys_numbers = {"KEY_DOWN": 1, "KEY_UP": -1, "KEY_NPAGE": 1, "KEY_PPAGE": -1, 'KEY_HOME': 0, 'KEY_END': -1} if key in ["KEY_DOWN", "KEY_UP"]: if menu.filtered_rows.size > menu.rows_height: menu.filtered_rows.shift(keys_numbers[key]) else: menu.visible_row_index = (menu.visible_row_index + keys_numbers[ key]) % menu.filtered_rows.size # index of the selected visible row elif key in ["KEY_NPAGE", "KEY_PPAGE"]: menu.filtered_rows.shift(keys_numbers[key] * len(menu.visible_rows())) elif key in ['KEY_HOME', 'KEY_END']: menu.visible_row_index = keys_numbers[key] draw_rows(menu) if menu != menus[2]: menus[2].visible_row_index = 0 async def handle_horizontal_navigation(key: str, menu: Menu): increment = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key] next_menu = menus[(menus.index(menu) + increment) % 3] draw_row(menu.win, menu.title, 1, 2, selected=False) # remove selection from the current menu title draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) # and select the new menu title globals().update(selected_menu=next_menu) async def confirm_action(message: str) -> bool: """Display a confirmation popup and return True if the user confirms.""" rows, cols = SCREEN.getmaxyx() # Get screen size popup_height = 5 popup_width = len(message) + 10 start_y = (rows - popup_height) // 2 start_x = (cols - popup_width) // 2 popup = curses.newwin(popup_height, popup_width, start_y, start_x) popup.box() # Draw a border around the popup popup.addstr(2, 2, message) # Display the message popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel") popup.refresh() while True: key = await get_key_async(popup) if key.lower() == 'y': return True elif key.lower() == 'n': return False async def get_key_async(popup: curses.window) -> str: return await asyncio.to_thread(popup.getkey) async def catch_input(menu: Menu): while True: # refresh third menu until key pressed try: key = await get_key_async(SCREEN) break except curses.error: await refresh_third_menu(namespace(), api_resource()) await asyncio.sleep(0.1) if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]: await handle_horizontal_navigation(key, menu) elif key in ["KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END"]: await handle_vertical_navigation(key, menu) elif key == "KEY_MOUSE": await handle_mouse(menu) elif key == "KEY_DC" and await confirm_action("Are you sure you want to delete this resource?"): await handle_key_bindings(key, namespace(), api_resource(), resource()) elif key != "KEY_DC" and curses.ascii.unctrl(key) in KEY_BINDINGS.keys(): await handle_key_bindings(curses.ascii.unctrl(key), namespace(), api_resource(), resource()) elif key in ["/", "\x1b", "KEY_BACKSPACE", "\x08"] or key.isalnum() or key == "-": await handle_filter_state(key, menu) async def kubectl_async(command: str) -> list: result = await asyncio.create_subprocess_shell( f"kubectl {command} 2> /dev/null", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await result.communicate() if stderr: raise subprocess.CalledProcessError(result.returncode, command, stderr=stderr) return stdout.decode().strip().split("\n") async def subprocess_call_async(command: str): process = await asyncio.create_subprocess_shell(command) await process.communicate() def enable_mouse_support(): if MOUSE_ENABLED: curses.mousemask(curses.REPORT_MOUSE_POSITION) # mouse tracking print('\033[?1003h') # enable mouse tracking with the XTERM API. That's the magic async def init_menus(): global menus, selected_menu, namespace, api_resource, resource api_resources_kubectl = [x.split()[0] for x in await kubectl_async("api-resources --no-headers --verbs=get")] api_resources = list(dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl)) if QUERY_API_RESOURCES else TOP_API_RESOURCES width_unit = WIDTH // 8 try: namespaces = await kubectl_async("get ns --no-headers -o custom-columns=NAME:.metadata.name") except: namespaces = await kubectl_async( "config view --minify --output 'jsonpath={..namespace}'") menus = [Menu("Namespaces", namespaces, 0, width_unit, ROWS_HEIGHT), Menu("API resources", api_resources, width_unit, width_unit * 2, ROWS_HEIGHT), Menu("Resources", [], width_unit * 3, WIDTH - width_unit * 3, ROWS_HEIGHT)] selected_menu = menus[0] namespace = menus[0].selected_row # method alias api_resource = menus[1].selected_row resource = lambda: menus[2].selected_row().split()[0] if menus[2].selected_row() else None SCREEN.refresh() # I don't know why this is needed but it doesn't work without it SCREEN.nodelay(True) # don't block while waiting for input SCREEN.keypad(True) # needed for arrow keys curses.set_escdelay(1) # reduce Escape delay to 1 ms (curses can't set it to 0) curses.curs_set(0) # make the cursor invisible curses.use_default_colors() # don't change the terminal color curses.noecho() # don't output characters at the top enable_mouse_support() async def main_async(screen): await init_menus() for menu in menus: draw_menu(menu) draw_row(curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), HELP_TEXT, 1, 2) while selected_menu: await catch_input(selected_menu) def main(screen): asyncio.run(main_async(screen)) if __name__ == "__main__": curses.wrapper(main) subprocess.run("tput reset", shell=True)