Make it async

This commit is contained in:
Digital Studium 2024-12-17 21:52:47 +03:00
parent 7b74528cca
commit dfd8a2edf6
1 changed files with 58 additions and 41 deletions

99
kls
View File

@ -1,6 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import subprocess, curses, time import subprocess
import curses
import curses.ascii import curses.ascii
import asyncio
KEY_BINDINGS = { # can be extended KEY_BINDINGS = { # can be extended
"^Y": { "^Y": {
@ -36,7 +38,8 @@ KEY_BINDINGS = { # can be extended
BATCAT_STYLE = " --paging always --style numbers" BATCAT_STYLE = " --paging always --style numbers"
# which api resources are on the top of menu? # which api resources are on the top of menu?
TOP_API_RESOURCES = ["pods", "services", "configmaps", "secrets", "persistentvolumeclaims", "ingresses", "nodes", TOP_API_RESOURCES = ["pods", "services", "configmaps", "secrets", "persistentvolumeclaims", "ingresses", "nodes",
"deployments", "statefulsets", "daemonsets", "storageclasses"] "deployments", "statefulsets", "daemonsets", "storageclasses", "serviceentries",
"destinationrules", "virtualservices", "gateways"]
# Dynamically generate HELP_TEXT based on KEY_BINDINGS descriptions # Dynamically generate HELP_TEXT based on KEY_BINDINGS descriptions
HELP_TEXT = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items()) HELP_TEXT = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items())
HELP_TEXT += ", /: filter mode, Esc: exit filter mode or exit kls, arrows/TAB/PgUp/PgDn: navigation" HELP_TEXT += ", /: filter mode, Esc: exit filter mode or exit kls, arrows/TAB/PgUp/PgDn: navigation"
@ -47,6 +50,7 @@ HEADER_HEIGHT = 4 # in rows
FOOTER_HEIGHT = 3 FOOTER_HEIGHT = 3
ROWS_HEIGHT = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3 # maximum number of visible rows indices ROWS_HEIGHT = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3 # maximum number of visible rows indices
WIDTH = curses.COLS WIDTH = curses.COLS
QUERY_API_RESOURCES = False
# classes # classes
@ -100,12 +104,12 @@ def draw_menu(menu: Menu):
2) # draw filter row 2) # draw filter row
def refresh_third_menu(namespace, api_resource): async def refresh_third_menu(namespace, api_resource):
menu = menus[2] menu = menus[2]
previous_menu_rows = menu.rows previous_menu_rows = menu.rows
if api_resource and namespace: if api_resource and namespace:
try: try:
menu.rows = kubectl(f"-n {namespace} get {api_resource} --no-headers --ignore-not-found") menu.rows = await kubectl_async(f"-n {namespace} get {api_resource} --no-headers --ignore-not-found")
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
menu.rows = [] # Fallback to an empty list if the command fails menu.rows = [] # Fallback to an empty list if the command fails
else: else:
@ -119,7 +123,7 @@ def refresh_third_menu(namespace, api_resource):
draw_menu(menu) draw_menu(menu)
def handle_key_bindings(key: str, namespace: str, api_resource: str, resource: str): async def handle_key_bindings(key: str, namespace: str, api_resource: str, resource: str):
if not resource: if not resource:
return return
if key in ("l", "x", "n") and api_resource != "pods" and not resource.startswith("pod/"): if key in ("l", "x", "n") and api_resource != "pods" and not resource.startswith("pod/"):
@ -129,13 +133,13 @@ def handle_key_bindings(key: str, namespace: str, api_resource: str, resource: s
command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource) command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource)
if "batcat" in command: if "batcat" in command:
command += BATCAT_STYLE command += BATCAT_STYLE
subprocess.call(command, shell=True) await subprocess_call_async(command)
curses.reset_prog_mode() # restore the previous terminal state curses.reset_prog_mode() # restore the previous terminal state
SCREEN.refresh() SCREEN.refresh()
enable_mouse_support() enable_mouse_support()
def handle_filter_state(key: str, menu: Menu): async def handle_filter_state(key: str, menu: Menu):
if key == "/" and not menu.filter_mode: # Enter filter mode if key == "/" and not menu.filter_mode: # Enter filter mode
menu.filter_mode = True menu.filter_mode = True
menu.filter = "" menu.filter = ""
@ -157,7 +161,7 @@ def handle_filter_state(key: str, menu: Menu):
menus[2].visible_row_index = 0 # reset the visible row index of third menu before redrawing menus[2].visible_row_index = 0 # reset the visible row index of third menu before redrawing
def handle_mouse(menu: Menu): async def handle_mouse(menu: Menu):
if not MOUSE_ENABLED: if not MOUSE_ENABLED:
return return
try: try:
@ -192,7 +196,7 @@ def handle_mouse(menu: Menu):
menus[2].visible_row_index = 0 # reset the selected row index of third menu before redrawing menus[2].visible_row_index = 0 # reset the selected row index of third menu before redrawing
def handle_vertical_navigation(key: str, menu: Menu): async def handle_vertical_navigation(key: str, menu: Menu):
if len(menu.visible_rows()) <= 1: if len(menu.visible_rows()) <= 1:
return return
keys_numbers = {"KEY_DOWN": 1, "KEY_UP": -1, "KEY_NPAGE": 1, "KEY_PPAGE": -1, 'KEY_HOME': 0, 'KEY_END': -1} keys_numbers = {"KEY_DOWN": 1, "KEY_UP": -1, "KEY_NPAGE": 1, "KEY_PPAGE": -1, 'KEY_HOME': 0, 'KEY_END': -1}
@ -211,7 +215,7 @@ def handle_vertical_navigation(key: str, menu: Menu):
menus[2].visible_row_index = 0 menus[2].visible_row_index = 0
def handle_horizontal_navigation(key: str, menu: Menu): async def handle_horizontal_navigation(key: str, menu: Menu):
increment = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key] increment = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
next_menu = menus[(menus.index(menu) + increment) % 3] next_menu = menus[(menus.index(menu) + increment) % 3]
draw_row(menu.win, menu.title, 1, 2, selected=False) # remove selection from the current menu title draw_row(menu.win, menu.title, 1, 2, selected=False) # remove selection from the current menu title
@ -219,7 +223,7 @@ def handle_horizontal_navigation(key: str, menu: Menu):
globals().update(selected_menu=next_menu) globals().update(selected_menu=next_menu)
def confirm_action(message: str) -> bool: async def confirm_action(message: str) -> bool:
"""Display a confirmation popup and return True if the user confirms.""" """Display a confirmation popup and return True if the user confirms."""
rows, cols = SCREEN.getmaxyx() # Get screen size rows, cols = SCREEN.getmaxyx() # Get screen size
popup_height = 5 popup_height = 5
@ -227,48 +231,59 @@ def confirm_action(message: str) -> bool:
start_y = (rows - popup_height) // 2 start_y = (rows - popup_height) // 2
start_x = (cols - popup_width) // 2 start_x = (cols - popup_width) // 2
# Create a new window for the popup
popup = curses.newwin(popup_height, popup_width, start_y, start_x) popup = curses.newwin(popup_height, popup_width, start_y, start_x)
popup.box() # Draw a border around the popup popup.box() # Draw a border around the popup
popup.addstr(2, 2, message) # Display the message popup.addstr(2, 2, message) # Display the message
popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel") popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel")
# Refresh the popup window and wait for input
popup.refresh() popup.refresh()
while True: while True:
key = popup.getkey() # Wait for a key press key = await get_key_async(popup)
if key.lower() == 'y': if key.lower() == 'y':
return True return True
elif key.lower() == 'n': elif key.lower() == 'n':
return False return False
async def get_key_async(popup: curses.window) -> str:
return await asyncio.to_thread(popup.getkey)
def catch_input(menu: Menu):
async def catch_input(menu: Menu):
while True: # refresh third menu until key pressed while True: # refresh third menu until key pressed
try: try:
key = SCREEN.getkey() key = await get_key_async(SCREEN)
break break
except curses.error: except curses.error:
refresh_third_menu(namespace(), api_resource()) await refresh_third_menu(namespace(), api_resource())
time.sleep(0.1) await asyncio.sleep(0.1)
if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]: if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]:
handle_horizontal_navigation(key, menu) await handle_horizontal_navigation(key, menu)
elif key in ["KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END"]: elif key in ["KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END"]:
handle_vertical_navigation(key, menu) await handle_vertical_navigation(key, menu)
elif key == "KEY_MOUSE": elif key == "KEY_MOUSE":
handle_mouse(menu) await handle_mouse(menu)
elif key == "KEY_DC" and confirm_action("Are you sure you want to delete this resource?"): elif key == "KEY_DC" and await confirm_action("Are you sure you want to delete this resource?"):
handle_key_bindings(key, namespace(), api_resource(), resource()) await handle_key_bindings(key, namespace(), api_resource(), resource())
elif key != "KEY_DC" and curses.ascii.unctrl(key) in KEY_BINDINGS.keys(): elif key != "KEY_DC" and curses.ascii.unctrl(key) in KEY_BINDINGS.keys():
handle_key_bindings(curses.ascii.unctrl(key), namespace(), api_resource(), resource()) await handle_key_bindings(curses.ascii.unctrl(key), namespace(), api_resource(), resource())
elif key in ["/", "\x1b", "KEY_BACKSPACE", elif key in ["/", "\x1b", "KEY_BACKSPACE", "\x08"] or key.isalnum() or key == "-":
"\x08"] or key.isalnum() or key == "-": # \x1b - escape, \x08 - backspace await handle_filter_state(key, menu)
handle_filter_state(key, menu)
def kubectl(command: str) -> list: async def kubectl_async(command: str) -> list:
return subprocess.check_output(f"kubectl {command} 2> /dev/null", shell=True).decode().strip().split("\n") result = await asyncio.create_subprocess_shell(
f"kubectl {command} 2> /dev/null", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if stderr:
raise subprocess.CalledProcessError(result.returncode, command, stderr=stderr)
return stdout.decode().strip().split("\n")
async def subprocess_call_async(command: str):
process = await asyncio.create_subprocess_shell(command)
await process.communicate()
def enable_mouse_support(): def enable_mouse_support():
@ -277,17 +292,16 @@ def enable_mouse_support():
print('\033[?1003h') # enable mouse tracking with the XTERM API. That's the magic print('\033[?1003h') # enable mouse tracking with the XTERM API. That's the magic
def init_menus(): async def init_menus():
global menus, selected_menu, namespace, api_resource, resource global menus, selected_menu, namespace, api_resource, resource
api_resources_kubectl = [x.split()[0] for x in kubectl("api-resources --no-headers --verbs=get")] api_resources_kubectl = [x.split()[0] for x in await kubectl_async("api-resources --no-headers --verbs=get")]
api_resources = list( api_resources = list(dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl)) if QUERY_API_RESOURCES else TOP_API_RESOURCES
dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl)) # so top api resources are at the top
width_unit = WIDTH // 8 width_unit = WIDTH // 8
try: try:
namespaces = kubectl("get ns --no-headers -o custom-columns=NAME:.metadata.name") namespaces = await kubectl_async("get ns --no-headers -o custom-columns=NAME:.metadata.name")
except: except:
namespaces = kubectl( namespaces = await kubectl_async(
"config view --minify --output 'jsonpath={..namespace}'") # if it is forbidden to list all namespaces "config view --minify --output 'jsonpath={..namespace}'")
menus = [Menu("Namespaces", namespaces, 0, width_unit, ROWS_HEIGHT), menus = [Menu("Namespaces", namespaces, 0, width_unit, ROWS_HEIGHT),
Menu("API resources", api_resources, width_unit, width_unit * 2, ROWS_HEIGHT), Menu("API resources", api_resources, width_unit, width_unit * 2, ROWS_HEIGHT),
Menu("Resources", [], width_unit * 3, WIDTH - width_unit * 3, ROWS_HEIGHT)] Menu("Resources", [], width_unit * 3, WIDTH - width_unit * 3, ROWS_HEIGHT)]
@ -305,13 +319,16 @@ def init_menus():
enable_mouse_support() enable_mouse_support()
def main(screen): async def main_async(screen):
init_menus() await init_menus()
for menu in menus: # draw the main windows for menu in menus:
draw_menu(menu) draw_menu(menu)
draw_row(curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), HELP_TEXT, 1, 2) # and the help window draw_row(curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), HELP_TEXT, 1, 2)
while selected_menu: while selected_menu:
catch_input(selected_menu) # if a menu is selected, catch user input await catch_input(selected_menu)
def main(screen):
asyncio.run(main_async(screen))
if __name__ == "__main__": if __name__ == "__main__":