# Copyright 2017 The Forseti Security Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Rules engine for checking crypto keys configuration."""
from builtins import object
from collections import namedtuple
import datetime
import threading
from google.cloud.forseti.common.gcp_type import resource_util
from google.cloud.forseti.common.util import logger, date_time, string_formats
from google.cloud.forseti.scanner.audit import base_rules_engine as bre
from google.cloud.forseti.scanner.audit import errors as audit_errors
LOGGER = logger.get_logger(__name__)
VIOLATION_TYPE = 'CRYPTO_KEY_VIOLATION'
# Rule Modes.
WHITELIST = 'whitelist'
BLACKLIST = 'blacklist'
RULE_MODES = frozenset([BLACKLIST, WHITELIST])
[docs]class KMSRulesEngine(bre.BaseRulesEngine):
"""Rules engine for KMS scanner."""
def __init__(self, rules_file_path, snapshot_timestamp=None):
"""Initialize.
Args:
rules_file_path (str): file location of rules
snapshot_timestamp (str): snapshot timestamp. Defaults to None.
If set, this will be the snapshot timestamp
used in the engine.
"""
super(KMSRulesEngine,
self).__init__(rules_file_path=rules_file_path)
self.rule_book = None
self.snapshot_timestamp = snapshot_timestamp
self._lock = threading.Lock()
[docs] def build_rule_book(self, global_configs=None):
"""Build KMSRuleBook from the rules definition file.
Args:
global_configs (dict): Global configurations.
"""
with self._lock:
self.rule_book = KMSRuleBook(
self._load_rule_definitions())
[docs] def find_violations(self, key, force_rebuild=False):
"""Determine whether crypto key configuration violates rules.
Args:
key (CryptoKey): A crypto key resource to check.
force_rebuild (bool): If True, rebuilds the rule book. This will
reload the rules definition file and add the rules to the book.
Returns:
generator: A generator of rule violations.
"""
res = self.rule_book is None or force_rebuild
if res:
self.build_rule_book()
violations = self.rule_book.find_violations(key)
return violations
[docs] def add_rules(self, rules):
"""Add rules to the rule book.
Args:
rules (list): The list of rules to add to the book.
"""
if self.rule_book is not None:
self.rule_book.add_rules(rules)
[docs]class KMSRuleBook(bre.BaseRuleBook):
"""The RuleBook for crypto key rules."""
supported_resource_types = frozenset([
'organization'
])
def __init__(self, rule_defs=None):
"""Initialization.
Args:
rule_defs (list): CryptoKeys rule definition dicts
"""
super(KMSRuleBook, self).__init__()
self._lock = threading.Lock()
self.resource_rules_map = {}
if not rule_defs:
self.rule_defs = {}
else:
self.rule_defs = rule_defs
self.add_rules(rule_defs)
[docs] def __eq__(self, other):
"""Equals.
Args:
other (object): Object to compare.
Returns:
bool: True or False.
"""
if not isinstance(other, type(self)):
return NotImplemented
return self.resource_rules_map == other.resource_rules_map
[docs] def __ne__(self, other):
"""Not Equals.
Args:
other (object): Object to compare.
Returns:
bool: True or False.
"""
return not self == other
[docs] def __repr__(self):
"""Object representation.
Returns:
str: The object representation.
"""
return 'KMSRuleBook <{}>'.format(self.resource_rules_map)
[docs] def add_rules(self, rule_defs):
"""Add rules to the rule book.
Args:
rule_defs (dict): rule definitions dictionary
"""
for (i, rule) in enumerate(rule_defs.get('rules', [])):
self.add_rule(rule, i)
[docs] def add_rule(self, rule_def, rule_index):
"""Add a rule to the rule book.
Args:
rule_def (dict): A dictionary containing rule definition
properties.
rule_index (int): The index of the rule from the rule definitions.
Assigned automatically when the rule book is built.
"""
resources = rule_def.get('resource')
mode = rule_def.get('mode')
key = rule_def.get('key')
if not resources or key is None or mode not in RULE_MODES:
raise audit_errors.InvalidRulesSchemaError(
'Faulty rule {}'.format(rule_index))
for resource in resources:
resource_type = resource.get('type')
resource_ids = resource.get('resource_ids')
if resource_type not in self.supported_resource_types:
raise audit_errors.InvalidRulesSchemaError(
'Invalid resource type in rule {}'.format(rule_index))
if not resource_ids or len(resource_ids) < 1:
raise audit_errors.InvalidRulesSchemaError(
'Missing resource ids in rule {}'.format(rule_index))
# For each resource id associated with the rule, create a
# mapping of resource => rules.
for resource_id in resource_ids:
gcp_resource = resource_util.create_resource(
resource_id=resource_id,
resource_type=resource_type)
rule_def_resource = {
'key': key,
'mode': mode
}
rule = Rule(rule_name=rule_def.get('name'),
rule_index=rule_index,
rule=rule_def_resource)
resource_rules = self.resource_rules_map.setdefault(
gcp_resource, ResourceRules(resource=gcp_resource))
if not resource_rules:
self.resource_rules_map[rule_index] = rule
if rule not in resource_rules.rules:
resource_rules.rules.add(rule)
[docs] def get_resource_rules(self, resource):
"""Get all the resource rules for resource.
Args:
resource (Resource): The gcp_type Resource find in the map.
Returns:
ResourceRules: A ResourceRules object.
"""
return self.resource_rules_map.get(resource)
[docs] def find_violations(self, key):
"""Find crypto key violations in the rule book.
Args:
key (CryptoKey): The GCP resource to check for violations.
Returns:
RuleViolation: resource crypto key rule violations.
"""
LOGGER.debug('Looking for crypto key violations: %s',
key.name)
violations = []
resource_ancestors = resource_util.get_ancestors_from_full_name(
key.crypto_key_full_name)
LOGGER.debug('Ancestors of resource: %r', resource_ancestors)
checked_wildcards = set()
for curr_resource in resource_ancestors:
if not curr_resource:
# The leaf node in the hierarchy
continue
resource_rule = self.get_resource_rules(curr_resource)
if resource_rule:
violations.extend(
resource_rule.find_violations(key))
wildcard_resource = resource_util.create_resource(
resource_id='*', resource_type=curr_resource.type)
if wildcard_resource in checked_wildcards:
continue
checked_wildcards.add(wildcard_resource)
resource_rule = self.get_resource_rules(wildcard_resource)
if resource_rule:
violations.extend(
resource_rule.find_violations(key))
LOGGER.debug('Returning violations: %r', violations)
return violations
[docs]class ResourceRules(object):
"""An association of a resource to rules."""
def __init__(self,
resource=None,
rules=None):
"""Initialize.
Args:
resource (Resource): The resource to associate with the rule.
rules (set): rules to associate with the resource.
"""
if not isinstance(rules, set):
rules = set([])
self.resource = resource
self.rules = rules
[docs] def find_violations(self, key):
"""Determine if the policy binding matches this rule's criteria.
Args:
key (CryptoKey): crypto key resource.
Returns:
list: RuleViolation
"""
violations = []
for rule in self.rules:
rule_violations = rule.find_violations(key)
if rule_violations:
violations.extend(rule_violations)
return violations
[docs] def __eq__(self, other):
"""Compare == with another object.
Args:
other (ResourceRules): object to compare with
Returns:
int: comparison result
"""
if not isinstance(other, type(self)):
return NotImplemented
return (self.resource == other.resource and
self.rules == other.rules)
[docs] def __ne__(self, other):
"""Compare != with another object.
Args:
other (object): object to compare with
Returns:
int: comparison result
"""
return not self == other
[docs] def __repr__(self):
"""String representation of this node.
Returns:
str: debug string
"""
return 'KMSResourceRules<resource={}, rules={}>'.format(
self.resource, self.rules)
[docs]class Rule(object):
"""Rule properties from the rule definition file, also finds violations."""
def __init__(self, rule_name, rule_index, rule):
"""Initialize.
Args:
rule_name (str): Name of the loaded rule.
rule_index (int): The index of the rule from the rule definitions.
rule (dict): The rule definition from the file.
"""
self.rule_name = rule_name
self.rule_index = rule_index
self.rule = rule
[docs] @classmethod
def find_match_rotation_period(cls, key, rotation_period, mode):
"""Check if there is a match for this rule rotation period against the
given resource.
If the mode is whitelist and days since the key was last rotated is less
than or equals to the rotation period specified then there is no
violation.
If the mode is blacklist and days since the key was last rotated is
greater than the rotation period specified then there is a violation.
Args:
key (Resource): The resource to check for a match.
mode (string): The mode specified in the rule.
rotation_period (string): The cut off rotation schedule of crypto
key specified in rule file.
Returns:
bool: Returns true if a match is found.
"""
LOGGER.debug('Formatting rotation time...')
creation_time = key.primary_version.get('createTime')
scan_time = date_time.get_utc_now_datetime()
last_rotation_time = creation_time[:-5]
formatted_last_rotation_time = datetime.datetime.strptime(
last_rotation_time, string_formats.TIMESTAMP_MICROS)
days_since_rotated = (scan_time - formatted_last_rotation_time).days
if mode == BLACKLIST and days_since_rotated > rotation_period:
return True
elif mode == WHITELIST and days_since_rotated <= rotation_period:
return True
return False
[docs] @classmethod
def find_match_algorithms(cls, key, rule_algorithms):
"""Check if there is a match for this rule algorithm against the given
resource.
Args:
key (Resource): The resource to check for a match.
rule_algorithms (string): The algorithms of this rule.
Returns:
bool: Returns true if a match is found.
"""
LOGGER.debug('Checking if the algorithm specified matches with that of'
' crypto key.')
key_algorithm = key.primary_version.get('algorithm')
for algorithm in rule_algorithms:
if key_algorithm == algorithm:
return True
return False
[docs] @classmethod
def find_match_protection_level(cls, key, rule_protection_level):
"""Check if there is a match for this rule protection level against the
given resource.
Args:
key (Resource): The resource to check for a match.
rule_protection_level (string): The protection level of this rule.
Returns:
bool: Returns true if a match is found.
"""
key_protection_level = key.primary_version.get('protectionLevel')
if key_protection_level == rule_protection_level:
return True
return False
[docs] @classmethod
def find_match_purpose(cls, key, rule_purpose):
"""Check if there is a match for this rule purpose against the given
resource.
Args:
key (Resource): The resource to check for a match.
rule_purpose (list): The purpose of this rule.
Returns:
bool: Returns true if a match is found.
"""
key_purpose = key.purpose
for purpose in rule_purpose:
if key_purpose == purpose:
return True
return False
[docs] @classmethod
def find_match_state(cls, key, rule_state):
"""Check if there is a match for this rule state against the given
resource.
Args:
key (Resource): The resource to check for a match.
rule_state (list): The state of this rule.
Returns:
bool: Returns true if a match is found.
"""
key_state = key.primary_version.get('state')
for state in rule_state:
if state == key_state:
return True
return False
[docs] def find_violations(self, key):
"""Find violations for this rule against the given resource.
Args:
key (Resource): The resource to check for violations.
Returns:
list: Returns a list of RuleViolation named tuples.
"""
violations = []
state = key.primary_version.get('state')
if not state == 'ENABLED':
return violations
mode = self.rule['mode']
crypto_key_rule = self.rule['key']
for key_data in crypto_key_rule:
has_violation = False
rule_algorithms = key_data.get('algorithms')
rule_protection_level = key_data.get('protection_level')
rule_purpose = key_data.get('purpose')
rule_state = key_data.get('state')
rotation_period = key_data.get('rotation_period')
all_matched = True
if rotation_period:
all_matched = all_matched and self.find_match_rotation_period(
key, rotation_period, mode)
if rule_algorithms:
all_matched = all_matched and self.find_match_algorithms(
key, rule_algorithms)
if rule_protection_level:
all_matched = all_matched and self.find_match_protection_level(
key, rule_protection_level)
if rule_purpose:
all_matched = all_matched and self.find_match_purpose(
key, rule_purpose)
if rule_state:
all_matched = all_matched and self.find_match_state(
key, rule_state)
if mode == BLACKLIST and all_matched:
has_violation = True
elif mode == WHITELIST and not all_matched:
has_violation = True
if has_violation:
violations.append(RuleViolation(
resource_id=key.id,
resource_type=key.type,
resource_name=key.id,
full_name=key.crypto_key_full_name,
rule_index=self.rule_index,
rule_name=self.rule_name,
violation_type=VIOLATION_TYPE,
primary_version=key.primary_version,
next_rotation_time=key.next_rotation_time,
rotation_period=key.rotation_period,
state=key.primary_version.get('state'),
algorithm=key.primary_version.get('algorithm'),
protection_level=key.primary_version.get('protectionLevel'),
purpose=key.purpose,
key_creation_time=key.create_time,
resource_data=key.data))
return violations
[docs] def __eq__(self, other):
"""Test whether Rule equals other Rule.
Args:
other (Rule): object to compare to
Returns:
int: comparison result
"""
if not isinstance(other, type(self)):
return NotImplemented
return (self.rule_name == other.rule_name and
self.rule_index == other.rule_index)
[docs] def __ne__(self, other):
"""Test whether Rule is not equal to another Rule.
Args:
other (object): object to compare to
Returns:
int: comparison result
"""
return not self == other
[docs] def __hash__(self):
"""Make a hash of the rule index.
Returns:
int: The hash of the rule index.
"""
return hash(self.rule_index)
# pylint: enable=inconsistent-return-statements
# Rule violation.
# resource_type: string
# resource_id: string
# resource_name: string
# primary_version: string
# next_rotation_time: string
# rule_name: string
# rule_index: int
# full_name: string
# violation_type: CRYPTO_KEY_VIOLATION
# state: string
# purpose: string
# algorithm: string
# protection_level: string
# rotation_period: string
# key_creation_time: string
# resource_data: string
RuleViolation = namedtuple('RuleViolation',
['resource_id', 'resource_type', 'resource_name',
'full_name', 'rule_index', 'rule_name',
'violation_type', 'state',
'primary_version', 'next_rotation_time',
'rotation_period', 'key_creation_time',
'algorithm', 'protection_level',
'purpose', 'resource_data'])