Add new key bindings and types annotations
This commit is contained in:
parent
987c24756b
commit
f1be42ed02
18
README.md
18
README.md
|
@ -1,15 +1,19 @@
|
|||
# KLS
|
||||
|
||||
## Description
|
||||
|
||||
`kls` is a cli tool based on `kubectl` for managing kubernetes cluster resources.
|
||||
Inspired by `lf` and `ranger` file managers, written in python.
|
||||
|
||||
It is lightweight (~400 lines of code) and easy to customize.
|
||||
Supports keyboard navigation and mouse navigation could be enabled (set MOUSE_ENABLED=True in a line #54).
|
||||
Supports keyboard navigation and mouse navigation could be enabled (set MOUSE_ENABLED=True in a line #64).
|
||||
|
||||
## Key bindings
|
||||
|
||||
### For kubectl
|
||||
You can customize these bindings or add extra bindings in `KEY_BINDINGS` variable of `kls` in a line #10:
|
||||
|
||||
You can customize these bindings or add extra bindings in `KEY_BINDINGS` variable of `kls` in a line #11:
|
||||
|
||||
- `Ctrl+y` - get yaml of resource
|
||||
- `Ctrl+d` - describe resource
|
||||
- `Ctrl+e` - edit resource
|
||||
|
@ -17,8 +21,13 @@ You can customize these bindings or add extra bindings in `KEY_BINDINGS` variabl
|
|||
- `Ctrl+x` - exec into pod
|
||||
- `Ctrl+n` - network debug of pod (with nicolaka/netshoot container attached)
|
||||
- `delete` - delete resource
|
||||
- `Ctrl+a` - access logs of istio sidecar
|
||||
- `Ctrl+p` - exec into istio sidecar
|
||||
- `Ctrl+r` - reveal base64 secret values
|
||||
- `Ctrl+x` - exec into pod
|
||||
|
||||
### Other:
|
||||
|
||||
- `/` - enter filter mode
|
||||
- `Escape` - exit filter mode or `kls` itself
|
||||
- `Backspace` - remove letter from filter
|
||||
|
@ -27,6 +36,7 @@ You can customize these bindings or add extra bindings in `KEY_BINDINGS` variabl
|
|||
![kls in action](./images/kls.gif)
|
||||
|
||||
## Dependencies
|
||||
|
||||
- `python3`
|
||||
- `kubectl`
|
||||
- `bat` - yaml viewer
|
||||
|
@ -34,11 +44,15 @@ You can customize these bindings or add extra bindings in `KEY_BINDINGS` variabl
|
|||
- `yq` - yaml manipulation
|
||||
|
||||
## Installation
|
||||
|
||||
Install `batcat`:
|
||||
|
||||
```
|
||||
sudo apt install bat lnav yq -y
|
||||
```
|
||||
|
||||
Download and install the latest `kls`:
|
||||
|
||||
```
|
||||
curl -O "https://git.digitalstudium.com/digitalstudium/kls/raw/branch/main/kls" && sudo install ./kls /usr/local/bin/ && rm -f ./kls
|
||||
```
|
||||
|
|
289
kls
289
kls
|
@ -1,4 +1,5 @@
|
|||
#!/usr/bin/env python3
|
||||
from typing import Optional, Callable
|
||||
import subprocess
|
||||
import curses
|
||||
import curses.ascii
|
||||
|
@ -7,7 +8,7 @@ import asyncio
|
|||
# ****************************** #
|
||||
# START OF CONFIGURATION SECTION #
|
||||
# ****************************** #
|
||||
KEY_BINDINGS = { # can be extended
|
||||
KEY_BINDINGS: dict[str, dict[str, str]] = { # can be extended
|
||||
"^Y": { # Ctrl + y
|
||||
"description": "view YAML",
|
||||
"command": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml'
|
||||
|
@ -36,6 +37,10 @@ KEY_BINDINGS = { # can be extended
|
|||
"description": "delete",
|
||||
"command": 'kubectl -n {namespace} delete {api_resource} {resource}'
|
||||
},
|
||||
"^A": { # Ctrl + a (a means access! :-))
|
||||
"description": "istio-proxy access logs",
|
||||
"command": 'kubectl -n {namespace} logs {resource} -c istio-proxy | lnav'
|
||||
},
|
||||
"^P": { # Ctrl + p (p means proxy! :-))
|
||||
"description": "exec istio-proxy",
|
||||
"command": 'kubectl -n {namespace} exec -it {resource} -c istio-proxy bash'
|
||||
|
@ -45,81 +50,92 @@ KEY_BINDINGS = { # can be extended
|
|||
"command": "kubectl get secret {resource} -n {namespace} -o yaml | yq '.data |= with_entries(.value |= @base64d)' -y | batcat -l yaml"
|
||||
}
|
||||
}
|
||||
|
||||
# which api resources are on the top of menu?
|
||||
TOP_API_RESOURCES = ["pods", "services", "configmaps", "secrets", "persistentvolumeclaims", "ingresses", "nodes",
|
||||
"deployments", "statefulsets", "daemonsets", "storageclasses", "serviceentries",
|
||||
"destinationrules", "virtualservices", "gateways", "telemetry"]
|
||||
QUERY_API_RESOURCES = False # Should we merge TOP_API_RESOURCES with all other api resources from cluster?
|
||||
BATCAT_STYLE = " --paging always --style numbers" # style of batcat
|
||||
MOUSE_ENABLED = False
|
||||
TOP_API_RESOURCES: list[str] = [
|
||||
"pods", "services", "configmaps", "secrets", "persistentvolumeclaims",
|
||||
"ingresses", "nodes", "deployments", "statefulsets", "daemonsets",
|
||||
"storageclasses", "serviceentries", "destinationrules",
|
||||
"virtualservices", "gateways", "telemetry"
|
||||
]
|
||||
|
||||
QUERY_API_RESOURCES: bool = False # Should we merge TOP_API_RESOURCES with all other api resources from cluster?
|
||||
BATCAT_STYLE: str = " --paging always --style numbers" # style of batcat
|
||||
MOUSE_ENABLED: bool = False
|
||||
|
||||
|
||||
# **************************** #
|
||||
# END OF CONFIGURATION SECTION #
|
||||
# **************************** #
|
||||
|
||||
# Dynamically generate HELP_TEXT based on KEY_BINDINGS descriptions
|
||||
HELP_TEXT = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items())
|
||||
HELP_TEXT += ", /: filter mode, Esc: exit filter mode or exit kls, arrows/TAB/PgUp/PgDn: navigation"
|
||||
SCREEN = curses.initscr() # screen initialization, needed for ROWS_HEIGHT working
|
||||
HEADER_HEIGHT = 4 # in rows
|
||||
FOOTER_HEIGHT = 3
|
||||
ROWS_HEIGHT = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3 # maximum number of visible rows indices
|
||||
WIDTH = curses.COLS
|
||||
THIRD_MENU_LOCK = asyncio.Lock()
|
||||
THIRD_MENU_TASK = None # It needs to be global because we should have ability to cancel it from anywhere
|
||||
|
||||
|
||||
# classes
|
||||
class CircularList:
|
||||
def __init__(self, elements):
|
||||
self.elements = elements
|
||||
self.size = len(elements)
|
||||
self.index = 0
|
||||
def __init__(self, elements: list[str]):
|
||||
self.elements: list[str] = elements
|
||||
self.size: int = len(elements)
|
||||
self.index: int = 0
|
||||
|
||||
def __getitem__(self, index):
|
||||
def __getitem__(self, index: slice) -> list[str]:
|
||||
start, stop, step = index.indices(self.size)
|
||||
return [self.elements[(self.index + i) % self.size] for i in range(start, stop, step)]
|
||||
|
||||
def shift(self, steps):
|
||||
def shift(self, steps: int) -> None:
|
||||
self.index = (self.index + steps) % self.size
|
||||
|
||||
|
||||
class Menu:
|
||||
def __init__(self, title: str, rows: list, begin_x: int, width: int, rows_height: int):
|
||||
self.title = title
|
||||
self.rows = rows # all rows
|
||||
self.filter = "" # filter for rows
|
||||
self.filter_mode = False # Tracks whether filter mode is active
|
||||
self.filtered_rows = CircularList([x for x in self.rows if self.filter in x]) # filtered rows
|
||||
self.visible_rows = lambda: self.filtered_rows[:rows_height] # visible rows
|
||||
self.visible_row_index = 0 # index of the selected visible row
|
||||
self.selected_row = lambda: self.visible_rows()[self.visible_row_index] if self.visible_rows() else None
|
||||
self.rows_height = rows_height
|
||||
self.width = width
|
||||
self.begin_x = begin_x
|
||||
self.win = curses.newwin(curses.LINES - FOOTER_HEIGHT, width, 0, begin_x)
|
||||
def __init__(self, title: str, rows: list[str], begin_x: int, width: int, rows_height: int):
|
||||
self.title: str = title
|
||||
self.rows: list[str] = rows
|
||||
self.filter: str = ""
|
||||
self.filter_mode: bool = False
|
||||
self.filtered_rows: Circularlist[str] = CircularList([x for x in self.rows if self.filter in x])
|
||||
self.visible_rows: Callable[[], list[str]] = lambda: self.filtered_rows[:rows_height]
|
||||
self.visible_row_index: int = 0
|
||||
self.selected_row: Callable[[], Optional[str]] = lambda: self.visible_rows()[
|
||||
self.visible_row_index] if self.visible_rows() else None
|
||||
self.rows_height: int = rows_height
|
||||
self.width: int = width
|
||||
self.begin_x: int = begin_x
|
||||
self.win: curses.window = curses.newwin(curses.LINES - FOOTER_HEIGHT, width, 0, begin_x)
|
||||
|
||||
|
||||
# helper functions
|
||||
def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = False):
|
||||
# Generate HELP_TEXT from KEY_BINDINGS
|
||||
HELP_TEXT: str = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items())
|
||||
HELP_TEXT += ", /: filter mode, Esc: exit filter mode or exit kls, arrows/TAB/PgUp/PgDn: navigation"
|
||||
|
||||
# Global variables
|
||||
SCREEN: curses.window = curses.initscr()
|
||||
HEADER_HEIGHT: int = 4
|
||||
FOOTER_HEIGHT: int = 3
|
||||
ROWS_HEIGHT: int = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3
|
||||
WIDTH: int = curses.COLS
|
||||
THIRD_MENU_LOCK: asyncio.Lock = asyncio.Lock()
|
||||
THIRD_MENU_TASK: Optional[asyncio.Task] = None
|
||||
menus: list[Menu] = []
|
||||
selected_menu: Optional[Menu] = None
|
||||
|
||||
|
||||
def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = False) -> None:
|
||||
window.addstr(y, x, text, curses.A_REVERSE | curses.A_BOLD if selected else curses.A_NORMAL)
|
||||
window.clrtoeol()
|
||||
window.refresh()
|
||||
|
||||
|
||||
def draw_rows(menu: Menu):
|
||||
def draw_rows(menu: Menu) -> None:
|
||||
for index, row in enumerate(menu.visible_rows()):
|
||||
draw_row(menu.win, row, index + HEADER_HEIGHT, 2, selected=True if row == menu.selected_row() else False)
|
||||
|
||||
|
||||
def draw_menu(menu: Menu):
|
||||
menu.win.erase() # clear menu window
|
||||
draw_row(menu.win, menu.title, 1, 2, selected=True if menu == selected_menu else False) # draw title
|
||||
draw_rows(menu) # draw menu rows
|
||||
draw_row(menu.win, f"/{menu.filter}" if menu.filter_mode else "", curses.LINES - FOOTER_HEIGHT - 2,
|
||||
2) # draw filter row
|
||||
def draw_menu(menu: Menu) -> None:
|
||||
menu.win.erase()
|
||||
draw_row(menu.win, menu.title, 1, 2, selected=True if menu == selected_menu else False)
|
||||
draw_rows(menu)
|
||||
draw_row(menu.win, f"/{menu.filter}" if menu.filter_mode else "", curses.LINES - FOOTER_HEIGHT - 2, 2)
|
||||
|
||||
|
||||
async def refresh_third_menu(namespace, api_resource):
|
||||
async def refresh_third_menu(namespace: Optional[str], api_resource: Optional[str]) -> None:
|
||||
global THIRD_MENU_TASK
|
||||
try:
|
||||
async with THIRD_MENU_LOCK:
|
||||
menu = menus[2]
|
||||
|
@ -129,11 +145,11 @@ async def refresh_third_menu(namespace, api_resource):
|
|||
menu.rows = await kubectl_async(
|
||||
f"-n {namespace} get {api_resource} --no-headers --ignore-not-found")
|
||||
except subprocess.CalledProcessError:
|
||||
menu.rows = [] # Fallback to an empty list if the command fails
|
||||
menu.rows = []
|
||||
else:
|
||||
menu.rows = []
|
||||
index_before_update = menu.filtered_rows.index
|
||||
menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x]) # update filtered rows
|
||||
menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x])
|
||||
menu.filtered_rows.index = index_before_update
|
||||
if menu.visible_row_index >= len(menu.visible_rows()):
|
||||
menu.visible_row_index = 0
|
||||
|
@ -143,7 +159,7 @@ async def refresh_third_menu(namespace, api_resource):
|
|||
raise
|
||||
|
||||
|
||||
async def handle_key_bindings(key: str, namespace: str, api_resource: str, resource: str):
|
||||
async def handle_key_bindings(key: str, namespace: str, api_resource: str, resource: str) -> None:
|
||||
if not resource:
|
||||
return
|
||||
if key in ("l", "x", "n") and api_resource != "pods":
|
||||
|
@ -157,49 +173,50 @@ async def handle_key_bindings(key: str, namespace: str, api_resource: str, resou
|
|||
except asyncio.CancelledError:
|
||||
pass
|
||||
async with THIRD_MENU_LOCK:
|
||||
curses.def_prog_mode() # save the previous terminal state
|
||||
curses.endwin() # without this, there are problems after exiting vim
|
||||
curses.def_prog_mode()
|
||||
curses.endwin()
|
||||
command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource)
|
||||
if "batcat" in command:
|
||||
command += BATCAT_STYLE
|
||||
await subprocess_call_async(command)
|
||||
curses.reset_prog_mode() # restore the previous terminal state
|
||||
curses.reset_prog_mode()
|
||||
SCREEN.refresh()
|
||||
enable_mouse_support()
|
||||
|
||||
|
||||
async def handle_filter_state(key: str, menu: Menu):
|
||||
if key == "/" and not menu.filter_mode: # Enter filter mode
|
||||
def handle_filter_state(key: str, menu: Menu) -> None:
|
||||
global selected_menu
|
||||
if key == "/" and not menu.filter_mode:
|
||||
menu.filter_mode = True
|
||||
menu.filter = ""
|
||||
elif key == "\x1b": # Escape key
|
||||
if menu.filter_mode: # Exit filter mode
|
||||
if menu.filter_mode:
|
||||
menu.filter_mode = False
|
||||
menu.filter = ""
|
||||
else:
|
||||
globals().update(selected_menu=None) # Exit program
|
||||
elif menu.filter_mode: # Only process filter input in filter mode
|
||||
selected_menu = None
|
||||
elif menu.filter_mode:
|
||||
if key in ["KEY_BACKSPACE", "\x08"] and menu.filter:
|
||||
menu.filter = menu.filter[:-1] # Remove last character
|
||||
elif key.isalnum() or key == "-": # Allow letters, numbers, and dashes
|
||||
menu.filter += key.lower()
|
||||
menu.visible_row_index = 0
|
||||
menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x]) # update filtered rows
|
||||
menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x])
|
||||
draw_menu(menu)
|
||||
if menu != menus[2]:
|
||||
menus[2].visible_row_index = 0 # reset the visible row index of third menu before redrawing
|
||||
menus[2].visible_row_index = 0
|
||||
|
||||
|
||||
async def handle_mouse(menu: Menu):
|
||||
def handle_mouse(menu: Menu) -> None:
|
||||
if not MOUSE_ENABLED:
|
||||
return
|
||||
try:
|
||||
mouse_info = curses.getmouse()
|
||||
mouse_info: tuple[int, ...] = curses.getmouse()
|
||||
except curses.error: # this fixes scrolling error
|
||||
return
|
||||
row_number = mouse_info[2] - HEADER_HEIGHT
|
||||
column_number = mouse_info[1]
|
||||
next_menu = None
|
||||
next_menu: Optional[Menu] = None
|
||||
if column_number > (menu.begin_x + menu.width):
|
||||
next_menu = menus[(menus.index(menu) + 1) % 3]
|
||||
if column_number > (next_menu.begin_x + next_menu.width):
|
||||
|
@ -211,30 +228,33 @@ async def handle_mouse(menu: Menu):
|
|||
next_menu = menus[(menus.index(next_menu) - 1) % 3]
|
||||
globals().update(selected_menu=next_menu)
|
||||
if next_menu:
|
||||
draw_row(menu.win, menu.title, 1, 2, selected=False) # remove selection from the current menu title
|
||||
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) # and select the new menu title
|
||||
draw_row(menu.win, menu.title, 1, 2, selected=False)
|
||||
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)
|
||||
menu = next_menu
|
||||
char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1) # get char from current mouse position
|
||||
char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1)
|
||||
char_str = chr(char_int & 0xFF)
|
||||
if not char_str or ord(char_str) > 127 or ' ' in char_str:
|
||||
return
|
||||
if 0 <= row_number < len(menu.visible_rows()):
|
||||
menu.visible_row_index = row_number
|
||||
draw_rows(menu) # this will change selected row in menu
|
||||
draw_rows(menu)
|
||||
if menu != menus[2]:
|
||||
menus[2].visible_row_index = 0 # reset the selected row index of third menu before redrawing
|
||||
menus[2].visible_row_index = 0
|
||||
|
||||
|
||||
async def handle_vertical_navigation(key: str, menu: Menu):
|
||||
def handle_vertical_navigation(key: str, menu: Menu) -> None:
|
||||
if len(menu.visible_rows()) <= 1:
|
||||
return
|
||||
keys_numbers = {"KEY_DOWN": 1, "KEY_UP": -1, "KEY_NPAGE": 1, "KEY_PPAGE": -1, 'KEY_HOME': 0, 'KEY_END': -1}
|
||||
keys_numbers: dict[str, int] = {
|
||||
"KEY_DOWN": 1, "KEY_UP": -1,
|
||||
"KEY_NPAGE": 1, "KEY_PPAGE": -1,
|
||||
'KEY_HOME': 0, 'KEY_END': -1
|
||||
}
|
||||
if key in ["KEY_DOWN", "KEY_UP"]:
|
||||
if menu.filtered_rows.size > menu.rows_height:
|
||||
menu.filtered_rows.shift(keys_numbers[key])
|
||||
else:
|
||||
menu.visible_row_index = (menu.visible_row_index + keys_numbers[
|
||||
key]) % menu.filtered_rows.size # index of the selected visible row
|
||||
menu.visible_row_index = (menu.visible_row_index + keys_numbers[key]) % menu.filtered_rows.size
|
||||
elif key in ["KEY_NPAGE", "KEY_PPAGE"]:
|
||||
menu.filtered_rows.shift(keys_numbers[key] * len(menu.visible_rows()))
|
||||
elif key in ['KEY_HOME', 'KEY_END']:
|
||||
|
@ -244,25 +264,24 @@ async def handle_vertical_navigation(key: str, menu: Menu):
|
|||
menus[2].visible_row_index = 0
|
||||
|
||||
|
||||
async def handle_horizontal_navigation(key: str, menu: Menu):
|
||||
increment = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
|
||||
def handle_horizontal_navigation(key: str, menu: Menu) -> None:
|
||||
increment: int = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
|
||||
next_menu = menus[(menus.index(menu) + increment) % 3]
|
||||
draw_row(menu.win, menu.title, 1, 2, selected=False) # remove selection from the current menu title
|
||||
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) # and select the new menu title
|
||||
draw_row(menu.win, menu.title, 1, 2, selected=False)
|
||||
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)
|
||||
globals().update(selected_menu=next_menu)
|
||||
|
||||
|
||||
async def confirm_action(message: str) -> bool:
|
||||
"""Display a confirmation popup and return True if the user confirms."""
|
||||
rows, cols = SCREEN.getmaxyx() # Get screen size
|
||||
rows, cols = SCREEN.getmaxyx()
|
||||
popup_height = 5
|
||||
popup_width = len(message) + 10
|
||||
start_y = (rows - popup_height) // 2
|
||||
start_x = (cols - popup_width) // 2
|
||||
|
||||
popup = curses.newwin(popup_height, popup_width, start_y, start_x)
|
||||
popup.box() # Draw a border around the popup
|
||||
popup.addstr(2, 2, message) # Display the message
|
||||
popup.box()
|
||||
popup.addstr(2, 2, message)
|
||||
popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel")
|
||||
|
||||
popup.refresh()
|
||||
|
@ -271,8 +290,8 @@ async def confirm_action(message: str) -> bool:
|
|||
if key.lower() == 'y':
|
||||
return True
|
||||
elif key.lower() == 'n':
|
||||
popup.clear() # Clear the popup window
|
||||
popup.refresh() # Refresh the window to hide it
|
||||
popup.clear()
|
||||
popup.refresh()
|
||||
return False
|
||||
|
||||
|
||||
|
@ -280,9 +299,11 @@ async def get_key_async(popup: curses.window) -> str:
|
|||
return await asyncio.to_thread(popup.getkey)
|
||||
|
||||
|
||||
async def kubectl_async(command: str) -> list:
|
||||
async def kubectl_async(command: str) -> list[str]:
|
||||
process = await asyncio.create_subprocess_shell(
|
||||
f"kubectl {command} 2> /dev/null", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||||
f"kubectl {command} 2> /dev/null",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
if stderr:
|
||||
|
@ -290,59 +311,79 @@ async def kubectl_async(command: str) -> list:
|
|||
return stdout.decode().strip().split("\n")
|
||||
|
||||
|
||||
async def catch_input(menu: Menu):
|
||||
global THIRD_MENU_TASK
|
||||
while True: # refresh third menu until key pressed
|
||||
async def catch_input(menu: Menu) -> None:
|
||||
global THIRD_MENU_TASK, selected_menu
|
||||
while True:
|
||||
try:
|
||||
key = await get_key_async(SCREEN)
|
||||
break
|
||||
except curses.error:
|
||||
if THIRD_MENU_TASK is None or THIRD_MENU_TASK.done() or THIRD_MENU_TASK.cancelled():
|
||||
THIRD_MENU_TASK = asyncio.create_task(refresh_third_menu(namespace(), api_resource()))
|
||||
THIRD_MENU_TASK = asyncio.create_task(
|
||||
refresh_third_menu(
|
||||
menus[0].selected_row(),
|
||||
menus[1].selected_row()
|
||||
)
|
||||
)
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]:
|
||||
await handle_horizontal_navigation(key, menu)
|
||||
handle_horizontal_navigation(key, menu)
|
||||
elif key in ["KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END"]:
|
||||
if THIRD_MENU_TASK is not None:
|
||||
THIRD_MENU_TASK.cancel()
|
||||
try:
|
||||
# Wait for the THIRD_MENU_TASK to handle cancellation
|
||||
await THIRD_MENU_TASK
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
await handle_vertical_navigation(key, menu)
|
||||
handle_vertical_navigation(key, menu)
|
||||
elif key == "KEY_MOUSE":
|
||||
await handle_mouse(menu)
|
||||
handle_mouse(menu)
|
||||
elif key == "KEY_DC" and await confirm_action("Are you sure you want to delete this resource?"):
|
||||
await handle_key_bindings(key, namespace(), api_resource(), resource())
|
||||
await handle_key_bindings(
|
||||
key,
|
||||
menus[0].selected_row(),
|
||||
menus[1].selected_row(),
|
||||
menus[2].selected_row() and menus[2].selected_row().split()[0]
|
||||
)
|
||||
elif key in ["/", "\x1b", "KEY_BACKSPACE", "\x08"] or key.isalnum() or key == "-":
|
||||
await handle_filter_state(key, menu)
|
||||
handle_filter_state(key, menu)
|
||||
elif key != "KEY_DC" and curses.ascii.unctrl(key) in KEY_BINDINGS.keys():
|
||||
await handle_key_bindings(curses.ascii.unctrl(key), namespace(), api_resource(), resource())
|
||||
await handle_key_bindings(
|
||||
curses.ascii.unctrl(key),
|
||||
menus[0].selected_row(),
|
||||
menus[1].selected_row(),
|
||||
menus[2].selected_row() and menus[2].selected_row().split()[0]
|
||||
)
|
||||
|
||||
|
||||
async def subprocess_call_async(command: str):
|
||||
async def subprocess_call_async(command: str) -> None:
|
||||
process = await asyncio.create_subprocess_shell(command)
|
||||
await process.communicate()
|
||||
|
||||
|
||||
def enable_mouse_support():
|
||||
def enable_mouse_support() -> None:
|
||||
if MOUSE_ENABLED:
|
||||
curses.mousemask(curses.REPORT_MOUSE_POSITION) # mouse tracking
|
||||
print('\033[?1003h') # enable mouse tracking with the XTERM API. That's the magic
|
||||
curses.mousemask(curses.REPORT_MOUSE_POSITION)
|
||||
print('\033[?1003h')
|
||||
|
||||
|
||||
async def init_menus():
|
||||
global menus, selected_menu, namespace, api_resource, resource
|
||||
api_resources_kubectl = [x.split()[0] for x in await kubectl_async("api-resources --no-headers --verbs=get")]
|
||||
async def init_menus() -> None:
|
||||
global menus, selected_menu
|
||||
api_resources_kubectl: list[str] = [
|
||||
x.split()[0] for x in await kubectl_async("api-resources --no-headers --verbs=get")
|
||||
]
|
||||
api_resources = list(
|
||||
dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl)) if QUERY_API_RESOURCES else TOP_API_RESOURCES
|
||||
dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl)
|
||||
) if QUERY_API_RESOURCES else TOP_API_RESOURCES
|
||||
|
||||
width_unit = WIDTH // 8
|
||||
namespaces = []
|
||||
namespaces: list[str] = []
|
||||
try:
|
||||
namespaces = await kubectl_async("config view --minify --output 'jsonpath={..namespace}'")
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
all_namespaces = await kubectl_async("get ns --no-headers -o custom-columns=NAME:.metadata.name")
|
||||
if all_namespaces:
|
||||
|
@ -353,34 +394,38 @@ async def init_menus():
|
|||
namespaces = all_namespaces
|
||||
except:
|
||||
pass
|
||||
menus = [Menu("Namespaces", namespaces, 0, width_unit, ROWS_HEIGHT),
|
||||
Menu("API resources", api_resources, width_unit, width_unit * 2, ROWS_HEIGHT),
|
||||
Menu("Resources", [], width_unit * 3, WIDTH - width_unit * 3, ROWS_HEIGHT)]
|
||||
|
||||
menus = [
|
||||
Menu("Namespaces", namespaces, 0, width_unit, ROWS_HEIGHT),
|
||||
Menu("API resources", api_resources, width_unit, width_unit * 2, ROWS_HEIGHT),
|
||||
Menu("Resources", [], width_unit * 3, WIDTH - width_unit * 3, ROWS_HEIGHT)
|
||||
]
|
||||
selected_menu = menus[0]
|
||||
namespace = menus[0].selected_row # method alias
|
||||
api_resource = menus[1].selected_row
|
||||
resource = lambda: menus[2].selected_row().split()[0] if menus[2].selected_row() else None
|
||||
SCREEN.refresh() # I don't know why this is needed but it doesn't work without it
|
||||
SCREEN.nodelay(True) # don't block while waiting for input
|
||||
SCREEN.keypad(True) # needed for arrow keys
|
||||
curses.set_escdelay(1) # reduce Escape delay to 1 ms (curses can't set it to 0)
|
||||
curses.curs_set(0) # make the cursor invisible
|
||||
curses.use_default_colors() # don't change the terminal color
|
||||
curses.noecho() # don't output characters at the top
|
||||
|
||||
SCREEN.refresh()
|
||||
SCREEN.nodelay(True)
|
||||
SCREEN.keypad(True)
|
||||
curses.set_escdelay(1)
|
||||
curses.curs_set(0)
|
||||
curses.use_default_colors()
|
||||
curses.noecho()
|
||||
enable_mouse_support()
|
||||
|
||||
|
||||
async def main_async(screen):
|
||||
async def main_async() -> None:
|
||||
await init_menus()
|
||||
for menu in menus:
|
||||
draw_menu(menu)
|
||||
draw_row(curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), HELP_TEXT, 1, 2)
|
||||
draw_row(
|
||||
curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0),
|
||||
HELP_TEXT, 1, 2
|
||||
)
|
||||
while selected_menu:
|
||||
await catch_input(selected_menu)
|
||||
|
||||
|
||||
def main(screen):
|
||||
asyncio.run(main_async(screen))
|
||||
def main() -> None:
|
||||
asyncio.run(main_async())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Loading…
Reference in New Issue