powerk8s

Powerk8s - a Powerline plugin for Kubernetes information.

Powerk8s reads the local $KUBECNOFIG and displays specified information, such as:

  • Current context
  • Namespace
  1"""Powerk8s - a Powerline plugin for Kubernetes information.
  2
  3Powerk8s reads the local $KUBECNOFIG and displays specified information, such
  4as:
  5- Current context
  6- Namespace
  7"""
  8from __future__ import annotations
  9
 10from dataclasses import asdict, dataclass, field
 11from enum import StrEnum
 12from typing import Any, Mapping, Sequence
 13
 14from kubernetes import config  # type: ignore
 15from powerline import PowerlineLogger  # type: ignore
 16
 17KUBERNETES_LOGO: str = "\U00002388 "
 18
 19
 20class SegmentArg(StrEnum):
 21    """All possible Powerline segment argument types for Powerk8s."""
 22
 23    SHOW_KUBERNETES_LOGO = "show_kube_logo"
 24    SHOW_CLUSTER = "show_cluster"
 25    SHOW_NAMESPACE = "show_namespace"
 26    SHOW_DEFAULT_NAMESPACE = "show_default_namespace"
 27
 28
 29class HighlightGroup(StrEnum):
 30    """All possible highlight groups for Powerk8s."""
 31
 32    KUBERNETES_CLUSTER_ALERT = "kubernetes_cluster:alert"
 33    KUBERNETES_CLUSTER = "kubernetes_cluster"
 34    KUBERNETES_DIVIDER = "kubernetes:divider"
 35    KUBERNETES_NAMESPACE_ALERT = "kubernetes_namespace:alert"
 36    KUBERNETES_NAMESPACE = "kubernetes_namespace"
 37
 38
 39@dataclass
 40class SegmentData:
 41    """Encapsulates data for a Powerk8s segment."""
 42
 43    # pylint: disable=unsubscriptable-object
 44    contents: str | None
 45    highlight_groups: Sequence[str]
 46    # pylint: disable=unsubscriptable-object
 47    divider_highlight_group: str | None = field(default="")
 48
 49
 50def get_kubernetes_logo(color: str) -> SegmentData:
 51    """Get the Kubernetes logo (it is hardcoded right now)."""
 52    return SegmentData(
 53        KUBERNETES_LOGO, [color], HighlightGroup.KUBERNETES_DIVIDER.value
 54    )
 55
 56
 57def get_segment_args(**kwargs: Any) -> Mapping[SegmentArg, Any]:
 58    """Get the arguments for a Powerk8s segment."""
 59    return {sa: kwargs.get(sa.value, None) for sa in SegmentArg}
 60
 61
 62def powerk8s(*_: Sequence[Any], **kwargs: Any) -> Sequence[Mapping[str, str]]:
 63    """Entry point to the plugin."""
 64    segment_args: Mapping[SegmentArg, Any] = get_segment_args(**kwargs)
 65
 66    powerline_logger: PowerlineLogger = kwargs.get("pl", None)
 67
 68    _, active_context = config.list_kube_config_contexts()
 69
 70    if powerline_logger is not None:
 71        powerline_logger.debug(f"Context: {active_context}")
 72        powerline_logger.debug(f"Segment arguments: {segment_args}")
 73
 74    segments: list[SegmentData] = []
 75
 76    if segment_args.get(SegmentArg.SHOW_KUBERNETES_LOGO, False):
 77        segments.append(get_kubernetes_logo(HighlightGroup.KUBERNETES_CLUSTER.value))
 78
 79    if segment_args.get(SegmentArg.SHOW_CLUSTER, False):
 80        segments.append(
 81            SegmentData(
 82                contents=active_context["context"]["cluster"],
 83                highlight_groups=[HighlightGroup.KUBERNETES_CLUSTER.value],
 84                divider_highlight_group=HighlightGroup.KUBERNETES_NAMESPACE.value,
 85            )
 86        )
 87
 88    if (
 89        segment_args.get(SegmentArg.SHOW_DEFAULT_NAMESPACE, False)
 90        and "namespace" in active_context["context"]
 91    ):
 92        segments.extend(
 93            [
 94                SegmentData(
 95                    contents=" ",
 96                    highlight_groups=[HighlightGroup.KUBERNETES_DIVIDER.value],
 97                    divider_highlight_group=HighlightGroup.KUBERNETES_DIVIDER.value,
 98                ),
 99                SegmentData(
100                    contents=active_context["context"]["namespace"],
101                    highlight_groups=[HighlightGroup.KUBERNETES_CLUSTER.value],
102                    divider_highlight_group=HighlightGroup.KUBERNETES_NAMESPACE.value,
103                ),
104            ]
105        )
106
107    return [asdict(s) for s in segments]
class SegmentArg(enum.StrEnum):
21class SegmentArg(StrEnum):
22    """All possible Powerline segment argument types for Powerk8s."""
23
24    SHOW_KUBERNETES_LOGO = "show_kube_logo"
25    SHOW_CLUSTER = "show_cluster"
26    SHOW_NAMESPACE = "show_namespace"
27    SHOW_DEFAULT_NAMESPACE = "show_default_namespace"

