Reformat again

This commit is contained in:
Digital Studium 2025-01-01 13:04:10 +03:00
parent 4db4959369
commit fad8d1d04b
1 changed files with 89 additions and 94 deletions

183
kls
View File

@ -79,7 +79,7 @@ FOOTER_HEIGHT: int = 3
ROWS_HEIGHT: int = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3 ROWS_HEIGHT: int = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3
# Generate HELP_TEXT from KEY_BINDINGS # Generate HELP_TEXT from KEY_BINDINGS
HELP_TEXT: str = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items()) HELP_TEXT: str = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items())
HELP_TEXT += ", /: filter mode, Esc: exit filter mode or kls, arrows/TAB: navigation" HELP_TEXT += ", /: filter mode, Esc: exit filter mode, arrows/TAB: navigation, q: exit kls"
# **************************** # # **************************** #
@ -103,8 +103,8 @@ class CircularList:
class MenuState(Enum): class MenuState(Enum):
NORMAL = auto() NORMAL = auto()
EMPTY_FILTER = auto() FILTER_MODE = auto()
FILLED_FILTER = auto() FILTER_MODE_WITH_FILTER = auto()
class Menu: class Menu:
@ -113,12 +113,13 @@ class Menu:
def __init__( def __init__(
self, self,
title: str, title: str,
rows: list[str], rows_function,
begin_x: int, begin_x: int,
width: int, width: int,
): ):
self.title: str = title self.title: str = title
self.rows: list[str] = rows self.rows: list[str] = []
self.rows_function = rows_function
self.filter: str = "" self.filter: str = ""
self.state: MenuState = MenuState.NORMAL self.state: MenuState = MenuState.NORMAL
self.filtered_rows: CircularList = CircularList([x for x in self.rows if self.filter in x]) self.filtered_rows: CircularList = CircularList([x for x in self.rows if self.filter in x])
@ -132,6 +133,9 @@ class Menu:
self.win: curses.window = curses.newwin(curses.LINES - FOOTER_HEIGHT, self.width, 0, self.begin_x) self.win: curses.window = curses.newwin(curses.LINES - FOOTER_HEIGHT, self.width, 0, self.begin_x)
self.dependent_menus: list[Self] = [] self.dependent_menus: list[Self] = []
async def refresh_rows(self):
self.rows = await self.rows_function()
def refresh_filtered_rows(self): def refresh_filtered_rows(self):
self.filtered_rows = CircularList([x for x in self.rows if self.filter in x]) self.filtered_rows = CircularList([x for x in self.rows if self.filter in x])
@ -141,15 +145,13 @@ class Menu:
match self.state: match self.state:
case MenuState.NORMAL: case MenuState.NORMAL:
self.filter = "" self.filter = ""
self.draw_menu_or_footer("") await self.draw_menu_or_footer("")
await self.refresh_dependent_menus() case MenuState.FILTER_MODE:
case MenuState.EMPTY_FILTER:
self.filter = "" self.filter = ""
self.draw_menu_or_footer("/") await self.draw_menu_or_footer("/")
await self.refresh_dependent_menus() case MenuState.FILTER_MODE_WITH_FILTER:
case MenuState.FILLED_FILTER: await self.draw_menu_or_footer(f"/{self.filter}") # if redrawing whole menu is not needed
self.draw_menu_or_footer(f"/{self.filter}") # if redrawing whole menu is not needed await self.refresh_dependent_menus()
await self.refresh_dependent_menus()
def draw_rows(self) -> None: def draw_rows(self) -> None:
for index, row in enumerate(self.visible_rows()): for index, row in enumerate(self.visible_rows()):
@ -161,31 +163,36 @@ class Menu:
self.draw_rows() self.draw_rows()
draw_row( draw_row(
self.win, self.win,
f"/{self.filter}" if self.state in [MenuState.EMPTY_FILTER, MenuState.FILLED_FILTER] else "", f"/{self.filter}" if self.state in [MenuState.FILTER_MODE, MenuState.FILTER_MODE_WITH_FILTER] else "",
curses.LINES - FOOTER_HEIGHT - 2, curses.LINES - FOOTER_HEIGHT - 2,
2, 2,
) )
def draw_menu_or_footer(self, footer_text: str) -> None: async def draw_menu_or_footer(self, footer_text: str) -> None:
previous_visible_rows = self.visible_rows() previous_visible_rows = self.visible_rows()
self.refresh_filtered_rows() self.refresh_filtered_rows()
if self.visible_rows() != previous_visible_rows: # draw whole menu if self.visible_rows() != previous_visible_rows: # draw whole menu
self.visible_row_index = 0 self.visible_row_index = 0
self.draw_menu_with_footer() self.draw_menu_with_footer()
self.refresh_dependent_menus() if self == MENUS[0]:
await switch_context(self.selected_row())
await self.refresh_dependent_menus()
else: # draw footer only else: # draw footer only
draw_row(self.win, footer_text, curses.LINES - FOOTER_HEIGHT - 2, 2) draw_row(self.win, footer_text, curses.LINES - FOOTER_HEIGHT - 2, 2)
def set_dependent_menus(self, menus: list[Self]) -> None:
self.dependent_menus = menus
async def refresh_dependent_menus(self): async def refresh_dependent_menus(self):
for menu in self.dependent_menus: for menu in self.dependent_menus:
await refresh_menu(menu) await menu.refresh_menu()
async def refresh_menu(self) -> None:
await self.refresh_rows()
self.refresh_filtered_rows()
if self.visible_row_index >= len(self.visible_rows()):
self.visible_row_index = 0 # reset selected row only if number of lines changed
self.draw_menu_with_footer()
# Global variables # Global variables
FOURTH_MENU_LOCK: asyncio.Lock = asyncio.Lock()
FOURTH_MENU_TASK: Optional[asyncio.Task] = None FOURTH_MENU_TASK: Optional[asyncio.Task] = None
MENUS: list[Menu] = [] MENUS: list[Menu] = []
@ -196,39 +203,13 @@ def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool =
window.refresh() window.refresh()
async def refresh_menu(menu: Menu) -> None: async def switch_context(context: str) -> None:
if menu == MENUS[1]: if not context:
menu.rows = await get_namespaces() return
elif menu == MENUS[2]:
menu.rows = await get_api_resources()
menu.refresh_filtered_rows()
menu.visible_row_index = 0
menu.draw_menu_with_footer()
async def refresh_resources_menu(namespace: Optional[str], api_resource: Optional[str]) -> None:
try: try:
async with FOURTH_MENU_LOCK: await kubectl_async(f"config use-context {context}")
menu = MENUS[3] except subprocess.CalledProcessError:
previous_menu_rows = menu.rows pass
if api_resource and namespace:
try:
menu.rows = await kubectl_async(
f"-n {namespace} get {api_resource} --no-headers --ignore-not-found --sort-by='{{.metadata.name}}'"
)
except subprocess.CalledProcessError:
menu.rows = []
else:
menu.rows = []
index_before_update = menu.filtered_rows.index
menu.refresh_filtered_rows()
menu.filtered_rows.index = index_before_update
if menu.visible_row_index >= len(menu.visible_rows()):
menu.visible_row_index = 0
if previous_menu_rows != menu.rows:
menu.draw_menu_with_footer()
except asyncio.CancelledError:
raise
async def get_contexts() -> list[str]: async def get_contexts() -> list[str]:
@ -242,15 +223,6 @@ async def get_contexts() -> list[str]:
return [] return []
async def switch_context(context: str) -> None:
if not context:
return
try:
await kubectl_async(f"config use-context {context}")
except subprocess.CalledProcessError:
pass
async def get_namespaces() -> list[str]: async def get_namespaces() -> list[str]:
namespaces: list[str] = [] namespaces: list[str] = []
context = MENUS[0].selected_row() and MENUS[0].selected_row().split()[0] context = MENUS[0].selected_row() and MENUS[0].selected_row().split()[0]
@ -282,27 +254,40 @@ async def get_api_resources() -> list[str]:
return [] return []
async def get_resources() -> list[str]:
api_resource = MENUS[2].selected_row()
namespace = MENUS[1].selected_row()
if not (api_resource and namespace):
return []
try:
resources = await kubectl_async(
f"-n {namespace} get {api_resource} --no-headers --ignore-not-found --sort-by='{{.metadata.name}}'"
)
return resources
except subprocess.CalledProcessError:
return []
async def handle_key_bindings(key: str) -> None: async def handle_key_bindings(key: str) -> None:
api_resource = MENUS[2].selected_row() api_resource = MENUS[2].selected_row()
if key == "KEY_DC":
key = "Delete"
if KEY_BINDINGS[key]["kind"] != api_resource and KEY_BINDINGS[key]["kind"] != "all": if KEY_BINDINGS[key]["kind"] != api_resource and KEY_BINDINGS[key]["kind"] != "all":
return return
resource = MENUS[3].selected_row() and MENUS[3].selected_row().split()[0] resource = MENUS[3].selected_row() and MENUS[3].selected_row().split()[0]
if not resource: if not resource:
return return
namespace = MENUS[1].selected_row() namespace = MENUS[1].selected_row()
if key == "KEY_DC":
key = "Delete"
await cancel_resources_refreshing() await cancel_resources_refreshing()
async with FOURTH_MENU_LOCK: curses.def_prog_mode()
curses.def_prog_mode() curses.endwin()
curses.endwin() command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource)
command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource) if "batcat" in command:
if "batcat" in command: command += BATCAT_STYLE
command += BATCAT_STYLE await subprocess_call_async(command)
await subprocess_call_async(command) curses.reset_prog_mode()
curses.reset_prog_mode() SCREEN.refresh()
SCREEN.refresh() enable_mouse_support()
enable_mouse_support()
def handle_mouse(menu: Menu) -> None: def handle_mouse(menu: Menu) -> None:
@ -438,19 +423,16 @@ def enable_mouse_support() -> None:
async def init_menus() -> list[Menu]: async def init_menus() -> list[Menu]:
MENUS.append(Menu("Contexts", await get_contexts(), 0, CONTEXTS_WIDTH)) MENUS.append(Menu("Contexts", get_contexts, 0, CONTEXTS_WIDTH))
MENUS.append(Menu("Namespaces", await get_namespaces(), CONTEXTS_WIDTH, NAMESPACES_WIDTH)) MENUS.append(Menu("Namespaces", get_namespaces, CONTEXTS_WIDTH, NAMESPACES_WIDTH))
MENUS.append(Menu("API resources", get_api_resources, CONTEXTS_WIDTH + NAMESPACES_WIDTH, API_RESOURCES_WIDTH))
MENUS.append( MENUS.append(
Menu("API resources", await get_api_resources(), CONTEXTS_WIDTH + NAMESPACES_WIDTH, API_RESOURCES_WIDTH) Menu("Resources", get_resources, CONTEXTS_WIDTH + NAMESPACES_WIDTH + API_RESOURCES_WIDTH, RESOURCES_WIDTH)
) )
MENUS.append(Menu("Resources", [], CONTEXTS_WIDTH + NAMESPACES_WIDTH + API_RESOURCES_WIDTH, RESOURCES_WIDTH))
return MENUS return MENUS
async def main_async() -> None: async def setup_curses() -> None:
global MENUS, FOURTH_MENU_TASK
MENUS = await init_menus()
Menu.selected = MENUS[0]
SCREEN.refresh() SCREEN.refresh()
SCREEN.nodelay(True) SCREEN.nodelay(True)
SCREEN.keypad(True) SCREEN.keypad(True)
@ -459,52 +441,65 @@ async def main_async() -> None:
curses.use_default_colors() curses.use_default_colors()
curses.noecho() curses.noecho()
enable_mouse_support() enable_mouse_support()
async def initialize_interface() -> None:
global MENUS
MENUS = await init_menus()
Menu.selected = MENUS[0]
await setup_curses()
for index, menu in enumerate(MENUS): for index, menu in enumerate(MENUS):
await menu.refresh_rows()
menu.refresh_filtered_rows()
menu.draw_menu_with_footer() menu.draw_menu_with_footer()
menu.set_dependent_menus(MENUS[index + 1 :]) menu.dependent_menus = MENUS[index + 1 :] # all other menu to the right
draw_row(curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), HELP_TEXT, 1, 2) draw_row(curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), HELP_TEXT, 1, 2)
async def main_async() -> None:
global MENUS, FOURTH_MENU_TASK
await initialize_interface()
while True: while True:
menu = Menu.selected menu = Menu.selected
try: try:
key = SCREEN.getkey() key = SCREEN.getkey()
except curses.error: except curses.error:
if FOURTH_MENU_TASK is None or FOURTH_MENU_TASK.done(): if FOURTH_MENU_TASK is None or FOURTH_MENU_TASK.done():
FOURTH_MENU_TASK = asyncio.create_task( FOURTH_MENU_TASK = asyncio.create_task(MENUS[3].refresh_menu())
refresh_resources_menu(MENUS[1].selected_row(), MENUS[2].selected_row())
)
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
continue continue
# handle state-dependent keys # handle state-dependent keys
match menu.state: match menu.state:
case MenuState.NORMAL: case MenuState.NORMAL:
if key == "\x1b": # E (Escape) if key == "q": # Q (Quit)
break # Exit break # Exit
elif key == "/": # S (Slash) elif key == "/": # S (Slash)
await menu.set_state(MenuState.EMPTY_FILTER) # Transition to EmptyFilter state await menu.set_state(MenuState.FILTER_MODE) # Transition to EmptyFilter state
continue continue
case MenuState.EMPTY_FILTER: case MenuState.FILTER_MODE:
if key == "\x1b": # E (Escape) if key == "\x1b": # E (Escape)
await menu.set_state(MenuState.NORMAL) # Transition to Normal state await menu.set_state(MenuState.NORMAL) # Transition to Normal state
continue continue
elif key.isalnum() or key == "-": # A (Type text) elif key.isalnum() or key == "-": # A (Type text)
menu.filter += key.lower() menu.filter += key.lower()
await menu.set_state(MenuState.FILLED_FILTER) # Transition to FilledFilter state await menu.set_state(MenuState.FILTER_MODE_WITH_FILTER) # Transition to FilledFilter state
continue continue
case MenuState.FILLED_FILTER: # FilledFilter state case MenuState.FILTER_MODE_WITH_FILTER: # FilledFilter state
if key == "\x1b": # E (Escape) if key == "\x1b": # E (Escape)
await menu.set_state(MenuState.NORMAL) # Transition to Normal state await menu.set_state(MenuState.NORMAL) # Transition to Normal state
continue continue
elif key in ["KEY_BACKSPACE", "\x08"]: # B (Backspace) elif key in ["KEY_BACKSPACE", "\x08"]: # B (Backspace)
if len(menu.filter) == 1: if len(menu.filter) == 1:
await menu.set_state(MenuState.EMPTY_FILTER) # Transition to EmptyFilter state await menu.set_state(MenuState.FILTER_MODE) # Transition to EmptyFilter state
continue continue
menu.filter = menu.filter[:-1] menu.filter = menu.filter[:-1]
menu.draw_menu_or_footer(f"/{menu.filter}") await menu.draw_menu_or_footer(f"/{menu.filter}")
continue continue
elif key.isalnum() or key == "-": # A (Type text) elif key.isalnum() or key == "-": # A (Type text)
menu.filter += key.lower() # Stay in FilledFilter state menu.filter += key.lower() # Stay in FilledFilter state
menu.draw_menu_or_footer(f"/{menu.filter}") await menu.draw_menu_or_footer(f"/{menu.filter}")
continue continue
# handle state-independent keys (Vertical/Horizontal navigation etc. available in all states) # handle state-independent keys (Vertical/Horizontal navigation etc. available in all states)