Package hardeneks

Expand source code
from pathlib import Path
from pkg_resources import resource_filename
import yaml

from botocore.exceptions import EndpointConnectionError
import boto3
import kubernetes
from rich.console import Console
import typer

from .resources import (
    NamespacedResources,
    Resources,
)
from .harden import harden


app = typer.Typer()


def _config_callback(value: str):

    config = Path(value)

    if config.is_dir():
        raise typer.BadParameter(f"{config} is a directory")
    elif not config.exists():
        raise typer.BadParameter(f"{config} doesn't exist")

    with open(value, "r") as f:
        try:
            yaml.load(f, Loader=yaml.FullLoader)
        except yaml.YAMLError as exc:
            raise typer.BadParameter(exc)

    return value


def _get_current_context(context):
    if context:
        return context
    _, active_context = kubernetes.config.list_kube_config_contexts()
    return active_context["name"]


def _get_namespaces(ignored_ns: list) -> list:
    v1 = kubernetes.client.CoreV1Api()
    namespaces = [i.metadata.name for i in v1.list_namespace().items]
    return list(set(namespaces) - set(ignored_ns))


def _get_cluster_name(context, region):
    try:
        client = boto3.client("eks", region_name=region)
        for name in client.list_clusters()["clusters"]:
            if name in context:
                return name
    except EndpointConnectionError:
        raise ValueError(f"{region} seems like a bad region name")


def _get_region():
    return boto3.session.Session().region_name


@app.command()
def run_hardeneks(
    region: str = typer.Option(
        default=None, help="AWS region of the cluster. Ex: us-east-1"
    ),
    context: str = typer.Option(
        default=None,
        help="K8s context.",
    ),
    cluster: str = typer.Option(default=None, help="Cluster name."),
    namespace: str = typer.Option(
        default=None,
        help="Specific namespace to harden. Default is all namespaces.",
    ),
    config: str = typer.Option(
        default=resource_filename(__name__, "config.yaml"),
        callback=_config_callback,
        help="Path to a hardeneks config file.",
    ),
):
    """
    Main entry point to hardeneks.

    Args:
        region (str): AWS region of the cluster. Ex: us-east-1
        context (str): K8s context
        cluster (str): Cluster name
        namespace (str): Specific namespace to be checked
        config (str): Path to hardeneks config file

    Returns:
        None

    """

    kubernetes.config.load_kube_config(context=context)
    context = _get_current_context(context)
    if not cluster:
        cluster = _get_cluster_name(context, region)

    if not region:
        region = _get_region()

    console = Console()
    console.rule("[b]HARDENEKS", characters="*  ")
    console.print(f"You are operating at {region}")
    console.print(f"You context is {context}")
    console.print(f"Your cluster name is {cluster}")
    console.print(f"You are using {config} as your config file")
    console.print()

    with open(config, "r") as f:
        config = yaml.load(f, Loader=yaml.FullLoader)

    if not namespace:
        namespaces = _get_namespaces(config["ignore-namespaces"])
    else:
        namespaces = [namespace]

    rules = config["rules"]

    console.rule("[b]Checking cluster wide rules", characters="- ")
    print()

    resources = Resources(region, context, cluster, namespaces)
    resources.set_resources()
    harden(resources, rules, "cluster_wide")

    for ns in namespaces:
        console.rule(
            f"[b]Checking rules against namespace: {ns}", characters=" -"
        )
        console.print()
        resources = NamespacedResources(region, context, cluster, ns)
        resources.set_resources()
        harden(resources, rules, "namespace_based")
        console.print()

Sub-modules

hardeneks.cluster_wide
hardeneks.harden
hardeneks.namespace_based
hardeneks.report
hardeneks.resources

Functions

def run_hardeneks(region: str = <typer.models.OptionInfo object>, context: str = <typer.models.OptionInfo object>, cluster: str = <typer.models.OptionInfo object>, namespace: str = <typer.models.OptionInfo object>, config: str = <typer.models.OptionInfo object>)

Main entry point to hardeneks.

Args

region : str
AWS region of the cluster. Ex: us-east-1
context : str
K8s context
cluster : str
Cluster name
namespace : str
Specific namespace to be checked
config : str
Path to hardeneks config file

Returns

None

Expand source code
@app.command()
def run_hardeneks(
    region: str = typer.Option(
        default=None, help="AWS region of the cluster. Ex: us-east-1"
    ),
    context: str = typer.Option(
        default=None,
        help="K8s context.",
    ),
    cluster: str = typer.Option(default=None, help="Cluster name."),
    namespace: str = typer.Option(
        default=None,
        help="Specific namespace to harden. Default is all namespaces.",
    ),
    config: str = typer.Option(
        default=resource_filename(__name__, "config.yaml"),
        callback=_config_callback,
        help="Path to a hardeneks config file.",
    ),
):
    """
    Main entry point to hardeneks.

    Args:
        region (str): AWS region of the cluster. Ex: us-east-1
        context (str): K8s context
        cluster (str): Cluster name
        namespace (str): Specific namespace to be checked
        config (str): Path to hardeneks config file

    Returns:
        None

    """

    kubernetes.config.load_kube_config(context=context)
    context = _get_current_context(context)
    if not cluster:
        cluster = _get_cluster_name(context, region)

    if not region:
        region = _get_region()

    console = Console()
    console.rule("[b]HARDENEKS", characters="*  ")
    console.print(f"You are operating at {region}")
    console.print(f"You context is {context}")
    console.print(f"Your cluster name is {cluster}")
    console.print(f"You are using {config} as your config file")
    console.print()

    with open(config, "r") as f:
        config = yaml.load(f, Loader=yaml.FullLoader)

    if not namespace:
        namespaces = _get_namespaces(config["ignore-namespaces"])
    else:
        namespaces = [namespace]

    rules = config["rules"]

    console.rule("[b]Checking cluster wide rules", characters="- ")
    print()

    resources = Resources(region, context, cluster, namespaces)
    resources.set_resources()
    harden(resources, rules, "cluster_wide")

    for ns in namespaces:
        console.rule(
            f"[b]Checking rules against namespace: {ns}", characters=" -"
        )
        console.print()
        resources = NamespacedResources(region, context, cluster, ns)
        resources.set_resources()
        harden(resources, rules, "namespace_based")
        console.print()