All possible Powerline segment argument types for Powerk8s.

SHOW_CLUSTER = <SegmentArg.SHOW_CLUSTER: 'show_cluster'>
SHOW_NAMESPACE = <SegmentArg.SHOW_NAMESPACE: 'show_namespace'>
SHOW_DEFAULT_NAMESPACE = <SegmentArg.SHOW_DEFAULT_NAMESPACE: 'show_default_namespace'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class HighlightGroup(enum.StrEnum):
30class HighlightGroup(StrEnum):
31    """All possible highlight groups for Powerk8s."""
32
33    KUBERNETES_CLUSTER_ALERT = "kubernetes_cluster:alert"
34    KUBERNETES_CLUSTER = "kubernetes_cluster"
35    KUBERNETES_DIVIDER = "kubernetes:divider"
36    KUBERNETES_NAMESPACE_ALERT = "kubernetes_namespace:alert"
37    KUBERNETES_NAMESPACE = "kubernetes_namespace"

All possible highlight groups for Powerk8s.

KUBERNETES_CLUSTER_ALERT = <HighlightGroup.KUBERNETES_CLUSTER_ALERT: 'kubernetes_cluster:alert'>
KUBERNETES_CLUSTER = <HighlightGroup.KUBERNETES_CLUSTER: 'kubernetes_cluster'>
KUBERNETES_DIVIDER = <HighlightGroup.KUBERNETES_DIVIDER: 'kubernetes:divider'>
KUBERNETES_NAMESPACE_ALERT = <HighlightGroup.KUBERNETES_NAMESPACE_ALERT: 'kubernetes_namespace:alert'>
KUBERNETES_NAMESPACE = <HighlightGroup.KUBERNETES_NAMESPACE: 'kubernetes_namespace'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
@dataclass
class SegmentData:
40@dataclass
41class SegmentData:
42    """Encapsulates data for a Powerk8s segment."""
43
44    # pylint: disable=unsubscriptable-object
45    contents: str | None
46    highlight_groups: Sequence[str]
47    # pylint: disable=unsubscriptable-object
48    divider_highlight_group: str | None = field(default="")

Encapsulates data for a Powerk8s segment.

SegmentData( contents: str | None, highlight_groups: Sequence[str], divider_highlight_group: str | None = '')
def get_segment_args(**kwargs: Any) -> Mapping[powerk8s.SegmentArg, Any]:
58def get_segment_args(**kwargs: Any) -> Mapping[SegmentArg, Any]:
59    """Get the arguments for a Powerk8s segment."""
60    return {sa: kwargs.get(sa.value, None) for sa in SegmentArg}

Get the arguments for a Powerk8s segment.

def powerk8s(*_: Sequence[Any], **kwargs: Any) -> Sequence[Mapping[str, str]]:
 63def powerk8s(*_: Sequence[Any], **kwargs: Any) -> Sequence[Mapping[str, str]]:
 64    """Entry point to the plugin."""
 65    segment_args: Mapping[SegmentArg, Any] = get_segment_args(**kwargs)
 66
 67    powerline_logger: PowerlineLogger = kwargs.get("pl", None)
 68
 69    _, active_context = config.list_kube_config_contexts()
 70
 71    if powerline_logger is not None:
 72        powerline_logger.debug(f"Context: {active_context}")
 73        powerline_logger.debug(f"Segment arguments: {segment_args}")
 74
 75    segments: list[SegmentData] = []
 76
 77    if segment_args.get(SegmentArg.SHOW_KUBERNETES_LOGO, False):
 78        segments.append(get_kubernetes_logo(HighlightGroup.KUBERNETES_CLUSTER.value))
 79
 80    if segment_args.get(SegmentArg.SHOW_CLUSTER, False):
 81        segments.append(
 82            SegmentData(
 83                contents=active_context["context"]["cluster"],
 84                highlight_groups=[HighlightGroup.KUBERNETES_CLUSTER.value],
 85                divider_highlight_group=HighlightGroup.KUBERNETES_NAMESPACE.value,
 86            )
 87        )
 88
 89    if (
 90        segment_args.get(SegmentArg.SHOW_DEFAULT_NAMESPACE, False)
 91        and "namespace" in active_context["context"]
 92    ):
 93        segments.extend(
 94            [
 95                SegmentData(
 96                    contents=" ",
 97                    highlight_groups=[HighlightGroup.KUBERNETES_DIVIDER.value],
 98                    divider_highlight_group=HighlightGroup.KUBERNETES_DIVIDER.value,
 99                ),
100                SegmentData(
101                    contents=active_context["context"]["namespace"],
102                    highlight_groups=[HighlightGroup.KUBERNETES_CLUSTER.value],
103                    divider_highlight_group=HighlightGroup.KUBERNETES_NAMESPACE.value,
104                ),
105            ]
106        )
107
108    return [asdict(s) for s in segments]

Entry point to the plugin.