# Copyright 2017 The Forseti Security Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Wrapper for IAM API client."""
from builtins import object
import json
import re
from googleapiclient import errors
from httplib2 import HttpLib2Error
from google.cloud.forseti.common.gcp_api import _base_repository
from google.cloud.forseti.common.gcp_api import api_helpers
from google.cloud.forseti.common.gcp_api import errors as api_errors
from google.cloud.forseti.common.gcp_api import repository_mixins
from google.cloud.forseti.common.util import logger
LOGGER = logger.get_logger(__name__)
API_NAME = 'iam'
[docs]class IamRepositoryClient(_base_repository.BaseRepositoryClient):
"""IAM API Respository."""
def __init__(self,
quota_max_calls=None,
quota_period=1.0,
use_rate_limiter=True):
"""Constructor.
Args:
quota_max_calls (int): Allowed requests per <quota_period> for the
API.
quota_period (float): The time period to limit the requests within.
use_rate_limiter (bool): Set to false to disable the use of a rate
limiter for this service.
"""
if not quota_max_calls:
use_rate_limiter = False
self._organizations_roles = None
self._projects_roles = None
self._projects_serviceaccounts = None
self._projects_serviceaccounts_keys = None
self._roles = None
super(IamRepositoryClient, self).__init__(
API_NAME, versions=['v1'],
quota_max_calls=quota_max_calls,
quota_period=quota_period,
use_rate_limiter=use_rate_limiter)
# Turn off docstrings for properties.
# pylint: disable=missing-return-doc, missing-return-type-doc
@property
def organizations_roles(self):
"""An _IamOrganizationsRolesRepository instance."""
if not self._organizations_roles:
self._organizations_roles = self._init_repository(
_IamOrganizationsRolesRepository)
return self._organizations_roles
@property
def projects_roles(self):
"""An _IamProjectsRolesRepository instance."""
if not self._projects_roles:
self._projects_roles = self._init_repository(
_IamProjectsRolesRepository)
return self._projects_roles
@property
def projects_serviceaccounts(self):
"""An _IamProjectsServiceAccountsRepository instance."""
if not self._projects_serviceaccounts:
self._projects_serviceaccounts = self._init_repository(
_IamProjectsServiceAccountsRepository)
return self._projects_serviceaccounts
@property
def projects_serviceaccounts_keys(self):
"""An _IamProjectsServiceAccountsKeysRepository instance."""
if not self._projects_serviceaccounts_keys:
self._projects_serviceaccounts_keys = self._init_repository(
_IamProjectsServiceAccountsKeysRepository)
return self._projects_serviceaccounts_keys
@property
def roles(self):
"""An _IamRolesRepository instance."""
if not self._roles:
self._roles = self._init_repository(
_IamRolesRepository)
return self._roles
# pylint: enable=missing-return-doc, missing-return-type-doc
[docs]class _IamOrganizationsRolesRepository(
repository_mixins.ListQueryMixin,
_base_repository.GCPRepository):
"""Implementation of Iam Organizations Roles repository."""
def __init__(self, **kwargs):
"""Constructor.
Args:
**kwargs (dict): The args to pass into GCPRepository.__init__()
"""
super(_IamOrganizationsRolesRepository, self).__init__(
key_field='parent', max_results_field='pageSize',
component='organizations.roles', **kwargs)
[docs] @staticmethod
def get_name(org_id):
"""Returns a formatted name field to pass in to the API.
Args:
org_id (str): The id of the organization to query.
Returns:
str: A formatted project name.
"""
if org_id and not org_id.startswith('organizations/'):
org_id = 'organizations/{}'.format(org_id)
return org_id
[docs]class _IamProjectsRolesRepository(
repository_mixins.ListQueryMixin,
_base_repository.GCPRepository):
"""Implementation of Iam Projects Roles repository."""
def __init__(self, **kwargs):
"""Constructor.
Args:
**kwargs (dict): The args to pass into GCPRepository.__init__()
"""
super(_IamProjectsRolesRepository, self).__init__(
key_field='parent', max_results_field='pageSize',
component='projects.roles', **kwargs)
[docs] @staticmethod
def get_name(project_id):
"""Returns a formatted name field to pass in to the API.
Args:
project_id (str): The id of the project to query.
Returns:
str: A formatted project name.
"""
if project_id and not project_id.startswith('projects/'):
project_id = 'projects/{}'.format(project_id)
return project_id
[docs]class _IamProjectsServiceAccountsRepository(
repository_mixins.GetIamPolicyQueryMixin,
repository_mixins.ListQueryMixin,
_base_repository.GCPRepository):
"""Implementation of Iam Projects ServiceAccounts repository."""
def __init__(self, **kwargs):
"""Constructor.
Args:
**kwargs (dict): The args to pass into GCPRepository.__init__()
"""
super(_IamProjectsServiceAccountsRepository, self).__init__(
key_field='name', max_results_field='pageSize',
component='projects.serviceAccounts', **kwargs)
[docs] def get_iam_policy(self, resource, fields=None, verb='getIamPolicy',
include_body=False, resource_field='resource', **kwargs):
"""Get Service Account IAM Policy.
Args:
self (GCPRespository): An instance of a GCPRespository class.
resource (str): The id of the resource to fetch.
fields (str): Fields to include in the response - partial response.
verb (str): The method to call on the API.
include_body (bool): If true, include an empty body parameter in the
method args.
resource_field (str): The parameter name of the resource field to
pass to the method.
**kwargs (dict): Optional additional arguments to pass to the query.
Returns:
dict: GCE response.
"""
# The IAM getIamPolicy does not allow the 'body' argument, so this
# overrides the default behavior by setting include_body to False.
return repository_mixins.GetIamPolicyQueryMixin.get_iam_policy(
self, resource, fields=fields, verb=verb, include_body=include_body,
resource_field=resource_field, **kwargs)
[docs] @staticmethod
def get_name(project_id):
"""Returns a formatted name field to pass in to the API.
Args:
project_id (str): The id of the project to query.
Returns:
str: A formatted project name.
"""
if not project_id.startswith('projects/'):
project_id = 'projects/{}'.format(project_id)
return project_id
[docs]class _IamProjectsServiceAccountsKeysRepository(
repository_mixins.ListQueryMixin,
_base_repository.GCPRepository):
"""Implementation of Iam Projects ServiceAccounts Keys repository."""
def __init__(self, **kwargs):
"""Constructor.
Args:
**kwargs (dict): The args to pass into GCPRepository.__init__()
"""
super(_IamProjectsServiceAccountsKeysRepository, self).__init__(
key_field='name', component='projects.serviceAccounts.keys',
**kwargs)
[docs]class _IamRolesRepository(
repository_mixins.ListQueryMixin,
_base_repository.GCPRepository):
"""Implementation of Iam Roles repository."""
def __init__(self, **kwargs):
"""Constructor.
Args:
**kwargs (dict): The args to pass into GCPRepository.__init__()
"""
super(_IamRolesRepository, self).__init__(
max_results_field='pageSize', component='roles', **kwargs)
[docs]class IAMClient(object):
"""IAM Client."""
USER_MANAGED = 'USER_MANAGED'
SYSTEM_MANAGED = 'SYSTEM_MANAGED'
KEY_TYPES = frozenset([USER_MANAGED, SYSTEM_MANAGED])
def __init__(self, global_configs, **kwargs):
"""Initialize.
Args:
global_configs (dict): Global configurations.
**kwargs (dict): The kwargs.
"""
max_calls, quota_period = api_helpers.get_ratelimiter_config(
global_configs, API_NAME)
self.repository = IamRepositoryClient(
quota_max_calls=max_calls,
quota_period=quota_period,
use_rate_limiter=kwargs.get('use_rate_limiter', True))
[docs] def get_curated_roles(self, parent=None):
"""Get information about organization roles
Args:
parent (str): An optional parent ID to query. If unset, defaults
to returning the list of curated roles in GCP.
Returns:
list: The response of retrieving the curated roles.
Raises:
ApiExecutionError: ApiExecutionError is raised if the call to the
GCP API fails.
"""
try:
paged_results = self.repository.roles.list(parent=parent,
view='FULL')
flattened_results = api_helpers.flatten_list_results(paged_results,
'roles')
LOGGER.debug('Getting information about organization roles,'
' parent = %s, flattened_results = %s',
parent, flattened_results)
return flattened_results
except (errors.HttpError, HttpLib2Error) as e:
api_exception = api_errors.ApiExecutionError(
'project_roles', e, 'parent', parent)
LOGGER.exception(api_exception)
raise api_exception
[docs] def get_organization_roles(self, org_id):
"""Get information about custom organization roles.
Args:
org_id (str): The id of the organization.
Returns:
list: The response of retrieving the organization roles.
Raises:
ApiExecutionError: ApiExecutionError is raised if the call to the
GCP API fails.
"""
name = self.repository.organizations_roles.get_name(org_id)
try:
paged_results = self.repository.organizations_roles.list(
name, view='FULL')
flattened_results = api_helpers.flatten_list_results(paged_results,
'roles')
LOGGER.debug('Getting information about custom organization roles,'
' org_id = %s, flattened_results = %s',
org_id, flattened_results)
return flattened_results
except (errors.HttpError, HttpLib2Error) as e:
api_exception = api_errors.ApiExecutionError(
'organizations_roles', e, 'name', name)
LOGGER.exception(api_exception)
raise api_exception
[docs] def get_project_roles(self, project_id):
"""Get information about custom project roles.
Args:
project_id (str): The id of the project.
Returns:
list: The response of retrieving the project roles.
Raises:
ApiExecutionError: ApiExecutionError is raised if the call to the
GCP API fails.
"""
name = self.repository.projects_roles.get_name(project_id)
try:
paged_results = self.repository.projects_roles.list(name,
view='FULL')
flattened_results = api_helpers.flatten_list_results(paged_results,
'roles')
LOGGER.debug('Getting the information about custom project roles,'
' project_id = %s, flattened_results = %s',
project_id, flattened_results)
return flattened_results
except (errors.HttpError, HttpLib2Error) as e:
api_exception = api_errors.ApiExecutionError(
'projects_roles', e, 'name', name)
LOGGER.exception(api_exception)
raise api_exception
[docs] def get_service_accounts(self, project_id):
"""Get Service Accounts associated with a project.
Args:
project_id (str): The project ID to get Service Accounts for.
Returns:
list: List of service accounts associated with the project.
Raises:
ApiExecutionError: ApiExecutionError is raised if the call to the
GCP API fails.
"""
name = self.repository.projects_serviceaccounts.get_name(project_id)
try:
paged_results = self.repository.projects_serviceaccounts.list(name)
flattened_results = api_helpers.flatten_list_results(paged_results,
'accounts')
LOGGER.debug('Getting service accounts associated with a project,'
' project_id = %s, flattened_results = %s',
project_id, flattened_results)
return flattened_results
except (errors.HttpError, HttpLib2Error) as e:
api_exception = api_errors.ApiExecutionError(
'serviceAccounts', e, 'name', name)
LOGGER.exception(api_exception)
raise api_exception
[docs] def get_service_account_iam_policy(self, name):
"""Get IAM policy associated with a service account.
Args:
name (str): The service account name to query, must be in the format
projects/{PROJECT_ID}/serviceAccounts/{SERVICE_ACCOUNT_EMAIL}
Returns:
dict: The IAM policies for the service account.
Raises:
ApiExecutionError: ApiExecutionError is raised if the call to the
GCP API fails.
"""
try:
results = self.repository.projects_serviceaccounts.get_iam_policy(
name)
LOGGER.debug('Getting the IAM Policy associated with the service'
' account, name = %s, results = %s', name, results)
return results
except (errors.HttpError, HttpLib2Error) as e:
api_exception = api_errors.ApiExecutionError(
'serviceAccountIamPolicy', e, 'name', name)
LOGGER.exception(api_exception)
raise api_exception
[docs] def get_service_account_keys(self, name, key_type=None):
"""Get keys associated with the given Service Account.
Args:
name (str): The service account name to query, must be in the format
projects/{PROJECT_ID}/serviceAccounts/{SERVICE_ACCOUNT_EMAIL}
key_type (str): Optional, the key type to include in the results.
Can be None, USER_MANAGED or SYSTEM_MANAGED. Defaults to
returning all key types.
Returns:
list: List with a dict for each key associated with the account.
Raises:
ValueError: Raised if an invalid key_type is specified.
ApiExecutionError: ApiExecutionError is raised if the call to the
GCP API fails.
"""
def _service_account_not_found(error):
"""Checks if the error is due to the SA not found in the project.
Args:
error (Exception): The error to check.
Returns:
bool: If the error is due to SA not found.
"""
sa_not_found_pattern = '^Service account .*? does not exist.$'
if isinstance(error, errors.HttpError):
if (str(error.resp.status) == '404' and
error.resp.get('content-type', '')
.startswith('application/json')):
error_resp = json.loads(error.content.decode('utf-8'))
error_details = error_resp.get('error', {})
error_message = error_details.get('message', '')
LOGGER.debug(error_message)
if re.match(sa_not_found_pattern, error_message):
return True
return False
try:
kwargs = {}
if key_type:
if key_type not in self.KEY_TYPES:
raise ValueError(
'Key type %s is not a valid key type.' % key_type)
kwargs['keyTypes'] = key_type
results = self.repository.projects_serviceaccounts_keys.list(
name, **kwargs)
flattened_results = api_helpers.flatten_list_results(results,
'keys')
LOGGER.debug('Getting the keys associated with the given service'
' account, name = %s, key_type = %s,'
' flattened_results = %s',
name, key_type, flattened_results)
return flattened_results
except (errors.HttpError, HttpLib2Error) as e:
if _service_account_not_found(e):
LOGGER.debug('Service account %s doesn\'t exist', name)
return []
api_exception = api_errors.ApiExecutionError(
'serviceAccountKeys', e, 'name', name)
LOGGER.exception(api_exception)
raise api_exception