149 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
			
		
		
	
	
			149 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
#!/usr/bin/env python3
 | 
						|
import subprocess
 | 
						|
import curses
 | 
						|
import asyncio
 | 
						|
 | 
						|
KEY_BINDINGS = {  # can be extended
 | 
						|
    "1": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml --paging always --style numbers',
 | 
						|
    "2": 'kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml --paging always --style numbers',
 | 
						|
    "3": 'kubectl -n {namespace} edit {api_resource} {resource}',
 | 
						|
    "4": 'kubectl -n {namespace} logs {resource} | batcat -l log --paging always --style numbers',
 | 
						|
    "5": 'kubectl -n {namespace} exec -it {resource} sh',
 | 
						|
    "KEY_DC": 'kubectl -n {namespace} delete {api_resource} {resource}'  # KEY_DC is the delete key
 | 
						|
}
 | 
						|
# which api resources are on the top of menu?
 | 
						|
TOP_API_RESOURCES = ["pods", "services", "ingresses", "secrets", "nodes", "deployments", "statefulsets", "daemonsets",
 | 
						|
                     "configmaps", "persistentvolumeclaims", "storageclasses"]
 | 
						|
 | 
						|
HELP_TEXT = "Esc: exit filter mode or exit kls, 1: get yaml, 2: describe, 3: edit, 4: pod logs, arrows/TAB: navigation"
 | 
						|
 | 
						|
 | 
						|
class Menu:
 | 
						|
    def __init__(self, title: str, rows: list, begin_x: int, width: int):
 | 
						|
        self.title = title
 | 
						|
        self.rows = rows  # all rows
 | 
						|
        self.filter = ""  # filter for rows
 | 
						|
        self.filtered_rows = lambda: [x for x in self.rows if self.filter in x]  # filtered rows
 | 
						|
        self.filtered_row_index = 0  # index of the selected filtered row
 | 
						|
        # __start_index - starting from which row we will select rows from filtered_rows()? Usually from the first row,
 | 
						|
        # but if the size of filtered_rows is greater than HEIGHT and filtered_row_index exceeds the menu HEIGHT,
 | 
						|
        # we shift __start_index to the right by filtered_row_index - HEIGHT. This way we implement menu scrolling
 | 
						|
        self.__start_index = lambda: 0 if self.filtered_row_index < HEIGHT else self.filtered_row_index - HEIGHT + 1
 | 
						|
        self.visible_rows = lambda: self.filtered_rows()[self.__start_index():][:HEIGHT]  # visible rows
 | 
						|
        self.__visible_row_index = lambda: self.filtered_row_index - self.__start_index()  # index of the selected visible row
 | 
						|
        # selected row from visible rows
 | 
						|
        self.selected_row = lambda: self.visible_rows()[self.__visible_row_index()] if self.visible_rows() else None
 | 
						|
        self.win = curses.newwin(curses.LINES - 3, width, 0, begin_x)
 | 
						|
 | 
						|
 | 
						|
def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = False):
 | 
						|
    window.addstr(y, x, text, curses.A_REVERSE | curses.A_BOLD if selected else curses.A_NORMAL)
 | 
						|
    window.clrtoeol()
 | 
						|
    window.refresh()
 | 
						|
 | 
						|
 | 
						|
def draw_rows(rows: list, menu: Menu):
 | 
						|
    for index, row in enumerate(rows):
 | 
						|
        draw_row(menu.win, row, index + 3, 2, selected=True if row == menu.selected_row() else False)
 | 
						|
 | 
						|
 | 
						|
def draw_menu(menu: Menu):
 | 
						|
    menu.win.clear()  # clear menu window
 | 
						|
    draw_row(menu.win, menu.title, 1, 2, selected=True if menu == SELECTED_MENU else False)  # draw title
 | 
						|
    draw_rows(menu.visible_rows(), menu)  # draw menu rows
 | 
						|
    draw_row(menu.win, f"/{menu.filter}" if menu.filter else "", curses.LINES - 5, 2)  # draw filter row
 | 
						|
 | 
						|
 | 
						|
def run_command(key: str):
 | 
						|
    if not (key == "4" and api_resource() != "pods") and not (key == "5" and api_resource() != "pods"):
 | 
						|
        curses.def_prog_mode()  # save the previous terminal state
 | 
						|
        curses.endwin()  # without this, there are problems after exiting vim
 | 
						|
        command = KEY_BINDINGS[key].format(namespace=namespace(), api_resource=api_resource(), resource=resource())
 | 
						|
        subprocess.call(command, shell=True)
 | 
						|
        curses.reset_prog_mode()  # restore the previous terminal state
 | 
						|
        SCREEN.refresh()
 | 
						|
 | 
						|
 | 
						|
