# -*- coding: utf-8 -*-
from awspice.helpers import extract_region_from_ip
from threading import Lock
[docs]class FinderModule:
'''
This class makes it easy to search for components in AWS.
Attributes:
aws: awspice client
'''
[docs] def find_instance(self, filters, profiles=[], regions=[]):
'''
Get an instance in different accounts and regions, using search filters.
'''
profiles = self.aws.ec2.parse_profiles(profiles)
regions = self.aws.ec2.parse_regions(regions, True)
# CODE SNIPPET from aws.service.ec2.instances.get_instance_by
if "publicip" in filters.keys():
ip_in_aws, ip_region = extract_region_from_ip(filters['publicip'])
if not ip_in_aws:
return {}
# ['eu-west-1', 'eu-west-2'] / 'eu-west-1'
# [ {'RegionName': 'eu-west-1'}, {...} ]
if isinstance(regions, list):
if ip_region in regions or ip_region in regions[0].values():
region = ip_region
# {'eu-west-1': {...}, 'eu-west-2': {...} }
if isinstance(regions, dict):
if ip_region in regions.keys():
region = ip_region
for account in profiles:
try:
self.aws.ec2.change_profile(account)
instance = self.aws.ec2.get_instance_by(filters, regions=regions)
if instance:
return instance
except:
pass
return {}
[docs] def find_instances(self, filters=None, profiles=[], regions=[]):
'''
Get instances in different accounts and regions, using search filters.
'''
results = list()
profiles = self.aws.ec2.parse_profiles(profiles)
regions = self.aws.ec2.parse_regions(regions, True)
for account in profiles:
self.aws.ec2.change_profile(account)
if filters:
results.extend(self.aws.ec2.get_instances_by(filters, regions=regions))
else:
results.extend(self.aws.ec2.get_instances(regions=regions))
return results
[docs] def find_volume(self, filters, profiles=[], regions=[]):
'''
Get a volume in different accounts and regions, using search filters.
'''
profiles = self.aws.ec2.parse_profiles(profiles)
regions = self.aws.ec2.parse_regions(regions, True)
for account in profiles:
try:
self.aws.ec2.change_profile(account)
volume = self.aws.ec2.get_volume_by(filters, regions=regions)
if volume:
return volume
except:
pass
return None
[docs] def find_volumes(self, filters=None, profiles=[], regions=[]):
'''
Get group of volumes in different accounts and regions, using search filters.
'''
results = list()
profiles = self.aws.ec2.parse_profiles(profiles)
regions = self.aws.ec2.parse_regions(regions, True)
for account in profiles:
self.aws.ec2.change_profile(account)
if filters:
results.extend(self.aws.ec2.get_volumes_by(filters, regions=regions))
else:
results.extend(self.aws.ec2.get_volumes(regions=regions))
return results
[docs] def find_loadbalancer(self, filters, profiles=[], regions=[]):
'''
Get a load balancer in different accounts and regions, using search filters.
'''
profiles = self.aws.elb.parse_profiles(profiles)
regions = self.aws.elb.parse_regions(regions, True)
for account in profiles:
try:
self.aws.elb.change_profile(account)
elb = self.aws.elb.get_loadbalancer_by(filters, regions=regions)
if elb:
return elb
except:
pass
return None
[docs] def find_loadbalancers(self, filter_key=None, filter_value=None, profiles=[], regions=[]):
'''
Get load balancers in different accounts and regions, using search filters.
'''
results = list()
profiles = self.aws.elb.parse_profiles(profiles)
regions = self.aws.elb.parse_regions(regions, True)
for account in profiles:
self.aws.elb.change_profile(account)
if filter_key and filter_value:
results.extend([self.aws.elb.get_loadbalancer_by(filter_key, filter_value, regions=regions)])
else:
results.extend(self.aws.elb.get_loadbalancers(regions=regions))
return results
[docs] def find_users(self, profiles=[]):
'''
Get IAM users in different accounts.
'''
results = list()
profiles = self.aws.iam.parse_profiles(profiles)
lock = Lock()
def worker(profile):
lock.acquire()
self.aws.iam.change_profile(profile)
lock.release()
results.extend(self.aws.iam.get_users())
for profile in profiles:
self.aws.iam.pool.add_task(worker, profile)
self.aws.iam.pool.wait_completion()
return results
[docs] def find_inactive_users(self, profiles=[]):
'''
Get inactive users in different accounts
'''
results = []
profiles = self.aws.iam.parse_profiles(profiles)
for profile in profiles:
self.aws.iam.change_profile(profile)
results.extend(self.aws.iam.get_inactive_users())
return results
[docs] def find_buckets(self, profiles=[]):
'''
Search S3 buckets in different accounts.
'''
results = list()
profiles = self.aws.s3.parse_profiles(profiles)
lock = Lock()
def worker(profile):
lock.acquire()
self.aws.s3.change_profile(profile)
lock.release()
results.extend(self.aws.s3.get_buckets())
for profile in profiles:
self.aws.s3.pool.add_task(worker, profile)
self.aws.s3.pool.wait_completion()
return results
[docs] def find_rds_databases(self, profiles=[], regions=[]):
'''
Get RDS databases in different accounts and regions.
'''
results = list()
profiles = self.aws.ec2.parse_profiles(profiles)
regions = self.aws.ec2.parse_regions(regions, True)
for account in profiles:
self.aws.rds.change_profile(account)
results.extend(self.aws.rds.get_databases(regions))
return results
[docs] def find_rds_snapshots(self, profiles=[], regions=[]):
'''
Get RDS snapshots in different accounts and regions.
'''
results = list()
profiles = self.aws.ec2.parse_profiles(profiles)
regions = self.aws.ec2.parse_regions(regions, True)
for account in profiles:
self.aws.rds.change_profile(account)
results.extend(self.aws.rds.get_snapshots(regions))
return results
[docs] def __init__(self, aws):
self.aws = aws