389 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
			
		
		
	
	
			389 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
#!/usr/bin/env python3
 | 
						|
import subprocess
 | 
						|
import curses
 | 
						|
import curses.ascii
 | 
						|
import asyncio
 | 
						|
 | 
						|
# ****************************** #
 | 
						|
# START OF CONFIGURATION SECTION #
 | 
						|
# ****************************** #
 | 
						|
KEY_BINDINGS = {  # can be extended
 | 
						|
    "^Y": {  # Ctrl + y
 | 
						|
        "description": "view YAML",
 | 
						|
        "command": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml'
 | 
						|
    },
 | 
						|
    "^D": {  # Ctrl + d
 | 
						|
        "description": "describe",
 | 
						|
        "command": 'kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml'
 | 
						|
    },
 | 
						|
    "^E": {  # Ctrl + e
 | 
						|
        "description": "edit",
 | 
						|
        "command": 'kubectl -n {namespace} edit {api_resource} {resource}'
 | 
						|
    },
 | 
						|
    "^L": {  # Ctrl + l
 | 
						|
        "description": "view logs",
 | 
						|
        "command": 'kubectl -n {namespace} logs {resource} | lnav'
 | 
						|
    },
 | 
						|
    "^X": {  # Ctrl + x
 | 
						|
        "description": "exec pod",
 | 
						|
        "command": 'kubectl -n {namespace} exec -it {resource} sh'
 | 
						|
    },
 | 
						|
    "^N": {  # Ctrl + n
 | 
						|
        "description": "network debug",
 | 
						|
        "command": 'kubectl -n {namespace} debug {resource} -it --image=nicolaka/netshoot'
 | 
						|
    },
 | 
						|
    "Delete": {  # It is actually KEY_DC
 | 
						|
        "description": "delete",
 | 
						|
        "command": 'kubectl -n {namespace} delete {api_resource} {resource}'
 | 
						|
    },
 | 
						|
    "^P": {  # Ctrl + p  (p means proxy! :-))
 | 
						|
        "description": "exec istio-proxy",
 | 
						|
        "command": 'kubectl -n {namespace} exec -it {resource} -c istio-proxy bash'
 | 
						|
    },
 | 
						|
    "^R": {  # Ctrl + r  (r means reveal! :-))
 | 
						|
        "description": "reveal secret",
 | 
						|
        "command": "kubectl get secret {resource} -n {namespace} -o yaml | yq '.data |= with_entries(.value |= @base64d)' -y | batcat -l yaml"
 | 
						|
    }
 | 
						|
}
 | 
						|
# which api resources are on the top of menu?
 | 
						|
TOP_API_RESOURCES = ["pods", "services", "configmaps", "secrets", "persistentvolumeclaims", "ingresses", "nodes",
 | 
						|
                     "deployments", "statefulsets", "daemonsets", "storageclasses", "serviceentries",
 | 
						|
                     "destinationrules", "virtualservices", "gateways", "telemetry"]
 | 
						|
QUERY_API_RESOURCES = False  # Should we merge TOP_API_RESOURCES with all other api resources from cluster?
 | 
						|
BATCAT_STYLE = " --paging always --style numbers"  # style of batcat
 | 
						|
MOUSE_ENABLED = False
 | 
						|
# **************************** #
 | 
						|
# END OF CONFIGURATION SECTION #
 | 
						|
# **************************** #
 | 
						|
 | 
						|
# Dynamically generate HELP_TEXT based on KEY_BINDINGS descriptions
 | 
						|
HELP_TEXT = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items())
 | 
						|
HELP_TEXT += ", /: filter mode, Esc: exit filter mode or exit kls, arrows/TAB/PgUp/PgDn: navigation"
 | 
						|
SCREEN = curses.initscr()  # screen initialization, needed for ROWS_HEIGHT working
 | 
						|
HEADER_HEIGHT = 4  # in rows
 | 
						|
FOOTER_HEIGHT = 3
 | 
						|
ROWS_HEIGHT = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3  # maximum number of visible rows indices
 | 
						|
WIDTH = curses.COLS
 | 
						|
THIRD_MENU_LOCK = asyncio.Lock()
 | 
						|
THIRD_MENU_TASK = None  # It needs to be global because we should have ability to cancel it from anywhere
 | 
						|
 | 
						|
 | 
						|
# classes
 | 
						|
