Source code for

# Copyright 2017 The Forseti Security Authors. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

""" Inventory gRPC service. """

from builtins import object
import google.protobuf.timestamp_pb2 as timestamp

from import inventory_pb2
from import inventory_pb2_grpc
from import inventory
from import autoclose_stream

# pylint: disable=no-member
    'Your inventory contains {message_type} message(s), please run command '
    '`forseti inventory get {inventory_index_id}` for more information.')

[docs]def inventory_pb_from_object(inventory_index, warning_messages): """Convert internal inventory data structure to protobuf. Args: inventory_index (InventoryIndex): InventoryIndex class in inventory storage. warning_messages (list): Warning message(s) from the inventory Returns: object: proto message of InventoryIndex """ # complete_timestamp is None before the inventory finished. complete_timestamp = None if inventory_index.completed_at_datetime: complete_timestamp = timestamp.Timestamp() complete_timestamp.FromDatetime(inventory_index.completed_at_datetime) start_timestamp = timestamp.Timestamp() start_timestamp.FromDatetime(inventory_index.created_at_datetime) errors = inventory_index.inventory_index_errors return inventory_pb2.InventoryIndex(, start_timestamp=start_timestamp, complete_timestamp=complete_timestamp, schema_version=inventory_index.schema_version, count_objects=inventory_index.counter, status=inventory_index.inventory_status, warnings='\n'.join(warning_messages), errors=errors)
[docs]class GrpcInventory(inventory_pb2_grpc.InventoryServicer): """Inventory gRPC handler.""" def __init__(self, inventory_api): """Initialize Args: inventory_api (object): inventory library """ super(GrpcInventory, self).__init__() self.inventory = inventory_api
[docs] def Ping(self, request, _): """Ping implemented to check service availability. Args: request (object): gRPC request object. _ (object): Unused. Returns: object: PingReply containing echo of data. """ return inventory_pb2.PingReply(
@autoclose_stream def Create(self, request, _): """Creates a new inventory. Args: request (object): gRPC request object. _ (object): Unused. Yields: object: Inventory progress updates. """ for progress in self.inventory.create(request.background, request.model_name): if request.enable_debug: last_warning = repr(progress.last_warning) last_error = repr(progress.last_error) else: last_warning = None last_error = None yield inventory_pb2.Progress( id=progress.inventory_index_id, final_message=progress.final_message, step=progress.step, warnings=progress.warnings, errors=progress.errors, last_warning=last_warning, last_error=last_error) @autoclose_stream def List(self, request, _): """Lists existing inventory. Args: request (object): gRPC request object. _ (object): Unused. Yields: object: Each Inventory API object. """ for inventory_index in self.inventory.list(): if (inventory_index.warning_count or inventory_index.inventory_index_warnings): inventory_warnings = [SUPPRESS_MESSAGE_TEMPLATE.format( message_type='warning',] else: inventory_warnings = [] yield inventory_pb_from_object(inventory_index, inventory_warnings)
[docs] def Get(self, request, _): """Gets existing inventory. Args: request (object): gRPC request object. _ (object): Unused. Returns: object: Inventory API object that is requested. """ inventory_index = self.inventory.get( inventory_warnings = [] if inventory_index.warning_count: inventory_warnings.extend( '{}: {}'.format(row.resource_full_name, row.warning_message) for row in inventory_index.warning_messages) if inventory_index.inventory_index_warnings: inventory_warnings.append(inventory_index.inventory_index_warnings) return inventory_pb2.GetReply( inventory=inventory_pb_from_object(inventory_index, inventory_warnings))
[docs] def Delete(self, request, _): """Deletes existing inventory. Args: request (object): gRPC request object. _ (object): Unused Returns: object: Inventory API object that is deleted. """ inventory_index = self.inventory.delete( return inventory_pb2.DeleteReply( inventory=inventory_pb_from_object(inventory_index, []))
[docs] def Purge(self, request, _): """Purge desired inventory data. Args: request (object): gRPC request object. _ (object): Unused Returns: object: gRPC reply object. """ result = self.inventory.purge(request.retention_days) return inventory_pb2.PurgeReply( result=result)
[docs]class GrpcInventoryFactory(object): """Factory class for Inventory service gRPC interface""" def __init__(self, config): """Initialize Args: config (object): ServiceConfig in server """ self.config = config
[docs] def create_and_register_service(self, server): """Creates an inventory service and registers it in the server. Args: server (object): Server to register service to. Returns: object: The instantiated gRPC service for inventory. """ service = GrpcInventory( inventory_api=inventory.Inventory( self.config)) inventory_pb2_grpc.add_InventoryServicer_to_server(service, server) return service