def handle_filter_state(key: str, menu: Menu):
 | 
						|
    if key == "\x1b":
 | 
						|
        menu.filter = ""  # Escape key exits filter mode
 | 
						|
    elif key in ["KEY_BACKSPACE", "\x08"]:
 | 
						|
        menu.filter = menu.filter[:-1]  # Backspace key deletes a character (\x08 is also Backspace)
 | 
						|
    elif key.isalpha() or key == "-":
 | 
						|
        menu.filter += key.lower()
 | 
						|
    else:
 | 
						|
        return
 | 
						|
    menu.filtered_row_index = 0
 | 
						|
    draw_menu(menu)
 | 
						|
    if menu != MENUS[2]:
 | 
						|
        MENUS[2].filtered_row_index = 0  # reset the selected row index of third menu before redrawing
 | 
						|
 | 
						|
 | 
						|
async def catch_input(menu: Menu):
 | 
						|
    while True:  # this while loop is needed for constant refreshing of third menu containing all resources
 | 
						|
        try:
 | 
						|
            key = SCREEN.getkey()
 | 
						|
            break
 | 
						|
        except:
 | 
						|
            MENUS[2].rows = []
 | 
						|
            if namespace() and api_resource():
 | 
						|
                command = f"-n {namespace()} get {api_resource()} --no-headers"
 | 
						|
                MENUS[2].rows = kubectl(command)
 | 
						|
            draw_menu(MENUS[2])
 | 
						|
            await asyncio.sleep(0.1)
 | 
						|
    if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]:
 | 
						|
        increment = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
 | 
						|
        next_menu = MENUS[(MENUS.index(menu) + increment) % 3]
 | 
						|
        draw_row(menu.win, menu.title, 1, 2, selected=False)  # remove selection from the current menu title
 | 
						|
        # and select the new menu title
 | 
						|
        draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)
 | 
						|
        globals().update(SELECTED_MENU=next_menu)
 | 
						|
    elif key in ["KEY_UP", "KEY_DOWN"] and len(menu.visible_rows()) > 1:
 | 
						|
        increment = {"KEY_DOWN": 1, "KEY_UP": -1}[key]
 | 
						|
        menu.filtered_row_index = (menu.filtered_row_index + increment) % len(menu.filtered_rows())
 | 
						|
        draw_rows(menu.visible_rows(), menu)  # redraw the menu
 | 
						|
        if menu != MENUS[2]:
 | 
						|
            MENUS[2].filtered_row_index = 0  # reset the selected row index of third menu before redrawing
 | 
						|
    elif key in KEY_BINDINGS.keys() and MENUS[2].selected_row():
 | 
						|
        run_command(key)
 | 
						|
    elif key == "\x1b" and not menu.filter:
 | 
						|
        globals().update(SELECTED_MENU=None)  # exit
 | 
						|
    else:
 | 
						|
        handle_filter_state(key, menu)
 | 
						|
 | 
						|
 | 
						|
def kubectl(command: str) -> list:
 | 
						|
    return subprocess.check_output(f"kubectl {command}", shell=True).decode().strip().split("\n")
 | 
						|
 | 
						|
 | 
						|
SCREEN = curses.initscr()  # screen initialization
 | 
						|
api_resources_kubectl = [x.split()[0] for x in kubectl("api-resources --no-headers --verbs=get")]
 | 
						|
api_resources = list(dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl))  # so top api resources are at the top
 | 
						|
width_unit = curses.COLS // 8
 | 
						|
MENUS = [Menu("Namespaces", kubectl("get ns --no-headers -o custom-columns=NAME:.metadata.name"), 0, width_unit),
 | 
						|
         Menu("API resources", api_resources, width_unit, width_unit * 2),
 | 
						|
         Menu("Resources", [], width_unit * 3, curses.COLS - width_unit * 3)]
 | 
						|
SELECTED_MENU = MENUS[0]
 | 
						|
HEIGHT = curses.LINES - 9  # maximum number of visible row indices
 | 
						|
namespace = MENUS[0].selected_row  # method alias
 | 
						|
api_resource = MENUS[1].selected_row
 | 
						|
resource = lambda: MENUS[2].selected_row().split()[0]
 | 
						|
 | 
						|
 | 
						|
def main(screen):
 | 
						|
    SCREEN.refresh()  # I don't know why this is needed but it doesn't work without it
 | 
						|
    SCREEN.nodelay(True)  # don't wait for input
 | 
						|
    SCREEN.keypad(True)  # needed for arrow keys
 | 
						|
    curses.set_escdelay(1)  # reduce Escape delay to 1 ms (curses can't set it to 0)
 | 
						|
    curses.curs_set(0)  # make the cursor invisible
 | 
						|
    curses.use_default_colors()  # don't change the terminal color
 | 
						|
    curses.noecho()  # don't output characters at the top
 | 
						|
    draw_menu(MENUS[0])  # draw the main windows
 | 
						|
    draw_menu(MENUS[1])
 | 
						|
    draw_row(curses.newwin(3, curses.COLS, curses.LINES - 3, 0), HELP_TEXT, 1, 2)  # and the help window
 | 
						|
    while SELECTED_MENU:
 | 
						|
        asyncio.run(catch_input(SELECTED_MENU))  # if a menu is selected, catch user input
 | 
						|
 | 
						|
 | 
						|
curses.wrapper(main)
 |