class CircularList:
 | 
						|
    def __init__(self, elements):
 | 
						|
        self.elements = elements
 | 
						|
        self.size = len(elements)
 | 
						|
        self.index = 0
 | 
						|
 | 
						|
    def __getitem__(self, index):
 | 
						|
        start, stop, step = index.indices(self.size)
 | 
						|
        return [self.elements[(self.index + i) % self.size] for i in range(start, stop, step)]
 | 
						|
 | 
						|
    def shift(self, steps):
 | 
						|
        self.index = (self.index + steps) % self.size
 | 
						|
 | 
						|
 | 
						|
class Menu:
 | 
						|
    def __init__(self, title: str, rows: list, begin_x: int, width: int, rows_height: int):
 | 
						|
        self.title = title
 | 
						|
        self.rows = rows  # all rows
 | 
						|
        self.filter = ""  # filter for rows
 | 
						|
        self.filter_mode = False  # Tracks whether filter mode is active
 | 
						|
        self.filtered_rows = CircularList([x for x in self.rows if self.filter in x])  # filtered rows
 | 
						|
        self.visible_rows = lambda: self.filtered_rows[:rows_height]  # visible rows
 | 
						|
        self.visible_row_index = 0  # index of the selected visible row
 | 
						|
        self.selected_row = lambda: self.visible_rows()[self.visible_row_index] if self.visible_rows() else None
 | 
						|
        self.rows_height = rows_height
 | 
						|
        self.width = width
 | 
						|
        self.begin_x = begin_x
 | 
						|
        self.win = curses.newwin(curses.LINES - FOOTER_HEIGHT, width, 0, begin_x)
 | 
						|
 | 
						|
 | 
						|
# helper functions
 | 
						|
def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = False):
 | 
						|
    window.addstr(y, x, text, curses.A_REVERSE | curses.A_BOLD if selected else curses.A_NORMAL)
 | 
						|
    window.clrtoeol()
 | 
						|
    window.refresh()
 | 
						|
 | 
						|
 | 
						|
def draw_rows(menu: Menu):
 | 
						|
    for index, row in enumerate(menu.visible_rows()):
 | 
						|
        draw_row(menu.win, row, index + HEADER_HEIGHT, 2, selected=True if row == menu.selected_row() else False)
 | 
						|
 | 
						|
 | 
						|
def draw_menu(menu: Menu):
 | 
						|
    menu.win.erase()  # clear menu window
 | 
						|
    draw_row(menu.win, menu.title, 1, 2, selected=True if menu == selected_menu else False)  # draw title
 | 
						|
    draw_rows(menu)  # draw menu rows
 | 
						|
    draw_row(menu.win, f"/{menu.filter}" if menu.filter_mode else "", curses.LINES - FOOTER_HEIGHT - 2,
 | 
						|
             2)  # draw filter row
 | 
						|
 | 
						|
 | 
						|
async def refresh_third_menu(namespace, api_resource):
 | 
						|
    try:
 | 
						|
        async with THIRD_MENU_LOCK:
 | 
						|
            menu = menus[2]
 | 
						|
            previous_menu_rows = menu.rows
 | 
						|
            if api_resource and namespace:
 | 
						|
                try:
 | 
						|
                    menu.rows = await kubectl_async(
 | 
						|
                        f"-n {namespace} get {api_resource} --no-headers --ignore-not-found")
 | 
						|
                except subprocess.CalledProcessError:
 | 
						|
                    menu.rows = []  # Fallback to an empty list if the command fails
 | 
						|
            else:
 | 
						|
                menu.rows = []
 | 
						|
            index_before_update = menu.filtered_rows.index
 | 
						|
            menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x])  # update filtered rows
 | 
						|
            menu.filtered_rows.index = index_before_update
 | 
						|
            if menu.visible_row_index >= len(menu.visible_rows()):
 | 
						|
                menu.visible_row_index = 0
 | 
						|
            if previous_menu_rows != menu.rows:
 | 
						|
                draw_menu(menu)
 | 
						|
    except asyncio.CancelledError:
 | 
						|
        raise
 | 
						|
 | 
						|
 | 
						|
