248 lines
11 KiB
Python
Executable File
248 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
import curses
|
||
import subprocess
|
||
|
||
screen = None
|
||
# в curses зачем-то сделали задержку на срабатывание Escape, уменьшаем её до 1 милисекунды (до 0 нельзя)
|
||
curses.set_escdelay(1)
|
||
running = True
|
||
|
||
# состояния меню
|
||
SELECTED_WITHOUT_SEARCH = 1
|
||
SELECTED_WITH_SEARCH = 2
|
||
NOT_SELECTED_WITHOUT_SEARCH = 3
|
||
NOT_SELECTED_WITH_SEARCH = 4
|
||
|
||
|
||
# я не знаю, что делается в этой функции.
|
||
def init_screen():
|
||
global screen
|
||
screen = curses.initscr()
|
||
screen.refresh()
|
||
curses.noecho()
|
||
curses.cbreak()
|
||
screen.keypad(True)
|
||
curses.curs_set(0)
|
||
|
||
|
||
init_screen()
|
||
|
||
|
||
class Menu:
|
||
def __init__(self, name, rows, begin_x, state):
|
||
self.state = state
|
||
self.name = name # заголовок окна
|
||
self.rows = rows # строки окна
|
||
self.begin_x = begin_x # где начинается окно по х?
|
||
self.win = curses.newwin(curses.LINES, curses.COLS // 3, 0,
|
||
begin_x) # окно с высотой во весь экран, шириной экран / 3, и началом по х в точке begin_x
|
||
self.selected_row = 0 # выбранная строка
|
||
self.filter = ""
|
||
|
||
|
||
# рисуем первое меню
|
||
## готовим контент
|
||
bytes_list = subprocess.check_output(
|
||
"kubectl get ns --no-headers -o template='{{range .items}}{{.metadata.name}} {{end}}'", shell=True).split()
|
||
namespaces = [bytes_list[i].decode('utf-8') for i in range(len(bytes_list))]
|
||
## отрисовываем меню
|
||
menu1 = Menu("Namespaces", namespaces, 0, SELECTED_WITHOUT_SEARCH)
|
||
|
||
# рисуем второе меню
|
||
## готовим контент
|
||
api_resources = ["pods", "services", "deployments", "statefulsets", "ingresses", "configmaps", "secrets"]
|
||
## отрисовываем меню
|
||
menu2 = Menu("API resources", api_resources, 0 + curses.COLS // 3, NOT_SELECTED_WITHOUT_SEARCH)
|
||
|
||
# рисуем третье меню
|
||
## готовим контент
|
||
bytes_list = subprocess.check_output(
|
||
"kubectl get pods -n kube-system --no-headers -o template='{{range .items}}{{.metadata.name}} {{end}}'",
|
||
shell=True).split()
|
||
pods = [bytes_list[i].decode('utf-8') for i in range(len(bytes_list))]
|
||
## отрисовываем меню
|
||
menu3 = Menu("Resources", pods, 0 + curses.COLS // 3 * 2, NOT_SELECTED_WITHOUT_SEARCH)
|
||
|
||
menus = [menu1, menu2, menu3]
|
||
|
||
|
||
def run_command(command, namespace, api_resource, resource):
|
||
curses.def_shell_mode()
|
||
curses.endwin()
|
||
subprocess.call(eval(command), shell=True)
|
||
curses.reset_shell_mode()
|
||
|
||
|
||
def update_menu3_object():
|
||
menu1_filtered_rows = list(filter(lambda x: (x.startswith(menu1.filter)), menu1.rows)) # фильтруем строки
|
||
menu2_filtered_rows = list(filter(lambda x: (x.startswith(menu2.filter)), menu2.rows)) # фильтруем строки
|
||
if not menu1_filtered_rows or not menu2_filtered_rows:
|
||
resources = [f"No resources matched criteria.", ]
|
||
else:
|
||
namespace = menu1_filtered_rows[menu1.selected_row]
|
||
api_resource = menu2_filtered_rows[menu2.selected_row]
|
||
command = "f'kubectl get {api_resource} -n {namespace} --no-headers -o template=\"{{{{range .items}}}}{{{{.metadata.name}}}} {{{{end}}}}\"'"
|
||
bytes_list = subprocess.check_output(eval(command), shell=True).split()
|
||
resources = [bytes_list[i].decode('utf-8') for i in range(len(bytes_list))]
|
||
if not resources:
|
||
resources = [f"No resources found in {namespace} namespace.", ]
|
||
menu3.rows = resources
|
||
menu3.selected_row = 0
|
||
|
||
|
||
def draw_header(menu):
|
||
if menu.state in [1, 2]:
|
||
menu.win.addstr(1, 2, menu.name, curses.A_REVERSE | curses.A_ITALIC)
|
||
else:
|
||
menu.win.addstr(1, 2, menu.name)
|
||
menu.win.refresh() # обновляем окно
|
||
|
||
|
||
def draw_rows(menu):
|
||
# какие строки сейчас в меню, учитывая фильтр?
|
||
filtered_rows = list(filter(lambda x: (x.startswith(menu.filter)), menu.rows))
|
||
# если строк нет, рисовать их не нужно
|
||
if not filtered_rows:
|
||
return
|
||
for index, row in enumerate(filtered_rows): # рисуем то, что отфильтровали
|
||
menu.win.addstr(index + 3, 2, row)
|
||
# выделяем выбранную строку
|
||
menu.win.addstr(3, 2, filtered_rows[menu.selected_row], curses.A_REVERSE | curses.A_ITALIC)
|
||
menu.win.box()
|
||
menu.win.refresh()
|
||
|
||
|
||
def draw_search_box(menu):
|
||
# рисуем строку поиска
|
||
if menu.state in [SELECTED_WITH_SEARCH, NOT_SELECTED_WITH_SEARCH]:
|
||
content = f"/{menu.filter}"
|
||
else:
|
||
content = "Press / for search"
|
||
menu.win.addstr(curses.LINES - 2, 2, content) # рисуем контент
|
||
menu.win.clrtoeol() # очищаем остальную часть строки
|
||
menu.win.box() # рисуем рамку
|
||
menu.win.refresh() # обновляем окно
|
||
|
||
|
||
def draw_menu(menu):
|
||
menu.win.clear() # очищаем окно меню
|
||
draw_header(menu) # рисуем заголовок
|
||
draw_rows(menu) # рисуем строки меню
|
||
draw_search_box(menu) # рисуем строку поиска
|
||
|
||
|
||
def catch_input(menu):
|
||
global running
|
||
key_pressed = screen.getkey()
|
||
if key_pressed == "/":
|
||
menu.state = SELECTED_WITH_SEARCH
|
||
draw_search_box(menu)
|
||
elif key_pressed == "q" and menu.state == SELECTED_WITHOUT_SEARCH:
|
||
running = False
|
||
elif key_pressed == "\x1b" and menu.state == SELECTED_WITH_SEARCH: # Escape disables search mode
|
||
menu.filter = ""
|
||
menu.state = SELECTED_WITHOUT_SEARCH
|
||
draw_menu(menu)
|
||
elif key_pressed == "KEY_BACKSPACE" and menu.filter:
|
||
menu.filter = menu.filter[:-1] # удаляем символ из строки поиска
|
||
draw_menu(menu)
|
||
elif key_pressed == "KEY_BACKSPACE" and not menu.filter:
|
||
menu.state = SELECTED_WITHOUT_SEARCH
|
||
draw_search_box(menu)
|
||
elif key_pressed == '\t' or key_pressed == "KEY_RIGHT":
|
||
navigate_horizontally("right", menu)
|
||
elif key_pressed == "KEY_BTAB" or key_pressed == "KEY_LEFT":
|
||
navigate_horizontally("left", menu)
|
||
elif key_pressed == "KEY_DOWN":
|
||
navigate_vertically("down", menu)
|
||
elif key_pressed == "KEY_UP":
|
||
navigate_vertically("up", menu)
|
||
elif key_pressed in ["KEY_F(1)", "KEY_F(2)", "KEY_F(3)", "KEY_F(4)"] and menu3.rows and not menu3.rows[menu3.selected_row].startswith("No resources"):
|
||
menu3_filtered_rows = list(filter(lambda x: (x.startswith(menu3.filter)), menu3.rows)) # фильтруем строки меню 3
|
||
if not menu3_filtered_rows:
|
||
return
|
||
menu1_filtered_rows = list(filter(lambda x: (x.startswith(menu1.filter)), menu1.rows)) # фильтруем строки
|
||
menu2_filtered_rows = list(filter(lambda x: (x.startswith(menu2.filter)), menu2.rows)) # фильтруем строки
|
||
namespace = menu1_filtered_rows[menu1.selected_row]
|
||
api_resource = menu2_filtered_rows[menu2.selected_row]
|
||
resource = menu3.rows[menu3.selected_row]
|
||
if key_pressed == "KEY_F(1)": # get
|
||
command = f"f'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml --paging always --style numbers'"
|
||
elif key_pressed == "KEY_F(2)": # describe
|
||
command = f"f'kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml --paging always --style numbers'"
|
||
elif key_pressed == "KEY_F(3)": # edit
|
||
command = f"f'kubectl edit {api_resource} -n {namespace} {resource}'"
|
||
elif key_pressed == "KEY_F(4)" and api_resource != "pods":
|
||
return
|
||
elif key_pressed == "KEY_F(4)" and api_resource == "pods": # logs
|
||
command = f"f'kubectl -n {namespace} logs {resource} | batcat -l log --paging always --style numbers'"
|
||
# raise ValueError(str(menu.state) + ' ' + eval(command))
|
||
run_command(command, namespace, api_resource, resource)
|
||
for menu in menus:
|
||
draw_menu(menu)
|
||
elif (key_pressed.isalpha() or key_pressed == "-") and menu.state == SELECTED_WITH_SEARCH: # объекты в кубе не могут иметь иных символов кроме a-z и -
|
||
menu.filter += key_pressed
|
||
menu.selected_row = 0
|
||
draw_menu(menu)
|
||
if menu != menu3 and not (key_pressed in ["KEY_RIGHT", "KEY_LEFT", "\t", "KEY_BTAB", "/"]):
|
||
update_menu3_object()
|
||
draw_menu(menu3)
|
||
|
||
|
||
def navigate_horizontally(direction, menu):
|
||
increment = {"right": 1, "left": -1}
|
||
# чтобы понять, какой порядковый номер у следующего/предыдущего меню, нужно сперва определить номер текущего меню
|
||
menu_index = {menu1: 0, menu2: 1, menu3: 2}
|
||
next_menu = menus[(menu_index[menu] + increment[direction]) % 3]
|
||
if menu.filter:
|
||
menu.state = NOT_SELECTED_WITH_SEARCH
|
||
else:
|
||
menu.state = NOT_SELECTED_WITHOUT_SEARCH
|
||
if next_menu.filter:
|
||
next_menu.state = SELECTED_WITH_SEARCH
|
||
else:
|
||
next_menu.state = SELECTED_WITHOUT_SEARCH
|
||
draw_header(menu) # убираем выделение с текущего меню
|
||
draw_header(next_menu) # выделяем заголовок следующего меню
|
||
|
||
|
||
def navigate_vertically(direction, menu):
|
||
# какие строки сейчас в меню, учитывая фильтр?
|
||
filtered_rows = list(filter(lambda x: (x.startswith(menu.filter)), menu.rows))
|
||
# если строк нет или строка одна, навигация не нужна
|
||
if not filtered_rows or len(filtered_rows) == 1:
|
||
return
|
||
increment = {"down": 1, "up": -1}
|
||
menu.win.addstr(menu.selected_row + 3, 2, filtered_rows[menu.selected_row]) # удаляем выделение с текущей строки
|
||
menu.selected_row = (menu.selected_row + increment[direction]) % len(filtered_rows) # переходим к предыдущей/следующей строке
|
||
menu.win.addstr(menu.selected_row + 3, 2, filtered_rows[menu.selected_row], curses.A_REVERSE | curses.A_ITALIC) # и выделяем её
|
||
menu.win.refresh()
|
||
|
||
|
||
def main(screen):
|
||
global running
|
||
screen.refresh()
|
||
# рисуем начальный экран
|
||
for menu in menus:
|
||
draw_menu(menu)
|
||
while running:
|
||
### выбрано первое меню ###
|
||
if menu1.state in [1, 2]:
|
||
catch_input(menu1) # перехватываем нажатия клавиш первого меню
|
||
### выбрано второе меню ###
|
||
elif menu2.state in [1, 2]:
|
||
catch_input(menu2) # перехватываем нажатия клавиш второго меню
|
||
### выбрано третье меню ###
|
||
elif menu3.state in [1, 2]:
|
||
catch_input(menu3) # перехватываем нажатия клавиш третьего меню
|
||
|
||
|
||
main(screen)
|
||
# curses.wrapper(main)
|
||
|
||
curses.nocbreak()
|
||
screen.keypad(False)
|
||
curses.echo()
|
||
curses.endwin()
|
||
subprocess.call(["clear"])
|