#!/usr/bin/env python3
"""
This file provides utilities for processing different configuration file formats.
Supported formats include:
a) YAML
b) JSON
c) SHELL
d) INI
e) XML
Typical usage involves first loading the config file, then using the dictionary
returnded by load_config to make queries.
"""
import argparse
import configparser
import datetime
import json
import os
import pathlib
import re
from textwrap import dedent
import xml.etree.ElementTree as ET
from xml.dom import minidom
import jinja2
#
# Note: yaml may not be available in which case we suppress
# the exception, so that we can have other functionality
# provided by this module.
#
try:
import yaml
except ModuleNotFoundError:
pass
from .environment import list_to_str, str_to_list, str_to_type
from .run_command import run_command
##########
# YAML
##########
[docs]def load_yaml_config(config_file):
"""Safe load a yaml file"""
with open(config_file, "r") as f:
cfg = yaml.safe_load(f)
return cfg
try:
[docs] class custom_dumper(yaml.Dumper):
"""Custom yaml dumper to correct list indentation"""
[docs] def increase_indent(self, flow=False, indentless=False):
return super(custom_dumper, self).increase_indent(flow, False)
[docs] def str_presenter(dumper, data):
if len(data.splitlines()) > 1:
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
return dumper.represent_scalar("tag:yaml.org,2002:str", data)
yaml.add_representer(str, str_presenter)
except NameError:
pass
[docs]def cfg_to_yaml_str(cfg):
"""Get contents of config file as a yaml string"""
return yaml.dump(
cfg, sort_keys=False, default_flow_style=False
)
[docs]def cycstr(loader, node):
''' Returns a cyclestring Element whose content corresponds to the
input node argument '''
arg = loader.construct_scalar(node)
return f'<cyclestr>{arg}</cyclestr>'
[docs]def include(filepaths):
''' Returns a dictionary that includes the contents of the referenced
YAML file(s). '''
srw_path = pathlib.Path(__file__).resolve().parents[0].parents[0]
cfg = {}
for filepath in filepaths:
abs_path = filepath
if not os.path.isabs(filepath):
abs_path = os.path.join(os.path.dirname(srw_path), filepath)
with open(abs_path, 'r') as fp:
contents = yaml.load(fp, Loader=yaml.SafeLoader)
for key, value in contents.items():
cfg[key] = value
return yaml.dump(cfg, sort_keys=False)
[docs]def join_str(loader, node):
"""Custom tag hangler to join strings"""
seq = loader.construct_sequence(node)
return "".join([str(i) for i in seq])
[docs]def startstopfreq(loader, node):
''' Returns a Rocoto-formatted string for the contents of a cycledef
tag. Assume that the items in the node are env variables, and return
a Rocoto-formatted string'''
args = loader.construct_sequence(node)
# Try to fill the values from environment values, default to the
# value provided in the entry.
start, stop, freq = (os.environ.get(arg, arg) for arg in args)
return f'{start}00 {stop}00 {freq}:00:00'
[docs]def nowtimestamp(loader, node):
return "id_" + str(int(datetime.datetime.now().timestamp()))
try:
yaml.add_constructor("!cycstr", cycstr, Loader=yaml.SafeLoader)
yaml.add_constructor("!include", include, Loader=yaml.SafeLoader)
yaml.add_constructor("!join_str", join_str, Loader=yaml.SafeLoader)
yaml.add_constructor("!startstopfreq", startstopfreq, Loader=yaml.SafeLoader)
yaml.add_constructor("!nowtimestamp", nowtimestamp ,Loader=yaml.SafeLoader)
except NameError:
pass
[docs]def path_join(arg):
"""A filter for jinja2 that joins paths"""
return os.path.join(*arg)
[docs]def days_ago(arg):
"""A filter for jinja2 that gives us a date string for x number of
days ago"""
return (datetime.date.today() -
datetime.timedelta(days=arg)).strftime("%Y%m%d00")
[docs]def extend_yaml(yaml_dict, full_dict=None, parent=None):
"""
Updates yaml_dict inplace by rendering any existing Jinja2 templates
that exist in a value.
"""
if full_dict is None:
full_dict = yaml_dict
if parent is None:
full_dict = yaml_dict
if not isinstance(yaml_dict, dict):
return
for k, val in yaml_dict.items():
if isinstance(val, dict):
extend_yaml(val, full_dict, yaml_dict)
else:
if not isinstance(val, list):
val = [val]
for v_idx, v in enumerate(val):
# Save a bit of compute and only do this part for strings that
# contain the jinja double brackets.
v_str = str(v.text) if isinstance(v, ET.Element) else str(v)
if isinstance(v, ET.Element):
print('ELEMENT VSTR', v_str, v.text, yaml_dict)
is_a_template = any((ele for ele in ["{{", "{%"] if ele in v_str))
if is_a_template:
# Find expressions first, and process them as a single template
# if they exist
# Find individual double curly brace template in the string
# otherwise. We need one substitution template at a time so that
# we can opt to leave some un-filled when they are not yet set.
# For example, we can save cycle-dependent templates to fill in
# at run time.
if "{%" in v_str:
templates = [v_str]
else:
# Separates out all the double curly bracket pairs
templates = [m.group() for m in
re.finditer(r"{{[^}]*}}|\S", v_str) if '{{'
in m.group()]
data = []
for template in templates:
j2env = jinja2.Environment(
loader=jinja2.BaseLoader, undefined=jinja2.StrictUndefined
)
j2env.filters["path_join"] = path_join
j2env.filters["days_ago"] = days_ago
j2env.filters["include"] = include
try:
j2tmpl = j2env.from_string(template)
except:
print(f"ERROR filling template: {template}, {v_str}")
raise
try:
# Fill in a template that has the appropriate variables
# set.
template = j2tmpl.render(parent=parent, **yaml_dict, **full_dict)
except jinja2.exceptions.UndefinedError as e:
# Leave a templated field as-is in the resulting dict
pass
except ValueError:
pass
except TypeError:
pass
except ZeroDivisionError:
pass
except:
print(f"{k}: {template}")
raise
data.append(template)
convert_type = True
for tmpl, rendered in zip(templates, data):
v_str = v_str.replace(tmpl, rendered)
if "string" in tmpl:
convert_type = False
if convert_type:
v_str = str_to_type(v_str, return_string=2)
if isinstance(v, ET.Element):
print('Replacing ET text with', v_str)
v.text = v_str
elif isinstance(yaml_dict[k], list):
yaml_dict[k][v_idx] = v_str
else:
# Put the full template line back together as it was,
# filled or not
yaml_dict[k] = v_str
##########
# JSON
##########
[docs]def load_json_config(config_file):
"""Load json config file"""
try:
with open(config_file, "r") as f:
cfg = json.load(f)
except json.JSONDecodeError as e:
raise Exception(f"Unable to load json file {config_file}")
return cfg
[docs]def cfg_to_json_str(cfg):
"""Get contents of config file as a json string"""
return json.dumps(cfg, sort_keys=False, indent=4) + "\n"
##########
# SHELL
##########
[docs]def load_shell_as_ini_config(file_name, return_string=1):
"""Load shell config file with embedded structure in comments"""
# read contents and replace comments as sections
with open(file_name, "r") as file:
cfg = file.read()
cfg = cfg.replace("# [", "[")
cfg = cfg.replace("\\\n", " ")
# write content to temp file and load it as ini
temp_file = os.path.join(os.getcwd(), "_temp." + str(os.getpid()) + ".ini")
with open(temp_file, "w") as file:
file.write(cfg)
# load it as a structured ini file
try:
cfg = load_ini_config(temp_file, return_string)
finally:
os.remove(temp_file)
return cfg
[docs]def load_shell_config(config_file, return_string=0):
"""Loads old style shell config files.
We source the config script in a subshell and gets the variables it sets
Args:
config_file: path to config file script
Returns:
dictionary that should be equivalent to one obtained from parsing a yaml file.
"""
# First try to load it as a structured shell config file
try:
cfg = load_shell_as_ini_config(config_file, return_string)
return cfg
except:
pass
# Save env vars before and after sourcing the scipt and then
# do a diff to get variables specifically defined/updated in the script
# Method sounds brittle but seems to work ok so far
pid = os.getpid()
code = dedent(
f""" #!/bin/bash
t1="./t1.{pid}"
t2="./t2.{pid}"
(set -o posix; set) > $t1
{{ . {config_file}; set +x; }} &>/dev/null
(set -o posix; set) > $t2
diff $t1 $t2 | grep "> " | cut -c 3-
rm -rf $t1 $t2
"""
)
(_, config_str, _) = run_command(code)
lines = config_str.splitlines()
# build the dictionary
cfg = {}
for l in lines:
idx = l.find("=")
k = l[:idx]
v = str_to_list(l[idx + 1 :], return_string)
cfg[k] = v
return cfg
[docs]def cfg_to_shell_str(cfg, kname=None):
"""Get contents of config file as shell script string"""
shell_str = ""
for k, v in cfg.items():
if isinstance(v, dict):
if kname:
n_kname = f"{kname}.{k}"
else:
n_kname = f"{k}"
shell_str += f"# [{n_kname}]\n"
shell_str += cfg_to_shell_str(v, n_kname)
shell_str += "\n"
continue
# others
v1 = list_to_str(v)
if isinstance(v, list):
shell_str += f"{k}={v1}\n"
else:
# replace some problematic chars
v1 = v1.replace("'", '"')
v1 = v1.replace("\n", " ")
# end problematic
shell_str += f"{k}='{v1}'\n"
return shell_str
##########
# INI
##########
[docs]def load_ini_config(config_file, return_string=0):
"""Load a config file with a format similar to Microsoft's INI files"""
if not os.path.exists(config_file):
raise FileNotFoundError(
dedent(
f"""
The specified configuration file does not exist:
'{config_file}'"""
)
)
config = configparser.RawConfigParser()
config.optionxform = str
config.read(config_file)
config_dict = {s: dict(config.items(s)) for s in config.sections()}
for _, vs in config_dict.items():
for k, v in vs.items():
vs[k] = str_to_list(v, return_string)
return config_dict
[docs]def get_ini_value(config, section, key):
"""Finds the value of a property in a given section"""
if not section in config:
raise KeyError(f"Section not found: {section}")
else:
return config[section][key]
return None
[docs]def cfg_to_ini_str(cfg, kname=None):
"""Get contents of config file as ini string"""
ini_str = ""
for k, v in cfg.items():
if isinstance(v, dict):
if kname:
n_kname = f"{kname}.{k}"
else:
n_kname = f"{k}"
ini_str += f"[{n_kname}]\n"
ini_str += cfg_to_ini_str(v, n_kname)
ini_str += "\n"
continue
v1 = list_to_str(v, True)
if isinstance(v, list):
ini_str += f"{k}={v1}\n"
else:
ini_str += f"{k}='{v1}'\n"
return ini_str
##########
# XML
##########
[docs]def xml_to_dict(root, return_string):
"""Convert an xml tree to dictionary"""
cfg = {}
for child in root:
if len(list(child)) > 0:
r = xml_to_dict(child, return_string)
cfg[child.tag] = r
else:
cfg[child.tag] = str_to_list(child.text, return_string)
return cfg
[docs]def dict_to_xml(d, tag):
"""Convert dictionary to an xml tree"""
elem = ET.Element(tag)
for k, v in d.items():
if isinstance(v, dict):
r = dict_to_xml(v, k)
elem.append(r)
else:
child = ET.Element(k)
child.text = list_to_str(v, True)
elem.append(child)
return elem
[docs]def load_xml_config(config_file, return_string=0):
"""Load xml config file"""
tree = ET.parse(config_file)
root = tree.getroot()
cfg = xml_to_dict(root, return_string)
return cfg
[docs]def cfg_to_xml_str(cfg):
"""Get contents of config file as a xml string"""
root = dict_to_xml(cfg, "root")
r = ET.tostring(root, encoding="unicode")
r = minidom.parseString(r)
r = r.toprettyxml(indent=" ")
r = r.replace(""", '"')
return r
##################
# CONFIG utils
##################
[docs]def flatten_dict(dictionary, keys=None):
"""Flatten a recursive dictionary (e.g.yaml/json) to be one level deep
Args:
dictionary: the source dictionary
keys: list of keys on top level whose contents to flatten, if None all of them
Returns:
A one-level deep dictionary for the selected set of keys
"""
flat_dict = {}
for k, v in dictionary.items():
if not keys or k in keys:
if isinstance(v, dict):
r = flatten_dict(v)
flat_dict.update(r)
else:
flat_dict[k] = v
return flat_dict
[docs]def structure_dict(dict_o, dict_t):
"""Structure a dictionary based on a template dictionary
Args:
dict_o: dictionary to structure (flat one level structure)
dict_t: template dictionary used for structuring
Returns:
A dictionary with contents of dict_o following structure of dict_t
"""
struct_dict = {}
for k, v in dict_t.items():
if isinstance(v, dict):
r = structure_dict(dict_o, v)
if r:
struct_dict[k] = r
elif k in dict_o.keys():
struct_dict[k] = dict_o[k]
return struct_dict
[docs]def update_dict(dict_o, dict_t, provide_default=False):
"""Update a dictionary with another
Args:
dict_o: flat dictionary used as source
dict_t: target dictionary to update
Returns:
None
"""
for k, v in dict_o.copy().items():
if isinstance(v, dict):
if isinstance(dict_t.get(k), dict):
update_dict(v, dict_t[k], provide_default)
else:
dict_t[k] = v
elif v is None and k in dict_t.keys():
# remove the key if the source dict has null entry
del dict_t[k]
elif k in dict_t.keys():
if (
(not provide_default)
or (dict_t[k] is None)
or (len(dict_t[k]) == 0)
or ("{{" in dict_t[k])
):
dict_t[k] = v
elif k not in dict_t.keys():
dict_t[k] = v
[docs]def check_structure_dict(dict_o, dict_t):
"""Check if a dictionary's structure follows a template.
The invalid entries are returned as a dictionary.
If all entries are valid, returns an empty dictionary
Args:
dict_o: target dictionary
dict_t: template dictionary to compare structure to
Returns:
dict: Invalid key-value pairs.
"""
inval = {}
for k, v in dict_o.items():
if k in dict_t.keys():
v1 = dict_t[k]
if isinstance(v, dict) and isinstance(v1, dict):
r = check_structure_dict(v, v1)
if r:
inval.update(r)
else:
inval[k] = v
return inval
[docs]def filter_dict(dict_o, keys_regex):
"""Filter dictionary keys based on a list of keys
Args:
dict_o: the source dictionary
keys_regex: list of keys to retain (could be regex exp.)
"""
keys = []
for k in keys_regex:
r = re.compile(k)
keys += list(filter(r.match, dict_o.keys()))
dict_t = {k: dict_o[k] for k in keys}
return dict_t
##################
# CONFIG loader
##################
[docs]def load_config_file(file_name, return_string=0):
"""Load config file based on file name extension"""
ext = os.path.splitext(file_name)[1][1:]
if ext == "sh":
return load_shell_config(file_name, return_string)
if ext == "ini":
return load_ini_config(file_name, return_string)
if ext == "json":
return load_json_config(file_name)
if ext in ["yaml", "yml"]:
return load_yaml_config(file_name)
if ext == "xml":
return load_xml_config(file_name, return_string)
return None
##################
# CONFIG main
##################
[docs]def cfg_main():
"""Main function for converting and formatting between different config file formats"""
parser = argparse.ArgumentParser(
description="Utility for managing different config formats."
)
parser.add_argument(
"--cfg", "-c", dest="cfg", required=True, help="Config file to parse"
)
parser.add_argument(
"--output-type",
"-o",
dest="out_type",
required=False,
help='Output format: can be any of ["shell", "yaml", "ini", "json", "xml"]',
)
parser.add_argument(
"--flatten",
"-f",
dest="flatten",
action="store_true",
required=False,
help="Flatten resulting dictionary",
)
parser.add_argument(
"--template-cfg",
"-t",
dest="template",
required=False,
help="Template config file used to structure a given config file",
)
parser.add_argument(
"--keys",
"-k",
dest="keys",
nargs="+",
required=False,
help="Include only these keys of dictionary for processing.\
Keys can be python regex expression.",
)
parser.add_argument(
"--validate-cfg",
"-v",
dest="validate",
required=False,
help="Validation config file used to validate a given config file",
)
args = parser.parse_args()
cfg = load_config_file(args.cfg, 2)
if args.validate:
cfg_t = load_config_file(args.validate, 1)
r = check_structure_dict(cfg, cfg_t)
if r:
for k in r:
print(f"INVALID ENTRY: {k}={r[k]}")
print("FAILURE")
else:
print("SUCCESS")
else:
if args.template:
cfg = flatten_dict(cfg)
cfg_t = load_config_file(args.template, 1)
cfg = structure_dict(cfg, cfg_t)
if args.keys:
cfg = filter_dict(cfg, args.keys)
if args.flatten:
cfg = flatten_dict(cfg)
# convert to string and print
if args.out_type in ["shell", "sh"]:
print(cfg_to_shell_str(cfg), end="")
elif args.out_type == "ini":
print(cfg_to_ini_str(cfg), end="")
elif args.out_type == "json":
print(cfg_to_json_str(cfg), end="")
elif args.out_type in ["yaml", "yml"]:
print(cfg_to_yaml_str(cfg), end="")
elif args.out_type == "xml":
print(cfg_to_xml_str(cfg), end="")
else:
parser.print_help()
parser.exit()