async def handle_key_bindings(key: str, namespace: str, api_resource: str, resource: str):
 | 
						|
    if not resource:
 | 
						|
        return
 | 
						|
    if key in ("l", "x", "n") and api_resource != "pods":
 | 
						|
        return
 | 
						|
    if key == "KEY_DC":
 | 
						|
        key = "Delete"
 | 
						|
    if THIRD_MENU_TASK is not None:
 | 
						|
        THIRD_MENU_TASK.cancel()
 | 
						|
        try:
 | 
						|
            await THIRD_MENU_TASK
 | 
						|
        except asyncio.CancelledError:
 | 
						|
            pass
 | 
						|
    async with THIRD_MENU_LOCK:
 | 
						|
        curses.def_prog_mode()  # save the previous terminal state
 | 
						|
        curses.endwin()  # without this, there are problems after exiting vim
 | 
						|
        command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource)
 | 
						|
        if "batcat" in command:
 | 
						|
            command += BATCAT_STYLE
 | 
						|
        await subprocess_call_async(command)
 | 
						|
        curses.reset_prog_mode()  # restore the previous terminal state
 | 
						|
        SCREEN.refresh()
 | 
						|
        enable_mouse_support()
 | 
						|
 | 
						|
 | 
						|
async def handle_filter_state(key: str, menu: Menu):
 | 
						|
    if key == "/" and not menu.filter_mode:  # Enter filter mode
 | 
						|
        menu.filter_mode = True
 | 
						|
        menu.filter = ""
 | 
						|
    elif key == "\x1b":  # Escape key
 | 
						|
        if menu.filter_mode:  # Exit filter mode
 | 
						|
            menu.filter_mode = False
 | 
						|
            menu.filter = ""
 | 
						|
        else:
 | 
						|
            globals().update(selected_menu=None)  # Exit program
 | 
						|
    elif menu.filter_mode:  # Only process filter input in filter mode
 | 
						|
        if key in ["KEY_BACKSPACE", "\x08"] and menu.filter:
 | 
						|
            menu.filter = menu.filter[:-1]  # Remove last character
 | 
						|
        elif key.isalnum() or key == "-":  # Allow letters, numbers, and dashes
 | 
						|
            menu.filter += key.lower()
 | 
						|
    menu.visible_row_index = 0
 | 
						|
    menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x])  # update filtered rows
 | 
						|
    draw_menu(menu)
 | 
						|
    if menu != menus[2]:
 | 
						|
        menus[2].visible_row_index = 0  # reset the visible row index of third menu before redrawing
 | 
						|
 | 
						|
 | 
						|
async def handle_mouse(menu: Menu):
 | 
						|
    if not MOUSE_ENABLED:
 | 
						|
        return
 | 
						|
    try:
 | 
						|
        mouse_info = curses.getmouse()
 | 
						|
    except curses.error:  # this fixes scrolling error
 | 
						|
        return
 | 
						|
    row_number = mouse_info[2] - HEADER_HEIGHT
 | 
						|
    column_number = mouse_info[1]
 | 
						|
    next_menu = None
 | 
						|
    if column_number > (menu.begin_x + menu.width):
 | 
						|
        next_menu = menus[(menus.index(menu) + 1) % 3]
 | 
						|
        if column_number > (next_menu.begin_x + next_menu.width):
 | 
						|
            next_menu = menus[(menus.index(next_menu) + 1) % 3]
 | 
						|
        globals().update(selected_menu=next_menu)
 | 
						|
    elif column_number < menu.begin_x:
 | 
						|
        next_menu = menus[(menus.index(menu) - 1) % 3]
 | 
						|
        if column_number < next_menu.begin_x:
 | 
						|
            next_menu = menus[(menus.index(next_menu) - 1) % 3]
 | 
						|
        globals().update(selected_menu=next_menu)
 | 
						|
    if next_menu:
 | 
						|
        draw_row(menu.win, menu.title, 1, 2, selected=False)  # remove selection from the current menu title
 | 
						|
        draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)  # and select the new menu title
 | 
						|
        menu = next_menu
 | 
						|
    char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1)  # get char from current mouse position
 | 
						|
    char_str = chr(char_int & 0xFF)
 | 
						|
    if not char_str or ord(char_str) > 127 or ' ' in char_str:
 | 
						|
        return
 | 
						|
    if 0 <= row_number < len(menu.visible_rows()):
 | 
						|
        menu.visible_row_index = row_number
 | 
						|
        draw_rows(menu)  # this will change selected row in menu
 | 
						|
        if menu != menus[2]:
 | 
						|
            menus[2].visible_row_index = 0  # reset the selected row index of third menu before redrawing
 | 
						|
 | 
						|
 | 
						|
