# Exit with various exit codes import sys # Path manipulation import os # Manipulate style and content of logs import logging from rich.logging import RichHandler # Use a config file import configparser # Verify IP address family import ipaddress # Resolve host names import dns.resolver # Validate if string is fqdn import validators # Build XML structure import lxml.etree import lxml.builder # Exit codes # 1: Config file invalid, it has no sections # 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY class CONST(object): __slots__ = () LOG_FORMAT = "%(message)s" # How to find a config file CFG_THIS_FILE_DIRNAME = os.path.dirname(__file__) CFG_DEFAULT_FILENAME = "config.ini" CFG_DEFAULT_ABS_PATH = os.path.join(CFG_THIS_FILE_DIRNAME, CFG_DEFAULT_FILENAME) # Values you don't have to set, these are their internal defaults. You may optionally add a key 'is_global' equal # to either True or False. By default if left off it'll be assumed False. Script will treat values where # 'is_global' equals True as not being overridable in a '[section]'. It's a setting that only makes sense in a # global context for the entire script. An option where 'empty_ok' equals True can safely be unset or set to # an empty string. An example config.ini file may give a sane config example value here, removing that value # still results in a valid file. CFG_KNOWN_DEFAULTS = [ {"key": "target", "value": "ACCEPT", "is_global": False}, {"key": "addr", "value": "", "is_global": False}, {"key": "ports", "value": "80, 443", "is_global": False}, {"key": "proto", "value": "tcp", "is_global": False}, {"key": "do_config_check", "value": "true", "is_global": True}, {"key": "restart_firewalld_after_change", "value": "true", "is_global": True} ] # In all sections other than 'default' the following settings are known and accepted. We ignore other settings. # Per CFG_KNOWN_DEFAULTS above most '[DEFAULT]' options are accepted by virtue of being defaults and overridable. # The only exception are options where "is_global" equals True, they can't be overridden in '[sections]'; any # attempt at doing it anyway will be ignored. The main purpose of this list is to name settings that do not have # a default value but can - if set - influence how a '[section]' behaves. Repeating a '[DEFAULT]' here does not # make sense. We use 'is_mandatory' to determine if we have to raise errors on missing settings. Here # 'is_mandatory' means the setting must be given in a '[section]'. It may be empty. CFG_KNOWN_SECTION = [ {"key": "target", "is_mandatory": False}, {"key": "addr", "is_mandatory": False}, {"key": "ports", "is_mandatory": False}, {"key": "proto", "is_mandatory": False} ] CFG_MANDATORY = [section_cfg["key"] for section_cfg in CFG_KNOWN_SECTION if section_cfg["is_mandatory"]] logging.basicConfig( # Default for all modules is NOTSET so log everything level="NOTSET", format=CONST.LOG_FORMAT, datefmt="[%X]", handlers=[RichHandler( rich_tracebacks=True )] ) log = logging.getLogger("rich") # Our own code logs with this level log.setLevel(logging.DEBUG) # Use this version of class ConfigParser to log.debug contents of our config file. When parsing sections other than # 'default' we don't want to reprint defaults over and over again. This custom class achieves that. class ConfigParser( configparser.ConfigParser): """Can get options() without defaults Taken from https://stackoverflow.com/a/12600066. """ def options(self, section, no_defaults=False, **kwargs): if no_defaults: try: return list(self._sections[section].keys()) except KeyError: raise configparser.NoSectionError(section) else: return super().options(section) # arg_allow_list = ["77.13.129.237", "2a0b:7080:20::1:f485", "home.seneve.de", "208.87.98.188", "outlook.com", # "uberspace.de"] ini_defaults = [] internal_defaults = {default["key"]: default["value"] for default in CONST.CFG_KNOWN_DEFAULTS} internal_globals = [default["key"] for default in CONST.CFG_KNOWN_DEFAULTS if default["is_global"]] config = ConfigParser(defaults=internal_defaults, converters={'list': lambda x: [i.strip() for i in x.split(',')]}) config.read(CONST.CFG_DEFAULT_ABS_PATH) def print_section_header( header: str) -> str: return f"Loading config section '[{header}]' ..." def validate_default_section( config_obj: configparser.ConfigParser()) -> None: log.debug(f"Loading config from file '{CONST.CFG_DEFAULT_ABS_PATH}' ...") if not config_obj.sections(): log.debug(f"No config sections found in '{CONST.CFG_DEFAULT_ABS_PATH}'. Exiting 1 ...") sys.exit(1) if config.defaults(): log.debug(f"Symbol legend:\n" f"* Default from section '[{config_obj.default_section}]'\n" f": Global option from '[{config_obj.default_section}]', can not be overridden in local sections\n" f"~ Local option, doesn't exist in '[{config_obj.default_section}]'\n" f"+ Local override of a value from '[{config_obj.default_section}]'\n" f"= Local override, same value as in '[{config_obj.default_section}]'\n" f"# Local attempt at overriding a global, will be ignored") log.debug(print_section_header(config_obj.default_section)) for default in config_obj.defaults(): ini_defaults.append({default: config_obj[config_obj.default_section][default]}) if default in internal_globals: log.debug(f": {default} = {config_obj[config_obj.default_section][default]}") else: log.debug(f"* {default} = {config_obj[config_obj.default_section][default]}") else: log.debug(f"No defaults defined") def config_has_valid_section( config_obj: configparser.ConfigParser()) -> bool: has_valid_section = False for config_obj_section in config_obj.sections(): if set(CONST.CFG_MANDATORY).issubset(config_obj.options(config_obj_section)): has_valid_section = True break return has_valid_section def is_default( config_key: str) -> bool: return any(config_key in ini_default for ini_default in ini_defaults) def is_global( config_key: str) -> bool: return config_key in internal_globals def is_same_as_default( config_kv_pair: dict) -> bool: return config_kv_pair in ini_defaults def validate_config_sections( config_obj: configparser.ConfigParser()) -> None: for this_section in config_obj.sections(): log.debug(print_section_header(this_section)) if not set(CONST.CFG_MANDATORY).issubset(config_obj.options(this_section, no_defaults=True)): log.debug(f"Config section '[{this_section}]' does not have all mandatory options " f"{CONST.CFG_MANDATORY} set, skipping section ...") config_obj.remove_section(this_section) else: for key in config_obj.options(this_section, no_defaults=True): kv_prefix = "~" remove_from_section = False if is_global(key): kv_prefix = "#" remove_from_section = True elif is_default(key): kv_prefix = "+" if is_same_as_default({key: config_obj[this_section][key]}): kv_prefix = "=" log.debug(f"{kv_prefix} {key} = {config_obj[this_section][key]}") if remove_from_section: config_obj.remove_option(this_section, key) def gen_fw_rule_xml(ip_addresses: dict[str, list]) -> lxml.builder.ElementMaker: len_ipv4_addresses = len(ip_addresses["ipv4"]) len_ipv6_addresses = len(ip_addresses["ipv6"]) data = lxml.builder.ElementMaker() direct_tag = data.direct chain_tag = data.chain rule_tag = data.rule fw_rule_data = direct_tag( chain_tag(ipv="ipv4", table="filter", chain="DOCKER-USER"), # rule_tag("-s 208.87.98.188 -j DROP", ipv="ipv4", table="filter", chain="DOCKER-USER", priority="0"), chain_tag(ipv="ipv6", table="filter", chain="DOCKER-USER"), # rule_tag("-s 2a0b:7080:20::1:f485 -j DROP", ipv="ipv6", table="filter", chain="DOCKER-USER", priority="0") *(rule_tag(f"-s {addr} -j DROP", ipv=f"ipv4", table=f"filter", chain="DOCKER-USER", priority=f"{count}") for count, addr in enumerate(ip_addresses["ipv4"])), *(rule_tag(f"-s {addr} -j DROP", ipv=f"ipv6", table=f"filter", chain="DOCKER-USER", priority=f"{count}") for count, addr in enumerate(ip_addresses["ipv6"])), rule_tag(f"-s -j DROP", ipv="ipv4", table="filter", chain="DOCKER-USER", priority=f"{len_ipv4_addresses}"), rule_tag(f"-s -j DROP", ipv="ipv6", table="filter", chain="DOCKER-USER", priority=f"{len_ipv6_addresses}") ) # fw_rule_data_str = lxml.etree.tostring( # fw_rule_data, # pretty_print=True, # xml_declaration=True, # encoding="UTF-8").decode() # log.debug(f"{fw_rule_data_str}") return fw_rule_data def resolve_domain(domain: str) -> list[str]: log.debug(f"Resolving DNS A and AAAA records for '{domain}' ...") try: a_records = dns.resolver.resolve(domain, rdtype=dns.rdatatype.A) except dns.resolver.NoAnswer: log.debug(f"DNS didn't return an A record for '{domain}', ignoring ...") a_records = [] try: aaaa_records = dns.resolver.resolve(domain, rdtype=dns.rdatatype.AAAA) except dns.resolver.NoAnswer: log.debug(f"DNS didn't return a AAAA record for '{domain}', ignoring ...") aaaa_records = [] dns_records = [] [dns_records.append(dns_record.address) for dns_record in a_records if a_records] [dns_records.append(dns_record.address) for dns_record in aaaa_records if aaaa_records] log.debug(f"Found records: {dns_records}") return dns_records def resolve_addresses(allow_list_mixed: list[str]) -> dict[str, list]: allow_sources = {"ipv4": [], "ipv6": []} allow_list_ip_only = [] for allow_source in allow_list_mixed: if validators.domain(allow_source): log.debug(f"'{allow_source}' is a domain.") [allow_list_ip_only.append(addr) for addr in resolve_domain(allow_source)] else: allow_list_ip_only.append(allow_source) for allow_source in allow_list_ip_only: try: ipv4_addr = str(ipaddress.IPv4Address(allow_source)) log.debug(f"Adding IPv4 address '{allow_source}' ...") allow_sources["ipv4"].append(ipv4_addr) except ipaddress.AddressValueError: log.debug(f"Address '{allow_source}' is not a valid IPv4 address. Trying to match against IPv6 ...") try: ipv6_addr = str(ipaddress.IPv6Address(allow_source)) log.debug(f"Adding IPv6 address '{allow_source}' ...") allow_sources["ipv6"].append(ipv6_addr) except ipaddress.AddressValueError: log.warning(f"Address '{allow_source}' is not a valid IPv6 address either. Ignoring ...") return allow_sources if __name__ == '__main__': validate_default_section(config) if config_has_valid_section(config): validate_config_sections(config) else: log.debug(f"No valid config section found. A valid config section has at least the mandatory options " f"{CONST.CFG_MANDATORY} set. Exiting 2 ...") sys.exit(2) log.debug(f"Iterating over config sections ...") for section in config.sections(): log.debug(f"Processing section '[{section}]' ...") log.debug(config.getlist(section, "addr")) # arg_allow_sources = resolve_addresses(arg_allow_list) # gen_fw_rule_xml(arg_allow_sources)