# Copyright 2017 The Forseti Security Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions for reading and parsing files in a variety of formats."""
import json
import os
import tempfile
import yaml
from google.cloud.forseti.common.gcp_api import storage
from google.cloud.forseti.common.util import errors as util_errors
from google.cloud.forseti.common.util import logger
LOGGER = logger.get_logger(__name__)
[docs]def read_and_parse_file(file_path):
"""Parse a json or yaml formatted file from a local path or GCS.
Args:
file_path (str): The full path to the file to read and parse.
Returns:
dict: The results of parsing the file.
"""
file_path = file_path.strip()
if file_path.startswith('gs://'):
return _read_file_from_gcs(file_path)
return _read_file_from_local(file_path)
[docs]def copy_file_from_gcs(file_path, output_path=None, storage_client=None):
"""Copy file from GCS to local file.
Args:
file_path (str): The full GCS path to the file.
output_path (str): The local file to copy to, if not set creates a
temporary file.
storage_client (storage.StorageClient): The Storage API Client to use
for downloading the file using the API.
Returns:
str: The output_path the file was copied to.
"""
if not storage_client:
storage_client = storage.StorageClient()
if not output_path:
tmp_file, output_path = tempfile.mkstemp()
# Ensure the handle returned by mkstemp is not leaked.
os.close(tmp_file)
with open(output_path, mode='wb') as f:
storage_client.download(full_bucket_path=file_path, output_file=f)
return output_path
[docs]def _get_filetype_parser(file_path, parser_type):
"""Return a parser function for parsing the file.
Args:
file_path (str): The file path.
parser_type (str): The file parser type.
Returns:
function: The parser function.
"""
filetype_handlers = {
'json': {
'string': _parse_json_string,
'file': _parse_json_file
},
'yaml': {
'string': _parse_yaml,
'file': _parse_yaml
}
}
file_ext = file_path.split('.')[-1]
if file_ext not in filetype_handlers:
raise util_errors.InvalidFileExtensionError(
'Unsupported file type: {}'.format(file_ext))
if parser_type not in filetype_handlers[file_ext]:
raise util_errors.InvalidParserTypeError(
'Unsupported parser type: {}'.format(parser_type))
return filetype_handlers[file_ext][parser_type]
[docs]def _read_file_from_gcs(file_path, storage_client=None):
"""Load file from GCS.
Args:
file_path (str): The GCS path to the file.
storage_client (storage.StorageClient): The Storage API Client to use
for downloading the file using the API.
Returns:
dict: The parsed dict from the loaded file.
"""
if not storage_client:
storage_client = storage.StorageClient()
file_content = storage_client.get_text_file(full_bucket_path=file_path)
parser = _get_filetype_parser(file_path, 'string')
return parser(file_content)
[docs]def _read_file_from_local(file_path):
"""Load rules file from local path.
Args:
file_path (str): The path to the file.
Returns:
dict: The parsed dict from the loaded file.
"""
with open(os.path.abspath(file_path), 'r') as rules_file:
parser = _get_filetype_parser(file_path, 'file')
return parser(rules_file)
[docs]def _parse_json_string(data):
"""Parse the data from a string of json.
Args:
data (str): String data to parse into json.
Returns:
dict: The json string successfully parsed into a dict.
Raises:
ValueError: If there was an error parsing the data.
"""
try:
return json.loads(data)
except ValueError as json_error:
raise json_error
[docs]def _parse_json_file(data):
"""Parse the data from a json file.
Args:
data (filepointer): File-like object containing a Json document,
to be parsed into json.
Returns:
dict: The file successfully parsed into a dict.
Raises:
ValueError: If there was an error parsing the file.
"""
try:
return json.load(data)
except ValueError as json_error:
raise json_error
[docs]def _parse_yaml(data):
"""Parse yaml data.
Args:
data (stream): A yaml data stream to parse.
Returns:
dict: The stream successfully parsed into a dict.
Raises:
YAMLError: If there was an error parsing the stream.
"""
try:
return yaml.safe_load(data)
except yaml.YAMLError as yaml_error:
LOGGER.exception(yaml_error)
raise yaml_error