# Copyright 2017 The Forseti Security Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Crawler implementation."""
from Queue import Empty
from Queue import Queue
import threading
import time
from google.cloud.forseti.common.util import logger
from google.cloud.forseti.services.inventory.base import cai_gcp_client
from google.cloud.forseti.services.inventory.base import cloudasset
from google.cloud.forseti.services.inventory.base import crawler
from google.cloud.forseti.services.inventory.base import gcp
from google.cloud.forseti.services.inventory.base import resources
LOGGER = logger.get_logger(__name__)
[docs]class CrawlerConfig(crawler.CrawlerConfig):
"""Crawler configuration to inject dependencies."""
def __init__(self, storage, progresser, api_client, variables=None):
"""Initialize
Args:
storage (Storage): The inventory storage
progresser (QueueProgresser): The progresser implemented using
a queue
api_client (ApiClientImpl): GCP API client
variables (dict): config variables
"""
super(CrawlerConfig, self).__init__()
self.storage = storage
self.progresser = progresser
self.variables = {} if not variables else variables
self.client = api_client
[docs]class ParallelCrawlerConfig(crawler.CrawlerConfig):
"""Multithreaded crawler configuration, to inject dependencies."""
def __init__(self, storage, progresser, api_client, threads=10,
variables=None):
"""Initialize
Args:
storage (Storage): The inventory storage
progresser (QueueProgresser): The progresser implemented using
a queue
api_client (ApiClientImpl): GCP API client
threads (int): how many threads to use
variables (dict): config variables
"""
super(ParallelCrawlerConfig, self).__init__()
self.storage = storage
self.progresser = progresser
self.variables = {} if not variables else variables
self.threads = threads
self.client = api_client
[docs]class Crawler(crawler.Crawler):
"""Simple single-threaded Crawler implementation."""
def __init__(self, config):
"""Initialize
Args:
config (CrawlerConfig): The crawler configuration
"""
super(Crawler, self).__init__()
self.config = config
[docs] def run(self, resource):
"""Run the crawler, given a start resource.
Args:
resource (object): Resource to start with.
Returns:
QueueProgresser: The filled progresser described in inventory
"""
resource.accept(self)
return self.config.progresser
[docs] def visit(self, resource):
"""Handle a newly found resource.
Args:
resource (object): Resource to handle.
Raises:
Exception: Reraises any exception.
"""
progresser = self.config.progresser
try:
resource.get_iam_policy(self.get_client())
resource.get_gcs_policy(self.get_client())
resource.get_dataset_policy(self.get_client())
resource.get_cloudsql_policy(self.get_client())
resource.get_billing_info(self.get_client())
resource.get_enabled_apis(self.get_client())
resource.get_kubernetes_service_config(self.get_client())
self.write(resource)
except Exception as e:
LOGGER.exception(e)
progresser.on_error(e)
raise
else:
progresser.on_new_object(resource)
[docs] def dispatch(self, callback):
"""Dispatch crawling of a subtree.
Args:
callback (function): Callback to dispatch.
"""
callback()
[docs] def write(self, resource):
"""Save resource to storage.
Args:
resource (object): Resource to handle.
"""
self.config.storage.write(resource)
[docs] def get_client(self):
"""Get the GCP API client.
Returns:
object: GCP API client
"""
return self.config.client
[docs] def on_child_error(self, error):
"""Process the error generated by child of a resource
Inventory does not stop for children errors but raise a warning
Args:
error (str): error message to handle
"""
warning_message = '{}\n'.format(error)
self.config.storage.warning(warning_message)
self.config.progresser.on_warning(error)
[docs] def update(self, resource):
"""Update the row of an existing resource
Args:
resource (Resource): Resource to update.
Raises:
Exception: Reraises any exception.
"""
try:
self.config.storage.update(resource)
except Exception as e:
LOGGER.exception(e)
self.config.progresser.on_error(e)
raise
[docs]class ParallelCrawler(Crawler):
"""Multi-threaded Crawler implementation."""
def __init__(self, config):
"""Initialize
Args:
config (ParallelCrawlerConfig): The crawler configuration
"""
super(ParallelCrawler, self).__init__(config)
self._write_lock = threading.Lock()
self._dispatch_queue = Queue()
self._shutdown_event = threading.Event()
[docs] def _start_workers(self):
"""Start a pool of worker threads for processing the dispatch queue."""
self._shutdown_event.clear()
for _ in xrange(self.config.threads):
worker = threading.Thread(target=self._process_queue)
worker.daemon = True
worker.start()
[docs] def _process_queue(self):
"""Process items in the queue until the shutdown event is set."""
while not self._shutdown_event.is_set():
try:
callback = self._dispatch_queue.get(timeout=1)
except Empty:
continue
callback()
self._dispatch_queue.task_done()
[docs] def run(self, resource):
"""Run the crawler, given a start resource.
Args:
resource (Resource): Resource to start with.
Returns:
QueueProgresser: The filled progresser described in inventory
"""
try:
self._start_workers()
resource.accept(self)
self._dispatch_queue.join()
finally:
self._shutdown_event.set()
# Wait for threads to exit.
time.sleep(2)
return self.config.progresser
[docs] def dispatch(self, callback):
"""Dispatch crawling of a subtree.
Args:
callback (function): Callback to dispatch.
"""
self._dispatch_queue.put(callback)
[docs] def write(self, resource):
"""Save resource to storage.
Args:
resource (Resource): Resource to handle.
"""
with self._write_lock:
self.config.storage.write(resource)
[docs] def on_child_error(self, error):
"""Process the error generated by child of a resource
Inventory does not stop for children errors but raise a warning
Args:
error (str): error message to handle
"""
warning_message = '{}\n'.format(error)
with self._write_lock:
self.config.storage.warning(warning_message)
self.config.progresser.on_warning(error)
[docs] def update(self, resource):
"""Update the row of an existing resource
Args:
resource (Resource): The db row of Resource to update
Raises:
Exception: Reraises any exception.
"""
try:
with self._write_lock:
self.config.storage.update(resource)
except Exception as e:
LOGGER.exception(e)
self.config.progresser.on_error(e)
raise
[docs]def _api_client_factory(storage, config, parallel):
"""Creates the proper initialized API client based on the configuration.
Args:
storage (object): Storage implementation to use.
config (object): Inventory configuration on server.
parallel (bool): If true, use the parallel crawler implementation.
Returns:
Union[gcp.ApiClientImpl, cai_gcp_client.CaiApiClientImpl]:
The initialized api client implementation class.
"""
client_config = config.get_api_quota_configs()
client_config['domain_super_admin_email'] = config.get_gsuite_admin_email()
asset_count = 0
if config.get_cai_enabled():
asset_count = cloudasset.load_cloudasset_data(storage.session, config)
LOGGER.info('%s total assets loaded from Cloud Asset data.',
asset_count)
if asset_count:
engine = config.get_service_config().get_engine()
return cai_gcp_client.CaiApiClientImpl(client_config,
engine,
parallel,
storage.session)
# Default to the non-CAI implementation
return gcp.ApiClientImpl(client_config)
[docs]def _crawler_factory(storage, progresser, client, parallel):
"""Creates the proper initialized crawler based on the configuration.
Args:
storage (object): Storage implementation to use.
progresser (object): Progresser to notify status updates.
client (object): The API client instance.
parallel (bool): If true, use the parallel crawler implementation.
Returns:
Union[Crawler, ParallelCrawler]:
The initialized crawler implementation class.
"""
if parallel:
parallel_config = ParallelCrawlerConfig(storage, progresser, client)
return ParallelCrawler(parallel_config)
# Default to the non-parallel crawler
crawler_config = CrawlerConfig(storage, progresser, client)
return Crawler(crawler_config)
[docs]def _root_resource_factory(config, client):
"""Creates the proper initialized crawler based on the configuration.
Args:
config (object): Inventory configuration on server.
client (object): The API client instance.
Returns:
Resource: The initialized root resource.
"""
if config.use_composite_root():
composite_root_resources = config.get_composite_root_resources()
return resources.CompositeRootResource.create(composite_root_resources)
# Default is a single resource as root.
return resources.from_root_id(client, config.get_root_resource_id())
[docs]def run_crawler(storage,
progresser,
config,
parallel=True):
"""Run the crawler with a determined configuration.
Args:
storage (object): Storage implementation to use.
progresser (object): Progresser to notify status updates.
config (object): Inventory configuration on server.
parallel (bool): If true, use the parallel crawler implementation.
Returns:
QueueProgresser: The progresser implemented in inventory
"""
if parallel and 'sqlite' in str(config.get_service_config().get_engine()):
LOGGER.info('SQLite used, disabling parallel threads.')
parallel = False
client = _api_client_factory(storage, config, parallel)
crawler_impl = _crawler_factory(storage, progresser, client, parallel)
resource = _root_resource_factory(config, client)
progresser = crawler_impl.run(resource)
# flush the buffer at the end to make sure nothing is cached.
storage.commit()
return progresser