Source code for awspice.services.iam

# -*- coding: utf-8 -*-
from .base import AwsBase
from datetime import datetime

[docs]class IamService(AwsBase): ''' Class belonging to the IAM Identity & Access management service. '''
[docs] def get_inactive_users(self): ''' Get users who have not logged in AWS since 1 year. This method returns users who haven't used their password and one of their keys in less than 9 months. Returns: list: List of inactive users ''' results = [] today = datetime.now() min_inactive_days = 270 # 9 months def worker(user): pass_last_use = user.get('PasswordLastUsed', None) pass_inactive_days = (today - pass_last_use.replace(tzinfo=None)).days if pass_last_use else None if not pass_last_use or pass_inactive_days > min_inactive_days: inactive_keys = [] for key in self.client.list_access_keys(UserName=user['UserName'])['AccessKeyMetadata']: key_info = self.client.get_access_key_last_used(AccessKeyId=key['AccessKeyId'])['AccessKeyLastUsed'] key_last_use = key_info.get('LastUsedDate', None) key_inactive_days = (today - key_last_use.replace(tzinfo=None)).days if key_last_use else None if not key_last_use or key_inactive_days > min_inactive_days: key.update({'Inactive': not bool(key_last_use), 'LastUsed': key_last_use}) inactive_keys.append(key) if inactive_keys: inactive_pass = {'Inactive': not bool(pass_last_use), 'LastUsed': pass_last_use} inactive_user = { 'LoginActivity' : { 'Password' : inactive_pass, 'AccessKeys' : inactive_keys } } user.update(inactive_user) results.append(user) for user in self.get_users(): self.pool.add_task(worker, user) self.pool.wait_completion() return results
[docs] def get_users(self): ''' List all users for an AWS account Returns: List of all users ''' results = [] config = self.get_client_vars() paginator = self.client.get_paginator('list_users').paginate() for response in paginator: for user in response['Users']: results.append(user) return self.inject_client_vars(results, config)
[docs] def get_access_keys(self, user): results = [] access_keys = self.client.list_access_keys(UserName=user)['AccessKeyMetadata'] def worker(ak): ak['LastUse'] = self.get_access_key_last_used(ak['AccessKeyId'])['AccessKeyLastUsed'] results.append(ak) [self.pool.add_task(worker, ak) for ak in access_keys] self.pool.wait_completion() return self.inject_client_vars(results)
[docs] def get_access_key_last_used(self, accesskey): last_use = self.client.get_access_key_last_used(AccessKeyId=accesskey) if last_use: last_use = self.inject_client_vars([last_use])[0] return last_use
[docs] def __init__(self): AwsBase.__init__(self, 'iam')