Source code for

# Copyright 2017 The Forseti Security Authors. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""Forseti Server model gRPC service."""

from import string_formats
from import model_pb2
from import model_pb2_grpc
from import modeller
from import logger

LOGGER = logger.get_logger(__name__)

[docs]class GrpcModeller(model_pb2_grpc.ModellerServicer): """Modeller gRPC implementation.""" HANDLE_KEY = 'handle'
[docs] def _get_handle(self, context): """Return the handle associated with the gRPC call. Args: context (object): GRPC context Returns: str: handle of the GRPC call """ metadata = context.invocation_metadata() metadata_dict = {} for key, value in metadata: metadata_dict[key] = value return metadata_dict[self.HANDLE_KEY]
def __init__(self, modeller_api): """Initialize Args: modeller_api (object): model library """ super(GrpcModeller, self).__init__() self.modeller = modeller_api
[docs] def Ping(self, request, _): """Provides the capability to check for service availability. Args: request (object): pb2 object of PingRequest _(object): Not used Returns: object: pb2 object of the PingReply """ return model_pb2.PingReply(
[docs] def CreateModel(self, request, context): """Creates a new model from an import source. Args: request (object): pb2 object of CreateModelRequest context (object): gRPC context Returns: object: pb2 object of ModelSimplified """ LOGGER.debug('Received request to create model: %s', request) model = self.modeller.create_model(request.type,,, request.background) created_at_str = self._get_model_created_at_str(model) LOGGER.debug('Model %s created at: %s', model, created_at_str) reply = model_pb2.CreateModelReply(model=model_pb2.ModelSimplified(, handle=model.handle, status=model.state, createdAt=created_at_str, description=model.description)) return reply
[docs] def DeleteModel(self, request, _): """Deletes a model and all associated data. Args: request (object): pb2 object of DeleteModelRequest _ (object): Not used Returns: object: pb2 object of DeleteModelReply """ model_name = request.handle try: self.modeller.delete_model(model_name) # pylint: disable=no-member status = model_pb2.DeleteModelReply.Status.Value('SUCCESS') except Exception: # pylint: disable=broad-except LOGGER.exception('Unable to delete model: %s', model_name) # pylint: disable=no-member status = model_pb2.DeleteModelReply.Status.Value('FAIL') return model_pb2.DeleteModelReply(status=status)
[docs] def ListModel(self, request, _): """List all models. Args: request (object): pb2 object of ListModelRequest _ (object): Not used Yields: object: pb2 object of ModelSimplified """ models = self.modeller.list_model() for model in models: created_at_str = self._get_model_created_at_str(model) yield model_pb2.ModelSimplified(, handle=model.handle, status=model.state, createdAt=created_at_str, description=model.description, message=model.message)
[docs] def GetModel(self, request, _): """Get details of a model. Args: request (object): pb2 object of GetModelRequest _ (object): Not used Returns: object: pb2 object of ModelDetails """ model = self.modeller.get_model(request.identifier) if model: created_at_str = self._get_model_created_at_str(model) return model_pb2.ModelDetails(, handle=model.handle, status=model.state, createdAt=created_at_str, description=model.description, message=model.message, warnings=model.warnings) return model_pb2.ModelDetails()
[docs] @staticmethod def _get_model_created_at_str(model): """Get model created_at datetime in human readable string format. Args: model (Model): Model dao object. Return: str: created_at datetime in string format. """ return model.created_at_datetime.strftime( string_formats.DEFAULT_FORSETI_HUMAN_TIMESTAMP)
[docs]class GrpcModellerFactory(object): """Factory class for model service gRPC interface""" def __init__(self, config): """Initialize Args: config (object): ServiceConfig in server """ self.config = config
[docs] def create_and_register_service(self, server): """Create and register the Model service. Args: server (object): Server to register service to. Returns: object: The instantiated gRPC service for model. """ service = GrpcModeller(modeller_api=modeller.Modeller(self.config)) model_pb2_grpc.add_ModellerServicer_to_server(service, server)'Service %s created and registered', service) return service