Add refreshing of third menu
This commit is contained in:
parent
dbfe3ed84e
commit
a1e360a801
40
kls
40
kls
|
@ -1,5 +1,7 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import subprocess, curses
|
import subprocess
|
||||||
|
import curses
|
||||||
|
import asyncio
|
||||||
|
|
||||||
KEY_BINDINGS = { # can be extended
|
KEY_BINDINGS = { # can be extended
|
||||||
"1": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml --paging always --style numbers',
|
"1": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml --paging always --style numbers',
|
||||||
|
@ -9,9 +11,9 @@ KEY_BINDINGS = { # can be extended
|
||||||
"5": 'kubectl -n {namespace} exec -it {resource} sh',
|
"5": 'kubectl -n {namespace} exec -it {resource} sh',
|
||||||
"KEY_DC": 'kubectl -n {namespace} delete {api_resource} {resource}' # KEY_DC is the delete key
|
"KEY_DC": 'kubectl -n {namespace} delete {api_resource} {resource}' # KEY_DC is the delete key
|
||||||
}
|
}
|
||||||
|
# which api resources are on the top of menu?
|
||||||
TOP_API_RESOURCES = ["pods", "services", "ingresses", "secrets", "nodes", "deployments", "statefulsets", "daemonsets",
|
TOP_API_RESOURCES = ["pods", "services", "ingresses", "secrets", "nodes", "deployments", "statefulsets", "daemonsets",
|
||||||
"configmaps", "persistentvolumeclaims", "storageclasses"] # which api resources are on the top of menu?
|
"configmaps", "persistentvolumeclaims", "storageclasses"]
|
||||||
|
|
||||||
HELP_TEXT = "Esc: exit filter mode or exit kls, 1: get yaml, 2: describe, 3: edit, 4: pod logs, arrows/TAB: navigation"
|
HELP_TEXT = "Esc: exit filter mode or exit kls, 1: get yaml, 2: describe, 3: edit, 4: pod logs, arrows/TAB: navigation"
|
||||||
|
|
||||||
|
@ -29,7 +31,8 @@ class Menu:
|
||||||
self.__start_index = lambda: 0 if self.filtered_row_index < HEIGHT else self.filtered_row_index - HEIGHT + 1
|
self.__start_index = lambda: 0 if self.filtered_row_index < HEIGHT else self.filtered_row_index - HEIGHT + 1
|
||||||
self.visible_rows = lambda: self.filtered_rows()[self.__start_index():][:HEIGHT] # visible rows
|
self.visible_rows = lambda: self.filtered_rows()[self.__start_index():][:HEIGHT] # visible rows
|
||||||
self.__visible_row_index = lambda: self.filtered_row_index - self.__start_index() # index of the selected visible row
|
self.__visible_row_index = lambda: self.filtered_row_index - self.__start_index() # index of the selected visible row
|
||||||
self.selected_row = lambda: self.visible_rows()[self.__visible_row_index()] if self.visible_rows() else None # selected row from visible rows
|
# selected row from visible rows
|
||||||
|
self.selected_row = lambda: self.visible_rows()[self.__visible_row_index()] if self.visible_rows() else None
|
||||||
self.win = curses.newwin(curses.LINES - 3, width, 0, begin_x)
|
self.win = curses.newwin(curses.LINES - 3, width, 0, begin_x)
|
||||||
|
|
||||||
|
|
||||||
|
@ -51,15 +54,6 @@ def draw_menu(menu: Menu):
|
||||||
draw_row(menu.win, f"/{menu.filter}" if menu.filter else "", curses.LINES - 5, 2) # draw filter row
|
draw_row(menu.win, f"/{menu.filter}" if menu.filter else "", curses.LINES - 5, 2) # draw filter row
|
||||||
|
|
||||||
|
|
||||||
def update_menu3():
|
|
||||||
MENUS[2].rows = []
|
|
||||||
if namespace() and api_resource():
|
|
||||||
command = f"-n {namespace()} get {api_resource()} --no-headers"
|
|
||||||
MENUS[2].rows = kubectl(command)
|
|
||||||
MENUS[2].filtered_row_index = 0 # reset the selected row index before redrawing
|
|
||||||
draw_menu(MENUS[2])
|
|
||||||
|
|
||||||
|
|
||||||
def run_command(key: str):
|
def run_command(key: str):
|
||||||
if not (key == "4" and api_resource() != "pods") and not (key == "5" and api_resource() != "pods"):
|
if not (key == "4" and api_resource() != "pods") and not (key == "5" and api_resource() != "pods"):
|
||||||
curses.def_prog_mode() # save the previous terminal state
|
curses.def_prog_mode() # save the previous terminal state
|
||||||
|
@ -82,11 +76,21 @@ def handle_filter_state(key: str, menu: Menu):
|
||||||
menu.filtered_row_index = 0
|
menu.filtered_row_index = 0
|
||||||
draw_menu(menu)
|
draw_menu(menu)
|
||||||
if menu != MENUS[2]:
|
if menu != MENUS[2]:
|
||||||
update_menu3() # redraw the third menu rows if we redraw the first or second menu rows
|
MENUS[2].filtered_row_index = 0 # reset the selected row index of third menu before redrawing
|
||||||
|
|
||||||
|
|
||||||
def catch_input(menu: Menu):
|
async def catch_input(menu: Menu):
|
||||||
|
while True: # this while loop is needed for constant refreshing of third menu containing all resources
|
||||||
|
try:
|
||||||
key = SCREEN.getkey()
|
key = SCREEN.getkey()
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
MENUS[2].rows = []
|
||||||
|
if namespace() and api_resource():
|
||||||
|
command = f"-n {namespace()} get {api_resource()} --no-headers"
|
||||||
|
MENUS[2].rows = kubectl(command)
|
||||||
|
draw_menu(MENUS[2])
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]:
|
if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]:
|
||||||
increment = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
|
increment = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
|
||||||
next_menu = MENUS[(MENUS.index(menu) + increment) % 3]
|
next_menu = MENUS[(MENUS.index(menu) + increment) % 3]
|
||||||
|
@ -99,7 +103,7 @@ def catch_input(menu: Menu):
|
||||||
menu.filtered_row_index = (menu.filtered_row_index + increment) % len(menu.filtered_rows())
|
menu.filtered_row_index = (menu.filtered_row_index + increment) % len(menu.filtered_rows())
|
||||||
draw_rows(menu.visible_rows(), menu) # redraw the menu
|
draw_rows(menu.visible_rows(), menu) # redraw the menu
|
||||||
if menu != MENUS[2]:
|
if menu != MENUS[2]:
|
||||||
update_menu3() # redraw the third menu rows if we redraw the first or second menu rows
|
MENUS[2].filtered_row_index = 0 # reset the selected row index of third menu before redrawing
|
||||||
elif key in KEY_BINDINGS.keys() and MENUS[2].selected_row():
|
elif key in KEY_BINDINGS.keys() and MENUS[2].selected_row():
|
||||||
run_command(key)
|
run_command(key)
|
||||||
elif key == "\x1b" and not menu.filter:
|
elif key == "\x1b" and not menu.filter:
|
||||||
|
@ -128,6 +132,7 @@ resource = lambda: MENUS[2].selected_row().split()[0]
|
||||||
|
|
||||||
def main(screen):
|
def main(screen):
|
||||||
SCREEN.refresh() # I don't know why this is needed but it doesn't work without it
|
SCREEN.refresh() # I don't know why this is needed but it doesn't work without it
|
||||||
|
SCREEN.nodelay(True) # don't wait for input
|
||||||
SCREEN.keypad(True) # needed for arrow keys
|
SCREEN.keypad(True) # needed for arrow keys
|
||||||
curses.set_escdelay(1) # reduce Escape delay to 1 ms (curses can't set it to 0)
|
curses.set_escdelay(1) # reduce Escape delay to 1 ms (curses can't set it to 0)
|
||||||
curses.curs_set(0) # make the cursor invisible
|
curses.curs_set(0) # make the cursor invisible
|
||||||
|
@ -135,10 +140,9 @@ def main(screen):
|
||||||
curses.noecho() # don't output characters at the top
|
curses.noecho() # don't output characters at the top
|
||||||
draw_menu(MENUS[0]) # draw the main windows
|
draw_menu(MENUS[0]) # draw the main windows
|
||||||
draw_menu(MENUS[1])
|
draw_menu(MENUS[1])
|
||||||
update_menu3()
|
|
||||||
draw_row(curses.newwin(3, curses.COLS, curses.LINES - 3, 0), HELP_TEXT, 1, 2) # and the help window
|
draw_row(curses.newwin(3, curses.COLS, curses.LINES - 3, 0), HELP_TEXT, 1, 2) # and the help window
|
||||||
while SELECTED_MENU:
|
while SELECTED_MENU:
|
||||||
catch_input(SELECTED_MENU) # if a menu is selected, catch user input
|
asyncio.run(catch_input(SELECTED_MENU)) # if a menu is selected, catch user input
|
||||||
|
|
||||||
|
|
||||||
curses.wrapper(main)
|
curses.wrapper(main)
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
import curses
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
|
||||||
|
def inactive_function():
|
||||||
|
# This function will be executed while the screen is inactive
|
||||||
|
print("Screen is inactive, doing some work...")
|
||||||
|
# Do some work here, e.g., update a database, send a notification, etc.
|
||||||
|
time.sleep(5) # simulate some work being done
|
||||||
|
print("Work done!")
|
||||||
|
|
||||||
|
def main(screen):
|
||||||
|
screen.nodelay(True) # don't wait for input
|
||||||
|
threading.Thread(target=inactive_function).start() # start the inactive function in a separate thread
|
||||||
|
while True:
|
||||||
|
screen.clear()
|
||||||
|
screen.addstr(0, 0, f"{time.time()}")
|
||||||
|
time.sleep(1)
|
||||||
|
screen.refresh()
|
||||||
|
if screen.getch() != -1: # if there's input, exit
|
||||||
|
break
|
||||||
|
|
||||||
|
curses.wrapper(main)
|
Loading…
Reference in New Issue