| 
							
							
							
						 |  |  | @ -1,84 +1,85 @@ | 
		
	
		
			
				|  |  |  |  | #!/usr/bin/env python3 | 
		
	
		
			
				|  |  |  |  | from typing import Optional, Callable | 
		
	
		
			
				|  |  |  |  | from typing import Optional, Callable, Self | 
		
	
		
			
				|  |  |  |  | import subprocess | 
		
	
		
			
				|  |  |  |  | import curses | 
		
	
		
			
				|  |  |  |  | import curses.ascii | 
		
	
		
			
				|  |  |  |  | import asyncio | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | SCREEN: curses.window = curses.initscr() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | # ****************************** # | 
		
	
		
			
				|  |  |  |  | # START OF CONFIGURATION SECTION # | 
		
	
		
			
				|  |  |  |  | # ****************************** # | 
		
	
		
			
				|  |  |  |  | KEY_BINDINGS: dict[str, dict[str, str]] = {  # can be extended | 
		
	
		
			
				|  |  |  |  |     "^Y": {  # Ctrl + y | 
		
	
		
			
				|  |  |  |  |         "description": "Yaml", | 
		
	
		
			
				|  |  |  |  |         "command": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml' | 
		
	
		
			
				|  |  |  |  |         "command": "kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml", | 
		
	
		
			
				|  |  |  |  |         "kind": "all",  # this key binding is valid for all api resources | 
		
	
		
			
				|  |  |  |  |     }, | 
		
	
		
			
				|  |  |  |  |     "^D": {  # Ctrl + d | 
		
	
		
			
				|  |  |  |  |         "description": "Describe", | 
		
	
		
			
				|  |  |  |  |         "command": 'kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml' | 
		
	
		
			
				|  |  |  |  |         "command": "kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml", | 
		
	
		
			
				|  |  |  |  |         "kind": "all", | 
		
	
		
			
				|  |  |  |  |     }, | 
		
	
		
			
				|  |  |  |  |     "^E": {  # Ctrl + e | 
		
	
		
			
				|  |  |  |  |         "description": "Edit", | 
		
	
		
			
				|  |  |  |  |         "command": 'kubectl -n {namespace} edit {api_resource} {resource}' | 
		
	
		
			
				|  |  |  |  |         "command": "kubectl -n {namespace} edit {api_resource} {resource}", | 
		
	
		
			
				|  |  |  |  |         "kind": "all", | 
		
	
		
			
				|  |  |  |  |     }, | 
		
	
		
			
				|  |  |  |  |     "^L": {  # Ctrl + l | 
		
	
		
			
				|  |  |  |  |         "description": "Logs", | 
		
	
		
			
				|  |  |  |  |         "command": 'kubectl -n {namespace} logs {resource} | lnav' | 
		
	
		
			
				|  |  |  |  |         "command": "kubectl -n {namespace} logs {resource} | lnav", | 
		
	
		
			
				|  |  |  |  |         "kind": "pods",  # this key binding is valid for pods only | 
		
	
		
			
				|  |  |  |  |     }, | 
		
	
		
			
				|  |  |  |  |     "^X": {  # Ctrl + x | 
		
	
		
			
				|  |  |  |  |         "description": "eXec", | 
		
	
		
			
				|  |  |  |  |         "command": 'kubectl -n {namespace} exec -it {resource} sh' | 
		
	
		
			
				|  |  |  |  |         "command": "kubectl -n {namespace} exec -it {resource} sh", | 
		
	
		
			
				|  |  |  |  |         "kind": "pods", | 
		
	
		
			
				|  |  |  |  |     }, | 
		
	
		
			
				|  |  |  |  |     "^N": {  # Ctrl + n | 
		
	
		
			
				|  |  |  |  |         "description": "Network debug", | 
		
	
		
			
				|  |  |  |  |         "command": 'kubectl -n {namespace} debug {resource} -it --image=nicolaka/netshoot' | 
		
	
		
			
				|  |  |  |  |         "command": "kubectl -n {namespace} debug {resource} -it --image=nicolaka/netshoot", | 
		
	
		
			
				|  |  |  |  |         "kind": "pods", | 
		
	
		
			
				|  |  |  |  |     }, | 
		
	
		
			
				|  |  |  |  |     "^A": {  # Ctrl + a  (a means Access logs! :-)) | 
		
	
		
			
				|  |  |  |  |         "description": "istio-proxy Access logs", | 
		
	
		
			
				|  |  |  |  |         "command": 'kubectl -n {namespace} logs {resource} -c istio-proxy | lnav' | 
		
	
		
			
				|  |  |  |  |         "command": "kubectl -n {namespace} logs {resource} -c istio-proxy | lnav", | 
		
	
		
			
				|  |  |  |  |         "kind": "pods", | 
		
	
		
			
				|  |  |  |  |     }, | 
		
	
		
			
				|  |  |  |  |     "^P": {  # Ctrl + p  (p means Proxy! :-)) | 
		
	
		
			
				|  |  |  |  |         "description": "exec istio-Proxy", | 
		
	
		
			
				|  |  |  |  |         "command": 'kubectl -n {namespace} exec -it {resource} -c istio-proxy bash' | 
		
	
		
			
				|  |  |  |  |         "command": "kubectl -n {namespace} exec -it {resource} -c istio-proxy bash", | 
		
	
		
			
				|  |  |  |  |         "kind": "pods", | 
		
	
		
			
				|  |  |  |  |     }, | 
		
	
		
			
				|  |  |  |  |     "^R": {  # Ctrl + r  (r means Reveal! :-)) | 
		
	
		
			
				|  |  |  |  |         "description": "Reveal secret", | 
		
	
		
			
				|  |  |  |  |         "command": "kubectl get secret {resource} -n {namespace} -o yaml | yq '.data |= with_entries(.value |= @base64d)' -y | batcat -l yaml" | 
		
	
		
			
				|  |  |  |  |         "command": "kubectl get secret {resource} -n {namespace} -o yaml" | 
		
	
		
			
				|  |  |  |  |         " | yq '.data |= with_entries(.value |= @base64d)' -y | batcat -l yaml", | 
		
	
		
			
				|  |  |  |  |         "kind": "secrets", | 
		
	
		
			
				|  |  |  |  |     }, | 
		
	
		
			
				|  |  |  |  |     "Delete": {  # It is actually KEY_DC | 
		
	
		
			
				|  |  |  |  |         "description": "Delete", | 
		
	
		
			
				|  |  |  |  |         "command": 'kubectl -n {namespace} delete {api_resource} {resource}' | 
		
	
		
			
				|  |  |  |  |     } | 
		
	
		
			
				|  |  |  |  |         "command": "kubectl -n {namespace} delete {api_resource} {resource}", | 
		
	
		
			
				|  |  |  |  |         "kind": "all", | 
		
	
		
			
				|  |  |  |  |     }, | 
		
	
		
			
				|  |  |  |  | } | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | # which api resources are on the top of menu? | 
		
	
		
			
				|  |  |  |  | TOP_API_RESOURCES: list[str] = [ | 
		
	
		
			
				|  |  |  |  |     "pods", "services", "configmaps", "secrets", "persistentvolumeclaims", | 
		
	
		
			
				|  |  |  |  |     "ingresses", "nodes", "deployments", "statefulsets", "daemonsets", | 
		
	
		
			
				|  |  |  |  |     "storageclasses", "serviceentries", "destinationrules", "authorizationpolicies", | 
		
	
		
			
				|  |  |  |  |     "virtualservices", "gateways", "telemetry", "envoyfilters" | 
		
	
		
			
				|  |  |  |  | ] | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | QUERY_API_RESOURCES: bool = False  # Should we merge TOP_API_RESOURCES with all other api resources from cluster? | 
		
	
		
			
				|  |  |  |  | BATCAT_STYLE: str = " --paging always --style numbers"  # style of batcat | 
		
	
		
			
				|  |  |  |  | SELECTED_ROW_STYLE = curses.A_REVERSE | curses.A_BOLD | 
		
	
		
			
				|  |  |  |  | MOUSE_ENABLED: bool = False | 
		
	
		
			
				|  |  |  |  | WIDTH: int = curses.COLS | 
		
	
		
			
				|  |  |  |  | WIDTH_UNIT: int = int(WIDTH / 8) | 
		
	
		
			
				|  |  |  |  | WIDTH_UNIT: int = int(WIDTH / 10) | 
		
	
		
			
				|  |  |  |  | CONTEXTS_WIDTH = int(WIDTH_UNIT * 1.5) | 
		
	
		
			
				|  |  |  |  | NAMESPACES_WIDTH = int(WIDTH_UNIT * 1.5) | 
		
	
		
			
				|  |  |  |  | API_RESOURCES_WIDTH = int(WIDTH_UNIT * 1.5) | 
		
	
		
			
				|  |  |  |  | RESOURCES_WIDTH = WIDTH - (API_RESOURCES_WIDTH + NAMESPACES_WIDTH) | 
		
	
		
			
				|  |  |  |  | RESOURCES_WIDTH = WIDTH - (CONTEXTS_WIDTH + NAMESPACES_WIDTH + API_RESOURCES_WIDTH) | 
		
	
		
			
				|  |  |  |  | HEADER_HEIGHT: int = 4 | 
		
	
		
			
				|  |  |  |  | FOOTER_HEIGHT: int = 3 | 
		
	
		
			
				|  |  |  |  | ROWS_HEIGHT: int = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3 | 
		
	
		
			
				|  |  |  |  | # Generate HELP_TEXT from KEY_BINDINGS | 
		
	
		
			
				|  |  |  |  | HELP_TEXT: str = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items()) | 
		
	
		
			
				|  |  |  |  | HELP_TEXT += ", /: filter mode, Esc: exit filter mode or kls, arrows/TAB/PgUp/PgDn: navigation" | 
		
	
		
			
				|  |  |  |  | SELECTED_ROW_STYLE = curses.A_REVERSE | curses.A_BOLD | 
		
	
		
			
				|  |  |  |  | ALLOWED_SPECIAL_KEYS = list(KEY_BINDINGS.keys()) + ["KEY_DC", "/", "\x1b", "KEY_BACKSPACE", "\x08", "KEY_MOUSE", "KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END", "\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"] | 
		
	
		
			
				|  |  |  |  | HELP_TEXT += ", /: filter mode, Esc: exit filter mode, arrows/TAB: navigation, q: exit kls" | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | # **************************** # | 
		
	
		
			
				|  |  |  |  | # END OF CONFIGURATION SECTION # | 
		
	
	
		
			
				
					|  |  |  | @ -86,41 +87,110 @@ ALLOWED_SPECIAL_KEYS = list(KEY_BINDINGS.keys()) + ["KEY_DC", "/", "\x1b", "KEY_ | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | class CircularList: | 
		
	
		
			
				|  |  |  |  |     def __init__(self, elements: list[str]): | 
		
	
		
			
				|  |  |  |  |         self.elements: list[str] = elements | 
		
	
		
			
				|  |  |  |  |         self.size: int = len(elements) | 
		
	
		
			
				|  |  |  |  |     def __init__(self, items: list[str]): | 
		
	
		
			
				|  |  |  |  |         self.items: list[str] = items | 
		
	
		
			
				|  |  |  |  |         self.size: int = len(items) | 
		
	
		
			
				|  |  |  |  |         self.index: int = 0 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     def __getitem__(self, index: slice) -> list[str]: | 
		
	
		
			
				|  |  |  |  |         start, stop, step = index.indices(self.size) | 
		
	
		
			
				|  |  |  |  |         return [self.elements[(self.index + i) % self.size] for i in range(start, stop, step)] | 
		
	
		
			
				|  |  |  |  |         return [self.items[(self.index + i) % self.size] for i in range(start, stop, step)] | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     def shift(self, steps: int) -> None: | 
		
	
		
			
				|  |  |  |  |         self.index = (self.index + steps) % self.size | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | class Menu: | 
		
	
		
			
				|  |  |  |  |     def __init__(self, title: str, rows: list[str], begin_x: int|float, width: int|float, rows_height: int): | 
		
	
		
			
				|  |  |  |  |     selected = None  # Class variable to track selected object | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     def __init__( | 
		
	
		
			
				|  |  |  |  |         self, | 
		
	
		
			
				|  |  |  |  |         title: str, | 
		
	
		
			
				|  |  |  |  |         rows_function, | 
		
	
		
			
				|  |  |  |  |         begin_x: int, | 
		
	
		
			
				|  |  |  |  |         width: int, | 
		
	
		
			
				|  |  |  |  |     ): | 
		
	
		
			
				|  |  |  |  |         self.title: str = title | 
		
	
		
			
				|  |  |  |  |         self.rows: list[str] = rows | 
		
	
		
			
				|  |  |  |  |         self.rows: list[str] = [] | 
		
	
		
			
				|  |  |  |  |         self.rows_function = rows_function | 
		
	
		
			
				|  |  |  |  |         self.filter: str = "" | 
		
	
		
			
				|  |  |  |  |         self.filter_mode: bool = False | 
		
	
		
			
				|  |  |  |  |         self.state: str = "Normal" | 
		
	
		
			
				|  |  |  |  |         self.filtered_rows: CircularList = CircularList([x for x in self.rows if self.filter in x]) | 
		
	
		
			
				|  |  |  |  |         self.visible_rows: Callable[[], list[str]] = lambda: self.filtered_rows[:rows_height] | 
		
	
		
			
				|  |  |  |  |         self.visible_row_index: int = 0 | 
		
	
		
			
				|  |  |  |  |         self.selected_row: Callable[[], Optional[str]] = lambda: self.visible_rows()[ | 
		
	
		
			
				|  |  |  |  |             self.visible_row_index] if self.visible_rows() else None | 
		
	
		
			
				|  |  |  |  |         self.rows_height: int = rows_height | 
		
	
		
			
				|  |  |  |  |         self.selected_row: Callable[[], Optional[str]] = ( | 
		
	
		
			
				|  |  |  |  |             lambda: self.visible_rows[self.visible_row_index] if self.visible_rows else None | 
		
	
		
			
				|  |  |  |  |         ) | 
		
	
		
			
				|  |  |  |  |         self.width: int = int(width) | 
		
	
		
			
				|  |  |  |  |         self.begin_x: int = int(begin_x) | 
		
	
		
			
				|  |  |  |  |         self.win: curses.window = curses.newwin(curses.LINES - FOOTER_HEIGHT, self.width, 0, self.begin_x) | 
		
	
		
			
				|  |  |  |  |         self.dependent_menus: list[Self] = [] | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     @property | 
		
	
		
			
				|  |  |  |  |     def visible_rows(self) -> list[str]: | 
		
	
		
			
				|  |  |  |  |         return self.filtered_rows[:ROWS_HEIGHT] | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     async def set_rows(self): | 
		
	
		
			
				|  |  |  |  |         self.rows = await self.rows_function() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     def set_filtered_rows(self): | 
		
	
		
			
				|  |  |  |  |         self.filtered_rows = CircularList([x for x in self.rows if self.filter in x]) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     async def set_state(self, state: str) -> None: | 
		
	
		
			
				|  |  |  |  |         self.state = state | 
		
	
		
			
				|  |  |  |  |         # entry activities | 
		
	
		
			
				|  |  |  |  |         match self.state: | 
		
	
		
			
				|  |  |  |  |             case "Normal": | 
		
	
		
			
				|  |  |  |  |                 self.filter = "" | 
		
	
		
			
				|  |  |  |  |                 await self.draw_menu_or_footer("") | 
		
	
		
			
				|  |  |  |  |             case "EmptyFilter": | 
		
	
		
			
				|  |  |  |  |                 self.filter = "" | 
		
	
		
			
				|  |  |  |  |                 await self.draw_menu_or_footer("/") | 
		
	
		
			
				|  |  |  |  |             case "FilledFilter": | 
		
	
		
			
				|  |  |  |  |                 await self.draw_menu_or_footer(f"/{self.filter}")  # if redrawing whole menu is not needed | 
		
	
		
			
				|  |  |  |  |         await self.refresh_dependent_menus() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     def draw_rows(self) -> None: | 
		
	
		
			
				|  |  |  |  |         for index, row in enumerate(self.visible_rows): | 
		
	
		
			
				|  |  |  |  |             draw_row(self.win, row, index + HEADER_HEIGHT, 2, selected=row == self.selected_row()) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     def draw_menu_with_footer(self) -> None: | 
		
	
		
			
				|  |  |  |  |         self.win.erase() | 
		
	
		
			
				|  |  |  |  |         draw_row(self.win, self.title, 1, 2, selected=self == Menu.selected) | 
		
	
		
			
				|  |  |  |  |         self.draw_rows() | 
		
	
		
			
				|  |  |  |  |         draw_row( | 
		
	
		
			
				|  |  |  |  |             self.win, | 
		
	
		
			
				|  |  |  |  |             f"/{self.filter}" if self.state in ["EmptyFilter", "FilledFilter"] else "", | 
		
	
		
			
				|  |  |  |  |             curses.LINES - FOOTER_HEIGHT - 2, | 
		
	
		
			
				|  |  |  |  |             2, | 
		
	
		
			
				|  |  |  |  |         ) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     async def draw_menu_or_footer(self, footer_text: str) -> None: | 
		
	
		
			
				|  |  |  |  |         previous_visible_rows = self.visible_rows | 
		
	
		
			
				|  |  |  |  |         self.set_filtered_rows() | 
		
	
		
			
				|  |  |  |  |         if self.visible_rows != previous_visible_rows:  # draw whole menu | 
		
	
		
			
				|  |  |  |  |             self.visible_row_index = 0 | 
		
	
		
			
				|  |  |  |  |             self.draw_menu_with_footer() | 
		
	
		
			
				|  |  |  |  |             if self == MENUS[0]: | 
		
	
		
			
				|  |  |  |  |                 await switch_context(self.selected_row()) | 
		
	
		
			
				|  |  |  |  |             await self.refresh_dependent_menus() | 
		
	
		
			
				|  |  |  |  |         else:  # draw footer only | 
		
	
		
			
				|  |  |  |  |             draw_row(self.win, footer_text, curses.LINES - FOOTER_HEIGHT - 2, 2) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     async def refresh_dependent_menus(self): | 
		
	
		
			
				|  |  |  |  |         for menu in self.dependent_menus: | 
		
	
		
			
				|  |  |  |  |             await menu.refresh_menu() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     async def refresh_menu(self) -> None: | 
		
	
		
			
				|  |  |  |  |         await self.set_rows() | 
		
	
		
			
				|  |  |  |  |         self.set_filtered_rows() | 
		
	
		
			
				|  |  |  |  |         if self.visible_row_index >= len(self.visible_rows): | 
		
	
		
			
				|  |  |  |  |             self.visible_row_index = 0  # reset selected row only if number of lines changed | 
		
	
		
			
				|  |  |  |  |         self.draw_menu_with_footer() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | # Global variables | 
		
	
		
			
				|  |  |  |  | THIRD_MENU_LOCK: asyncio.Lock = asyncio.Lock() | 
		
	
		
			
				|  |  |  |  | THIRD_MENU_TASK: Optional[asyncio.Task] = None | 
		
	
		
			
				|  |  |  |  | menus: list[Menu] = [] | 
		
	
		
			
				|  |  |  |  | selected_menu: Optional[Menu] = None | 
		
	
		
			
				|  |  |  |  | FOURTH_MENU_TASK: Optional[asyncio.Task] = None | 
		
	
		
			
				|  |  |  |  | MENUS: list[Menu] = [] | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = False) -> None: | 
		
	
	
		
			
				
					|  |  |  | @ -129,274 +199,31 @@ def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = | 
		
	
		
			
				|  |  |  |  |     window.refresh() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def draw_rows(menu: Menu) -> None: | 
		
	
		
			
				|  |  |  |  |     for index, row in enumerate(menu.visible_rows()): | 
		
	
		
			
				|  |  |  |  |         draw_row(menu.win, row, index + HEADER_HEIGHT, 2, selected = row == menu.selected_row()) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def draw_menu(menu: Menu) -> None: | 
		
	
		
			
				|  |  |  |  |     menu.win.erase() | 
		
	
		
			
				|  |  |  |  |     draw_row(menu.win, menu.title, 1, 2, selected = menu == selected_menu) | 
		
	
		
			
				|  |  |  |  |     draw_rows(menu) | 
		
	
		
			
				|  |  |  |  |     draw_row(menu.win, f"/{menu.filter}" if menu.filter_mode else "", curses.LINES - FOOTER_HEIGHT - 2, 2) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def refresh_third_menu(namespace: Optional[str], api_resource: Optional[str]) -> None: | 
		
	
		
			
				|  |  |  |  |     global THIRD_MENU_TASK | 
		
	
		
			
				|  |  |  |  | async def switch_context(context: str) -> None: | 
		
	
		
			
				|  |  |  |  |     if not context: | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  |     try: | 
		
	
		
			
				|  |  |  |  |         async with THIRD_MENU_LOCK: | 
		
	
		
			
				|  |  |  |  |             menu = menus[2] | 
		
	
		
			
				|  |  |  |  |             previous_menu_rows = menu.rows | 
		
	
		
			
				|  |  |  |  |             if api_resource and namespace: | 
		
	
		
			
				|  |  |  |  |                 try: | 
		
	
		
			
				|  |  |  |  |                     menu.rows = await kubectl_async( | 
		
	
		
			
				|  |  |  |  |                         f"-n {namespace} get {api_resource} --no-headers --ignore-not-found") | 
		
	
		
			
				|  |  |  |  |         await kubectl_async(f"config use-context {context}") | 
		
	
		
			
				|  |  |  |  |     except subprocess.CalledProcessError: | 
		
	
		
			
				|  |  |  |  |                     menu.rows = [] | 
		
	
		
			
				|  |  |  |  |             else: | 
		
	
		
			
				|  |  |  |  |                 menu.rows = [] | 
		
	
		
			
				|  |  |  |  |             index_before_update = menu.filtered_rows.index | 
		
	
		
			
				|  |  |  |  |             menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x]) | 
		
	
		
			
				|  |  |  |  |             menu.filtered_rows.index = index_before_update | 
		
	
		
			
				|  |  |  |  |             if menu.visible_row_index >= len(menu.visible_rows()): | 
		
	
		
			
				|  |  |  |  |                 menu.visible_row_index = 0 | 
		
	
		
			
				|  |  |  |  |             if previous_menu_rows != menu.rows: | 
		
	
		
			
				|  |  |  |  |                 draw_menu(menu) | 
		
	
		
			
				|  |  |  |  |     except asyncio.CancelledError: | 
		
	
		
			
				|  |  |  |  |         raise | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def handle_key_bindings(key: str, namespace: str, api_resource: str, resource: str) -> None: | 
		
	
		
			
				|  |  |  |  |     if not resource: | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  |     if key in ("l", "x", "n") and api_resource != "pods": | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  |     if key == "KEY_DC": | 
		
	
		
			
				|  |  |  |  |         key = "Delete" | 
		
	
		
			
				|  |  |  |  |     if THIRD_MENU_TASK is not None: | 
		
	
		
			
				|  |  |  |  |         THIRD_MENU_TASK.cancel() | 
		
	
		
			
				|  |  |  |  |         try: | 
		
	
		
			
				|  |  |  |  |             await THIRD_MENU_TASK | 
		
	
		
			
				|  |  |  |  |         except asyncio.CancelledError: | 
		
	
		
			
				|  |  |  |  |         pass | 
		
	
		
			
				|  |  |  |  |     async with THIRD_MENU_LOCK: | 
		
	
		
			
				|  |  |  |  |         curses.def_prog_mode() | 
		
	
		
			
				|  |  |  |  |         curses.endwin() | 
		
	
		
			
				|  |  |  |  |         command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource) | 
		
	
		
			
				|  |  |  |  |         if "batcat" in command: | 
		
	
		
			
				|  |  |  |  |             command += BATCAT_STYLE | 
		
	
		
			
				|  |  |  |  |         await subprocess_call_async(command) | 
		
	
		
			
				|  |  |  |  |         curses.reset_prog_mode() | 
		
	
		
			
				|  |  |  |  |         SCREEN.refresh() | 
		
	
		
			
				|  |  |  |  |         enable_mouse_support() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def handle_filter_state(key: str, menu: Menu) -> None: | 
		
	
		
			
				|  |  |  |  |     global selected_menu | 
		
	
		
			
				|  |  |  |  |     if key == "/" and not menu.filter_mode: | 
		
	
		
			
				|  |  |  |  |         menu.filter_mode = True | 
		
	
		
			
				|  |  |  |  |         menu.filter = "" | 
		
	
		
			
				|  |  |  |  |     elif key == "\x1b":  # Escape key | 
		
	
		
			
				|  |  |  |  |         if menu.filter_mode: | 
		
	
		
			
				|  |  |  |  |             menu.filter_mode = False | 
		
	
		
			
				|  |  |  |  |             menu.filter = "" | 
		
	
		
			
				|  |  |  |  |         else: | 
		
	
		
			
				|  |  |  |  |             selected_menu = None | 
		
	
		
			
				|  |  |  |  |     elif menu.filter_mode: | 
		
	
		
			
				|  |  |  |  |         if key in ["KEY_BACKSPACE", "\x08"] and menu.filter: | 
		
	
		
			
				|  |  |  |  |             menu.filter = menu.filter[:-1]  # Remove last character | 
		
	
		
			
				|  |  |  |  |         elif key.isalnum() or key == "-":  # Allow letters, numbers, and dashes | 
		
	
		
			
				|  |  |  |  |             menu.filter += key.lower() | 
		
	
		
			
				|  |  |  |  |     menu.visible_row_index = 0 | 
		
	
		
			
				|  |  |  |  |     menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x]) | 
		
	
		
			
				|  |  |  |  |     draw_menu(menu) | 
		
	
		
			
				|  |  |  |  |     if menu != menus[2]: | 
		
	
		
			
				|  |  |  |  |         menus[2].visible_row_index = 0 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def handle_mouse(menu: Menu) -> None: | 
		
	
		
			
				|  |  |  |  |     if not MOUSE_ENABLED: | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  | async def get_contexts() -> list[str]: | 
		
	
		
			
				|  |  |  |  |     try: | 
		
	
		
			
				|  |  |  |  |         mouse_info: tuple[int, ...] = curses.getmouse() | 
		
	
		
			
				|  |  |  |  |     except curses.error:  # this fixes scrolling error | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  |     row_number = mouse_info[2] - HEADER_HEIGHT | 
		
	
		
			
				|  |  |  |  |     column_number = mouse_info[1] | 
		
	
		
			
				|  |  |  |  |     next_menu: Optional[Menu] = None | 
		
	
		
			
				|  |  |  |  |     if column_number > (menu.begin_x + menu.width): | 
		
	
		
			
				|  |  |  |  |         next_menu = menus[(menus.index(menu) + 1) % 3] | 
		
	
		
			
				|  |  |  |  |         if column_number > (next_menu.begin_x + next_menu.width): | 
		
	
		
			
				|  |  |  |  |             next_menu = menus[(menus.index(next_menu) + 1) % 3] | 
		
	
		
			
				|  |  |  |  |         globals().update(selected_menu=next_menu) | 
		
	
		
			
				|  |  |  |  |     elif column_number < menu.begin_x: | 
		
	
		
			
				|  |  |  |  |         next_menu = menus[(menus.index(menu) - 1) % 3] | 
		
	
		
			
				|  |  |  |  |         if column_number < next_menu.begin_x: | 
		
	
		
			
				|  |  |  |  |             next_menu = menus[(menus.index(next_menu) - 1) % 3] | 
		
	
		
			
				|  |  |  |  |         globals().update(selected_menu=next_menu) | 
		
	
		
			
				|  |  |  |  |     if next_menu: | 
		
	
		
			
				|  |  |  |  |         draw_row(menu.win, menu.title, 1, 2, selected=False) | 
		
	
		
			
				|  |  |  |  |         draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) | 
		
	
		
			
				|  |  |  |  |         menu = next_menu | 
		
	
		
			
				|  |  |  |  |     char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1) | 
		
	
		
			
				|  |  |  |  |     char_str = chr(char_int & 0xFF) | 
		
	
		
			
				|  |  |  |  |     if not char_str or ord(char_str) > 127 or ' ' in char_str: | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  |     if 0 <= row_number < len(menu.visible_rows()): | 
		
	
		
			
				|  |  |  |  |         menu.visible_row_index = row_number | 
		
	
		
			
				|  |  |  |  |         draw_rows(menu) | 
		
	
		
			
				|  |  |  |  |         if menu != menus[2]: | 
		
	
		
			
				|  |  |  |  |             menus[2].visible_row_index = 0 | 
		
	
		
			
				|  |  |  |  |         current_context = await kubectl_async("config current-context") | 
		
	
		
			
				|  |  |  |  |         contexts = await kubectl_async("config get-contexts --no-headers -o name") | 
		
	
		
			
				|  |  |  |  |         contexts.remove(current_context[0]) | 
		
	
		
			
				|  |  |  |  |         contexts.insert(0, current_context[0]) | 
		
	
		
			
				|  |  |  |  |         return [line.split()[0] for line in contexts if line.strip()] | 
		
	
		
			
				|  |  |  |  |     except subprocess.CalledProcessError: | 
		
	
		
			
				|  |  |  |  |         return [] | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def handle_vertical_navigation(key: str, menu: Menu) -> None: | 
		
	
		
			
				|  |  |  |  |     if len(menu.visible_rows()) <= 1: | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  |     keys_numbers: dict[str, int] = { | 
		
	
		
			
				|  |  |  |  |         "KEY_DOWN": 1, "KEY_UP": -1, | 
		
	
		
			
				|  |  |  |  |         "KEY_NPAGE": 1, "KEY_PPAGE": -1, | 
		
	
		
			
				|  |  |  |  |         'KEY_HOME': 0, 'KEY_END': -1 | 
		
	
		
			
				|  |  |  |  |     } | 
		
	
		
			
				|  |  |  |  |     if key in ["KEY_DOWN", "KEY_UP"]: | 
		
	
		
			
				|  |  |  |  |         if menu.filtered_rows.size > menu.rows_height: | 
		
	
		
			
				|  |  |  |  |             menu.filtered_rows.shift(keys_numbers[key]) | 
		
	
		
			
				|  |  |  |  |         else: | 
		
	
		
			
				|  |  |  |  |             menu.visible_row_index = (menu.visible_row_index + keys_numbers[key]) % menu.filtered_rows.size | 
		
	
		
			
				|  |  |  |  |     elif key in ["KEY_NPAGE", "KEY_PPAGE"]: | 
		
	
		
			
				|  |  |  |  |         menu.filtered_rows.shift(keys_numbers[key] * len(menu.visible_rows())) | 
		
	
		
			
				|  |  |  |  |     elif key in ['KEY_HOME', 'KEY_END']: | 
		
	
		
			
				|  |  |  |  |         menu.visible_row_index = keys_numbers[key] | 
		
	
		
			
				|  |  |  |  |     draw_rows(menu) | 
		
	
		
			
				|  |  |  |  |     if menu != menus[2]: | 
		
	
		
			
				|  |  |  |  |         menus[2].visible_row_index = 0 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def handle_horizontal_navigation(key: str, menu: Menu) -> None: | 
		
	
		
			
				|  |  |  |  |     increment: int = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key] | 
		
	
		
			
				|  |  |  |  |     next_menu = menus[(menus.index(menu) + increment) % 3] | 
		
	
		
			
				|  |  |  |  |     draw_row(menu.win, menu.title, 1, 2, selected=False) | 
		
	
		
			
				|  |  |  |  |     draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) | 
		
	
		
			
				|  |  |  |  |     globals().update(selected_menu=next_menu) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def confirm_action(message: str) -> bool: | 
		
	
		
			
				|  |  |  |  |     rows, cols = SCREEN.getmaxyx() | 
		
	
		
			
				|  |  |  |  |     popup_height = 5 | 
		
	
		
			
				|  |  |  |  |     popup_width = len(message) + 10 | 
		
	
		
			
				|  |  |  |  |     start_y = (rows - popup_height) // 2 | 
		
	
		
			
				|  |  |  |  |     start_x = (cols - popup_width) // 2 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     popup = curses.newwin(popup_height, popup_width, start_y, start_x) | 
		
	
		
			
				|  |  |  |  |     popup.box() | 
		
	
		
			
				|  |  |  |  |     popup.addstr(2, 2, message) | 
		
	
		
			
				|  |  |  |  |     popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel") | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     popup.refresh() | 
		
	
		
			
				|  |  |  |  |     while True: | 
		
	
		
			
				|  |  |  |  |         key = await get_key_async(popup) | 
		
	
		
			
				|  |  |  |  |         if key.lower() == 'y': | 
		
	
		
			
				|  |  |  |  |             return True | 
		
	
		
			
				|  |  |  |  |         if key.lower() == 'n': | 
		
	
		
			
				|  |  |  |  |             popup.clear() | 
		
	
		
			
				|  |  |  |  |             popup.refresh() | 
		
	
		
			
				|  |  |  |  |             return False | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def get_key_async(popup: curses.window) -> str: | 
		
	
		
			
				|  |  |  |  |     return await asyncio.to_thread(popup.getkey) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def kubectl_async(command: str) -> list[str]: | 
		
	
		
			
				|  |  |  |  |     process = await asyncio.create_subprocess_shell( | 
		
	
		
			
				|  |  |  |  |         f"kubectl {command} 2> /dev/null", | 
		
	
		
			
				|  |  |  |  |         stdout=asyncio.subprocess.PIPE, | 
		
	
		
			
				|  |  |  |  |         stderr=asyncio.subprocess.PIPE | 
		
	
		
			
				|  |  |  |  |     ) | 
		
	
		
			
				|  |  |  |  |     stdout, stderr = await process.communicate() | 
		
	
		
			
				|  |  |  |  |     if stderr: | 
		
	
		
			
				|  |  |  |  |         raise subprocess.CalledProcessError(process.returncode, command, stderr=stderr) | 
		
	
		
			
				|  |  |  |  |     return stdout.decode().strip().split("\n") | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def catch_input(menu: Menu) -> None: | 
		
	
		
			
				|  |  |  |  |     global THIRD_MENU_TASK | 
		
	
		
			
				|  |  |  |  |     while True: | 
		
	
		
			
				|  |  |  |  |         try: | 
		
	
		
			
				|  |  |  |  |             key = await get_key_async(SCREEN) | 
		
	
		
			
				|  |  |  |  |             break | 
		
	
		
			
				|  |  |  |  |         except curses.error: | 
		
	
		
			
				|  |  |  |  |             if THIRD_MENU_TASK is None or THIRD_MENU_TASK.done() or THIRD_MENU_TASK.cancelled(): | 
		
	
		
			
				|  |  |  |  |                 THIRD_MENU_TASK = asyncio.create_task( | 
		
	
		
			
				|  |  |  |  |                     refresh_third_menu( | 
		
	
		
			
				|  |  |  |  |                         menus[0].selected_row(), | 
		
	
		
			
				|  |  |  |  |                         menus[1].selected_row() | 
		
	
		
			
				|  |  |  |  |                     ) | 
		
	
		
			
				|  |  |  |  |                 ) | 
		
	
		
			
				|  |  |  |  |             await asyncio.sleep(0.1) | 
		
	
		
			
				|  |  |  |  |     # Convert control keys to their string representation (e.g., Ctrl+Y -> ^Y) | 
		
	
		
			
				|  |  |  |  |     # Handle special keys (e.g., "KEY_UP", "KEY_DC") | 
		
	
		
			
				|  |  |  |  |     if key.startswith("KEY_") or key == "\x1b": | 
		
	
		
			
				|  |  |  |  |         key_str = key  # Use the key as-is for special keys | 
		
	
		
			
				|  |  |  |  |     else: | 
		
	
		
			
				|  |  |  |  |         # Handle single-character keys (e.g., "a", "^Y") | 
		
	
		
			
				|  |  |  |  |         key_str = curses.ascii.unctrl(key) if curses.ascii.iscntrl(ord(key)) else key | 
		
	
		
			
				|  |  |  |  |     # Check if the key is allowed | 
		
	
		
			
				|  |  |  |  |     if key_str not in ALLOWED_SPECIAL_KEYS and not key.isalnum(): | 
		
	
		
			
				|  |  |  |  |         return  # Ignore the key if it's not in the allowed list or not alphanumeric | 
		
	
		
			
				|  |  |  |  |     if key.isalnum() and not menu.filter_mode: | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]: | 
		
	
		
			
				|  |  |  |  |         handle_horizontal_navigation(key, menu) | 
		
	
		
			
				|  |  |  |  |     elif key in ["KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END"]: | 
		
	
		
			
				|  |  |  |  |         if THIRD_MENU_TASK is not None: | 
		
	
		
			
				|  |  |  |  |             THIRD_MENU_TASK.cancel() | 
		
	
		
			
				|  |  |  |  |             try: | 
		
	
		
			
				|  |  |  |  |                 await THIRD_MENU_TASK | 
		
	
		
			
				|  |  |  |  |             except asyncio.CancelledError: | 
		
	
		
			
				|  |  |  |  |                 pass | 
		
	
		
			
				|  |  |  |  |         handle_vertical_navigation(key, menu) | 
		
	
		
			
				|  |  |  |  |     elif key == "KEY_MOUSE": | 
		
	
		
			
				|  |  |  |  |         handle_mouse(menu) | 
		
	
		
			
				|  |  |  |  |     elif key == "KEY_DC" and await confirm_action("Are you sure you want to delete this resource?"): | 
		
	
		
			
				|  |  |  |  |         await handle_key_bindings( | 
		
	
		
			
				|  |  |  |  |             key, | 
		
	
		
			
				|  |  |  |  |             menus[0].selected_row(), | 
		
	
		
			
				|  |  |  |  |             menus[1].selected_row(), | 
		
	
		
			
				|  |  |  |  |             menus[2].selected_row() and menus[2].selected_row().split()[0] | 
		
	
		
			
				|  |  |  |  |         ) | 
		
	
		
			
				|  |  |  |  |     elif key in ["/", "\x1b", "KEY_BACKSPACE", "\x08"] or key.isalnum() or key == "-": | 
		
	
		
			
				|  |  |  |  |         handle_filter_state(key, menu) | 
		
	
		
			
				|  |  |  |  |     elif key != "KEY_DC" and curses.ascii.unctrl(key) in KEY_BINDINGS: | 
		
	
		
			
				|  |  |  |  |         await handle_key_bindings( | 
		
	
		
			
				|  |  |  |  |             curses.ascii.unctrl(key), | 
		
	
		
			
				|  |  |  |  |             menus[0].selected_row(), | 
		
	
		
			
				|  |  |  |  |             menus[1].selected_row(), | 
		
	
		
			
				|  |  |  |  |             menus[2].selected_row() and menus[2].selected_row().split()[0] | 
		
	
		
			
				|  |  |  |  |         ) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def subprocess_call_async(command: str) -> None: | 
		
	
		
			
				|  |  |  |  |     process = await asyncio.create_subprocess_shell(command) | 
		
	
		
			
				|  |  |  |  |     await process.communicate() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def enable_mouse_support() -> None: | 
		
	
		
			
				|  |  |  |  |     if MOUSE_ENABLED: | 
		
	
		
			
				|  |  |  |  |         curses.mousemask(curses.REPORT_MOUSE_POSITION) | 
		
	
		
			
				|  |  |  |  |         print('\033[?1003h') | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def init_menus() -> None: | 
		
	
		
			
				|  |  |  |  |     global menus, selected_menu | 
		
	
		
			
				|  |  |  |  |     api_resources_kubectl: list[str] = [ | 
		
	
		
			
				|  |  |  |  |         x.split()[0] for x in await kubectl_async("api-resources --no-headers --verbs=get") | 
		
	
		
			
				|  |  |  |  |     ] | 
		
	
		
			
				|  |  |  |  |     api_resources = list( | 
		
	
		
			
				|  |  |  |  |         dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl) | 
		
	
		
			
				|  |  |  |  |     ) if QUERY_API_RESOURCES else TOP_API_RESOURCES | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def get_namespaces() -> list[str]: | 
		
	
		
			
				|  |  |  |  |     namespaces: list[str] = [] | 
		
	
		
			
				|  |  |  |  |     context = MENUS[0].selected_row() and MENUS[0].selected_row().split()[0] | 
		
	
		
			
				|  |  |  |  |     if not context: | 
		
	
		
			
				|  |  |  |  |         return namespaces | 
		
	
		
			
				|  |  |  |  |     try: | 
		
	
		
			
				|  |  |  |  |         namespaces = await kubectl_async("config view --minify --output 'jsonpath={..namespace}'") | 
		
	
		
			
				|  |  |  |  |     except: | 
		
	
	
		
			
				
					|  |  |  | @ -412,14 +239,196 @@ async def init_menus() -> None: | 
		
	
		
			
				|  |  |  |  |                 namespaces = all_namespaces | 
		
	
		
			
				|  |  |  |  |     except: | 
		
	
		
			
				|  |  |  |  |         pass | 
		
	
		
			
				|  |  |  |  |     return namespaces | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     menus = [ | 
		
	
		
			
				|  |  |  |  |         Menu("Namespaces", namespaces, 0, NAMESPACES_WIDTH, ROWS_HEIGHT), | 
		
	
		
			
				|  |  |  |  |         Menu("API resources", api_resources, NAMESPACES_WIDTH, API_RESOURCES_WIDTH, ROWS_HEIGHT), | 
		
	
		
			
				|  |  |  |  |         Menu("Resources", [], NAMESPACES_WIDTH + API_RESOURCES_WIDTH, RESOURCES_WIDTH, ROWS_HEIGHT) | 
		
	
		
			
				|  |  |  |  |     ] | 
		
	
		
			
				|  |  |  |  |     selected_menu = menus[0] | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def get_api_resources() -> list[str]: | 
		
	
		
			
				|  |  |  |  |     try: | 
		
	
		
			
				|  |  |  |  |         api_resources = await kubectl_async("api-resources --no-headers --verbs=get") | 
		
	
		
			
				|  |  |  |  |         return sorted(list(set([x.split()[0] for x in api_resources])))  # dedup | 
		
	
		
			
				|  |  |  |  |     except subprocess.CalledProcessError: | 
		
	
		
			
				|  |  |  |  |         return [] | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def get_resources() -> list[str]: | 
		
	
		
			
				|  |  |  |  |     api_resource = MENUS[2].selected_row() | 
		
	
		
			
				|  |  |  |  |     namespace = MENUS[1].selected_row() | 
		
	
		
			
				|  |  |  |  |     if not (api_resource and namespace): | 
		
	
		
			
				|  |  |  |  |         return [] | 
		
	
		
			
				|  |  |  |  |     try: | 
		
	
		
			
				|  |  |  |  |         resources = await kubectl_async( | 
		
	
		
			
				|  |  |  |  |             f"-n {namespace} get {api_resource} --no-headers --ignore-not-found --sort-by='{{.metadata.name}}'" | 
		
	
		
			
				|  |  |  |  |         ) | 
		
	
		
			
				|  |  |  |  |         return resources | 
		
	
		
			
				|  |  |  |  |     except subprocess.CalledProcessError: | 
		
	
		
			
				|  |  |  |  |         return [] | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def handle_key_bindings(key: str) -> None: | 
		
	
		
			
				|  |  |  |  |     api_resource = MENUS[2].selected_row() | 
		
	
		
			
				|  |  |  |  |     if key == "KEY_DC": | 
		
	
		
			
				|  |  |  |  |         key = "Delete" | 
		
	
		
			
				|  |  |  |  |     if KEY_BINDINGS[key]["kind"] != api_resource and KEY_BINDINGS[key]["kind"] != "all": | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  |     resource = MENUS[3].selected_row() and MENUS[3].selected_row().split()[0] | 
		
	
		
			
				|  |  |  |  |     if not resource: | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  |     namespace = MENUS[1].selected_row() | 
		
	
		
			
				|  |  |  |  |     await cancel_resources_refreshing() | 
		
	
		
			
				|  |  |  |  |     curses.def_prog_mode() | 
		
	
		
			
				|  |  |  |  |     curses.endwin() | 
		
	
		
			
				|  |  |  |  |     command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource) | 
		
	
		
			
				|  |  |  |  |     if "batcat" in command: | 
		
	
		
			
				|  |  |  |  |         command += BATCAT_STYLE | 
		
	
		
			
				|  |  |  |  |     await subprocess_call_async(command) | 
		
	
		
			
				|  |  |  |  |     curses.reset_prog_mode() | 
		
	
		
			
				|  |  |  |  |     SCREEN.refresh() | 
		
	
		
			
				|  |  |  |  |     enable_mouse_support() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def handle_mouse(menu: Menu) -> None: | 
		
	
		
			
				|  |  |  |  |     if not MOUSE_ENABLED: | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  |     try: | 
		
	
		
			
				|  |  |  |  |         mouse_info: tuple[int, ...] = curses.getmouse() | 
		
	
		
			
				|  |  |  |  |     except curses.error:  # this fixes scrolling error | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  |     row_number = mouse_info[2] - HEADER_HEIGHT | 
		
	
		
			
				|  |  |  |  |     column_number = mouse_info[1] | 
		
	
		
			
				|  |  |  |  |     next_menu: Optional[Menu] = None | 
		
	
		
			
				|  |  |  |  |     if column_number > (menu.begin_x + menu.width): | 
		
	
		
			
				|  |  |  |  |         next_menu = MENUS[(MENUS.index(menu) + 1) % MENUS.__len__()] | 
		
	
		
			
				|  |  |  |  |         if column_number > (next_menu.begin_x + next_menu.width): | 
		
	
		
			
				|  |  |  |  |             next_menu = MENUS[(MENUS.index(next_menu) + 1) % MENUS.__len__()] | 
		
	
		
			
				|  |  |  |  |         Menu.selected = next_menu | 
		
	
		
			
				|  |  |  |  |     elif column_number < menu.begin_x: | 
		
	
		
			
				|  |  |  |  |         next_menu = MENUS[(MENUS.index(menu) - 1) % MENUS.__len__()] | 
		
	
		
			
				|  |  |  |  |         if column_number < next_menu.begin_x: | 
		
	
		
			
				|  |  |  |  |             next_menu = MENUS[(MENUS.index(next_menu) - 1) % MENUS.__len__()] | 
		
	
		
			
				|  |  |  |  |         Menu.selected = next_menu | 
		
	
		
			
				|  |  |  |  |     if next_menu: | 
		
	
		
			
				|  |  |  |  |         draw_row(menu.win, menu.title, 1, 2, selected=False) | 
		
	
		
			
				|  |  |  |  |         draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) | 
		
	
		
			
				|  |  |  |  |         menu = next_menu | 
		
	
		
			
				|  |  |  |  |     char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1) | 
		
	
		
			
				|  |  |  |  |     char_str = chr(char_int & 0xFF) | 
		
	
		
			
				|  |  |  |  |     if not char_str or ord(char_str) > 127 or " " in char_str: | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  |     if 0 <= row_number < len(menu.visible_rows): | 
		
	
		
			
				|  |  |  |  |         menu.visible_row_index = row_number | 
		
	
		
			
				|  |  |  |  |         menu.draw_rows() | 
		
	
		
			
				|  |  |  |  |         menu.refresh_dependent_menus() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def move_selection_vertically(key: str, menu: Menu) -> None: | 
		
	
		
			
				|  |  |  |  |     if len(menu.visible_rows) <= 1: | 
		
	
		
			
				|  |  |  |  |         return | 
		
	
		
			
				|  |  |  |  |     keys_numbers: dict[str, int] = {"KEY_DOWN": 1, "KEY_UP": -1} | 
		
	
		
			
				|  |  |  |  |     if menu.filtered_rows.size > ROWS_HEIGHT: | 
		
	
		
			
				|  |  |  |  |         menu.filtered_rows.shift(keys_numbers[key]) | 
		
	
		
			
				|  |  |  |  |     else: | 
		
	
		
			
				|  |  |  |  |         menu.visible_row_index = (menu.visible_row_index + keys_numbers[key]) % menu.filtered_rows.size | 
		
	
		
			
				|  |  |  |  |     menu.draw_rows() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def move_selection_horizontally(key: str, menu: Menu) -> None: | 
		
	
		
			
				|  |  |  |  |     increment: int = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key] | 
		
	
		
			
				|  |  |  |  |     next_menu = MENUS[(MENUS.index(menu) + increment) % MENUS.__len__()] | 
		
	
		
			
				|  |  |  |  |     draw_row(menu.win, menu.title, 1, 2, selected=False) | 
		
	
		
			
				|  |  |  |  |     draw_row(next_menu.win, next_menu.title, 1, 2, selected=True) | 
		
	
		
			
				|  |  |  |  |     Menu.selected = next_menu | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def confirm_action(message: str) -> bool: | 
		
	
		
			
				|  |  |  |  |     rows, cols = SCREEN.getmaxyx() | 
		
	
		
			
				|  |  |  |  |     popup_height = 5 | 
		
	
		
			
				|  |  |  |  |     popup_width = len(message) + 10 | 
		
	
		
			
				|  |  |  |  |     start_y = (rows - popup_height) // 2 | 
		
	
		
			
				|  |  |  |  |     start_x = (cols - popup_width) // 2 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     popup = curses.newwin(popup_height, popup_width, start_y, start_x) | 
		
	
		
			
				|  |  |  |  |     popup.box() | 
		
	
		
			
				|  |  |  |  |     popup.addstr(2, 2, message) | 
		
	
		
			
				|  |  |  |  |     popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel") | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     popup.refresh() | 
		
	
		
			
				|  |  |  |  |     while True: | 
		
	
		
			
				|  |  |  |  |         key = popup.getkey() | 
		
	
		
			
				|  |  |  |  |         if key.lower() == "y": | 
		
	
		
			
				|  |  |  |  |             confirm = True | 
		
	
		
			
				|  |  |  |  |         elif key.lower() == "n": | 
		
	
		
			
				|  |  |  |  |             confirm = False | 
		
	
		
			
				|  |  |  |  |         else: | 
		
	
		
			
				|  |  |  |  |             continue | 
		
	
		
			
				|  |  |  |  |         popup.clear() | 
		
	
		
			
				|  |  |  |  |         popup.refresh() | 
		
	
		
			
				|  |  |  |  |         return confirm | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def cancel_resources_refreshing() -> None: | 
		
	
		
			
				|  |  |  |  |     if not (FOURTH_MENU_TASK is None or FOURTH_MENU_TASK.done()): | 
		
	
		
			
				|  |  |  |  |         FOURTH_MENU_TASK.cancel() | 
		
	
		
			
				|  |  |  |  |         try: | 
		
	
		
			
				|  |  |  |  |             await FOURTH_MENU_TASK | 
		
	
		
			
				|  |  |  |  |         except asyncio.CancelledError: | 
		
	
		
			
				|  |  |  |  |             pass | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def kubectl_async(command: str) -> list[str]: | 
		
	
		
			
				|  |  |  |  |     process = await asyncio.create_subprocess_shell( | 
		
	
		
			
				|  |  |  |  |         f"kubectl {command}", | 
		
	
		
			
				|  |  |  |  |         stdout=asyncio.subprocess.PIPE, | 
		
	
		
			
				|  |  |  |  |         stderr=asyncio.subprocess.PIPE, | 
		
	
		
			
				|  |  |  |  |     ) | 
		
	
		
			
				|  |  |  |  |     stdout, stderr = await process.communicate() | 
		
	
		
			
				|  |  |  |  |     if stderr: | 
		
	
		
			
				|  |  |  |  |         raise subprocess.CalledProcessError(process.returncode, command, stderr=stderr) | 
		
	
		
			
				|  |  |  |  |     return stdout.decode().strip().split("\n") | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def handle_state_independent_input(menu: Menu, key: str) -> None: | 
		
	
		
			
				|  |  |  |  |     if key in ["KEY_UP", "KEY_DOWN"]:  # V (Vertical navigation) | 
		
	
		
			
				|  |  |  |  |         if len(menu.visible_rows) > 1: | 
		
	
		
			
				|  |  |  |  |             await cancel_resources_refreshing() | 
		
	
		
			
				|  |  |  |  |             await move_selection_vertically(key, menu) | 
		
	
		
			
				|  |  |  |  |             if menu == MENUS[0]: | 
		
	
		
			
				|  |  |  |  |                 await switch_context(menu.selected_row()) | 
		
	
		
			
				|  |  |  |  |             await menu.refresh_dependent_menus() | 
		
	
		
			
				|  |  |  |  |     elif key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]:  # H (Vertical navigation) | 
		
	
		
			
				|  |  |  |  |         move_selection_horizontally(key, menu) | 
		
	
		
			
				|  |  |  |  |     elif key == "KEY_MOUSE": | 
		
	
		
			
				|  |  |  |  |         handle_mouse(menu) | 
		
	
		
			
				|  |  |  |  |     elif key == "KEY_DC": | 
		
	
		
			
				|  |  |  |  |         if not MENUS[3].selected_row(): | 
		
	
		
			
				|  |  |  |  |             return | 
		
	
		
			
				|  |  |  |  |         if confirm_action("Are you sure you want to delete this resource?"): | 
		
	
		
			
				|  |  |  |  |             await handle_key_bindings(key) | 
		
	
		
			
				|  |  |  |  |     elif not key.startswith("KEY_") and curses.ascii.unctrl(key) in KEY_BINDINGS:  # K (Key Bindings) | 
		
	
		
			
				|  |  |  |  |         await handle_key_bindings(curses.ascii.unctrl(key)) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def subprocess_call_async(command: str) -> None: | 
		
	
		
			
				|  |  |  |  |     process = await asyncio.create_subprocess_shell(command) | 
		
	
		
			
				|  |  |  |  |     await process.communicate() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def enable_mouse_support() -> None: | 
		
	
		
			
				|  |  |  |  |     curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION) | 
		
	
		
			
				|  |  |  |  |     if MOUSE_ENABLED: | 
		
	
		
			
				|  |  |  |  |         print("\033[?1003h") | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def init_menus() -> list[Menu]: | 
		
	
		
			
				|  |  |  |  |     MENUS.append(Menu("Contexts", get_contexts, 0, CONTEXTS_WIDTH)) | 
		
	
		
			
				|  |  |  |  |     MENUS.append(Menu("Namespaces", get_namespaces, CONTEXTS_WIDTH, NAMESPACES_WIDTH)) | 
		
	
		
			
				|  |  |  |  |     MENUS.append(Menu("API resources", get_api_resources, CONTEXTS_WIDTH + NAMESPACES_WIDTH, API_RESOURCES_WIDTH)) | 
		
	
		
			
				|  |  |  |  |     MENUS.append( | 
		
	
		
			
				|  |  |  |  |         Menu("Resources", get_resources, CONTEXTS_WIDTH + NAMESPACES_WIDTH + API_RESOURCES_WIDTH, RESOURCES_WIDTH) | 
		
	
		
			
				|  |  |  |  |     ) | 
		
	
		
			
				|  |  |  |  |     return MENUS | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def setup_curses() -> None: | 
		
	
		
			
				|  |  |  |  |     SCREEN.refresh() | 
		
	
		
			
				|  |  |  |  |     SCREEN.nodelay(True) | 
		
	
		
			
				|  |  |  |  |     SCREEN.keypad(True) | 
		
	
	
		
			
				
					|  |  |  | @ -430,16 +439,67 @@ async def init_menus() -> None: | 
		
	
		
			
				|  |  |  |  |     enable_mouse_support() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def initialize_interface() -> None: | 
		
	
		
			
				|  |  |  |  |     global MENUS | 
		
	
		
			
				|  |  |  |  |     MENUS = await init_menus() | 
		
	
		
			
				|  |  |  |  |     Menu.selected = MENUS[0] | 
		
	
		
			
				|  |  |  |  |     await setup_curses() | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |     for index, menu in enumerate(MENUS): | 
		
	
		
			
				|  |  |  |  |         await menu.set_rows() | 
		
	
		
			
				|  |  |  |  |         menu.set_filtered_rows() | 
		
	
		
			
				|  |  |  |  |         menu.draw_menu_with_footer() | 
		
	
		
			
				|  |  |  |  |         menu.dependent_menus = MENUS[index + 1 :]  # all other menu to the right | 
		
	
		
			
				|  |  |  |  |     draw_row(curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), HELP_TEXT, 1, 2) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | async def main_async() -> None: | 
		
	
		
			
				|  |  |  |  |     await init_menus() | 
		
	
		
			
				|  |  |  |  |     for menu in menus: | 
		
	
		
			
				|  |  |  |  |         draw_menu(menu) | 
		
	
		
			
				|  |  |  |  |     draw_row( | 
		
	
		
			
				|  |  |  |  |         curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), | 
		
	
		
			
				|  |  |  |  |         HELP_TEXT, 1, 2 | 
		
	
		
			
				|  |  |  |  |     ) | 
		
	
		
			
				|  |  |  |  |     while selected_menu: | 
		
	
		
			
				|  |  |  |  |         await catch_input(selected_menu) | 
		
	
		
			
				|  |  |  |  |     global MENUS, FOURTH_MENU_TASK | 
		
	
		
			
				|  |  |  |  |     await initialize_interface() | 
		
	
		
			
				|  |  |  |  |     while True: | 
		
	
		
			
				|  |  |  |  |         menu = Menu.selected | 
		
	
		
			
				|  |  |  |  |         try: | 
		
	
		
			
				|  |  |  |  |             key = SCREEN.getkey() | 
		
	
		
			
				|  |  |  |  |         except curses.error: | 
		
	
		
			
				|  |  |  |  |             if FOURTH_MENU_TASK is None or FOURTH_MENU_TASK.done(): | 
		
	
		
			
				|  |  |  |  |                 FOURTH_MENU_TASK = asyncio.create_task(MENUS[3].refresh_menu()) | 
		
	
		
			
				|  |  |  |  |             await asyncio.sleep(0.01) | 
		
	
		
			
				|  |  |  |  |             continue | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |         # handle state-dependent keys | 
		
	
		
			
				|  |  |  |  |         match menu.state: | 
		
	
		
			
				|  |  |  |  |             case "Normal": | 
		
	
		
			
				|  |  |  |  |                 if key == "q":  # Q (Quit) | 
		
	
		
			
				|  |  |  |  |                     break  # Exit | 
		
	
		
			
				|  |  |  |  |                 elif key == "/":  # S (Slash) | 
		
	
		
			
				|  |  |  |  |                     await menu.set_state("EmptyFilter")  # Transition to EmptyFilter state | 
		
	
		
			
				|  |  |  |  |                     continue | 
		
	
		
			
				|  |  |  |  |             case "EmptyFilter": | 
		
	
		
			
				|  |  |  |  |                 if key == "\x1b":  # E (Escape) | 
		
	
		
			
				|  |  |  |  |                     await menu.set_state("Normal")  # Transition to Normal state | 
		
	
		
			
				|  |  |  |  |                     continue | 
		
	
		
			
				|  |  |  |  |                 elif key.isalnum() or key == "-":  # A (Type text) | 
		
	
		
			
				|  |  |  |  |                     menu.filter += key.lower() | 
		
	
		
			
				|  |  |  |  |                     await menu.set_state("FilledFilter")  # Transition to FilledFilter state | 
		
	
		
			
				|  |  |  |  |                     continue | 
		
	
		
			
				|  |  |  |  |             case "FilledFilter":  # FilledFilter state | 
		
	
		
			
				|  |  |  |  |                 if key == "\x1b":  # E (Escape) | 
		
	
		
			
				|  |  |  |  |                     await menu.set_state("Normal")  # Transition to Normal state | 
		
	
		
			
				|  |  |  |  |                     continue | 
		
	
		
			
				|  |  |  |  |                 elif key in ["KEY_BACKSPACE", "\x08"]:  # B (Backspace) | 
		
	
		
			
				|  |  |  |  |                     if len(menu.filter) == 1: | 
		
	
		
			
				|  |  |  |  |                         await menu.set_state("EmptyFilter")  # Transition to EmptyFilter state | 
		
	
		
			
				|  |  |  |  |                         continue | 
		
	
		
			
				|  |  |  |  |                     menu.filter = menu.filter[:-1] | 
		
	
		
			
				|  |  |  |  |                     await menu.draw_menu_or_footer(f"/{menu.filter}") | 
		
	
		
			
				|  |  |  |  |                     continue | 
		
	
		
			
				|  |  |  |  |                 elif key.isalnum() or key == "-":  # A (Type text) | 
		
	
		
			
				|  |  |  |  |                     menu.filter += key.lower()  # Stay in FilledFilter state | 
		
	
		
			
				|  |  |  |  |                     await menu.draw_menu_or_footer(f"/{menu.filter}") | 
		
	
		
			
				|  |  |  |  |                     continue | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  |         # handle state-independent keys (Vertical/Horizontal navigation etc. available in all states) | 
		
	
		
			
				|  |  |  |  |         await handle_state_independent_input(menu, key) | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | 
 | 
		
	
		
			
				|  |  |  |  | def main(screen: curses.window) -> None: | 
		
	
	
		
			
				
					|  |  |  | 
 |