Source code for

# Copyright 2017 The Forseti Security Authors. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""A Compute Instance.


from builtins import object
import json
import os

from import key
from import resource

# pylint: disable=too-many-instance-attributes
[docs]class Instance(resource.Resource): """Represents Instance resource.""" RESOURCE_NAME_FMT = 'instances/%s' def __init__(self, instance_id, parent=None, **kwargs): """Instance resource. Args: instance_id (str): id of the instance. parent (Resource): Parent resource of this instance. Should be a project. **kwargs (dict): The object's attributes. Raises: TypeError: If unexpected parent type. """ super(Instance, self).__init__( resource_id=instance_id, resource_type=resource.ResourceType.INSTANCE, name=kwargs.get('name'), display_name=kwargs.get('name'), parent=parent, locations=kwargs.get('locations')) if parent and parent.type != 'project': raise TypeError( 'Unexpected parent type: got {}, want project'.format( parent.type)) self.full_name = kwargs.get('full_name') self.can_ip_forward = kwargs.get('can_ip_forward') self.cpu_platform = kwargs.get('cpu_platform') self.creation_timestamp = kwargs.get('creation_timestamp') self.description = kwargs.get('description') self.disks = kwargs.get('disks') self.machine_type = kwargs.get('machine_type') self.metadata = kwargs.get('metadata') self.network_interfaces = kwargs.get('network_interfaces') self.resource_id = kwargs.get('id') self.scheduling = kwargs.get('scheduling') self.service_accounts = kwargs.get('service_accounts') self.status = kwargs.get('status') self.status_message = kwargs.get('status_message') self.tags = kwargs.get('tags') = kwargs.get('data')
[docs] @classmethod def from_json(cls, parent, json_string): """Creates an Instance from an instance JSON string. Args: parent (Resource): resource this instance belongs to. Should be a project. json_string(str): JSON string of a instance GCP API response. Returns: Instance: A new Instance object. """ instance = json.loads(json_string) instance_key = Key.from_url(instance.get('selfLink')) kwargs = { 'full_name': '{}instance/{}/'.format(parent.full_name,, 'creation_timestamp': instance.get('creationTimestamp'), 'name': instance.get('name'), 'description': instance.get('description'), 'can_ip_forward': instance.get('canIpForward'), 'cpu_platform': instance.get('cpuPlatform'), 'disks': instance.get('disks', []), 'machine_type': instance.get('machineType'), 'metadata': instance.get('metadata', {}), 'network_interfaces': instance.get('networkInterfaces', []), 'scheduling': instance.get('scheduling', {}), 'service_accounts': instance.get('serviceAccounts', []), 'status': instance.get('status'), 'status_message': instance.get('statusMessage'), 'tags': instance.get('tags'), 'data': json.dumps(instance, sort_keys=True), } return cls(, parent=parent, locations=[], **kwargs)
[docs] def _create_json_str(self): """Creates a json string based on the object attributes. Returns: str: json str. """ resource_dict = { 'id':, 'full_name': self.full_name, 'creationTimestamp': self.creation_timestamp, 'name':, 'description': self.description, 'canIpForward': self.can_ip_forward, 'cpuPlatform': self.cpu_platform, 'disks': self.disks, 'machineType': self.machine_type, 'metadata': self.metadata, 'networkInterfaces': self.network_interfaces, 'scheduling': self.scheduling, 'serviceAccounts': self.service_accounts, 'status': self.status, 'statusMessage': self.status_message, 'tags': self.tags, 'locations': self.locations, 'inventory_data':} # Strip out empty values resource_dict = dict((k, v) for k, v in list(resource_dict.items()) if v) return json.dumps(resource_dict, sort_keys=True)
@property def json(self): """Returns the json string representation of the resource. Returns: str: json str. """ if not = self._create_json_str() return @property def key(self): """Return a Key identifying the object. Returns: Key: the key """ project_id = if self.parent else '' zone = self.locations[0] if self.locations else '' return Key.from_args(project_id, zone,
[docs] def create_network_interfaces(self): """Return a list of network_interface objects. Returns: List: list of InstanceNetworkInterface objects """ instance_network_interfaces = [] for nic in self.network_interfaces: nic['full_name'] = self.full_name instance_network_interface = InstanceNetworkInterface(**nic) instance_network_interfaces.append(instance_network_interface) return instance_network_interfaces
KEY_OBJECT_KIND = 'Instance'
[docs]class Key(key.Key): """An identifier for a specific instance."""
[docs] @staticmethod def from_args(project_id, zone, name): """Construct a Key from specific values. Args: project_id (str): project_id zone (str): zone name (str): name Returns: Key: the key """ if zone: zone = os.path.basename(zone) return Key(KEY_OBJECT_KIND, { 'project_id': project_id, 'zone': zone, 'name': name})
[docs] @staticmethod def from_url(url): """Construct a Key from a URL. Args: url (str): Object reference URL Returns: Key: the key Raises: ValueError: Required parameters are missing. """ obj = Key._from_url(KEY_OBJECT_KIND, {'projects': 'project_id', 'zones': 'zone', 'instances': 'name'}, url) if not obj.project_id or not or not raise ValueError('Missing fields in URL %r' % url) return obj
@property def project_id(self): """Object property: project_id Returns: str: project_id """ return self._path_component('project_id') @property def zone(self): """Object property: zone Returns: str: zone """ return self._path_component('zone') @property def name(self): """Object property: name Returns: str: name """ return self._path_component('name')
[docs]class InstanceNetworkInterface(object): """InstanceNetworkInterface Resource.""" def __init__(self, **kwargs): """Initialize Args: kwargs: json from a single instance on the network_interfaces """ self.full_name = kwargs.get('full_name') self.kind = kwargs.get('kind') = kwargs.get('network') self.subnetwork = kwargs.get('subnetwork') self.network_ip = kwargs.get('networkIP') = kwargs.get('name') self.access_configs = kwargs.get('accessConfigs') self.alias_ip_ranges = kwargs.get('aliasIpRanges') self._json = json.dumps(kwargs, sort_keys=True, indent=2)
[docs] def __repr__(self): """Repr Returns: string: a string for a InstanceNetworkInterface """ return ('kind: %s Network: %s subnetwork: %s network_ip %s name %s' 'access_configs %s alias_ip_ranges %s' % ( self.kind,, self.subnetwork, self.network_ip,, self.access_configs, self.alias_ip_ranges))
[docs] def __hash__(self): """hash Returns: hash: of InstanceNetworkInterface """ return hash(self.__repr__())
[docs] def __ne__(self, other): """Ne Args: other (InstanceNetworkInterface): other InstanceNetworkInterface Return: bool: True if not equal """ return not self.__eq__(other)
[docs] def __eq__(self, other): """Eq Args: other (InstanceNetworkInterface) : other InstanceNetworkInterface Return: bool: True if is equal """ if isinstance(self, InstanceNetworkInterface): return ((self.kind == other.kind) and ( == and (self.subnetwork == other.subnetwork) and (self.network_ip == other.network_ip) and ( == and (self.access_configs == other.access_configs) and (self.alias_ip_ranges == other.alias_ip_ranges)) return False
# TODO: Add this as a base method for all gcp_type objects.
[docs] def as_json(self): """Returns the attributes as json formatted string. Returns: string: json formatted attribute of the instance network interface """ return self._json