Source code for awspice.services._ec2.security_group

from threading import Lock

secgroup_filters = {
    'id': 'group-id',
    'name': 'group-name',
    'description': 'description',
    'protocol': 'ip-permission.protocol',
    'fromport': 'ip-permission.from-port',
    'toport': 'ip-permission.to-port',
    'range': 'ip-permission.cidr',
}

def _extract_secgroups(self, filters=[], regions=[], return_first=False):
    regions = self.parse_regions(regions)
    results = dict() if return_first else list()
    lock = Lock()

    def worker(region):
        lock.acquire()
        self.change_region(region['RegionName'])
        config = self.get_client_vars()
        lock.release()

        secgroups = self.client.describe_security_groups(Filters=filters)["SecurityGroups"]

        if secgroups:
            secgroups = self.inject_client_vars(secgroups, config)

            if return_first:
                results.update(secgroups[0])
            if not return_first:
                results.extend(secgroups)

    for region in regions: self.pool.add_task(worker, region=region)
    self.pool.wait_completion()
    
    return results

def get_secgroups(self, regions=[]):
    '''
    Get all security groups for the current region

    Returns:
        SecurityGroups (lst): List of dictionaries with the security groups requested
    '''
    return self._extract_secgroups(regions=regions)

def get_secgroup_by(self, filters, regions=[]):
    '''
    Get security group for a region that matches with filters

    Args:
        filter_key (str): Name of the filter
        filter_value (str): Value of the filter

    Returns:
        SecurityGroup (dict): Dictionaries with the security group requested
    '''
    formatted_filters = self.validate_filters(filters, self.secgroup_filters)
    return self._extract_secgroups(filters=formatted_filters, regions=regions, return_first=True)

def get_secgroups_by(self, filters, regions=[]):
    '''
    Get all security groups for a region that matches with filters

    Args:
        filter_key (str): Name of the filter
        filter_value (str): Value of the filter

    Returns:
        SecurityGroups (lst): List of dictionaries with the security groups requested
    '''
    formatted_filters = self.validate_filters(filters, self.secgroup_filters)
    return self._extract_secgroups(filters=formatted_filters, regions=regions)

def create_security_group(self, name, allowed_range, vpc_id=None):
    '''
    Create a new Security Group

    Args:
        name (str): Name of the Security Group
        allowed_range (str): Network range with permissions (i.e: 10.0.0.0/32)
        vpc_id (str): Id of assigned VPC

    Returns:
        str: Identifier of the security group created.
    '''
    vpc = vpc_id if vpc_id else self.get_default_vpc()['VpcId']
    desc = 'Security group created by Awspice'
    default_rules = [
        {'IpProtocol': 'tcp',
            'FromPort': 80,
            'ToPort': 80,
            'IpRanges': [{'CidrIp': allowed_range}]},
        {'IpProtocol': 'tcp',
            'FromPort': 22,
            'ToPort': 22,
            'IpRanges': [{'CidrIp': allowed_range}]}
    ]

    sg_id = self.client.create_security_group(
        GroupName=name, VpcId=vpc, Description=desc)['GroupId']
    self.client.authorize_security_group_ingress(
        GroupId=sg_id, IpPermissions=default_rules)

    return sg_id

def delete_security_group(self, identifier):
    '''
    Delete an existing Security Group

    Args:
        identifier (str): Id of the Security Group

    Returns:
        none
    '''
    self.client.delete_security_group(GroupId=identifier)