Source code for google.cloud.forseti.scanner.scanners.config_validator_util.cv_data_converter

# Copyright 2019 The Forseti Security Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Config Validator Data Converter."""

import json

from google.iam.v1.policy_pb2 import Policy
from google.protobuf import json_format
from google.cloud.asset.v1.assets_pb2 import Resource

from google.cloud.forseti.common.util import logger
from google.cloud.forseti.scanner.scanners.config_validator_util import (
    validator_pb2)

LOGGER = logger.get_logger(__name__)

_IAM_POLICY = 'iam_policy'
_RESOURCE = 'resource'

SUPPORTED_DATA_TYPE = frozenset([_IAM_POLICY, _RESOURCE])

CAI_RESOURCE_TYPE_MAPPING = {
    # TODO: Support non cai resource type by creating a fake cai resource type.
    # e.g. 'lien' -> 'google.SOME_RESOURCE_TYPE.Lien'
}


[docs]def generate_ancestry_path(full_name): """Generate ancestry path from full_name. Args: full_name (str): Full name of the resource. Returns: str: Ancestry path. """ supported_ancestors = ['organization', 'folder', 'project'] ancestry_path = '' full_name_items = full_name.split('/') for i in range(0, len(full_name_items) - 1): if full_name_items[i] in supported_ancestors: ancestry_path += (full_name_items[i] + '/' + full_name_items[i + 1] + '/') else: continue return ancestry_path
[docs]def convert_data_to_cv_asset(resource, data_type): """Convert data to CAI format. Args: resource (Resource): Resource from querying the resources table. data_type (str): Type of the data, can either be 'resource' or 'iam_policy'. Returns: Asset: A Config Validator Asset. Raises: ValueError: if data_type is have an unexpected type. """ if data_type not in SUPPORTED_DATA_TYPE: raise ValueError('Data type {} not supported.'.format(data_type)) if (not resource.cai_resource_name and resource.type not in CAI_RESOURCE_TYPE_MAPPING): raise ValueError('Resource {} not supported to use Config' ' Validator scanner.'.format(resource.type)) # Generate ancestry path that ends at project as the lowest level. ancestry_path = generate_ancestry_path(resource.full_name) data = json.loads(resource.data) cleanup_dict(data) asset_resource, asset_iam_policy = None, None if data_type == _IAM_POLICY: asset_iam_policy = json_format.ParseDict(data, Policy(), ignore_unknown_fields=True) else: asset_resource = json_format.ParseDict(resource_wrapper(data), Resource()) return validator_pb2.Asset(name=resource.cai_resource_name, asset_type=resource.cai_resource_type, ancestry_path=ancestry_path, resource=asset_resource, iam_policy=asset_iam_policy)
[docs]def resource_wrapper(data): """Wrap the data with the resource wrapper used by CAI. Args: data (dict): The resource data. Returns: dict: Resource data wrapped by a resource wrapper. """ return { 'version': 'v1', 'discovery_document_uri': None, 'discovery_name': None, 'resource_url': None, 'parent': None, 'data': data }
[docs]def cleanup_dict(raw_dict): """Replace empty value to None in dict. Args: raw_dict (dict): Dict to clean up. """ for key, value in raw_dict.items(): if not value: raw_dict[key] = None elif isinstance(value, list): for i in value: if isinstance(i, dict): cleanup_dict(i) elif isinstance(value, dict): cleanup_dict(value)