260 lines
12 KiB
Python
Executable File
260 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import subprocess, curses, time
|
|
|
|
# constants
|
|
KEY_BINDINGS = { # can be extended
|
|
"1": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml',
|
|
"\n": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml', # Enter key
|
|
"2": 'kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml',
|
|
"3": 'kubectl -n {namespace} edit {api_resource} {resource}',
|
|
"4": 'kubectl -n {namespace} logs {resource} | batcat -l log',
|
|
"5": 'kubectl -n {namespace} exec -it {resource} sh',
|
|
"6": 'kubectl -n {namespace} debug {resource} -it --image=nicolaka/netshoot',
|
|
"KEY_DC": 'kubectl -n {namespace} delete {api_resource} {resource}' # KEY_DC is the delete key
|
|
}
|
|
BATCAT_STYLE = " --paging always --style numbers"
|
|
# which api resources are on the top of menu?
|
|
TOP_API_RESOURCES = ["pods", "services", "configmaps", "secrets", "persistentvolumeclaims", "ingresses", "nodes",
|
|
"deployments", "statefulsets", "daemonsets", "storageclasses", "all"]
|
|
HELP_TEXT = ("letters: filter mode, Esc: exit filter mode or exit kls, 1/Enter: get yaml, 2: describe, 3: edit, "
|
|
"4: logs, 5: exec, 6: debug, arrows/TAB/PgUp/PgDn: navigation")
|
|
MOUSE_ENABLED = True
|
|
SCREEN = curses.initscr() # screen initialization, needed for ROWS_HEIGHT working
|
|
HEADER_HEIGHT = 4 # in rows
|
|
FOOTER_HEIGHT = 3
|
|
ROWS_HEIGHT = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3 # maximum number of visible rows indices
|
|
WIDTH = curses.COLS
|
|
|
|
|
|
# classes
|
|
class CircularList:
|
|
def __init__(self, elements):
|
|
self.elements = elements
|
|
self.size = len(elements)
|
|
self.index = 0
|
|
|
|
def __getitem__(self, index):
|
|
start, stop, step = index.indices(self.size)
|
|
return [self.elements[(self.index + i) % self.size] for i in range(start, stop, step)]
|
|
|
|
def shift(self, steps):
|
|
self.index = (self.index + steps) % self.size
|
|
|
|
|
|
class Menu:
|
|
def __init__(self, title: str, rows: list, begin_x: int, width: int, rows_height: int):
|
|
self.title = title
|
|
self.rows = rows # all rows
|
|
self.filter = "" # filter for rows
|
|
self.filtered_rows = CircularList([x for x in self.rows if self.filter in x]) # filtered rows
|
|
self.visible_rows = lambda: self.filtered_rows[:rows_height] # visible rows
|
|
self.visible_row_index = 0 # index of the selected visible row
|
|
self.selected_row = lambda: self.visible_rows()[self.visible_row_index] if self.visible_rows() else None
|
|
self.rows_height = rows_height
|
|
self.width = width
|
|
self.begin_x = begin_x
|
|
self.win = curses.newwin(curses.LINES - FOOTER_HEIGHT, width, 0, begin_x)
|
|
|
|
|
|
# helper functions
|
|
def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = False):
|
|
window.addstr(y, x, text, curses.A_REVERSE | curses.A_BOLD if selected else curses.A_NORMAL)
|
|
window.clrtoeol()
|
|
window.refresh()
|
|
|
|
|
|
def draw_rows(menu: Menu):
|
|
for index, row in enumerate(menu.visible_rows()):
|
|
draw_row(menu.win, row, index + HEADER_HEIGHT, 2, selected=True if row == menu.selected_row() else False)
|
|
|
|
|
|
def draw_menu(menu: Menu):
|
|
menu.win.erase() # clear menu window
|
|
draw_row(menu.win, menu.title, 1, 2, selected=True if menu == selected_menu else False) # draw title
|
|
draw_rows(menu) # draw menu rows
|
|
draw_row(menu.win, f"/{menu.filter}" if menu.filter else "", curses.LINES - FOOTER_HEIGHT - 2, 2) # draw filter row
|
|
|
|
|
|
def refresh_third_menu(namespace: str, api_resource: str):
|
|
menu = menus[2]
|
|
previous_menu_rows = menu.rows
|
|
if api_resource and namespace:
|
|
if api_resource == "all":
|
|
cmd = ("api-resources --verbs=get --namespaced -o name | grep -v events | xargs -n 1 kubectl get"
|
|
f" --show-kind --namespace {namespace} --ignore-not-found --no-headers -o name")
|
|
menu.rows = kubectl(cmd)
|
|
else:
|
|
menu.rows = kubectl(f"-n {namespace} get {api_resource} --no-headers --ignore-not-found")
|
|
index_before_update = menu.filtered_rows.index
|
|
menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x]) # update filtered rows
|
|
menu.filtered_rows.index = index_before_update
|
|
else:
|
|
menu.rows = []
|
|
if menu.visible_row_index >= len(menu.visible_rows()):
|
|
menu.visible_row_index = 0
|
|
if previous_menu_rows != menu.rows:
|
|
draw_menu(menu)
|
|
|
|
|
|
def handle_key_bindings(key: str, namespace: str, api_resource: str, resource: str):
|
|
if not resource:
|
|
return
|
|
if key in ("4", "5", "6"):
|
|
if api_resource not in ["pods", "all"] or (api_resource == "all" and not resource.startswith("pod/")):
|
|
return
|
|
curses.def_prog_mode() # save the previous terminal state
|
|
curses.endwin() # without this, there are problems after exiting vim
|
|
command = KEY_BINDINGS[key].format(namespace=namespace, api_resource=api_resource, resource=resource)
|
|
if api_resource == "all":
|
|
command = command.replace(" all", "")
|
|
if "batcat" in command:
|
|
command += BATCAT_STYLE
|
|
subprocess.call(command, shell=True)
|
|
curses.reset_prog_mode() # restore the previous terminal state
|
|
SCREEN.refresh()
|
|
enable_mouse_support()
|
|
|
|
|
|
def handle_filter_state(key: str, menu: Menu):
|
|
if key in ["KEY_BACKSPACE", "\x08"] and not menu.filter:
|
|
return
|
|
elif key == "\x1b" and not menu.filter:
|
|
globals().update(selected_menu=None) # exit
|
|
elif key == "\x1b":
|
|
menu.filter = "" # Escape key exits filter mode
|
|
elif key in ["KEY_BACKSPACE", "\x08"]:
|
|
menu.filter = menu.filter[:-1] # Backspace key deletes a character (\x08 is also Backspace)
|
|
elif key.isalpha() or key == "-":
|
|
menu.filter += key.lower()
|
|
menu.visible_row_index = 0
|
|
menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x]) # update filtered rows
|
|
draw_menu(menu)
|
|
if menu != menus[2]:
|
|
menus[2].visible_row_index = 0 # reset the visible row index of third menu before redrawing
|
|
|
|
|
|
def handle_mouse(menu: Menu):
|
|
if not MOUSE_ENABLED:
|
|
return
|
|
try:
|
|
mouse_info = curses.getmouse()
|
|
except curses.error: # this fixes scrolling error
|
|
return
|
|
row_number = mouse_info[2] - HEADER_HEIGHT
|
|
column_number = mouse_info[1]
|
|
next_menu = None
|
|
if column_number > (menu.begin_x + menu.width):
|
|
next_menu = menus[(menus.index(menu) + 1) % 3]
|
|
if column_number > (next_menu.begin_x + next_menu.width):
|
|
next_menu = menus[(menus.index(next_menu) + 1) % 3]
|
|
globals().update(selected_menu=next_menu)
|
|
elif column_number < menu.begin_x:
|
|
next_menu = menus[(menus.index(menu) - 1) % 3]
|
|
if column_number < next_menu.begin_x:
|
|
next_menu = menus[(menus.index(next_menu) - 1) % 3]
|
|
globals().update(selected_menu=next_menu)
|
|
if next_menu:
|
|
draw_row(menu.win, menu.title, 1, 2, selected=False) # remove selection from the current menu title
|
|
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) # and select the new menu title
|
|
menu = next_menu
|
|
char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1) # get char from current mouse position
|
|
char_str = chr(char_int & 0xFF)
|
|
if not char_str or ord(char_str) > 127 or ' ' in char_str:
|
|
return
|
|
if 0 <= row_number < len(menu.visible_rows()):
|
|
menu.visible_row_index = row_number
|
|
draw_rows(menu) # this will change selected row in menu
|
|
if menu != menus[2]:
|
|
menus[2].visible_row_index = 0 # reset the selected row index of third menu before redrawing
|
|
|
|
|
|
def handle_vertical_navigation(key: str, menu: Menu):
|
|
if len(menu.visible_rows()) <= 1:
|
|
return
|
|
keys_numbers = {"KEY_DOWN": 1, "KEY_UP": -1, "KEY_NPAGE": 1, "KEY_PPAGE": -1, 'KEY_HOME': 0, 'KEY_END': -1}
|
|
if key in ["KEY_DOWN", "KEY_UP"]:
|
|
if menu.filtered_rows.size > menu.rows_height:
|
|
menu.filtered_rows.shift(keys_numbers[key])
|
|
else:
|
|
menu.visible_row_index = (menu.visible_row_index + keys_numbers[key]) % menu.filtered_rows.size # index of the selected visible row
|
|
elif key in ["KEY_NPAGE", "KEY_PPAGE"]:
|
|
menu.filtered_rows.shift(keys_numbers[key] * len(menu.visible_rows()))
|
|
elif key in ['KEY_HOME','KEY_END']:
|
|
menu.visible_row_index = keys_numbers[key]
|
|
draw_rows(menu)
|
|
if menu != menus[2]:
|
|
menus[2].visible_row_index = 0
|
|
|
|
|
|
def handle_horizontal_navigation(key, menu):
|
|
increment = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
|
|
next_menu = menus[(menus.index(menu) + increment) % 3]
|
|
draw_row(menu.win, menu.title, 1, 2, selected=False) # remove selection from the current menu title
|
|
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) # and select the new menu title
|
|
globals().update(selected_menu=next_menu)
|
|
|
|
|
|
def catch_input(menu: Menu):
|
|
while True: # refresh third menu until key pressed
|
|
try:
|
|
key = SCREEN.getkey()
|
|
break
|
|
except curses.error:
|
|
refresh_third_menu(namespace(), api_resource())
|
|
time.sleep(0.1)
|
|
if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]:
|
|
handle_horizontal_navigation(key, menu)
|
|
elif key in ["KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END"]:
|
|
handle_vertical_navigation(key, menu)
|
|
elif key == "KEY_MOUSE":
|
|
handle_mouse(menu)
|
|
elif key in KEY_BINDINGS.keys():
|
|
handle_key_bindings(key, namespace(), api_resource(), resource())
|
|
elif key in ["\x1b", "KEY_BACKSPACE", "\x08"] or key.isalpha() or key == "-": # \x1b - escape, \x08 - backspace
|
|
handle_filter_state(key, menu)
|
|
|
|
|
|
def kubectl(command: str) -> list:
|
|
return subprocess.check_output(f"kubectl {command} 2> /dev/null", shell=True).decode().strip().split("\n")
|
|
|
|
|
|
def enable_mouse_support():
|
|
curses.mousemask(curses.REPORT_MOUSE_POSITION) # mouse tracking
|
|
print('\033[?1003h') # enable mouse tracking with the XTERM API. That's the magic
|
|
|
|
|
|
def init_menus():
|
|
global menus, selected_menu, namespace, api_resource, resource
|
|
api_resources_kubectl = [x.split()[0] for x in kubectl("api-resources --no-headers --verbs=get")]
|
|
api_resources = list(
|
|
dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl)) # so top api resources are at the top
|
|
width_unit = WIDTH // 8
|
|
menus = [Menu("Namespaces", kubectl("get ns --no-headers -o custom-columns=NAME:.metadata.name"), 0, width_unit, ROWS_HEIGHT),
|
|
Menu("API resources", api_resources, width_unit, width_unit * 2, ROWS_HEIGHT),
|
|
Menu("Resources", [], width_unit * 3, WIDTH - width_unit * 3, ROWS_HEIGHT)]
|
|
selected_menu = menus[0]
|
|
namespace = menus[0].selected_row # method alias
|
|
api_resource = menus[1].selected_row
|
|
resource = lambda: menus[2].selected_row().split()[0] if menus[2].selected_row() else None
|
|
SCREEN.refresh() # I don't know why this is needed but it doesn't work without it
|
|
SCREEN.nodelay(True) # don't block while waiting for input
|
|
SCREEN.keypad(True) # needed for arrow keys
|
|
curses.set_escdelay(1) # reduce Escape delay to 1 ms (curses can't set it to 0)
|
|
curses.curs_set(0) # make the cursor invisible
|
|
curses.use_default_colors() # don't change the terminal color
|
|
curses.noecho() # don't output characters at the top
|
|
enable_mouse_support()
|
|
|
|
|
|
def main(screen):
|
|
init_menus()
|
|
for menu in menus: # draw the main windows
|
|
draw_menu(menu)
|
|
draw_row(curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), HELP_TEXT, 1, 2) # and the help window
|
|
while selected_menu:
|
|
catch_input(selected_menu) # if a menu is selected, catch user input
|
|
|
|
|
|
if __name__ == "__main__":
|
|
curses.wrapper(main)
|