kls/kls

154 lines
7.8 KiB
Python
Executable File

#!/usr/bin/env python3
import subprocess, curses
KEY_BINDINGS = { # can be extended
"1": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml --paging always --style numbers',
"2": 'kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml --paging always --style numbers',
"3": 'kubectl -n {namespace} edit {api_resource} {resource}',
"4": 'kubectl -n {namespace} logs {resource} | batcat -l log --paging always --style numbers',
"5": 'kubectl -n {namespace} exec -it {resource} sh',
"KEY_DC": 'kubectl -n {namespace} delete {api_resource} {resource}' # delete key
}
EXTRA_COLUMNS = { # By default, only the NAME column is displayed. Any api resource can be added here
"pods": ',STATUS:.status.phase,NODE:.spec.nodeName,AGE:.metadata.creationTimestamp',
"nodes": ',STATUS:.status.conditions[-1].type,CAPACITY_CPU:.status.capacity.cpu,CAPACITY_MEM:.status.capacity.memory',
"services": ',TYPE:.spec.type,CLUSTER_IP:.spec.clusterIP',
"ingresses": ',HOSTS:.spec.rules[*].host',
"deployments": ',DESIRED_REPLICAS:.spec.replicas,READY_REPLICAS:.status.readyReplicas',
"persistentvolumeclaims": ',SIZE:.spec.resources.requests.storage,STORAGE_CLASS:.spec.storageClassName'
}
TOP_API_RESOURCES = ["pods", "services", "ingresses", "secrets", "nodes", "deployments", "statefulsets", "daemonsets",
"configmaps", "persistentvolumeclaims", "storageclasses"] # which api resources are on the top of menu?
HELP_TEXT = "Esc: exit filter mode or exit kls, 1: get yaml, 2: describe, 3: edit, 4: pod logs, arrows/TAB: navigation"
class Menu:
def __init__(self, title, rows, begin_x, width):
self.title = title
self.rows = rows # all rows
self.filter = "" # filter for rows
self.filtered_rows = lambda: [x for x in self.rows if self.filter in x] # filtered rows
self.filtered_row_index = 0 # index of the selected filtered row
# __start_index - starting from which row we will select rows from filtered_rows()? Usually from the first row,
# but if the size of filtered_rows is greater than HEIGHT and filtered_row_index exceeds the menu HEIGHT,
# we shift __start_index to the right by filtered_row_index - HEIGHT. This way we implement menu scrolling
self.__start_index = lambda: 0 if self.filtered_row_index < HEIGHT else self.filtered_row_index - HEIGHT + 1
self.visible_rows = lambda: self.filtered_rows()[self.__start_index():][:HEIGHT] # visible rows
self.__visible_row_index = lambda: self.filtered_row_index - self.__start_index() # index of the selected visible row
self.selected_row = lambda: self.visible_rows()[self.__visible_row_index()] if self.visible_rows() else None # selected row from visible rows
self.win = curses.newwin(curses.LINES - 3, width, 0, begin_x)
SCREEN = curses.initscr() # screen initialization
# convert command output to list
kubectl = lambda command: subprocess.check_output("kubectl " + command, shell=True).decode().strip().split("\n")
api_resources_kubectl = kubectl("api-resources --no-headers --verbs=get | awk '{print $1}'")
api_resources = list(dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl)) # so top api resources are at the top
width_unit = curses.COLS // 8
MENUS = [Menu("Namespaces", kubectl("get ns --no-headers -o custom-columns=NAME:.metadata.name"), 0, width_unit),
Menu("API resources", api_resources, width_unit, width_unit * 2),
Menu("Resources", [], width_unit * 3, curses.COLS - width_unit * 3)]
SELECTED_MENU = MENUS[0]
HEIGHT = curses.LINES - 9 # maximum number of visible row indices
namespace = MENUS[0].selected_row # method alias
api_resource = MENUS[1].selected_row
resource = lambda: MENUS[2].selected_row().split()[0]
def draw_row(window, text, y, x, selected=False):
window.addstr(y, x, text, curses.A_REVERSE | curses.A_BOLD if selected else curses.A_NORMAL)
window.clrtoeol()
window.refresh()
def draw_rows(rows, menu):
for index, row in enumerate(rows):
draw_row(menu.win, row, index + 3, 2, selected=True if row == menu.selected_row() else False)
def draw_menu(menu):
menu.win.clear() # clear menu window
draw_row(menu.win, menu.title, 1, 2, selected=True if menu == SELECTED_MENU else False) # draw title
draw_rows(menu.visible_rows(), menu) # draw menu rows
draw_row(menu.win, f"/{menu.filter}" if menu.filter else "", curses.LINES - 5, 2) # draw filter row
def update_menu3():
MENUS[2].rows = []
if namespace() and api_resource():
columns = EXTRA_COLUMNS.get(api_resource(), "")
command = f"-n {namespace()} get {api_resource()} --no-headers -o custom-columns=NAME:.metadata.name{columns}"
MENUS[2].rows = kubectl(command)
MENUS[2].filtered_row_index = 0 # reset the selected row index before redrawing
draw_menu(MENUS[2])
def run_command(key):
if not (key == "4" and api_resource() != "pods") and not (key == "5" and api_resource() != "pods"):
curses.def_prog_mode() # save the previous terminal state
curses.endwin() # without this, there are problems after exiting vim
command = KEY_BINDINGS[key].format(namespace=namespace(), api_resource=api_resource(), resource=resource())
subprocess.call(command, shell=True)
curses.reset_prog_mode() # restore the previous terminal state
SCREEN.refresh()
def handle_filter_state(key, menu):
if key == "\x1b":
menu.filter = "" # Escape key exits filter mode
elif key in ["KEY_BACKSPACE", "\x08"]:
menu.filter = menu.filter[:-1] # Backspace key deletes a character (\x08 is also Backspace)
elif key.isalpha() or key == "-":
menu.filter += key.lower()
else:
return
menu.filtered_row_index = 0
draw_menu(menu)
if menu != MENUS[2]:
update_menu3() # redraw the third menu rows if we redraw the first or second menu rows
def catch_input(menu):
key = SCREEN.getkey()
if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]:
increment = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
next_menu = MENUS[(MENUS.index(menu) + increment) % 3]
draw_row(menu.win, menu.title, 1, 2, selected=False) # remove selection from the current menu title
# and select the new menu title
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)
globals().update(SELECTED_MENU=next_menu)
elif key in ["KEY_UP", "KEY_DOWN"] and len(menu.visible_rows()) > 1:
increment = {"KEY_DOWN": 1, "KEY_UP": -1}[key]
menu.filtered_row_index = (menu.filtered_row_index + increment) % len(menu.filtered_rows())
draw_rows(menu.visible_rows(), menu) # redraw the menu
if menu != MENUS[2]:
update_menu3() # redraw the third menu rows if we redraw the first or second menu rows
elif key in KEY_BINDINGS.keys() and MENUS[2].selected_row():
run_command(key)
elif key == "\x1b" and not menu.filter:
globals().update(SELECTED_MENU=None) # exit
else:
handle_filter_state(key, menu)
def main(screen):
SCREEN.refresh() # I don't know why this is needed but it doesn't work without it
SCREEN.keypad(True) # needed for arrow keys
# in curses, there is a delay on Escape key press, we reduce it to 1 millisecond (can't set it to 0)
curses.set_escdelay(1)
curses.curs_set(0) # make the cursor invisible
curses.use_default_colors() # don't change the terminal color
curses.noecho() # don't output characters at the top
draw_menu(MENUS[0]) # draw the main windows
draw_menu(MENUS[1])
update_menu3()
draw_row(curses.newwin(3, curses.COLS, curses.LINES - 3, 0), HELP_TEXT, 1, 2) # and the help window
while SELECTED_MENU:
catch_input(SELECTED_MENU) # if a menu is selected, catch user input
curses.wrapper(main)