Compare commits
4 Commits
main
...
feature/co
Author | SHA1 | Date |
---|---|---|
Digital Studium | 32c2118eca | |
Digital Studium | 29a46b9093 | |
Digital Studium | fad8d1d04b | |
Digital Studium | 4db4959369 |
|
@ -6,13 +6,13 @@
|
||||||
Inspired by `lf` and `ranger` file managers, written in python.
|
Inspired by `lf` and `ranger` file managers, written in python.
|
||||||
|
|
||||||
It is lightweight (~400 lines of code) and easy to customize.
|
It is lightweight (~400 lines of code) and easy to customize.
|
||||||
Supports keyboard navigation and mouse navigation could be enabled (set MOUSE_ENABLED=True in a line #68).
|
Supports keyboard navigation and mouse navigation could be enabled (set MOUSE_ENABLED=True in a line #69).
|
||||||
|
|
||||||
## Key bindings
|
## Key bindings
|
||||||
|
|
||||||
### For kubectl
|
### For kubectl
|
||||||
|
|
||||||
You can customize these bindings or add extra bindings in `KEY_BINDINGS` variable of `kls` in a line #15:
|
You can customize these bindings or add extra bindings in `KEY_BINDINGS` variable of `kls` in a line #14:
|
||||||
|
|
||||||
- `Ctrl+y` - get **Y**aml of resource
|
- `Ctrl+y` - get **Y**aml of resource
|
||||||
- `Ctrl+d` - **D**escribe resource
|
- `Ctrl+d` - **D**escribe resource
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
digraph state_machine {
|
||||||
|
// Graph styling
|
||||||
|
rankdir=TB;
|
||||||
|
compound=true;
|
||||||
|
node [shape=record, style=filled, fillcolor=lightblue, fontsize=10];
|
||||||
|
edge [fontsize=9];
|
||||||
|
|
||||||
|
// States with entry/exit/activities
|
||||||
|
Normal [label="{Normal State|Entry: None\l\
|
||||||
|
Activities:\l\
|
||||||
|
• Wait for key input\l\
|
||||||
|
• Handle state-independent navigation\l\
|
||||||
|
Exit: None\l}"];
|
||||||
|
|
||||||
|
EmptyFilter [label="{EmptyFilter State|Entry: draw_footer(menu, '/')\l\
|
||||||
|
Activities:\l\
|
||||||
|
• Wait for key input\l\
|
||||||
|
• Handle state-independent navigation\l\
|
||||||
|
• Monitor for filter input\l\
|
||||||
|
Exit: None\l}"];
|
||||||
|
|
||||||
|
FilledFilter [label="{FilledFilter State|Entry:\l\
|
||||||
|
• Update filtered_rows\l\
|
||||||
|
• Draw menu or footer with filter\l\
|
||||||
|
Activities:\l\
|
||||||
|
• Process filter input\l\
|
||||||
|
• Update filtered_rows on changes\l\
|
||||||
|
• Update display if visibility changes\l\
|
||||||
|
• Handle state-independent navigation\l\
|
||||||
|
Exit:\l\
|
||||||
|
• Clear filter (on ESC)\l\
|
||||||
|
• Update filtered_rows\l\
|
||||||
|
• Update display\l}"];
|
||||||
|
|
||||||
|
// Exit node
|
||||||
|
exit [shape=doublecircle, fillcolor=lightpink, label="Exit\n(SELECTED_MENU = None)"];
|
||||||
|
|
||||||
|
// Initial state indicator
|
||||||
|
start [shape=point, fillcolor=black];
|
||||||
|
start -> Normal;
|
||||||
|
|
||||||
|
// Transitions
|
||||||
|
Normal -> EmptyFilter [label="/ (slash)\nAction: Initialize empty filter"];
|
||||||
|
Normal -> exit [label="ESC\nAction: Clear selection"];
|
||||||
|
|
||||||
|
EmptyFilter -> Normal [label="ESC\nAction: draw_footer('')"];
|
||||||
|
EmptyFilter -> FilledFilter [label="alnum or -\nActions:\n• Add char to filter\n• Create filtered_rows\n• Update display"];
|
||||||
|
|
||||||
|
FilledFilter -> Normal [label="ESC\nActions:\n• Clear filter\n• Reset filtered_rows\n• Update display"];
|
||||||
|
FilledFilter -> EmptyFilter [label="BACKSPACE\n(when filter empty)\nAction: draw_footer('/')"];
|
||||||
|
FilledFilter -> FilledFilter [label="alnum or -\nActions:\n• Add char to filter\n• Update filtered_rows\n• Update display"];
|
||||||
|
|
||||||
|
// State-independent actions note
|
||||||
|
subgraph cluster_notes {
|
||||||
|
label="Notes";
|
||||||
|
style=filled;
|
||||||
|
fillcolor=lightyellow;
|
||||||
|
node [style=filled, fillcolor=white];
|
||||||
|
note1 [label="State-Independent Actions:\n• Vertical/Horizontal navigation\n• Available in all states\n• Handled by handle_state_independent_input()"];
|
||||||
|
note2 [label="Display Updates:\n• draw_menu_with_filter()\n- Called when visible rows change\n• draw_footer()\n- Called for filter updates"];
|
||||||
|
note3 [label="Filtering:\n• Uses CircularList for row management\n• Filters are case-insensitive\n• Supports alphanumeric and '-' chars"];
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
digraph {
|
||||||
|
newrank=true;
|
||||||
|
subgraph cluster_menu_states {
|
||||||
|
label="Menu States"
|
||||||
|
color=lightgrey;
|
||||||
|
style=filled;
|
||||||
|
node [style=filled,color=white];
|
||||||
|
Normal -> EmptyFilter [label = "S"];
|
||||||
|
EmptyFilter -> FilledFilter [label = "A"];
|
||||||
|
EmptyFilter -> Normal [label = "E"];
|
||||||
|
FilledFilter:ne -> Normal:se [label = "E"];
|
||||||
|
FilledFilter -> EmptyFilter [label = "B"];
|
||||||
|
FilledFilter:s -> FilledFilter:s [label = "B"];
|
||||||
|
FilledFilter:e -> FilledFilter:e [label = "A"];
|
||||||
|
FilledFilter:w -> FilledFilter:w [label = "V"];
|
||||||
|
EmptyFilter:w -> EmptyFilter:w [label = "V"];
|
||||||
|
Normal:w -> Normal:w [label = "V"];
|
||||||
|
}
|
||||||
|
{ rank=same; Normal; Exit; }
|
||||||
|
Normal -> Exit [label = "E"];
|
||||||
|
}
|
|
@ -0,0 +1,57 @@
|
||||||
|
digraph fsm {
|
||||||
|
|
||||||
|
// Create states cluster for l1, l2, l3
|
||||||
|
subgraph cluster_states {
|
||||||
|
rankdir="LR"
|
||||||
|
label = "States";
|
||||||
|
style=filled;
|
||||||
|
color=lightgrey;
|
||||||
|
node [style=filled,color=white];
|
||||||
|
|
||||||
|
l1 [label = "Normal state"];
|
||||||
|
l2 [label = "Filter state\n(empty filter)"];
|
||||||
|
l3 [label = "Filter state\n(non-empty filter)"];
|
||||||
|
}
|
||||||
|
|
||||||
|
// Other nodes
|
||||||
|
exit [label = "Exit"];
|
||||||
|
i1 [label = "Other menu"];
|
||||||
|
e1 [label = "External program"];
|
||||||
|
|
||||||
|
|
||||||
|
// Transitions
|
||||||
|
l1 -> exit [label = "E"];
|
||||||
|
|
||||||
|
// Mode switches
|
||||||
|
l1 -> l2 [label = "S"];
|
||||||
|
l2 -> l3 [label = "A"];
|
||||||
|
l2 -> l1 [label = "E"];
|
||||||
|
l3 -> l1 [label = "E"];
|
||||||
|
l3 -> l2 [label = "B"];
|
||||||
|
|
||||||
|
// Horizontal navigation
|
||||||
|
l1 -> i1 [label = "H"];
|
||||||
|
l2 -> i1 [label = "H"];
|
||||||
|
l3 -> i1 [label = "H"];
|
||||||
|
|
||||||
|
// External program
|
||||||
|
l1 -> e1 [label = "K"];
|
||||||
|
l2 -> e1 [label = "K"];
|
||||||
|
l3 -> e1 [label = "K"];
|
||||||
|
|
||||||
|
// Self-loops
|
||||||
|
l1 -> l1 [label = "V"];
|
||||||
|
l2 -> l2 [label = "V"];
|
||||||
|
l3 -> l3 [label = "V"];
|
||||||
|
l3 -> l3 [label = "A"];
|
||||||
|
l3 -> l3 [label = "B"];
|
||||||
|
|
||||||
|
label = "Keys:\n\
|
||||||
|
S - Slash (start filter) \l\
|
||||||
|
E - Escape \l\
|
||||||
|
A - Type text \l\
|
||||||
|
B - Backspace \l\
|
||||||
|
H - Horizontal navigation (Right,Left,Tab) \l\
|
||||||
|
V - Vertical navigation (Up, Down) \l\
|
||||||
|
K - Key binding \l"
|
||||||
|
}
|
694
kls
694
kls
|
@ -1,84 +1,85 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
from typing import Optional, Callable
|
from typing import Optional, Callable, Self
|
||||||
import subprocess
|
import subprocess
|
||||||
import curses
|
import curses
|
||||||
import curses.ascii
|
import curses.ascii
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
|
|
||||||
SCREEN: curses.window = curses.initscr()
|
SCREEN: curses.window = curses.initscr()
|
||||||
|
|
||||||
|
|
||||||
# ****************************** #
|
# ****************************** #
|
||||||
# START OF CONFIGURATION SECTION #
|
# START OF CONFIGURATION SECTION #
|
||||||
# ****************************** #
|
# ****************************** #
|
||||||
KEY_BINDINGS: dict[str, dict[str, str]] = { # can be extended
|
KEY_BINDINGS: dict[str, dict[str, str]] = { # can be extended
|
||||||
"^Y": { # Ctrl + y
|
"^Y": { # Ctrl + y
|
||||||
"description": "Yaml",
|
"description": "Yaml",
|
||||||
"command": 'kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml'
|
"command": "kubectl -n {namespace} get {api_resource} {resource} -o yaml | batcat -l yaml",
|
||||||
|
"kind": "all", # this key binding is valid for all api resources
|
||||||
},
|
},
|
||||||
"^D": { # Ctrl + d
|
"^D": { # Ctrl + d
|
||||||
"description": "Describe",
|
"description": "Describe",
|
||||||
"command": 'kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml'
|
"command": "kubectl -n {namespace} describe {api_resource} {resource} | batcat -l yaml",
|
||||||
|
"kind": "all",
|
||||||
},
|
},
|
||||||
"^E": { # Ctrl + e
|
"^E": { # Ctrl + e
|
||||||
"description": "Edit",
|
"description": "Edit",
|
||||||
"command": 'kubectl -n {namespace} edit {api_resource} {resource}'
|
"command": "kubectl -n {namespace} edit {api_resource} {resource}",
|
||||||
|
"kind": "all",
|
||||||
},
|
},
|
||||||
"^L": { # Ctrl + l
|
"^L": { # Ctrl + l
|
||||||
"description": "Logs",
|
"description": "Logs",
|
||||||
"command": 'kubectl -n {namespace} logs {resource} | lnav'
|
"command": "kubectl -n {namespace} logs {resource} | lnav",
|
||||||
|
"kind": "pods", # this key binding is valid for pods only
|
||||||
},
|
},
|
||||||
"^X": { # Ctrl + x
|
"^X": { # Ctrl + x
|
||||||
"description": "eXec",
|
"description": "eXec",
|
||||||
"command": 'kubectl -n {namespace} exec -it {resource} sh'
|
"command": "kubectl -n {namespace} exec -it {resource} sh",
|
||||||
|
"kind": "pods",
|
||||||
},
|
},
|
||||||
"^N": { # Ctrl + n
|
"^N": { # Ctrl + n
|
||||||
"description": "Network debug",
|
"description": "Network debug",
|
||||||
"command": 'kubectl -n {namespace} debug {resource} -it --image=nicolaka/netshoot'
|
"command": "kubectl -n {namespace} debug {resource} -it --image=nicolaka/netshoot",
|
||||||
|
"kind": "pods",
|
||||||
},
|
},
|
||||||
"^A": { # Ctrl + a (a means Access logs! :-))
|
"^A": { # Ctrl + a (a means Access logs! :-))
|
||||||
"description": "istio-proxy Access logs",
|
"description": "istio-proxy Access logs",
|
||||||
"command": 'kubectl -n {namespace} logs {resource} -c istio-proxy | lnav'
|
"command": "kubectl -n {namespace} logs {resource} -c istio-proxy | lnav",
|
||||||
|
"kind": "pods",
|
||||||
},
|
},
|
||||||
"^P": { # Ctrl + p (p means Proxy! :-))
|
"^P": { # Ctrl + p (p means Proxy! :-))
|
||||||
"description": "exec istio-Proxy",
|
"description": "exec istio-Proxy",
|
||||||
"command": 'kubectl -n {namespace} exec -it {resource} -c istio-proxy bash'
|
"command": "kubectl -n {namespace} exec -it {resource} -c istio-proxy bash",
|
||||||
|
"kind": "pods",
|
||||||
},
|
},
|
||||||
"^R": { # Ctrl + r (r means Reveal! :-))
|
"^R": { # Ctrl + r (r means Reveal! :-))
|
||||||
"description": "Reveal secret",
|
"description": "Reveal secret",
|
||||||
"command": "kubectl get secret {resource} -n {namespace} -o yaml | yq '.data |= with_entries(.value |= @base64d)' -y | batcat -l yaml"
|
"command": "kubectl get secret {resource} -n {namespace} -o yaml"
|
||||||
|
" | yq '.data |= with_entries(.value |= @base64d)' -y | batcat -l yaml",
|
||||||
|
"kind": "secrets",
|
||||||
},
|
},
|
||||||
"Delete": { # It is actually KEY_DC
|
"Delete": { # It is actually KEY_DC
|
||||||
"description": "Delete",
|
"description": "Delete",
|
||||||
"command": 'kubectl -n {namespace} delete {api_resource} {resource}'
|
"command": "kubectl -n {namespace} delete {api_resource} {resource}",
|
||||||
}
|
"kind": "all",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
# which api resources are on the top of menu?
|
|
||||||
TOP_API_RESOURCES: list[str] = [
|
|
||||||
"pods", "services", "configmaps", "secrets", "persistentvolumeclaims",
|
|
||||||
"ingresses", "nodes", "deployments", "statefulsets", "daemonsets",
|
|
||||||
"storageclasses", "serviceentries", "destinationrules", "authorizationpolicies",
|
|
||||||
"virtualservices", "gateways", "telemetry", "envoyfilters"
|
|
||||||
]
|
|
||||||
|
|
||||||
QUERY_API_RESOURCES: bool = False # Should we merge TOP_API_RESOURCES with all other api resources from cluster?
|
|
||||||
BATCAT_STYLE: str = " --paging always --style numbers" # style of batcat
|
BATCAT_STYLE: str = " --paging always --style numbers" # style of batcat
|
||||||
|
SELECTED_ROW_STYLE = curses.A_REVERSE | curses.A_BOLD
|
||||||
MOUSE_ENABLED: bool = False
|
MOUSE_ENABLED: bool = False
|
||||||
WIDTH: int = curses.COLS
|
WIDTH: int = curses.COLS
|
||||||
WIDTH_UNIT: int = int(WIDTH / 8)
|
WIDTH_UNIT: int = int(WIDTH / 10)
|
||||||
|
CONTEXTS_WIDTH = int(WIDTH_UNIT * 1.5)
|
||||||
NAMESPACES_WIDTH = int(WIDTH_UNIT * 1.5)
|
NAMESPACES_WIDTH = int(WIDTH_UNIT * 1.5)
|
||||||
API_RESOURCES_WIDTH = int(WIDTH_UNIT * 1.5)
|
API_RESOURCES_WIDTH = int(WIDTH_UNIT * 1.5)
|
||||||
RESOURCES_WIDTH = WIDTH - (API_RESOURCES_WIDTH + NAMESPACES_WIDTH)
|
RESOURCES_WIDTH = WIDTH - (CONTEXTS_WIDTH + NAMESPACES_WIDTH + API_RESOURCES_WIDTH)
|
||||||
HEADER_HEIGHT: int = 4
|
HEADER_HEIGHT: int = 4
|
||||||
FOOTER_HEIGHT: int = 3
|
FOOTER_HEIGHT: int = 3
|
||||||
ROWS_HEIGHT: int = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3
|
ROWS_HEIGHT: int = curses.LINES - HEADER_HEIGHT - FOOTER_HEIGHT - 3
|
||||||
# Generate HELP_TEXT from KEY_BINDINGS
|
# Generate HELP_TEXT from KEY_BINDINGS
|
||||||
HELP_TEXT: str = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items())
|
HELP_TEXT: str = ", ".join(f"{key}: {binding['description']}" for key, binding in KEY_BINDINGS.items())
|
||||||
HELP_TEXT += ", /: filter mode, Esc: exit filter mode or kls, arrows/TAB/PgUp/PgDn: navigation"
|
HELP_TEXT += ", /: filter mode, Esc: exit filter mode, arrows/TAB: navigation, q: exit kls"
|
||||||
SELECTED_ROW_STYLE = curses.A_REVERSE | curses.A_BOLD
|
|
||||||
ALLOWED_SPECIAL_KEYS = list(KEY_BINDINGS.keys()) + ["KEY_DC", "/", "\x1b", "KEY_BACKSPACE", "\x08", "KEY_MOUSE", "KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END", "\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]
|
|
||||||
|
|
||||||
# **************************** #
|
# **************************** #
|
||||||
# END OF CONFIGURATION SECTION #
|
# END OF CONFIGURATION SECTION #
|
||||||
|
@ -86,41 +87,110 @@ ALLOWED_SPECIAL_KEYS = list(KEY_BINDINGS.keys()) + ["KEY_DC", "/", "\x1b", "KEY_
|
||||||
|
|
||||||
|
|
||||||
class CircularList:
|
class CircularList:
|
||||||
def __init__(self, elements: list[str]):
|
def __init__(self, items: list[str]):
|
||||||
self.elements: list[str] = elements
|
self.items: list[str] = items
|
||||||
self.size: int = len(elements)
|
self.size: int = len(items)
|
||||||
self.index: int = 0
|
self.index: int = 0
|
||||||
|
|
||||||
def __getitem__(self, index: slice) -> list[str]:
|
def __getitem__(self, index: slice) -> list[str]:
|
||||||
start, stop, step = index.indices(self.size)
|
start, stop, step = index.indices(self.size)
|
||||||
return [self.elements[(self.index + i) % self.size] for i in range(start, stop, step)]
|
return [self.items[(self.index + i) % self.size] for i in range(start, stop, step)]
|
||||||
|
|
||||||
def shift(self, steps: int) -> None:
|
def shift(self, steps: int) -> None:
|
||||||
self.index = (self.index + steps) % self.size
|
self.index = (self.index + steps) % self.size
|
||||||
|
|
||||||
|
|
||||||
class Menu:
|
class Menu:
|
||||||
def __init__(self, title: str, rows: list[str], begin_x: int|float, width: int|float, rows_height: int):
|
selected = None # Class variable to track selected object
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
title: str,
|
||||||
|
rows_function,
|
||||||
|
begin_x: int,
|
||||||
|
width: int,
|
||||||
|
):
|
||||||
self.title: str = title
|
self.title: str = title
|
||||||
self.rows: list[str] = rows
|
self.rows: list[str] = []
|
||||||
|
self.rows_function = rows_function
|
||||||
self.filter: str = ""
|
self.filter: str = ""
|
||||||
self.filter_mode: bool = False
|
self.state: str = "Normal"
|
||||||
self.filtered_rows: CircularList = CircularList([x for x in self.rows if self.filter in x])
|
self.filtered_rows: CircularList = CircularList([x for x in self.rows if self.filter in x])
|
||||||
self.visible_rows: Callable[[], list[str]] = lambda: self.filtered_rows[:rows_height]
|
|
||||||
self.visible_row_index: int = 0
|
self.visible_row_index: int = 0
|
||||||
self.selected_row: Callable[[], Optional[str]] = lambda: self.visible_rows()[
|
self.selected_row: Callable[[], Optional[str]] = (
|
||||||
self.visible_row_index] if self.visible_rows() else None
|
lambda: self.visible_rows[self.visible_row_index] if self.visible_rows else None
|
||||||
self.rows_height: int = rows_height
|
)
|
||||||
self.width: int = int(width)
|
self.width: int = int(width)
|
||||||
self.begin_x: int = int(begin_x)
|
self.begin_x: int = int(begin_x)
|
||||||
self.win: curses.window = curses.newwin(curses.LINES - FOOTER_HEIGHT, self.width, 0, self.begin_x)
|
self.win: curses.window = curses.newwin(curses.LINES - FOOTER_HEIGHT, self.width, 0, self.begin_x)
|
||||||
|
self.dependent_menus: list[Self] = []
|
||||||
|
|
||||||
|
@property
|
||||||
|
def visible_rows(self) -> list[str]:
|
||||||
|
return self.filtered_rows[:ROWS_HEIGHT]
|
||||||
|
|
||||||
|
async def set_rows(self):
|
||||||
|
self.rows = await self.rows_function()
|
||||||
|
|
||||||
|
def set_filtered_rows(self):
|
||||||
|
self.filtered_rows = CircularList([x for x in self.rows if self.filter in x])
|
||||||
|
|
||||||
|
async def set_state(self, state: str) -> None:
|
||||||
|
self.state = state
|
||||||
|
# entry activities
|
||||||
|
match self.state:
|
||||||
|
case "Normal":
|
||||||
|
self.filter = ""
|
||||||
|
await self.draw_menu_or_footer("")
|
||||||
|
case "EmptyFilter":
|
||||||
|
self.filter = ""
|
||||||
|
await self.draw_menu_or_footer("/")
|
||||||
|
case "FilledFilter":
|
||||||
|
await self.draw_menu_or_footer(f"/{self.filter}") # if redrawing whole menu is not needed
|
||||||
|
await self.refresh_dependent_menus()
|
||||||
|
|
||||||
|
def draw_rows(self) -> None:
|
||||||
|
for index, row in enumerate(self.visible_rows):
|
||||||
|
draw_row(self.win, row, index + HEADER_HEIGHT, 2, selected=row == self.selected_row())
|
||||||
|
|
||||||
|
def draw_menu_with_footer(self) -> None:
|
||||||
|
self.win.erase()
|
||||||
|
draw_row(self.win, self.title, 1, 2, selected=self == Menu.selected)
|
||||||
|
self.draw_rows()
|
||||||
|
draw_row(
|
||||||
|
self.win,
|
||||||
|
f"/{self.filter}" if self.state in ["EmptyFilter", "FilledFilter"] else "",
|
||||||
|
curses.LINES - FOOTER_HEIGHT - 2,
|
||||||
|
2,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def draw_menu_or_footer(self, footer_text: str) -> None:
|
||||||
|
previous_visible_rows = self.visible_rows
|
||||||
|
self.set_filtered_rows()
|
||||||
|
if self.visible_rows != previous_visible_rows: # draw whole menu
|
||||||
|
self.visible_row_index = 0
|
||||||
|
self.draw_menu_with_footer()
|
||||||
|
if self == MENUS[0]:
|
||||||
|
await switch_context(self.selected_row())
|
||||||
|
await self.refresh_dependent_menus()
|
||||||
|
else: # draw footer only
|
||||||
|
draw_row(self.win, footer_text, curses.LINES - FOOTER_HEIGHT - 2, 2)
|
||||||
|
|
||||||
|
async def refresh_dependent_menus(self):
|
||||||
|
for menu in self.dependent_menus:
|
||||||
|
await menu.refresh_menu()
|
||||||
|
|
||||||
|
async def refresh_menu(self) -> None:
|
||||||
|
await self.set_rows()
|
||||||
|
self.set_filtered_rows()
|
||||||
|
if self.visible_row_index >= len(self.visible_rows):
|
||||||
|
self.visible_row_index = 0 # reset selected row only if number of lines changed
|
||||||
|
self.draw_menu_with_footer()
|
||||||
|
|
||||||
|
|
||||||
# Global variables
|
# Global variables
|
||||||
THIRD_MENU_LOCK: asyncio.Lock = asyncio.Lock()
|
FOURTH_MENU_TASK: Optional[asyncio.Task] = None
|
||||||
THIRD_MENU_TASK: Optional[asyncio.Task] = None
|
MENUS: list[Menu] = []
|
||||||
menus: list[Menu] = []
|
|
||||||
selected_menu: Optional[Menu] = None
|
|
||||||
|
|
||||||
|
|
||||||
def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = False) -> None:
|
def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool = False) -> None:
|
||||||
|
@ -129,274 +199,31 @@ def draw_row(window: curses.window, text: str, y: int, x: int, selected: bool =
|
||||||
window.refresh()
|
window.refresh()
|
||||||
|
|
||||||
|
|
||||||
def draw_rows(menu: Menu) -> None:
|
async def switch_context(context: str) -> None:
|
||||||
for index, row in enumerate(menu.visible_rows()):
|
if not context:
|
||||||
draw_row(menu.win, row, index + HEADER_HEIGHT, 2, selected = row == menu.selected_row())
|
return
|
||||||
|
|
||||||
|
|
||||||
def draw_menu(menu: Menu) -> None:
|
|
||||||
menu.win.erase()
|
|
||||||
draw_row(menu.win, menu.title, 1, 2, selected = menu == selected_menu)
|
|
||||||
draw_rows(menu)
|
|
||||||
draw_row(menu.win, f"/{menu.filter}" if menu.filter_mode else "", curses.LINES - FOOTER_HEIGHT - 2, 2)
|
|
||||||
|
|
||||||
|
|
||||||
async def refresh_third_menu(namespace: Optional[str], api_resource: Optional[str]) -> None:
|
|
||||||
global THIRD_MENU_TASK
|
|
||||||
try:
|
try:
|
||||||
async with THIRD_MENU_LOCK:
|
await kubectl_async(f"config use-context {context}")
|
||||||
menu = menus[2]
|
|
||||||
previous_menu_rows = menu.rows
|
|
||||||
if api_resource and namespace:
|
|
||||||
try:
|
|
||||||
menu.rows = await kubectl_async(
|
|
||||||
f"-n {namespace} get {api_resource} --no-headers --ignore-not-found")
|
|
||||||
except subprocess.CalledProcessError:
|
except subprocess.CalledProcessError:
|
||||||
menu.rows = []
|
|
||||||
else:
|
|
||||||
menu.rows = []
|
|
||||||
index_before_update = menu.filtered_rows.index
|
|
||||||
menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x])
|
|
||||||
menu.filtered_rows.index = index_before_update
|
|
||||||
if menu.visible_row_index >= len(menu.visible_rows()):
|
|
||||||
menu.visible_row_index = 0
|
|
||||||
if previous_menu_rows != menu.rows:
|
|
||||||
draw_menu(menu)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
async def handle_key_bindings(key: str, namespace: str, api_resource: str, resource: str) -> None:
|
|
||||||
if not resource:
|
|
||||||
return
|
|
||||||
if key in ("l", "x", "n") and api_resource != "pods":
|
|
||||||
return
|
|
||||||
if key == "KEY_DC":
|
|
||||||
key = "Delete"
|
|
||||||
if THIRD_MENU_TASK is not None:
|
|
||||||
THIRD_MENU_TASK.cancel()
|
|
||||||
try:
|
|
||||||
await THIRD_MENU_TASK
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
pass
|
||||||
async with THIRD_MENU_LOCK:
|
|
||||||
curses.def_prog_mode()
|
|
||||||
curses.endwin()
|
|
||||||
command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource)
|
|
||||||
if "batcat" in command:
|
|
||||||
command += BATCAT_STYLE
|
|
||||||
await subprocess_call_async(command)
|
|
||||||
curses.reset_prog_mode()
|
|
||||||
SCREEN.refresh()
|
|
||||||
enable_mouse_support()
|
|
||||||
|
|
||||||
|
|
||||||
def handle_filter_state(key: str, menu: Menu) -> None:
|
async def get_contexts() -> list[str]:
|
||||||
global selected_menu
|
|
||||||
if key == "/" and not menu.filter_mode:
|
|
||||||
menu.filter_mode = True
|
|
||||||
menu.filter = ""
|
|
||||||
elif key == "\x1b": # Escape key
|
|
||||||
if menu.filter_mode:
|
|
||||||
menu.filter_mode = False
|
|
||||||
menu.filter = ""
|
|
||||||
else:
|
|
||||||
selected_menu = None
|
|
||||||
elif menu.filter_mode:
|
|
||||||
if key in ["KEY_BACKSPACE", "\x08"] and menu.filter:
|
|
||||||
menu.filter = menu.filter[:-1] # Remove last character
|
|
||||||
elif key.isalnum() or key == "-": # Allow letters, numbers, and dashes
|
|
||||||
menu.filter += key.lower()
|
|
||||||
menu.visible_row_index = 0
|
|
||||||
menu.filtered_rows = CircularList([x for x in menu.rows if menu.filter in x])
|
|
||||||
draw_menu(menu)
|
|
||||||
if menu != menus[2]:
|
|
||||||
menus[2].visible_row_index = 0
|
|
||||||
|
|
||||||
|
|
||||||
def handle_mouse(menu: Menu) -> None:
|
|
||||||
if not MOUSE_ENABLED:
|
|
||||||
return
|
|
||||||
try:
|
try:
|
||||||
mouse_info: tuple[int, ...] = curses.getmouse()
|
current_context = await kubectl_async("config current-context")
|
||||||
except curses.error: # this fixes scrolling error
|
contexts = await kubectl_async("config get-contexts --no-headers -o name")
|
||||||
return
|
contexts.remove(current_context[0])
|
||||||
row_number = mouse_info[2] - HEADER_HEIGHT
|
contexts.insert(0, current_context[0])
|
||||||
column_number = mouse_info[1]
|
return [line.split()[0] for line in contexts if line.strip()]
|
||||||
next_menu: Optional[Menu] = None
|
except subprocess.CalledProcessError:
|
||||||
if column_number > (menu.begin_x + menu.width):
|
return []
|
||||||
next_menu = menus[(menus.index(menu) + 1) % 3]
|
|
||||||
if column_number > (next_menu.begin_x + next_menu.width):
|
|
||||||
next_menu = menus[(menus.index(next_menu) + 1) % 3]
|
|
||||||
globals().update(selected_menu=next_menu)
|
|
||||||
elif column_number < menu.begin_x:
|
|
||||||
next_menu = menus[(menus.index(menu) - 1) % 3]
|
|
||||||
if column_number < next_menu.begin_x:
|
|
||||||
next_menu = menus[(menus.index(next_menu) - 1) % 3]
|
|
||||||
globals().update(selected_menu=next_menu)
|
|
||||||
if next_menu:
|
|
||||||
draw_row(menu.win, menu.title, 1, 2, selected=False)
|
|
||||||
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)
|
|
||||||
menu = next_menu
|
|
||||||
char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1)
|
|
||||||
char_str = chr(char_int & 0xFF)
|
|
||||||
if not char_str or ord(char_str) > 127 or ' ' in char_str:
|
|
||||||
return
|
|
||||||
if 0 <= row_number < len(menu.visible_rows()):
|
|
||||||
menu.visible_row_index = row_number
|
|
||||||
draw_rows(menu)
|
|
||||||
if menu != menus[2]:
|
|
||||||
menus[2].visible_row_index = 0
|
|
||||||
|
|
||||||
|
|
||||||
def handle_vertical_navigation(key: str, menu: Menu) -> None:
|
async def get_namespaces() -> list[str]:
|
||||||
if len(menu.visible_rows()) <= 1:
|
|
||||||
return
|
|
||||||
keys_numbers: dict[str, int] = {
|
|
||||||
"KEY_DOWN": 1, "KEY_UP": -1,
|
|
||||||
"KEY_NPAGE": 1, "KEY_PPAGE": -1,
|
|
||||||
'KEY_HOME': 0, 'KEY_END': -1
|
|
||||||
}
|
|
||||||
if key in ["KEY_DOWN", "KEY_UP"]:
|
|
||||||
if menu.filtered_rows.size > menu.rows_height:
|
|
||||||
menu.filtered_rows.shift(keys_numbers[key])
|
|
||||||
else:
|
|
||||||
menu.visible_row_index = (menu.visible_row_index + keys_numbers[key]) % menu.filtered_rows.size
|
|
||||||
elif key in ["KEY_NPAGE", "KEY_PPAGE"]:
|
|
||||||
menu.filtered_rows.shift(keys_numbers[key] * len(menu.visible_rows()))
|
|
||||||
elif key in ['KEY_HOME', 'KEY_END']:
|
|
||||||
menu.visible_row_index = keys_numbers[key]
|
|
||||||
draw_rows(menu)
|
|
||||||
if menu != menus[2]:
|
|
||||||
menus[2].visible_row_index = 0
|
|
||||||
|
|
||||||
|
|
||||||
def handle_horizontal_navigation(key: str, menu: Menu) -> None:
|
|
||||||
increment: int = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
|
|
||||||
next_menu = menus[(menus.index(menu) + increment) % 3]
|
|
||||||
draw_row(menu.win, menu.title, 1, 2, selected=False)
|
|
||||||
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)
|
|
||||||
globals().update(selected_menu=next_menu)
|
|
||||||
|
|
||||||
|
|
||||||
async def confirm_action(message: str) -> bool:
|
|
||||||
rows, cols = SCREEN.getmaxyx()
|
|
||||||
popup_height = 5
|
|
||||||
popup_width = len(message) + 10
|
|
||||||
start_y = (rows - popup_height) // 2
|
|
||||||
start_x = (cols - popup_width) // 2
|
|
||||||
|
|
||||||
popup = curses.newwin(popup_height, popup_width, start_y, start_x)
|
|
||||||
popup.box()
|
|
||||||
popup.addstr(2, 2, message)
|
|
||||||
popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel")
|
|
||||||
|
|
||||||
popup.refresh()
|
|
||||||
while True:
|
|
||||||
key = await get_key_async(popup)
|
|
||||||
if key.lower() == 'y':
|
|
||||||
return True
|
|
||||||
if key.lower() == 'n':
|
|
||||||
popup.clear()
|
|
||||||
popup.refresh()
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
async def get_key_async(popup: curses.window) -> str:
|
|
||||||
return await asyncio.to_thread(popup.getkey)
|
|
||||||
|
|
||||||
|
|
||||||
async def kubectl_async(command: str) -> list[str]:
|
|
||||||
process = await asyncio.create_subprocess_shell(
|
|
||||||
f"kubectl {command} 2> /dev/null",
|
|
||||||
stdout=asyncio.subprocess.PIPE,
|
|
||||||
stderr=asyncio.subprocess.PIPE
|
|
||||||
)
|
|
||||||
stdout, stderr = await process.communicate()
|
|
||||||
if stderr:
|
|
||||||
raise subprocess.CalledProcessError(process.returncode, command, stderr=stderr)
|
|
||||||
return stdout.decode().strip().split("\n")
|
|
||||||
|
|
||||||
|
|
||||||
async def catch_input(menu: Menu) -> None:
|
|
||||||
global THIRD_MENU_TASK
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
key = await get_key_async(SCREEN)
|
|
||||||
break
|
|
||||||
except curses.error:
|
|
||||||
if THIRD_MENU_TASK is None or THIRD_MENU_TASK.done() or THIRD_MENU_TASK.cancelled():
|
|
||||||
THIRD_MENU_TASK = asyncio.create_task(
|
|
||||||
refresh_third_menu(
|
|
||||||
menus[0].selected_row(),
|
|
||||||
menus[1].selected_row()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
# Convert control keys to their string representation (e.g., Ctrl+Y -> ^Y)
|
|
||||||
# Handle special keys (e.g., "KEY_UP", "KEY_DC")
|
|
||||||
if key.startswith("KEY_") or key == "\x1b":
|
|
||||||
key_str = key # Use the key as-is for special keys
|
|
||||||
else:
|
|
||||||
# Handle single-character keys (e.g., "a", "^Y")
|
|
||||||
key_str = curses.ascii.unctrl(key) if curses.ascii.iscntrl(ord(key)) else key
|
|
||||||
# Check if the key is allowed
|
|
||||||
if key_str not in ALLOWED_SPECIAL_KEYS and not key.isalnum():
|
|
||||||
return # Ignore the key if it's not in the allowed list or not alphanumeric
|
|
||||||
if key.isalnum() and not menu.filter_mode:
|
|
||||||
return
|
|
||||||
|
|
||||||
if key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]:
|
|
||||||
handle_horizontal_navigation(key, menu)
|
|
||||||
elif key in ["KEY_UP", "KEY_DOWN", "KEY_NPAGE", "KEY_PPAGE", "KEY_HOME", "KEY_END"]:
|
|
||||||
if THIRD_MENU_TASK is not None:
|
|
||||||
THIRD_MENU_TASK.cancel()
|
|
||||||
try:
|
|
||||||
await THIRD_MENU_TASK
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
handle_vertical_navigation(key, menu)
|
|
||||||
elif key == "KEY_MOUSE":
|
|
||||||
handle_mouse(menu)
|
|
||||||
elif key == "KEY_DC" and await confirm_action("Are you sure you want to delete this resource?"):
|
|
||||||
await handle_key_bindings(
|
|
||||||
key,
|
|
||||||
menus[0].selected_row(),
|
|
||||||
menus[1].selected_row(),
|
|
||||||
menus[2].selected_row() and menus[2].selected_row().split()[0]
|
|
||||||
)
|
|
||||||
elif key in ["/", "\x1b", "KEY_BACKSPACE", "\x08"] or key.isalnum() or key == "-":
|
|
||||||
handle_filter_state(key, menu)
|
|
||||||
elif key != "KEY_DC" and curses.ascii.unctrl(key) in KEY_BINDINGS:
|
|
||||||
await handle_key_bindings(
|
|
||||||
curses.ascii.unctrl(key),
|
|
||||||
menus[0].selected_row(),
|
|
||||||
menus[1].selected_row(),
|
|
||||||
menus[2].selected_row() and menus[2].selected_row().split()[0]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def subprocess_call_async(command: str) -> None:
|
|
||||||
process = await asyncio.create_subprocess_shell(command)
|
|
||||||
await process.communicate()
|
|
||||||
|
|
||||||
|
|
||||||
def enable_mouse_support() -> None:
|
|
||||||
if MOUSE_ENABLED:
|
|
||||||
curses.mousemask(curses.REPORT_MOUSE_POSITION)
|
|
||||||
print('\033[?1003h')
|
|
||||||
|
|
||||||
|
|
||||||
async def init_menus() -> None:
|
|
||||||
global menus, selected_menu
|
|
||||||
api_resources_kubectl: list[str] = [
|
|
||||||
x.split()[0] for x in await kubectl_async("api-resources --no-headers --verbs=get")
|
|
||||||
]
|
|
||||||
api_resources = list(
|
|
||||||
dict.fromkeys(TOP_API_RESOURCES + api_resources_kubectl)
|
|
||||||
) if QUERY_API_RESOURCES else TOP_API_RESOURCES
|
|
||||||
|
|
||||||
namespaces: list[str] = []
|
namespaces: list[str] = []
|
||||||
|
context = MENUS[0].selected_row() and MENUS[0].selected_row().split()[0]
|
||||||
|
if not context:
|
||||||
|
return namespaces
|
||||||
try:
|
try:
|
||||||
namespaces = await kubectl_async("config view --minify --output 'jsonpath={..namespace}'")
|
namespaces = await kubectl_async("config view --minify --output 'jsonpath={..namespace}'")
|
||||||
except:
|
except:
|
||||||
|
@ -412,14 +239,196 @@ async def init_menus() -> None:
|
||||||
namespaces = all_namespaces
|
namespaces = all_namespaces
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
return namespaces
|
||||||
|
|
||||||
menus = [
|
|
||||||
Menu("Namespaces", namespaces, 0, NAMESPACES_WIDTH, ROWS_HEIGHT),
|
|
||||||
Menu("API resources", api_resources, NAMESPACES_WIDTH, API_RESOURCES_WIDTH, ROWS_HEIGHT),
|
|
||||||
Menu("Resources", [], NAMESPACES_WIDTH + API_RESOURCES_WIDTH, RESOURCES_WIDTH, ROWS_HEIGHT)
|
|
||||||
]
|
|
||||||
selected_menu = menus[0]
|
|
||||||
|
|
||||||
|
async def get_api_resources() -> list[str]:
|
||||||
|
try:
|
||||||
|
api_resources = await kubectl_async("api-resources --no-headers --verbs=get")
|
||||||
|
return sorted(list(set([x.split()[0] for x in api_resources]))) # dedup
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
async def get_resources() -> list[str]:
|
||||||
|
api_resource = MENUS[2].selected_row()
|
||||||
|
namespace = MENUS[1].selected_row()
|
||||||
|
if not (api_resource and namespace):
|
||||||
|
return []
|
||||||
|
try:
|
||||||
|
resources = await kubectl_async(
|
||||||
|
f"-n {namespace} get {api_resource} --no-headers --ignore-not-found --sort-by='{{.metadata.name}}'"
|
||||||
|
)
|
||||||
|
return resources
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_key_bindings(key: str) -> None:
|
||||||
|
api_resource = MENUS[2].selected_row()
|
||||||
|
if key == "KEY_DC":
|
||||||
|
key = "Delete"
|
||||||
|
if KEY_BINDINGS[key]["kind"] != api_resource and KEY_BINDINGS[key]["kind"] != "all":
|
||||||
|
return
|
||||||
|
resource = MENUS[3].selected_row() and MENUS[3].selected_row().split()[0]
|
||||||
|
if not resource:
|
||||||
|
return
|
||||||
|
namespace = MENUS[1].selected_row()
|
||||||
|
await cancel_resources_refreshing()
|
||||||
|
curses.def_prog_mode()
|
||||||
|
curses.endwin()
|
||||||
|
command = KEY_BINDINGS[key]["command"].format(namespace=namespace, api_resource=api_resource, resource=resource)
|
||||||
|
if "batcat" in command:
|
||||||
|
command += BATCAT_STYLE
|
||||||
|
await subprocess_call_async(command)
|
||||||
|
curses.reset_prog_mode()
|
||||||
|
SCREEN.refresh()
|
||||||
|
enable_mouse_support()
|
||||||
|
|
||||||
|
|
||||||
|
def handle_mouse(menu: Menu) -> None:
|
||||||
|
if not MOUSE_ENABLED:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
mouse_info: tuple[int, ...] = curses.getmouse()
|
||||||
|
except curses.error: # this fixes scrolling error
|
||||||
|
return
|
||||||
|
row_number = mouse_info[2] - HEADER_HEIGHT
|
||||||
|
column_number = mouse_info[1]
|
||||||
|
next_menu: Optional[Menu] = None
|
||||||
|
if column_number > (menu.begin_x + menu.width):
|
||||||
|
next_menu = MENUS[(MENUS.index(menu) + 1) % MENUS.__len__()]
|
||||||
|
if column_number > (next_menu.begin_x + next_menu.width):
|
||||||
|
next_menu = MENUS[(MENUS.index(next_menu) + 1) % MENUS.__len__()]
|
||||||
|
Menu.selected = next_menu
|
||||||
|
elif column_number < menu.begin_x:
|
||||||
|
next_menu = MENUS[(MENUS.index(menu) - 1) % MENUS.__len__()]
|
||||||
|
if column_number < next_menu.begin_x:
|
||||||
|
next_menu = MENUS[(MENUS.index(next_menu) - 1) % MENUS.__len__()]
|
||||||
|
Menu.selected = next_menu
|
||||||
|
if next_menu:
|
||||||
|
draw_row(menu.win, menu.title, 1, 2, selected=False)
|
||||||
|
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)
|
||||||
|
menu = next_menu
|
||||||
|
char_int = menu.win.inch(mouse_info[2], column_number - menu.begin_x - 1)
|
||||||
|
char_str = chr(char_int & 0xFF)
|
||||||
|
if not char_str or ord(char_str) > 127 or " " in char_str:
|
||||||
|
return
|
||||||
|
if 0 <= row_number < len(menu.visible_rows):
|
||||||
|
menu.visible_row_index = row_number
|
||||||
|
menu.draw_rows()
|
||||||
|
menu.refresh_dependent_menus()
|
||||||
|
|
||||||
|
|
||||||
|
async def move_selection_vertically(key: str, menu: Menu) -> None:
|
||||||
|
if len(menu.visible_rows) <= 1:
|
||||||
|
return
|
||||||
|
keys_numbers: dict[str, int] = {"KEY_DOWN": 1, "KEY_UP": -1}
|
||||||
|
if menu.filtered_rows.size > ROWS_HEIGHT:
|
||||||
|
menu.filtered_rows.shift(keys_numbers[key])
|
||||||
|
else:
|
||||||
|
menu.visible_row_index = (menu.visible_row_index + keys_numbers[key]) % menu.filtered_rows.size
|
||||||
|
menu.draw_rows()
|
||||||
|
|
||||||
|
|
||||||
|
def move_selection_horizontally(key: str, menu: Menu) -> None:
|
||||||
|
increment: int = {"KEY_RIGHT": 1, "\t": 1, "KEY_LEFT": -1, "KEY_BTAB": -1}[key]
|
||||||
|
next_menu = MENUS[(MENUS.index(menu) + increment) % MENUS.__len__()]
|
||||||
|
draw_row(menu.win, menu.title, 1, 2, selected=False)
|
||||||
|
draw_row(next_menu.win, next_menu.title, 1, 2, selected=True)
|
||||||
|
Menu.selected = next_menu
|
||||||
|
|
||||||
|
|
||||||
|
def confirm_action(message: str) -> bool:
|
||||||
|
rows, cols = SCREEN.getmaxyx()
|
||||||
|
popup_height = 5
|
||||||
|
popup_width = len(message) + 10
|
||||||
|
start_y = (rows - popup_height) // 2
|
||||||
|
start_x = (cols - popup_width) // 2
|
||||||
|
|
||||||
|
popup = curses.newwin(popup_height, popup_width, start_y, start_x)
|
||||||
|
popup.box()
|
||||||
|
popup.addstr(2, 2, message)
|
||||||
|
popup.addstr(3, 2, "Press 'y' to confirm, 'n' to cancel")
|
||||||
|
|
||||||
|
popup.refresh()
|
||||||
|
while True:
|
||||||
|
key = popup.getkey()
|
||||||
|
if key.lower() == "y":
|
||||||
|
confirm = True
|
||||||
|
elif key.lower() == "n":
|
||||||
|
confirm = False
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
popup.clear()
|
||||||
|
popup.refresh()
|
||||||
|
return confirm
|
||||||
|
|
||||||
|
|
||||||
|
async def cancel_resources_refreshing() -> None:
|
||||||
|
if not (FOURTH_MENU_TASK is None or FOURTH_MENU_TASK.done()):
|
||||||
|
FOURTH_MENU_TASK.cancel()
|
||||||
|
try:
|
||||||
|
await FOURTH_MENU_TASK
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
async def kubectl_async(command: str) -> list[str]:
|
||||||
|
process = await asyncio.create_subprocess_shell(
|
||||||
|
f"kubectl {command}",
|
||||||
|
stdout=asyncio.subprocess.PIPE,
|
||||||
|
stderr=asyncio.subprocess.PIPE,
|
||||||
|
)
|
||||||
|
stdout, stderr = await process.communicate()
|
||||||
|
if stderr:
|
||||||
|
raise subprocess.CalledProcessError(process.returncode, command, stderr=stderr)
|
||||||
|
return stdout.decode().strip().split("\n")
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_state_independent_input(menu: Menu, key: str) -> None:
|
||||||
|
if key in ["KEY_UP", "KEY_DOWN"]: # V (Vertical navigation)
|
||||||
|
if len(menu.visible_rows) > 1:
|
||||||
|
await cancel_resources_refreshing()
|
||||||
|
await move_selection_vertically(key, menu)
|
||||||
|
if menu == MENUS[0]:
|
||||||
|
await switch_context(menu.selected_row())
|
||||||
|
await menu.refresh_dependent_menus()
|
||||||
|
elif key in ["\t", "KEY_RIGHT", "KEY_BTAB", "KEY_LEFT"]: # H (Vertical navigation)
|
||||||
|
move_selection_horizontally(key, menu)
|
||||||
|
elif key == "KEY_MOUSE":
|
||||||
|
handle_mouse(menu)
|
||||||
|
elif key == "KEY_DC":
|
||||||
|
if not MENUS[3].selected_row():
|
||||||
|
return
|
||||||
|
if confirm_action("Are you sure you want to delete this resource?"):
|
||||||
|
await handle_key_bindings(key)
|
||||||
|
elif not key.startswith("KEY_") and curses.ascii.unctrl(key) in KEY_BINDINGS: # K (Key Bindings)
|
||||||
|
await handle_key_bindings(curses.ascii.unctrl(key))
|
||||||
|
|
||||||
|
|
||||||
|
async def subprocess_call_async(command: str) -> None:
|
||||||
|
process = await asyncio.create_subprocess_shell(command)
|
||||||
|
await process.communicate()
|
||||||
|
|
||||||
|
|
||||||
|
def enable_mouse_support() -> None:
|
||||||
|
curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION)
|
||||||
|
if MOUSE_ENABLED:
|
||||||
|
print("\033[?1003h")
|
||||||
|
|
||||||
|
|
||||||
|
async def init_menus() -> list[Menu]:
|
||||||
|
MENUS.append(Menu("Contexts", get_contexts, 0, CONTEXTS_WIDTH))
|
||||||
|
MENUS.append(Menu("Namespaces", get_namespaces, CONTEXTS_WIDTH, NAMESPACES_WIDTH))
|
||||||
|
MENUS.append(Menu("API resources", get_api_resources, CONTEXTS_WIDTH + NAMESPACES_WIDTH, API_RESOURCES_WIDTH))
|
||||||
|
MENUS.append(
|
||||||
|
Menu("Resources", get_resources, CONTEXTS_WIDTH + NAMESPACES_WIDTH + API_RESOURCES_WIDTH, RESOURCES_WIDTH)
|
||||||
|
)
|
||||||
|
return MENUS
|
||||||
|
|
||||||
|
|
||||||
|
async def setup_curses() -> None:
|
||||||
SCREEN.refresh()
|
SCREEN.refresh()
|
||||||
SCREEN.nodelay(True)
|
SCREEN.nodelay(True)
|
||||||
SCREEN.keypad(True)
|
SCREEN.keypad(True)
|
||||||
|
@ -430,16 +439,67 @@ async def init_menus() -> None:
|
||||||
enable_mouse_support()
|
enable_mouse_support()
|
||||||
|
|
||||||
|
|
||||||
|
async def initialize_interface() -> None:
|
||||||
|
global MENUS
|
||||||
|
MENUS = await init_menus()
|
||||||
|
Menu.selected = MENUS[0]
|
||||||
|
await setup_curses()
|
||||||
|
|
||||||
|
for index, menu in enumerate(MENUS):
|
||||||
|
await menu.set_rows()
|
||||||
|
menu.set_filtered_rows()
|
||||||
|
menu.draw_menu_with_footer()
|
||||||
|
menu.dependent_menus = MENUS[index + 1 :] # all other menu to the right
|
||||||
|
draw_row(curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0), HELP_TEXT, 1, 2)
|
||||||
|
|
||||||
|
|
||||||
async def main_async() -> None:
|
async def main_async() -> None:
|
||||||
await init_menus()
|
global MENUS, FOURTH_MENU_TASK
|
||||||
for menu in menus:
|
await initialize_interface()
|
||||||
draw_menu(menu)
|
while True:
|
||||||
draw_row(
|
menu = Menu.selected
|
||||||
curses.newwin(3, curses.COLS, curses.LINES - FOOTER_HEIGHT, 0),
|
try:
|
||||||
HELP_TEXT, 1, 2
|
key = SCREEN.getkey()
|
||||||
)
|
except curses.error:
|
||||||
while selected_menu:
|
if FOURTH_MENU_TASK is None or FOURTH_MENU_TASK.done():
|
||||||
await catch_input(selected_menu)
|
FOURTH_MENU_TASK = asyncio.create_task(MENUS[3].refresh_menu())
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# handle state-dependent keys
|
||||||
|
match menu.state:
|
||||||
|
case "Normal":
|
||||||
|
if key == "q": # Q (Quit)
|
||||||
|
break # Exit
|
||||||
|
elif key == "/": # S (Slash)
|
||||||
|
await menu.set_state("EmptyFilter") # Transition to EmptyFilter state
|
||||||
|
continue
|
||||||
|
case "EmptyFilter":
|
||||||
|
if key == "\x1b": # E (Escape)
|
||||||
|
await menu.set_state("Normal") # Transition to Normal state
|
||||||
|
continue
|
||||||
|
elif key.isalnum() or key == "-": # A (Type text)
|
||||||
|
menu.filter += key.lower()
|
||||||
|
await menu.set_state("FilledFilter") # Transition to FilledFilter state
|
||||||
|
continue
|
||||||
|
case "FilledFilter": # FilledFilter state
|
||||||
|
if key == "\x1b": # E (Escape)
|
||||||
|
await menu.set_state("Normal") # Transition to Normal state
|
||||||
|
continue
|
||||||
|
elif key in ["KEY_BACKSPACE", "\x08"]: # B (Backspace)
|
||||||
|
if len(menu.filter) == 1:
|
||||||
|
await menu.set_state("EmptyFilter") # Transition to EmptyFilter state
|
||||||
|
continue
|
||||||
|
menu.filter = menu.filter[:-1]
|
||||||
|
await menu.draw_menu_or_footer(f"/{menu.filter}")
|
||||||
|
continue
|
||||||
|
elif key.isalnum() or key == "-": # A (Type text)
|
||||||
|
menu.filter += key.lower() # Stay in FilledFilter state
|
||||||
|
await menu.draw_menu_or_footer(f"/{menu.filter}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# handle state-independent keys (Vertical/Horizontal navigation etc. available in all states)
|
||||||
|
await handle_state_independent_input(menu, key)
|
||||||
|
|
||||||
|
|
||||||
def main(screen: curses.window) -> None:
|
def main(screen: curses.window) -> None:
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
Key Bindings:
|
||||||
|
1. Test all key bindings on pods, services and secrets
|
||||||
|
Filter:
|
||||||
|
1. filter mode
|
||||||
|
2. unmatched filter
|
||||||
|
3. matched filter
|
||||||
|
4. matched all filter
|
||||||
|
5. vertical navigation with matched filter
|
||||||
|
6. exit filter mode
|
||||||
|
Navigation:
|
||||||
|
1. Navigate contexts
|
Loading…
Reference in New Issue