Module hardeneks.cluster_wide.security.iam

Expand source code
import boto3
from kubernetes import client
from rich import print
from rich.panel import Panel
from rich.console import Console

from ...resources import Resources
from ...report import print_role_table, print_instance_metadata_table

console = Console()


def restrict_wildcard_for_cluster_roles(resources: Resources):
    offenders = []

    for role in resources.cluster_roles:
        for rule in role.rules:
            if "*" in rule.verbs:
                offenders.append(role)
            if rule.resources and "*" in rule.resources:
                offenders.append(role)

    if offenders:
        print_role_table(
            offenders,
            "[red]ClusterRoles should not have '*' in Verbs or Resources",
            "Link: https://aws.github.io/aws-eks-best-practices/security/docs/iam/#employ-least-privileged-access-when-creating-rolebindings-and-clusterrolebindings",
            "ClusterRole",
        )
    return offenders


def check_endpoint_public_access(resources: Resources):
    client = boto3.client("eks", region_name=resources.region)
    cluster_metadata = client.describe_cluster(name=resources.cluster)
    endpoint_access = cluster_metadata["cluster"]["resourcesVpcConfig"][
        "endpointPublicAccess"
    ]
    if endpoint_access:
        print(
            Panel(
                "[red]EKS Cluster Endpoint is not Private",
                subtitle="Link: https://aws.github.io/aws-eks-best-practices/security/docs/iam/#make-the-eks-cluster-endpoint-private",
            )
        )
        console.print()
        return False

    return True


def check_aws_node_daemonset_service_account(resources: Resources):
    daemonset = client.AppsV1Api().read_namespaced_daemon_set(
        name="aws-node", namespace="kube-system"
    )

    if daemonset.spec.template.spec.service_account_name == "aws-node":
        print(
            Panel(
                "[red]Update the aws-node daemonset to use IRSA",
                subtitle="Link: https://aws.github.io/aws-eks-best-practices/security/docs/iam/#update-the-aws-node-daemonset-to-use-irsa",
            )
        )
        console.print()
        return False

    return True


def check_access_to_instance_profile(resources: Resources):
    client = boto3.client("ec2", region_name=resources.region)
    offenders = []

    instance_metadata = client.describe_instances(
        Filters=[
            {
                "Name": "tag:aws:eks:cluster-name",
                "Values": [
                    resources.cluster,
                ],
            },
        ]
    )

    for instance in instance_metadata["Reservations"]:
        if (
            instance["Instances"][0]["MetadataOptions"][
                "HttpPutResponseHopLimit"
            ]
            == 2
        ):
            offenders.append(instance)

    if offenders:
        print_instance_metadata_table(
            offenders,
            "[red]Restrict access to the instance profile assigned to nodes",
            "Link: https://aws.github.io/aws-eks-best-practices/security/docs/iam/#when-your-application-needs-access-to-imds-use-imdsv2-and-increase-the-hop-limit-on-ec2-instances-to-2",
        )
    return offenders


def disable_anonymous_access_for_cluster_roles(resources: Resources):
    offenders = []

    for cluster_role_binding in resources.cluster_role_bindings:
        if cluster_role_binding.subjects:
            for subject in cluster_role_binding.subjects:
                if (
                    subject.name == "system:unauthenticated"
                    or subject.name == "system:anonymous"
                ):
                    offenders.append(cluster_role_binding)

    if offenders:
        print_role_table(
            offenders,
            "[red]Don't bind clusterroles to anonymous/unauthenticated groups",
            "Link: https://aws.github.io/aws-eks-best-practices/security/docs/iam/#review-and-revoke-unnecessary-anonymous-access",
            "ClusterRoleBinding",
        )

    return offenders

Functions

def check_access_to_instance_profile(resources: Resources)
Expand source code
def check_access_to_instance_profile(resources: Resources):
    client = boto3.client("ec2", region_name=resources.region)
    offenders = []

    instance_metadata = client.describe_instances(
        Filters=[
            {
                "Name": "tag:aws:eks:cluster-name",
                "Values": [
                    resources.cluster,
                ],
            },
        ]
    )

    for instance in instance_metadata["Reservations"]:
        if (
            instance["Instances"][0]["MetadataOptions"][
                "HttpPutResponseHopLimit"
            ]
            == 2
        ):
            offenders.append(instance)

    if offenders:
        print_instance_metadata_table(
            offenders,
            "[red]Restrict access to the instance profile assigned to nodes",
            "Link: https://aws.github.io/aws-eks-best-practices/security/docs/iam/#when-your-application-needs-access-to-imds-use-imdsv2-and-increase-the-hop-limit-on-ec2-instances-to-2",
        )
    return offenders
def check_aws_node_daemonset_service_account(resources: Resources)
Expand source code
def check_aws_node_daemonset_service_account(resources: Resources):
    daemonset = client.AppsV1Api().read_namespaced_daemon_set(
        name="aws-node", namespace="kube-system"
    )

    if daemonset.spec.template.spec.service_account_name == "aws-node":
        print(
            Panel(
                "[red]Update the aws-node daemonset to use IRSA",
                subtitle="Link: https://aws.github.io/aws-eks-best-practices/security/docs/iam/#update-the-aws-node-daemonset-to-use-irsa",
            )
        )
        console.print()
        return False

    return True
def check_endpoint_public_access(resources: Resources)
Expand source code
def check_endpoint_public_access(resources: Resources):
    client = boto3.client("eks", region_name=resources.region)
    cluster_metadata = client.describe_cluster(name=resources.cluster)
    endpoint_access = cluster_metadata["cluster"]["resourcesVpcConfig"][
        "endpointPublicAccess"
    ]
    if endpoint_access:
        print(
            Panel(
                "[red]EKS Cluster Endpoint is not Private",
                subtitle="Link: https://aws.github.io/aws-eks-best-practices/security/docs/iam/#make-the-eks-cluster-endpoint-private",
            )
        )
        console.print()
        return False

    return True
def disable_anonymous_access_for_cluster_roles(resources: Resources)
Expand source code
def disable_anonymous_access_for_cluster_roles(resources: Resources):
    offenders = []

    for cluster_role_binding in resources.cluster_role_bindings:
        if cluster_role_binding.subjects:
            for subject in cluster_role_binding.subjects:
                if (
                    subject.name == "system:unauthenticated"
                    or subject.name == "system:anonymous"
                ):
                    offenders.append(cluster_role_binding)

    if offenders:
        print_role_table(
            offenders,
            "[red]Don't bind clusterroles to anonymous/unauthenticated groups",
            "Link: https://aws.github.io/aws-eks-best-practices/security/docs/iam/#review-and-revoke-unnecessary-anonymous-access",
            "ClusterRoleBinding",
        )

    return offenders
def restrict_wildcard_for_cluster_roles(resources: Resources)
Expand source code
def restrict_wildcard_for_cluster_roles(resources: Resources):
    offenders = []

    for role in resources.cluster_roles:
        for rule in role.rules:
            if "*" in rule.verbs:
                offenders.append(role)
            if rule.resources and "*" in rule.resources:
                offenders.append(role)

    if offenders:
        print_role_table(
            offenders,
            "[red]ClusterRoles should not have '*' in Verbs or Resources",
            "Link: https://aws.github.io/aws-eks-best-practices/security/docs/iam/#employ-least-privileged-access-when-creating-rolebindings-and-clusterrolebindings",
            "ClusterRole",
        )
    return offenders