Module hardeneks.cluster_wide.security.network_security

Expand source code
import boto3
from kubernetes import client
from rich.console import Console
from rich.panel import Panel
from rich import print


from ...resources import Resources
from ...report import print_namespace_table


console = Console()


def check_vpc_flow_logs(resources: Resources):
    client = boto3.client("eks", region_name=resources.region)
    cluster_metadata = client.describe_cluster(name=resources.cluster)

    vpc_id = cluster_metadata["cluster"]["resourcesVpcConfig"]["vpcId"]
    client = boto3.client("ec2", region_name=resources.region)

    flow_logs = client.describe_flow_logs(
        Filters=[{"Name": "resource-id", "Values": [vpc_id]}]
    )["FlowLogs"]

    if not flow_logs:
        print(
            Panel(
                "[red]Enable flow logs for your VPC.",
                subtitle="Link: https://aws.github.io/aws-eks-best-practices/security/docs/network/#log-network-traffic-metadata",
            )
        )
        console.print()
        return False


def check_awspca_exists(resources: Resources):
    services = client.CoreV1Api().list_service_for_all_namespaces().items
    for service in services:
        if service.metadata.name.startswith("aws-privateca-issuer"):
            return True

    print(
        Panel(
            "[red]Install aws privateca issuer for your certificates.",
            subtitle="Link: https://aws.github.io/aws-eks-best-practices/security/docs/network/#acm-private-ca-with-cert-manager",
        )
    )
    console.print()
    return False


def check_default_deny_policy_exists(resources: Resources):
    offenders = resources.namespaces

    for policy in resources.network_policies:
        offenders.remove(policy.metadata.namespace)

    if offenders:
        print_namespace_table(
            offenders,
            "[red]Namespaces that does not have default network deny policies",
            "Link: https://aws.github.io/aws-eks-best-practices/security/docs/network/#create-a-default-deny-policy",
        )

    return offenders

Functions

def check_awspca_exists(resources: Resources)
Expand source code
def check_awspca_exists(resources: Resources):
    services = client.CoreV1Api().list_service_for_all_namespaces().items
    for service in services:
        if service.metadata.name.startswith("aws-privateca-issuer"):
            return True

    print(
        Panel(
            "[red]Install aws privateca issuer for your certificates.",
            subtitle="Link: https://aws.github.io/aws-eks-best-practices/security/docs/network/#acm-private-ca-with-cert-manager",
        )
    )
    console.print()
    return False
def check_default_deny_policy_exists(resources: Resources)
Expand source code
def check_default_deny_policy_exists(resources: Resources):
    offenders = resources.namespaces

    for policy in resources.network_policies:
        offenders.remove(policy.metadata.namespace)

    if offenders:
        print_namespace_table(
            offenders,
            "[red]Namespaces that does not have default network deny policies",
            "Link: https://aws.github.io/aws-eks-best-practices/security/docs/network/#create-a-default-deny-policy",
        )

    return offenders
def check_vpc_flow_logs(resources: Resources)
Expand source code
def check_vpc_flow_logs(resources: Resources):
    client = boto3.client("eks", region_name=resources.region)
    cluster_metadata = client.describe_cluster(name=resources.cluster)

    vpc_id = cluster_metadata["cluster"]["resourcesVpcConfig"]["vpcId"]
    client = boto3.client("ec2", region_name=resources.region)

    flow_logs = client.describe_flow_logs(
        Filters=[{"Name": "resource-id", "Values": [vpc_id]}]
    )["FlowLogs"]

    if not flow_logs:
        print(
            Panel(
                "[red]Enable flow logs for your VPC.",
                subtitle="Link: https://aws.github.io/aws-eks-best-practices/security/docs/network/#log-network-traffic-metadata",
            )
        )
        console.print()
        return False