238 lines
10 KiB
Python
Raw Normal View History

# Path and env manipulation
import os
# Use a config file
import configparser
# Exit with various exit codes
import sys
# Manipulate style and content of logs
import logging
from rich.logging import RichHandler
# Correctly generate plurals, singular nouns etc.
import inflect
# Exit codes
# 1: Config file invalid, it has no sections
# 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY
# 7 : An option that must have a non-null value is either unset or null
class CONST(object):
__slots__ = ()
LOG_FORMAT = "%(message)s"
# How to find a config file
CFG_THIS_FILE_DIRNAME = os.path.dirname(__file__)
CFG_DEFAULT_FILENAME = "config.ini"
CFG_DEFAULT_ABS_PATH = os.path.join(CFG_THIS_FILE_DIRNAME, CFG_DEFAULT_FILENAME)
# Values you don't have to set, these are their internal defaults. You may optionally add a key 'is_global' equal
# to either True or False. By default if left off it'll be assumed False. Script will treat values where
# 'is_global' equals True as not being overridable in a '[section]'. It's a setting that only makes sense in a
# global context for the entire script. An option where 'empty_ok' equals True can safely be unset or set to
# an empty string. An example config.ini file may give a sane config example value here, removing that value
# still results in a valid file.
CFG_KNOWN_DEFAULTS = [
2022-07-05 19:53:40 +02:00
{"key": "self_name", "value": "update-firewall-source", "empty_ok": False},
{"key": "tmp_base_dir", "value": os.path.join(CFG_THIS_FILE_DIRNAME, "data/tmp/%(self_name)s"),
"empty_ok": False},
{"key": "state_base_dir", "value": os.path.join(CFG_THIS_FILE_DIRNAME, "data/var/lib/%(self_name)s"),
"empty_ok": False},
{"key": "state_files_dir", "value": "%(state_base_dir)s/state", "is_global": False, "empty_ok": False},
{"key": "state_file_retention", "value": "50", "is_global": False, "empty_ok": True},
{"key": "state_file_name_prefix", "value": "state-", "is_global": False, "empty_ok": True},
{"key": "state_file_name_suffix", "value": ".log", "is_global": False, "empty_ok": True},
2022-07-05 19:53:40 +02:00
{"key": "update_firewall_source_some_option", "value": "http://localhost:8000/api/query", "is_global": True,
"empty_ok": False},
{"key": "another_option", "value": "first", "is_global": True, "empty_ok": True}
]
# In all sections other than 'default' the following settings are known and accepted. We ignore other settings.
# Per CFG_KNOWN_DEFAULTS above most '[DEFAULT]' options are accepted by virtue of being defaults and overridable.
# The only exception are options where "is_global" equals True, they can't be overridden in '[sections]'; any
# attempt at doing it anyway will be ignored. The main purpose of this list is to name settings that do not have
# a default value but can - if set - influence how a '[section]' behaves. Repeating a '[DEFAULT]' here does not
# make sense. We use 'is_mandatory' to determine if we have to raise errors on missing settings. Here
# 'is_mandatory' means the setting must be given in a '[section]'. It may be empty.
CFG_KNOWN_SECTION = [
# {"key": "an_option", "is_mandatory": True},
# {"key": "another_one", "is_mandatory": False}
]
CFG_MANDATORY = [section_cfg["key"] for section_cfg in CFG_KNOWN_SECTION if section_cfg["is_mandatory"]]
is_systemd = any([systemd_env_var in os.environ for systemd_env_var in ["SYSTEMD_EXEC_PID", "INVOCATION_ID"]])
logging.basicConfig(
# Default for all modules is NOTSET so log everything
level="NOTSET",
format=CONST.LOG_FORMAT,
datefmt="[%X]",
handlers=[RichHandler(
show_time=False if is_systemd else True,
show_path=False if is_systemd else True,
show_level=False if is_systemd else True,
rich_tracebacks=True
)]
)
log = logging.getLogger("rich")
# Our own code logs with this level
log.setLevel(os.environ.get("LOGLEVEL") if "LOGLEVEL" in [k for k, v in os.environ.items()] else logging.INFO)
p = inflect.engine()
# Use this version of class ConfigParser to log.debug contents of our config file. When parsing sections other than
# 'default' we don't want to reprint defaults over and over again. This custom class achieves that.
class ConfigParser(
configparser.ConfigParser):
"""Can get options() without defaults
Taken from https://stackoverflow.com/a/12600066.
"""
def options(self, section, no_defaults=False, **kwargs):
if no_defaults:
try:
return list(self._sections[section].keys())
except KeyError:
raise configparser.NoSectionError(section)
else:
return super().options(section)
ini_defaults = []
internal_defaults = {default["key"]: default["value"] for default in CONST.CFG_KNOWN_DEFAULTS}
internal_globals = [default["key"] for default in CONST.CFG_KNOWN_DEFAULTS if default["is_global"]]
internal_empty_ok = [default["key"] for default in CONST.CFG_KNOWN_DEFAULTS if default["empty_ok"]]
config = ConfigParser(defaults=internal_defaults,
converters={'list': lambda x: [i.strip() for i in x.split(',') if len(x) > 0]})
config.read(CONST.CFG_DEFAULT_ABS_PATH)
def print_section_header(
header: str) -> str:
return f"Loading config section '[{header}]' ..."
def validate_default_section(
config_obj: configparser.ConfigParser()) -> None:
log.debug(f"Loading config from file '{CONST.CFG_DEFAULT_ABS_PATH}' ...")
if not config_obj.sections():
log.debug(f"No config sections found in '{CONST.CFG_DEFAULT_ABS_PATH}'. Exiting 1 ...")
sys.exit(1)
if config.defaults():
log.debug(f"Symbol legend:\n"
f"* Default from section '[{config_obj.default_section}]'\n"
f": Global option from '[{config_obj.default_section}]', can not be overridden in local sections\n"
f"~ Local option, doesn't exist in '[{config_obj.default_section}]'\n"
f"+ Local override of a value from '[{config_obj.default_section}]'\n"
f"= Local override, same value as in '[{config_obj.default_section}]'\n"
f"# Local attempt at overriding a global, will be ignored")
log.debug(print_section_header(config_obj.default_section))
for default in config_obj.defaults():
ini_defaults.append({default: config_obj[config_obj.default_section][default]})
if default in internal_globals:
log.debug(f": {default} = {config_obj[config_obj.default_section][default]}")
else:
log.debug(f"* {default} = {config_obj[config_obj.default_section][default]}")
else:
log.debug(f"No defaults defined")
def config_has_valid_section(
config_obj: configparser.ConfigParser()) -> bool:
has_valid_section = False
for config_obj_section in config_obj.sections():
if set(CONST.CFG_MANDATORY).issubset(config_obj.options(config_obj_section)):
has_valid_section = True
break
return has_valid_section
def is_default(
config_key: str) -> bool:
return any(config_key in ini_default for ini_default in ini_defaults)
def is_global(
config_key: str) -> bool:
return config_key in internal_globals
def is_same_as_default(
config_kv_pair: dict) -> bool:
return config_kv_pair in ini_defaults
def we_have_unset_options(
config_obj: configparser.ConfigParser(),
section_name: str) -> list:
options_must_be_non_empty = []
for option in config_obj.options(section_name):
if not config_obj.get(section_name, option):
if option not in internal_empty_ok:
log.warning(f"In section '[{section_name}]' option '{option}' is empty, it mustn't be.")
options_must_be_non_empty.append(option)
return options_must_be_non_empty
def validate_config_sections(
config_obj: configparser.ConfigParser()) -> None:
for this_section in config_obj.sections():
log.debug(print_section_header(this_section))
unset_options = we_have_unset_options(config_obj, this_section)
if unset_options:
log.error(f"""{p.plural("Option", len(unset_options))} {unset_options} """
f"""{p.plural("is", len(unset_options))} unset. """
f"""{p.singular_noun("They", len(unset_options))} """
f"must have a non-null value. "
f"""{p.plural("Default", len(unset_options))} {p.plural("is", len(unset_options))}:""")
for unset_option in unset_options:
log.error(f"{unset_option} = {internal_defaults[unset_option]}")
log.error(f"Exiting 7 ...")
sys.exit(7)
if not set(CONST.CFG_MANDATORY).issubset(config_obj.options(this_section, no_defaults=True)):
log.warning(f"Config section '[{this_section}]' does not have all mandatory options "
f"{CONST.CFG_MANDATORY} set, skipping section ...")
config_obj.remove_section(this_section)
else:
for key in config_obj.options(this_section, no_defaults=True):
kv_prefix = "~"
remove_from_section = False
if is_global(key):
kv_prefix = "#"
remove_from_section = True
elif is_default(key):
kv_prefix = "+"
if is_same_as_default({key: config_obj[this_section][key]}):
kv_prefix = "="
log.debug(f"{kv_prefix} {key} = {config_obj[this_section][key]}")
if remove_from_section:
config_obj.remove_option(this_section, key)
def an_important_function(
section_name: str,
config_obj: configparser.ConfigParser(),
whatever: str) -> list:
min_duration = config_obj.getint(section_name, "min_duration")
max_duration = config_obj.getint(section_name, "max_duration")
return ["I", "am", "a", "list"]
if __name__ == "__main__":
validate_default_section(config)
if config_has_valid_section(config):
validate_config_sections(config)
else:
log.error(f"No valid config section found. A valid config section has at least the mandatory options "
f"{CONST.CFG_MANDATORY} set. Exiting 2 ...")
sys.exit(2)
log.debug(f"Iterating over config sections ...")
for section in config.sections():
log.info(f"Processing section '[{section}]' ...")
# ...