Source code for google.cloud.forseti.scanner.audit.lien_rules_engine

# Copyright 2017 The Forseti Security Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Rules engine for Liens."""
import collections

from google.cloud.forseti.common.gcp_type import resource_util
from google.cloud.forseti.common.util import logger
from google.cloud.forseti.common.util import relationship
from google.cloud.forseti.scanner.audit import base_rules_engine
from google.cloud.forseti.scanner.audit import errors

LOGGER = logger.get_logger(__name__)


RuleViolation = collections.namedtuple(
    'RuleViolation',
    ['resource_id', 'resource_name', 'resource_type', 'full_name', 'rule_index',
     'rule_name', 'violation_type', 'resource_data']
)


[docs]class LienRulesEngine(base_rules_engine.BaseRulesEngine): """Rules engine for Liens.""" def __init__(self, rules_file_path, snapshot_timestamp=None): """Initialize. Args: rules_file_path (str): file location of rules snapshot_timestamp (str): snapshot timestamp. Defaults to None. If set, this will be the snapshot timestamp used in the engine. """ super(LienRulesEngine, self).__init__(rules_file_path=rules_file_path) self.rule_book = None
[docs] def build_rule_book(self, global_configs=None): """Build LienRuleBook from the rules definition file. Args: global_configs (dict): Global configurations. """ self.rule_book = LienRuleBook(self._load_rule_definitions())
[docs] def find_violations(self, parent_resource, liens, force_rebuild=False): """Determine whether Big Query datasets violate rules. Args: parent_resource (Resource): parent resource the lien belongs to. liens (List[Lien]): liens to find violations for. force_rebuild (bool): If True, rebuilds the rule book. This will reload the rules definition file and add the rules to the book. Returns: generator: A generator of rule violations. """ if self.rule_book is None or force_rebuild: self.build_rule_book() violations = self.rule_book.find_violations(parent_resource, liens) return violations
[docs] def add_rules(self, rule_defs): """Add rules to the rule book. Args: rule_defs (dict): rule definitions dictionary """ if self.rule_book is not None: self.rule_book.add_rules(rule_defs)
[docs]class LienRuleBook(base_rules_engine.BaseRuleBook): """The RuleBook for Lien resources.""" def __init__(self, rule_defs=None): """Initialization. Args: rule_defs (dict): rule definitons dictionary. """ super(LienRuleBook, self).__init__() self.resource_to_rules = collections.defaultdict(list) if not rule_defs: self.rule_defs = {} else: self.rule_defs = rule_defs self.add_rules(rule_defs)
[docs] def add_rules(self, rule_defs): """Add rules to the rule book. Args: rule_defs (dict): rule definitions dictionary. """ for (i, rule) in enumerate(rule_defs.get('rules', [])): self.add_rule(rule, i)
[docs] def add_rule(self, rule_def, rule_index): """Add a rule to the rule book. Args: rule_def (dict): A dictionary containing rule definition properties. rule_index (int): The index of the rule from the rule definitions. Assigned automatically when the rule book is built. """ resources = rule_def.get('resource') if not resources: raise errors.InvalidRulesSchemaError( 'Missing field "resource" in rule {}'.format(rule_index)) for raw_resource in resources: resource_ids = raw_resource.get('resource_ids') if not resource_ids: raise errors.InvalidRulesSchemaError( 'Missing resource ids in rule {}'.format(rule_index)) resource_type = raw_resource.get('type') if resource_type not in ['project', 'folder', 'organization']: raise errors.InvalidRulesSchemaError( 'Invalid resource type "{}" in rule {}'.format( resource_type, rule_index)) for resource_id in resource_ids: resource = resource_util.create_resource( resource_id=resource_id, resource_type=resource_type, ) if not resource: raise errors.InvalidRulesSchemaError( 'Invalid resource in rule {} (id: {}, type: {})'.format( rule_index, resource_id, resource_type)) rule = self._build_rule(rule_def, rule_index) self.resource_to_rules[resource].append(rule)
[docs] @classmethod def _build_rule(cls, rule_def, rule_index): """Build a rule. Args: rule_def (dict): A dictionary containing rule definition properties. rule_index (int): The index of the rule from the rule definitions. Assigned automatically when the rule book is built. Returns: Rule: rule for the given definition. """ for field in ['name', 'restrictions']: if field not in rule_def: raise errors.InvalidRulesSchemaError( 'Missing field "{}" in rule {}'.format(field, rule_index)) return Rule(name=rule_def.get('name'), index=rule_index, restrictions=rule_def.get('restrictions'))
[docs] def find_violations(self, parent_resource, liens): """Find lien violations in the rule book. Args: parent_resource (Resource): The GCP resource associated with the liens. This is where we start looking for rule violations and we move up the resource hierarchy (if permitted by the resource's "inherit_from_parents" property). liens (List[Lien]): The liens to look for violations. Yields: RuleViolation: lien rule violations. """ all_restrictions = set() for lien in liens: for restriction in lien.restrictions: all_restrictions.add(restriction) resource_ancestors = relationship.find_ancestors( parent_resource, parent_resource.full_name) applicable_rules = [] for res in resource_ancestors: applicable_rules.extend(self.resource_to_rules.get(res, [])) for rule in applicable_rules: for violation in rule.find_violations(parent_resource, all_restrictions): yield violation
[docs]class Rule(object): """Rule properties from the rule definition file. Also finds violations. """ def __init__(self, name, index, restrictions): """Initialize. Args: name (str): Name of the loaded rule. index (int): The index of the rule from the rule definitions. restrictions (List[string]): The restrictions this rule enforces on liens. """ self.name = name self.index = index self.restrictions = restrictions
[docs] def find_violations(self, parent_resource, restrictions): """Find violations for this rule against the given resource. Args: parent_resource (Resource): The GCP resource associated with the liens. restrictions (Iterable[str]): The restrictions to check. Yields: RuleViolation: lien rule violation. """ for restriction in self.restrictions: if restriction not in restrictions: yield RuleViolation( resource_id=parent_resource.id, resource_name=parent_resource.display_name, resource_type=parent_resource.type, full_name=parent_resource.full_name, rule_index=self.index, rule_name=self.name, violation_type='LIEN_VIOLATION', resource_data='', ) return