Source code for google.cloud.forseti.common.gcp_api.cloudasset

# Copyright 2018 The Forseti Security Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Wrapper for Cloud Asset API client."""
from builtins import object
import time

from googleapiclient import errors
from httplib2 import HttpLib2Error

from google.cloud.forseti.common.gcp_api import _base_repository
from google.cloud.forseti.common.gcp_api import api_helpers
from google.cloud.forseti.common.gcp_api import errors as api_errors
from google.cloud.forseti.common.gcp_api import repository_mixins
from google.cloud.forseti.common.util import logger

LOGGER = logger.get_logger(__name__)
API_NAME = 'cloudasset'


[docs]class CloudAssetRepositoryClient(_base_repository.BaseRepositoryClient): """Cloud Asset API Respository.""" def __init__(self, quota_max_calls=None, quota_period=60.0, use_rate_limiter=True): """Constructor. Args: quota_max_calls (int): Allowed requests per <quota_period> for the API. quota_period (float): The time period to track requests over. use_rate_limiter (bool): Set to false to disable the use of a rate limiter for this service. """ if not quota_max_calls: use_rate_limiter = False self._top_level = None self._operations = None super(CloudAssetRepositoryClient, self).__init__( API_NAME, versions=['v1'], quota_max_calls=quota_max_calls, quota_period=quota_period, use_rate_limiter=use_rate_limiter) # Turn off docstrings for properties. # pylint: disable=missing-return-doc, missing-return-type-doc @property def top_level(self): """Returns a _CloudAssetV1Repository instance.""" if not self._top_level: self._top_level = self._init_repository( _CloudAssetV1Repository) return self._top_level @property def operations(self): """Returns a _CloudAssetOperationsRepository instance.""" if not self._operations: self._operations = self._init_repository( _CloudAssetOperationsRepository) return self._operations
# pylint: enable=missing-return-doc, missing-return-type-doc
[docs]class _CloudAssetV1Repository( repository_mixins.ExportAssetsQueryMixin, _base_repository.GCPRepository): """Implementation of Cloud Asset V1 repository.""" def __init__(self, **kwargs): """Constructor. Args: **kwargs (dict): The args to pass into GCPRepository.__init__() """ # The top level API methods are rooted under the v1 component in the # discovery doc. super(_CloudAssetV1Repository, self).__init__( component='v1', **kwargs)
[docs]class _CloudAssetOperationsRepository( repository_mixins.GetQueryMixin, _base_repository.GCPRepository): """Implementation of Cloud Asset Operations repository.""" def __init__(self, **kwargs): """Constructor. Args: **kwargs (dict): The args to pass into GCPRepository.__init__() """ super(_CloudAssetOperationsRepository, self).__init__( key_field='name', component='operations', **kwargs)
[docs]class CloudAssetClient(object): """Cloud Asset Client.""" # Estimation of how long to wait for an async API to complete. OPERATION_DELAY_IN_SEC = 5.0 def __init__(self, global_configs, **kwargs): """Initialize. Args: global_configs (dict): Global configurations. **kwargs (dict): The kwargs. """ max_calls, quota_period = api_helpers.get_ratelimiter_config( global_configs, API_NAME) self.repository = CloudAssetRepositoryClient( quota_max_calls=max_calls, quota_period=quota_period, use_rate_limiter=kwargs.get('use_rate_limiter', True))
[docs] @staticmethod def build_gcs_object_output(destination_object): """Creates an OutputConfig message for output to GCS object. Args: destination_object (str): The GCS path and file name to store the results in. The bucket must be in the same project that has the Cloud Asset API enabled and the object must not exist. Returns: dict: an OutputConfig message. """ return {'gcsDestination': {'uri': destination_object}}
[docs] @staticmethod def build_gcs_prefix_output(destination_prefix): """Creates an OutputConfig message for sharding output to a GCS prefix. Args: destination_prefix (str): The GCS bucket and folder path to store the results in. The bucket must be in the same project that has the Cloud Asset API enabled and the folder must not exist. Returns: dict: an OutputConfig message. """ return {'gcsDestination': {'uriPrefix': destination_prefix}}
[docs] @staticmethod def build_bigquery_table_output(project_id, dataset, table, force=True): """Creates an OutputConfig message for output to a BigQuery table. Args: project_id (str): The project that hosts the dataset. dataset (str): The dataset name to output into. table (str): The table name to create for the export, the table must not exist. force (bool): If true, force write rows even on error. """ # TODO: Uncomment the below code and remove the exception when the # bigquery support is live in the v1 API. # # dataset_full_name = 'projects/{}/datasets/{}'.format(project_id, # dataset) # return {'bigqueryDestination': {'dataset': dataset_full_name, # 'table': table, # "force": force}} raise NotImplementedError('The v1 API does not support this ' 'destination, it will be enabled in a future ' 'release.')
[docs] def export_assets(self, parent, destination_object=None, output_config=None, content_type=None, asset_types=None, read_time=None, blocking=False, timeout=0): """Export assets under a parent resource to the destination GCS object. Args: parent (str): The name of the parent resource to export assests under. destination_object (str): The GCS path and file name to store the results in. The bucket must be in the same project that has the Cloud Asset API enabled. output_config (dict): The full outputConfig message to pass to the export assets API. Should be built using one of the build_*_output_config methods. content_type (str): The specific content type to export, currently supports "RESOURCE" and "IAM_POLICY". If not specified only the CAI metadata for assets are included. asset_types (list): The list of asset types to filter the results to, if not specified, exports all assets known to CAI. read_time (str): A timestamp to take an asset snapshot in RFC3339 UTC "Zulu" format, accurate to nanoseconds. Example: "2014-10-02T15:01:23.045123456Z" blocking (bool): If true, don't return until the async operation completes on the backend or timeout seconds pass. timeout (float): If greater than 0 and blocking is True, then raise an exception if timeout seconds pass before the operation completes. Returns: dict: Operation status and info. Raises: ApiExecutionError: Returns if there is an error in the API response. OperationTimeoutError: Raised if the operation times out. ValueError: Raised on invalid parent resource name. """ if not (parent.startswith('folders/') or parent.startswith('organizations/') or parent.startswith('projects/')): raise ValueError('parent must start with folders/, projects/, or ' 'organizations/') if not destination_object and not output_config: raise ValueError('Either destination_object or output_config must ' 'be set.') if destination_object and output_config: raise ValueError('destination_object and output_config must be not ' 'both be specified.') if destination_object: # Explicitly continue to support the original use of this method, # which was output to a specific GCS object. There may be usage of # this method from outside of Forseti, and thus backwards # compatibility is required. output_config = self.build_gcs_object_output(destination_object) repository = self.repository.top_level try: results = repository.export_assets( parent, output_config, content_type=content_type, asset_types=asset_types, read_time=read_time) if blocking: results = self.wait_for_completion(parent, results, timeout=timeout) except (errors.HttpError, HttpLib2Error) as e: LOGGER.error('Error exporting assets for parent %s: %s', parent, e) raise api_errors.ApiExecutionError(parent, e) except api_errors.OperationTimeoutError as e: LOGGER.warning('Timeout exporting assets for parent %s: %s', parent, e) raise LOGGER.info('Exporting assets for parent %s. Result: %s', parent, results) return results
[docs] def get_operation(self, operation_name): """Get the Operations Status. Args: operation_name (str): The name of the operation to get. Returns: dict: Operation status and info. Raises: ApiExecutionError: Returns if there is an error in the API response. ValueError: Raised on invalid parent resource name. """ if not (operation_name.startswith('folders/') or operation_name.startswith('organizations/') or operation_name.startswith('projects/')): raise ValueError('operation_name must start with folders/, ' 'projects/, or organizations/') repository = self.repository.operations try: results = repository.get(operation_name) LOGGER.debug('Getting the operation status, operation_name = %s, ' 'results = %s', operation_name, results) except (errors.HttpError, HttpLib2Error) as e: raise api_errors.ApiExecutionError(operation_name, e) return results
[docs] def wait_for_completion(self, parent, operation, timeout=0, initial_delay=None, retry_delay=None): """Wait for the operation to complete. Args: parent (str): The name of the parent resource to export assests under. operation (dict): The operation response from an API call. timeout (float): The maximum time to wait for the operation to complete. initial_delay (float): The time to wait before first checking if the API has completed. If None then the default value, configured as CloudAssetClient.OPERATION_DELAY_IN_SEC, is used. retry_delay (float): The time to wait between checking if the API has completed. If None then the default value, configured as CloudAssetClient.OPERATION_DELAY_IN_SEC, is used. Returns: dict: Operation status and info. Raises: OperationTimeoutError: Raised if the operation times out. """ if operation.get('done', False): return operation if initial_delay is None: initial_delay = self.OPERATION_DELAY_IN_SEC if retry_delay is None: retry_delay = self.OPERATION_DELAY_IN_SEC started_timestamp = time.time() time.sleep(initial_delay) while True: operation_name = operation['name'] operation = self.get_operation(operation_name) if operation.get('done', False): return operation if timeout and time.time() - started_timestamp > timeout: raise api_errors.OperationTimeoutError(parent, operation) time.sleep(retry_delay)