async def handle_vertical_navigation(key: str, menu: Menu):
 | 
						|
    if len(menu.visible_rows()) <= 1:
 | 
						|
        return
 | 
						|
    keys_numbers = {"KEY_DOWN": 1, "KEY_UP": -1, "KEY_NPAGE": 1, "KEY_PPAGE": -1, 'KEY_HOME': 0, 'KEY_END': -1}
 | 
						|
    if key in ["KEY_DOWN", "KEY_UP"]:
 | 
						|
        if menu.filtered_rows.size > menu.rows_height:
 | 
						|
            menu.filtered_rows.shift(keys_numbers[key])
 | 
						|
        else:
 | 
						|
            menu.visible_row_index = (menu.visible_row_index + keys_numbers[
 | 
						|
                key]) % menu.filtered_rows.size  # index of the selected visible row
 | 
						|
    elif key in ["KEY_NPAGE", "KEY_PPAGE"]:
 | 
						|
        menu.filtered_rows.shift(keys_numbers[key] * len(menu.visible_rows()))
 | 
						|
    elif key in ['KEY_HOME', 'KEY_END']:
 | 
						|
        menu.visible_row_index = keys_numbers[key]
 | 
						|
    draw_rows(menu)
 | 
						|
    if menu != menus[2]:
 | 
						|
        menus[2].visible_row_index = 0
 | 
						|
 | 
						|
 | 
						|
async def handle_horizontal_navigation(key: str, menu: Menu):
 | 
						|
    increment = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
 | 
						|
    next_menu = menus[(menus.index(menu) + increment) % 3]
 | 
						|
    draw_row(menu.win, menu.title, 1, 2, selected=False)  # remove selection from the current menu title
 | 
						|
    draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)  # and select the new menu title
 | 
						|
    globals().update(selected_menu=next_menu)
 | 
						|
 | 
						|
 | 
						|
async def confirm_action(message: str) -> bool:
 | 
						|
    """Display a confirmation popup and return True if the user confirms."""
 | 
						|
    rows, cols = SCREEN.getmaxyx()  # Get screen size
 | 
						|
    popup_height = 5
 | 
						|
    popup_width = len(message) + 10
 | 
						|
    start_y = (rows - popup_height) // 2
 | 
						|
    start_x = (cols - popup_width) // 2
 | 
						|
 | 
						|
    popup = curses.newwin(popup_height, popup_width, start_y, start_x)
 | 
						|
    popup.box()  # Draw a border around the popup
 | 
						|
    popup.addstr(2, 2, message)  # Display the message
 | 
						|
    popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel")
 | 
						|
 | 
						|
    popup.refresh()
 | 
						|
    while True:
 | 
						|
        key = await get_key_async(popup)
 | 
						|
        if key.lower() == 'y':
 | 
						|
            return True
 | 
						|
        elif key.lower() == 'n':
 | 
						|
            popup.clear()  # Clear the popup window
 | 
						|
            popup.refresh()  # Refresh the window to hide it
 | 
						|
            return False
 | 
						|
 | 
						|
 | 
						|
async def get_key_async(popup: curses.window) -> str:
 | 
						|
    return await asyncio.to_thread(popup.getkey)
 | 
						|
 | 
						|
 | 
						|
async def kubectl_async(command: str) -> list:
 | 
						|
    process = await asyncio.create_subprocess_shell(
 | 
						|
        f"kubectl {command} 2> /dev/null", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
 | 
						|
    )
 | 
						|
    stdout, stderr = await process.communicate()
 | 
						|
    if stderr:
 | 
						|
        raise subprocess.CalledProcessError(process.returncode, command, stderr=stderr)
 | 
						|
    return stdout.decode().strip().split("\n")
 | 
						|
 | 
						|
 | 
						|
async def catch_input(menu: Menu):
 | 
						|
    global THIRD_MENU_TASK
 | 
						|
    while True:  # refresh third menu until key pressed
 | 
						|
        try:
 | 
						|
            key = await get_key_async(SCREEN)
 | 
						|
            break
 | 
						|
        except curses.error:
 | 
						|
            if THIRD_MENU_TASK is None or THIRD_MENU_TASK.done() or THIRD_MENU_TASK.cancelled():
 | 
						|
                THIRD_MENU_TASK = asyncio.create_task(refresh_third_menu(namespace(), api_resource()))
 | 
						|
            await asyncio.sleep(0.1)
 | 
						|
    if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]:
 | 
						|
        await handle_horizontal_navigation(key, menu)
 | 
						|
    elif key in ["KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END"]:
 | 
						|
        if THIRD_MENU_TASK is not None:
 | 
						|
            THIRD_MENU_TASK.cancel()
 | 
						|
            try:
 | 
						|
                # Wait for the THIRD_MENU_TASK to handle cancellation
 | 
						|
                await THIRD_MENU_TASK
 | 
						|
            except asyncio.CancelledError:
 | 
						|
                pass
 | 
						|
        await handle_vertical_navigation(key, menu)
 | 
						|
    elif key == "KEY_MOUSE":
 | 
						|
        await handle_mouse(menu)
 | 
						|
    elif key == "KEY_DC" and await confirm_action("Are you sure you want to delete this resource?"):
 | 
						|
        await handle_key_bindings(key, namespace(), api_resource(), resource())
 | 
						|
    elif key in ["/", "\x1b", "KEY_BACKSPACE", "\x08"] or key.isalnum() or key == "-":
 | 
						|
        await handle_filter_state(key, menu)
 | 
						|
    elif key != "KEY_DC" and curses.ascii.unctrl(key) in KEY_BINDINGS.keys():
 | 
						|
        await handle_key_bindings(curses.ascii.unctrl(key), namespace(), api_resource(), resource())
 | 
						|
 | 
						|
 | 
						|
