Source code for google.cloud.forseti.scanner.audit.role_rules_engine

# Copyright 2019 The Forseti Security Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Rules engine for Roles."""
from builtins import object
import collections
import itertools
import threading
import json

from google.cloud.forseti.common.util import logger
from google.cloud.forseti.common.util import relationship
from google.cloud.forseti.scanner.audit import base_rules_engine as bre
from google.cloud.forseti.scanner.audit import errors as audit_errors


LOGGER = logger.get_logger(__name__)

VIOLATION_TYPE = 'CUSTOM_ROLE_VIOLATION'

RuleViolation = collections.namedtuple(
    'RuleViolation',
    ['resource_name', 'resource_type', 'full_name', 'rule_name',
     'rule_index', 'violation_type', 'violation_data', 'resource_data',
     'resource_id'])


[docs]class RoleRulesEngine(bre.BaseRulesEngine): """Rules engine for roles.""" def __init__(self, rules_file_path, snapshot_timestamp=None): """Initialize. Args: rules_file_path (str): file location of rules snapshot_timestamp (str): snapshot timestamp. Defaults to None. If set, this will be the snapshot timestamp used in the engine. """ super(RoleRulesEngine, self).__init__(rules_file_path=rules_file_path) self.rule_book = None
[docs] def build_rule_book(self, global_configs=None): """Build RoleRuleBook from the rules definition file. Args: global_configs (dict): Global configurations. """ self.rule_book = RoleRuleBook(self._load_rule_definitions())
[docs] def find_violations(self, role, force_rebuild=False): """Determine whether the role violates rules. Args: role (Role): Role to be tested. force_rebuild (bool): If True, rebuilds the rule book. This will reload the rules definition file and add the rules to the book. Returns: generator: A generator of rule violations. """ if self.rule_book is None or force_rebuild: self.build_rule_book() violations = itertools.chain() rules = self.rule_book.get_rule_by_role_name(role.id) for rule in rules: violations = itertools.chain( violations, rule.find_violations(role)) return set(violations)
[docs]class RoleRuleBook(bre.BaseRuleBook): """The RuleBook for Role resources.""" def __init__(self, rule_defs=None): """Initialization. Args: rule_defs (dict): rule definitons """ super(RoleRuleBook, self).__init__() self._rules_sema = threading.BoundedSemaphore(value=1) self.rules_map = {} if not rule_defs: self.rule_defs = {} else: self.rule_defs = rule_defs self.add_rules(rule_defs)
[docs] def add_rules(self, rule_defs): """Add rules to the rule book. Args: rule_defs (dict): rule definitions dictionary. """ for (i, rule) in enumerate(rule_defs.get('rules', [])): self.add_rule(rule, i)
[docs] def add_rule(self, rule_def, rule_index): """Add a rule to the rule book. Args: rule_def (dict): A dictionary containing rule definition properties. rule_index (int): The index of the rule from the rule definitions. Assigned automatically when the rule book is built. """ if 'name' not in rule_def: raise audit_errors.InvalidRulesSchemaError( 'Lack of role_name in rule {}'.format(rule_index)) if 'role_name' not in rule_def: raise audit_errors.InvalidRulesSchemaError( 'Lack of role_name in rule {}'.format(rule_index)) role_name = rule_def['role_name'] if 'permissions' not in rule_def: raise audit_errors.InvalidRulesSchemaError( 'Lack of permissions in rule {}'.format(rule_index)) if 'resource' not in rule_def: raise audit_errors.InvalidRulesSchemaError( 'Lack of resource in rule {}'.format(rule_index)) res = rule_def['resource'] rule = Rule(rule_index=rule_index, rule_name=rule_def['name'], permissions=rule_def['permissions'], res=res) if role_name not in self.rules_map: self.rules_map[role_name] = [rule] else: self.rules_map[role_name].append(rule)
[docs] def get_rule_by_role_name(self, role_name): """Get the rule of a given role. Args: role_name (str): Name of a role. Returns: list: Rule list of the given role. """ return self.rules_map.get(role_name, [])
[docs]class Rule(object): """Rule properties from the rule definition file. Also finds violations.""" def __init__(self, rule_index, rule_name, permissions, res): """Initialize. Args: rule_index (int): The index of the rule. rule_name (str): Name of the rule. permissions (int): Expected permissions of the role. res (dict): Parent resource of the role that should obey the rule. """ self.rule_name = rule_name self.rule_index = rule_index self.permissions = permissions[:] self.res_types = res[:] for index, res_item in enumerate(self.res_types): if 'type' not in res_item: raise audit_errors.InvalidRulesSchemaError( 'Lack of resource:type in rule {}'.format(rule_index)) if res_item['type'] not in [ 'organization', 'folder', 'project', 'role']: raise audit_errors.InvalidRulesSchemaError( 'Wrong resource:type {} in rule {}'.format( res_item['type'], rule_index)) if 'resource_ids' not in res_item: raise audit_errors.InvalidRulesSchemaError( 'Lack of resource:resource_ids in rule {}'.format( rule_index)) if '*' in res_item['resource_ids']: self.res_types[index]['resource_ids'] = ['*']
[docs] def generate_violation(self, role): """Generate a violation. Args: role (Role): The role that triggers the violation. Returns: RuleViolation: The violation. """ permissions = role.get_permissions() permissions_str = json.dumps(permissions) return RuleViolation( resource_name=role.name, resource_id=role.id, resource_type=role.type, full_name=role.full_name, rule_name=self.rule_name, rule_index=self.rule_index, violation_type=VIOLATION_TYPE, violation_data=permissions_str, resource_data=role.data, )
[docs] def find_violations(self, res): """Get a generator for violations. Args: res (Resource): A class derived from Resource. Returns: Generator: All violations of the resource breaking the rule. Raises: ValueError: Raised if the resource type is bucket. """ def find_violations_in_role(role): """Get a generator for violations. Args: role (role): Find violation from the role. Returns: RuleViolation: All violations of the role breaking the rule. """ resource_ancestors = (relationship.find_ancestors( role, role.full_name)) violations = itertools.chain() for related_resources in resource_ancestors: violations = itertools.chain( violations, self.find_violations_by_ancestor(related_resources, role)) return violations if res.type == 'role': return find_violations_in_role(res) raise ValueError( 'only role is supported.' )
[docs] def find_violations_by_ancestor(self, ancestor, role): """Get a generator on a given ancestor of the role. Args: role (role): Role to find violation from. ancestor (Resource): Ancestor of the role or the role itself. Yields: RuleViolation: All violations of the role breaking the rule. """ for res in self.res_types: if ancestor.type != res['type']: continue if '*' in res['resource_ids']: if set(role.get_permissions()) != set(self.permissions): yield self.generate_violation(role) else: for res_id in res['resource_ids']: if res_id == ancestor.id: if set(role.get_permissions()) != set(self.permissions): yield self.generate_violation(role)