Source code for google.cloud.forseti.services.base.config

# Copyright 2018 The Forseti Security Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Base classes required for handling configuration of the gRPC server."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from builtins import str
from builtins import object
import abc
import re
from multiprocessing.pool import ThreadPool
import threading

from future.utils import with_metaclass
from google.cloud.forseti.common.util import file_loader
from google.cloud.forseti.common.util import http_helpers
from google.cloud.forseti.common.util import logger
from google.cloud.forseti.services import db
from google.cloud.forseti.services.client import ClientComposition
from google.cloud.forseti.services.dao import create_engine
from google.cloud.forseti.services.dao import ModelManager
from google.cloud.forseti.services.inventory.storage import Storage

LOGGER = logger.get_logger(__name__)


[docs]def _validate_cai_enabled(cai_configs): """Verifies if CloudAsset Inventory can be used for this inventory config. Args: cai_configs (dict): Settings for the Cloud AssetInventory API Returns: bool: True if CAI is supported and enabled by configuration, else False. """ if not cai_configs.get('enabled'): LOGGER.debug('CloudAsset Inventory disabled by configuration.') return False if not cai_configs.get('gcs_path', '').startswith('gs://'): LOGGER.debug('CloudAsset Inventory not configured with a valid GCS ' 'bucket.') return False return True
[docs]class AbstractInventoryConfig(with_metaclass(abc.ABCMeta, object)): """Abstract base class for service configuration. This class is used to implement dependency injection for the gRPC services."""
[docs] @abc.abstractmethod def use_composite_root(self): """Checks if inventory is configured to use a composite root resource. """
[docs] @abc.abstractmethod def get_root_resource_id(self): """Returns the root resource id."""
[docs] @abc.abstractmethod def get_composite_root_resources(self): """Returns the composite root resource ids."""
[docs] @abc.abstractmethod def get_gsuite_admin_email(self): """Returns gsuite admin email."""
[docs] @abc.abstractmethod def get_api_quota_configs(self): """Returns the per API quota configs."""
[docs] @abc.abstractmethod def get_retention_days_configs(self): """Returns the days of inventory data to retain."""
[docs] @abc.abstractmethod def get_cai_asset_types(self): """Returns the GCS bucket path to store the CAI data dumps in."""
[docs] @abc.abstractmethod def get_cai_enabled(self): """Returns True if the cloudasset API should be used."""
[docs] @abc.abstractmethod def get_cai_gcs_path(self): """Returns the GCS bucket path to store the CAI data dumps in."""
[docs] @abc.abstractmethod def get_cai_timeout(self): """Returns the timeout in seconds for calls to the Cloud Asset API."""
[docs] @abc.abstractmethod def get_service_config(self): """Returns the service config."""
[docs] @abc.abstractmethod def set_service_config(self, service_config): """Attach a service configuration. Args: service_config (object): Service configuration."""
[docs]class AbstractServiceConfig(with_metaclass(abc.ABCMeta, object)): """Abstract base class for service configuration. This class is used to implement dependency injection for the gRPC services."""
[docs] @abc.abstractmethod def get_engine(self): """Get the database engine."""
[docs] @abc.abstractmethod def scoped_session(self): """Get a scoped session."""
[docs] @abc.abstractmethod def client(self): """Get an API client."""
[docs] @abc.abstractmethod def run_in_background(self, func): """Runs a function in a thread pool in the background. Args: func (Function): Function to be executed."""
[docs] @abc.abstractmethod def get_storage_class(self): """Returns the class used for the inventory storage."""
[docs]class InventoryConfig(AbstractInventoryConfig): # pylint: disable=too-many-instance-attributes """Implements composed dependency injection for the inventory."""
[docs] @staticmethod def _filter_valid_resources(excluded_resources): """Filter valid excluded resources. Args: excluded_resources (list): A list of excluded resources to validate against. Returns: list: List of valid resources to exclude. """ if not excluded_resources: return [] valid_resource_patterns = [ 'organization/\\d+', 'folder/\\d+', 'project/\\d+', 'project/[a-z][-a-z0-9]{4,28}[a-z0-9]' ] valid_resources = [] for resource in excluded_resources: # Note: Internally we are referring resource types as # organization / folder / project so we need to replace # the 's' out of the string. resource = resource.replace('s/', '/') for pattern in valid_resource_patterns: if re.match(pattern, resource): valid_resources.append(resource) break if resource not in valid_resources: LOGGER.warning('Resource %s is invalid and will not be ' 'omitted during the inventory process.', resource) return valid_resources
def __init__(self, root_resource_id, gsuite_admin_email, api_quota_configs, retention_days, cai_configs, composite_root_resources=None, excluded_resources=None): """Initialize. Args: root_resource_id (str): Root resource to start crawling from gsuite_admin_email (str): G Suite admin email api_quota_configs (dict): API quota configs retention_days (int): Days of inventory tables to retain cai_configs (dict): Settings for the Cloud AssetInventory API composite_root_resources (list): The list of resources to use crawl using a composite root. excluded_resources (list): The list of resources to exclude. Raises: ValueError: Raised if neither or both root_resource_id and composite_root_resources are set. """ super(InventoryConfig, self).__init__() if root_resource_id and composite_root_resources: err = ValueError( 'Both root_resource_id and composite_root_resources defined in ' 'the server inventory configuration. Only one may be set.') LOGGER.error(err) raise err if not root_resource_id and not composite_root_resources: err = ValueError( 'Neither root_resource_id nor composite_root_resources defined ' 'in the server inventory configuration. One must be set.') LOGGER.error(err) raise err self.service_config = None self.root_resource_id = root_resource_id self.gsuite_admin_email = gsuite_admin_email self.api_quota_configs = api_quota_configs self.retention_days = retention_days self.cai_configs = cai_configs self.composite_root_resources = composite_root_resources self.excluded_resources = self._filter_valid_resources( excluded_resources)
[docs] def use_composite_root(self): """Checks if inventory is configured to use a composite root resource. Returns: bool: True if using a composite root, else False. """ return not self.root_resource_id
[docs] def get_excluded_resources(self): """Return the list of excluded resources. Returns: list: List of excluded resources. """ return self.excluded_resources
[docs] def get_root_resource_id(self): """Return the configured root resource id. Returns: str: Root resource id if defined, else None. """ return self.root_resource_id
[docs] def get_composite_root_resources(self): """Returns the composite root resource ids. Returns: list: The list of root resources defined in the configuration or None. """ return self.composite_root_resources
[docs] def get_gsuite_admin_email(self): """Return the gsuite admin email to use. Returns: str: Gsuite admin email. """ return self.gsuite_admin_email
[docs] def get_api_quota_configs(self): """Returns the per API quota configs. Returns: dict: The API quota configurations. """ return self.api_quota_configs
[docs] def get_retention_days_configs(self): """Returns the days of inventory data to retain. Returns: int: The days of inventory data to retain. """ return self.retention_days
[docs] def get_cai_asset_types(self): """Returns the list of Asset Types to include in the CAI export. The full list of supported asset types is at: https://cloud.google.com/resource-manager/docs/cloud-asset-inventory/overview Returns: list: The list of asset types to include, or None if all asset types should be included. """ return self.cai_configs.get('asset_types', None)
[docs] def get_cai_enabled(self): """Returns True if the cloudasset API should be used for the inventory. Returns: bool: Whether CAI should be integrated with the inventory or not. """ return _validate_cai_enabled(self.cai_configs)
[docs] def get_cai_gcs_path(self): """Returns the GCS bucket path to store the CAI data dumps in. Returns: str: The GCS bucket path for CAI data. """ return self.cai_configs.get('gcs_path', '')
[docs] def get_cai_timeout(self): """Returns the timeout in seconds for calls to the Cloud Asset API. Returns: int: Timeout in seconds, defaults to 3600 seconds. """ return self.cai_configs.get('api_timeout', 3600)
[docs] def get_service_config(self): """Return the attached service configuration. Returns: object: Service configuration. """ return self.service_config
[docs] def set_service_config(self, service_config): """Attach a service configuration. Args: service_config (object): Service configuration. """ self.service_config = service_config
# pylint: disable=too-many-instance-attributes
[docs]class ServiceConfig(AbstractServiceConfig): """Implements composed dependency injection to Forseti Server services.""" def __init__(self, forseti_config_file_path, forseti_db_connect_string, endpoint): """Initialize. Args: forseti_config_file_path (str): Path to Forseti configuration file forseti_db_connect_string (str): Forseti database string endpoint (str): server endpoint """ super(ServiceConfig, self).__init__() self.thread_pool = ThreadPool() # Enable pool_pre_ping to ensure that disconnected or errored # connections are dropped and recreated before use. self.engine = create_engine(forseti_db_connect_string, pool_recycle=3600, pool_pre_ping=True) self.model_manager = ModelManager(self.engine) self.sessionmaker = db.create_scoped_sessionmaker(self.engine) self.endpoint = endpoint self.forseti_config_file_path = forseti_config_file_path self.inventory_config = None self.scanner_config = None self.notifier_config = None self.global_config = None self.forseti_config = None self.update_lock = threading.RLock()
[docs] def _read_from_config(self, config_file_path=None): """Read from the forseti configuration file. Args: config_file_path (str): Forseti server config file path Returns: tuple(dict, str): (Forseti server configuration, Error message) """ # if config_file_path is not passed in, we will use the default # configuration path that was passed in during the initialization # of the server. forseti_config_path = config_file_path or self.forseti_config_file_path forseti_config = {} err_msg = '' try: forseti_config = file_loader.read_and_parse_file( forseti_config_path) except (AttributeError, IOError): err_msg = ('Unable to open Forseti Security config file. Please ' 'check your path and filename and try again.') LOGGER.exception(err_msg) return forseti_config, err_msg
[docs] def update_configuration(self, config_file_path=None): """Update the inventory, scanner, global and notifier configurations. Args: config_file_path (str): Forseti server config file path. Returns: tuple(bool, str): (Configuration was updated, Error message) """ forseti_config, err_msg = self._read_from_config(config_file_path) if not forseti_config: # if forseti_config is empty, there is nothing to update. return False, err_msg with self.update_lock: # Lock before performing the update to avoid multiple updates # at the same time. self.forseti_config = forseti_config # Setting up individual configurations forseti_inventory_config = forseti_config.get('inventory', {}) try: inventory_config = InventoryConfig( forseti_inventory_config.get('root_resource_id'), forseti_inventory_config.get('domain_super_admin_email', ''), forseti_inventory_config.get('api_quota', {}), forseti_inventory_config.get('retention_days', -1), # Default to disable CloudAsset Inventory if not configured. forseti_inventory_config.get('cai', {'enabled': False}), composite_root_resources=( forseti_inventory_config.get( 'composite_root_resources') ), excluded_resources=forseti_inventory_config.get( 'excluded_resources', []) ) except ValueError as e: return False, str(e) # TODO: Create Config classes to store scanner and notifier configs. forseti_scanner_config = forseti_config.get('scanner', {}) # The suffix is used to indicate which major feature is enabled for # tracking purposes. For now only config validator is supported. user_agent_suffix = '' for scanner in forseti_scanner_config.get('scanners', {}): if (scanner.get('enabled') and scanner.get('name') == 'config_validator'): user_agent_suffix = 'config-validator' break http_helpers.set_user_agent_suffix(user_agent_suffix) forseti_notifier_config = forseti_config.get('notifier', {}) forseti_global_config = forseti_config.get('global', {}) self.inventory_config = inventory_config self.inventory_config.set_service_config(self) self.scanner_config = forseti_scanner_config self.notifier_config = forseti_notifier_config self.global_config = forseti_global_config return True, err_msg
[docs] def get_forseti_config(self): """Get the Forseti config. Returns: dict: Forseti config. """ return self.forseti_config
[docs] def get_inventory_config(self): """Get the inventory config. Returns: object: Inventory config. """ return self.inventory_config
[docs] def get_scanner_config(self): """Get the scanner config. Returns: dict: Scanner config. """ return self.scanner_config
[docs] def get_notifier_config(self): """Get the notifier config. Returns: dict: Notifier config. """ return self.notifier_config
[docs] def get_global_config(self): """Get the global config. Returns: dict: Global config. """ return self.global_config
[docs] def get_engine(self): """Get the database engine. Returns: object: Database engine object. """ return self.engine
[docs] def scoped_session(self): """Get a scoped session. Returns: object: A scoped session. """ return self.sessionmaker()
[docs] def client(self): """Get an API client. Returns: object: API client to use against services. """ return ClientComposition(self.endpoint)
[docs] def run_in_background(self, func): """Runs a function in a thread pool in the background. Args: func (Function): Function to be executed. """ self.thread_pool.apply_async(func)
[docs] def get_storage_class(self): """Returns the storage class used to access the inventory. Returns: class: Type of a storage implementation. """ return Storage
# pylint: enable=too-many-instance-attributes