async def subprocess_call_async(command: str):
 | 
						|
    process = await asyncio.create_subprocess_shell(command)
 | 
						|
    await process.communicate()
 | 
						|
 | 
						|
 | 
						|
def enable_mouse_support():
 | 
						|
    if MOUSE_ENABLED:
 | 
						|
        curses.mousemask(curses.REPORT_MOUSE_POSITION)  # mouse tracking
 | 
						|
        print('\033[?1003h')  # enable mouse tracking with the XTERM API. That's the magic
 | 
						|
 | 
						|
 | 
						|
async def init_menus():
 | 
						|
    global menus, selected_menu, namespace, api_resource, resource
 | 
						|
    api_resources_kubectl = [x.split()[0] for x in await kubectl_async("api-resources --no-headers --verbs=get")]
 | 
						|
    api_resources = list(
 | 
						|
        dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl)) if QUERY_API_RESOURCES else TOP_API_RESOURCES
 | 
						|
    width_unit = WIDTH // 8
 | 
						|
    namespaces = []
 | 
						|
    try:
 | 
						|
        namespaces = await kubectl_async("config view --minify --output 'jsonpath={..namespace}'")
 | 
						|
    except:
 | 
						|
        pass
 | 
						|
    try:
 | 
						|
        all_namespaces = await kubectl_async("get ns --no-headers -o custom-columns=NAME:.metadata.name")
 | 
						|
        if all_namespaces:
 | 
						|
            if namespaces:
 | 
						|
                all_namespaces.remove(namespaces[0])
 | 
						|
                namespaces = namespaces + all_namespaces
 | 
						|
            else:
 | 
						|
                namespaces = all_namespaces
 | 
						|
    except:
 | 
						|
        pass
 | 
						|
    menus = [Menu("Namespaces", namespaces, 0, width_unit, ROWS_HEIGHT),
 | 
						|
             Menu("API resources", api_resources, width_unit, width_unit * 2, ROWS_HEIGHT),
 | 
						|
             Menu("Resources", [], width_unit * 3, WIDTH - width_unit * 3, ROWS_HEIGHT)]
 | 
						|
    selected_menu = menus[0]
 | 
						|
    namespace = menus[0].selected_row  # method alias
 | 
						|
    api_resource = menus[1].selected_row
 | 
						|
    resource = lambda: menus[2].selected_row().split()[0] if menus[2].selected_row() else None
 | 
						|
    SCREEN.refresh()  # I don't know why this is needed but it doesn't work without it
 | 
						|
    SCREEN.nodelay(True)  # don't block while waiting for input
 | 
						|
    SCREEN.keypad(True)  # needed for arrow keys
 | 
						|
    curses.set_escdelay(1)  # reduce Escape delay to 1 ms (curses can't set it to 0)
 | 
						|
    curses.curs_set(0)  # make the cursor invisible
 | 
						|
    curses.use_default_colors()  # don't change the terminal color
 | 
						|
    curses.noecho()  # don't output characters at the top
 | 
						|
    enable_mouse_support()
 | 
						|
 | 
						|
 | 
						|
async def main_async(screen):
 | 
						|
    await init_menus()
 | 
						|
    for menu in menus:
 | 
						|
        draw_menu(menu)
 | 
						|
    draw_row(curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), HELP_TEXT, 1, 2)
 | 
						|
    while selected_menu:
 | 
						|
        await catch_input(selected_menu)
 | 
						|
 | 
						|
 | 
						|
def main(screen):
 | 
						|
    asyncio.run(main_async(screen))
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    curses.wrapper(main)
 | 
						|
    subprocess.run("tput reset", shell=True)
 |