update-firewall-source/update-firewall-source.py

436 lines
19 KiB
Python

# Exit with various exit codes
import sys
# Path manipulation
import os
# Manipulate style and content of logs
import logging
from rich.logging import RichHandler
# Use a config file
import configparser
# Verify IP address family
import ipaddress
# Resolve host names
import dns.resolver
# Validate if string is fqdn
import validators
# Build XML structure
import lxml.etree
import lxml.builder
# Correctly generate plurals, singular nouns etc.
import inflect
# Restart firewalld systemd service unit
import dbus
# Find physical network interface via 'find' command
import subprocess
# Exit codes
# 1 : Config file invalid, it has no sections
# 2 : Config file invalid, sections must define at least CONST.CFG_MANDATORY
# 3 : Performing a firewalld rules check failed
# 4 : Performing a firewalld rules encountered a FileNotFoundError
# 5 : Unable to open firewalld direct rules file
# 6 : Source and destination are identical when attempting to back up firewalld direct rules file
# 7 : An option that must have a non-null value is either unset or null
# 8 : Exception while adding a chain XML element to firewalld direct rules
# 9 : Unable to open firewalld direct rules file for updating
# 10: Unable to restart systemd firewalld.service unit
# 11: Unable to add a <rule/> tag to firewalld
# 12: Kernel sysfs export for network devices at "/sys/class/net" doesn't exist
# 13: Linux find command exited non-zero trying to find a physical network device at "/sys/class/net"
# 14: No physical network device found at "/sys/class/net"
class CONST(object):
__slots__ = ()
LOG_FORMAT = "%(message)s"
# How to find a config file
CFG_THIS_FILE_DIRNAME = os.path.dirname(__file__)
CFG_DEFAULT_FILENAME = "config.ini"
CFG_DEFAULT_ABS_PATH = os.path.join(CFG_THIS_FILE_DIRNAME, CFG_DEFAULT_FILENAME)
# Values you don't have to set, these are their internal defaults. You may optionally add a key 'is_global' equal
# to either True or False. By default if left off it'll be assumed False. Script will treat values where
# 'is_global' equals True as not being overridable in a '[section]'. It's a setting that only makes sense in a
# global context for the entire script. An option where 'empty_ok' equals True can safely be unset or set to
# an empty string. An example config.ini file may give a sane config example value here, removing that value
# still results in a valid file.
CFG_KNOWN_DEFAULTS = [
{"key": "target", "value": "ACCEPT", "is_global": False, "empty_ok": False},
{"key": "addr", "value": "", "is_global": False, "empty_ok": True},
{"key": "ports", "value": "80, 443", "is_global": False, "empty_ok": True},
{"key": "proto", "value": "tcp", "is_global": False, "empty_ok": True},
{"key": "state", "value": "NEW", "is_global": False, "empty_ok": True},
{"key": "do_ipv6", "value": "false", "is_global": False, "empty_ok": False},
{"key": "firewalld_direct_abs", "value": "/etc/firewalld/direct.xml", "is_global": True, "empty_ok": False},
{"key": "restart_firewalld_after_change", "value": "true", "is_global": True, "empty_ok": False}
]
# In all sections other than 'default' the following settings are known and accepted. We ignore other settings.
# Per CFG_KNOWN_DEFAULTS above most '[DEFAULT]' options are accepted by virtue of being defaults and overridable.
# The only exception are options where "is_global" equals True, they can't be overridden in '[sections]'; any
# attempt at doing it anyway will be ignored. The main purpose of this list is to name settings that do not have
# a default value but can - if set - influence how a '[section]' behaves. Repeating a '[DEFAULT]' here does not
# make sense. We use 'is_mandatory' to determine if we have to raise errors on missing settings. Here
# 'is_mandatory' means the setting must be given in a '[section]'. It may be empty.
CFG_KNOWN_SECTION = [
# {"key": "an_option", "is_mandatory": True},
# {"key": "another_one", "is_mandatory": False}
]
CFG_MANDATORY = [section_cfg["key"] for section_cfg in CFG_KNOWN_SECTION if section_cfg["is_mandatory"]]
logging.basicConfig(
# Default for all modules is NOTSET so log everything
level="NOTSET",
format=CONST.LOG_FORMAT,
datefmt="[%X]",
handlers=[RichHandler(
rich_tracebacks=True
)]
)
log = logging.getLogger("rich")
# Our own code logs with this level
log.setLevel(os.environ.get("UFS_LOGLEVEL") if "UFS_LOGLEVEL" in [k for k, v in os.environ.items()] else logging.INFO)
p = inflect.engine()
# Use this version of class ConfigParser to log.debug contents of our config file. When parsing sections other than
# 'default' we don't want to reprint defaults over and over again. This custom class achieves that.
class ConfigParser(
configparser.ConfigParser):
"""Can get options() without defaults
Taken from https://stackoverflow.com/a/12600066.
"""
def options(self, section, no_defaults=False, **kwargs):
if no_defaults:
try:
return list(self._sections[section].keys())
except KeyError:
raise configparser.NoSectionError(section)
else:
return super().options(section)
# arg_allow_list = ["77.13.129.237", "2a0b:7080:20::1:f485", "home.seneve.de", "208.87.98.188", "outlook.com",
# "uberspace.de"]
ini_defaults = []
internal_defaults = {default["key"]: default["value"] for default in CONST.CFG_KNOWN_DEFAULTS}
internal_globals = [default["key"] for default in CONST.CFG_KNOWN_DEFAULTS if default["is_global"]]
internal_empty_ok = [default["key"] for default in CONST.CFG_KNOWN_DEFAULTS if default["empty_ok"]]
config = ConfigParser(defaults=internal_defaults,
converters={'list': lambda x: [i.strip() for i in x.split(',') if len(x) > 0]})
config.read(CONST.CFG_DEFAULT_ABS_PATH)
def print_section_header(
header: str) -> str:
return f"Loading config section '[{header}]' ..."
def validate_default_section(
config_obj: configparser.ConfigParser()) -> None:
log.debug(f"Loading config from file '{CONST.CFG_DEFAULT_ABS_PATH}' ...")
if not config_obj.sections():
log.debug(f"No config sections found in '{CONST.CFG_DEFAULT_ABS_PATH}'. Exiting 1 ...")
sys.exit(1)
if config.defaults():
log.debug(f"Symbol legend:\n"
f"* Default from section '[{config_obj.default_section}]'\n"
f": Global option from '[{config_obj.default_section}]', can not be overridden in local sections\n"
f"~ Local option, doesn't exist in '[{config_obj.default_section}]'\n"
f"+ Local override of a value from '[{config_obj.default_section}]'\n"
f"= Local override, same value as in '[{config_obj.default_section}]'\n"
f"# Local attempt at overriding a global, will be ignored")
log.debug(print_section_header(config_obj.default_section))
for default in config_obj.defaults():
ini_defaults.append({default: config_obj[config_obj.default_section][default]})
if default in internal_globals:
log.debug(f": {default} = {config_obj[config_obj.default_section][default]}")
else:
log.debug(f"* {default} = {config_obj[config_obj.default_section][default]}")
else:
log.debug(f"No defaults defined")
def config_has_valid_section(
config_obj: configparser.ConfigParser()) -> bool:
has_valid_section = False
for config_obj_section in config_obj.sections():
if set(CONST.CFG_MANDATORY).issubset(config_obj.options(config_obj_section)):
has_valid_section = True
break
return has_valid_section
def is_default(
config_key: str) -> bool:
return any(config_key in ini_default for ini_default in ini_defaults)
def is_global(
config_key: str) -> bool:
return config_key in internal_globals
def is_same_as_default(
config_kv_pair: dict) -> bool:
return config_kv_pair in ini_defaults
def we_have_unset_options(
config_obj: configparser.ConfigParser(),
section_name: str) -> list:
options_must_be_non_empty = []
for option in config_obj.options(section_name):
if not config_obj.get(section_name, option):
if option not in internal_empty_ok:
log.warning(f"In section '[{section_name}]' option '{option}' is empty, it mustn't be.")
options_must_be_non_empty.append(option)
return options_must_be_non_empty
def validate_config_sections(
config_obj: configparser.ConfigParser()) -> None:
for this_section in config_obj.sections():
log.debug(print_section_header(this_section))
unset_options = we_have_unset_options(config_obj, this_section)
if unset_options:
log.error(f"""{p.plural("Option", len(unset_options))} {unset_options} """
f"""{p.plural("is", len(unset_options))} unset. """
f"""{p.singular_noun("They", len(unset_options))} """
f"must have a non-null value. "
f"""{p.plural("Default", len(unset_options))} {p.plural("is", len(unset_options))}:""")
for unset_option in unset_options:
log.error(f"{unset_option} = {internal_defaults[unset_option]}")
log.error(f"Exiting 7 ...")
sys.exit(7)
if not set(CONST.CFG_MANDATORY).issubset(config_obj.options(this_section, no_defaults=True)):
log.warning(f"Config section '[{this_section}]' does not have all mandatory options "
f"{CONST.CFG_MANDATORY} set, skipping section ...")
config_obj.remove_section(this_section)
else:
for key in config_obj.options(this_section, no_defaults=True):
kv_prefix = "~"
remove_from_section = False
if is_global(key):
kv_prefix = "#"
remove_from_section = True
elif is_default(key):
kv_prefix = "+"
if is_same_as_default({key: config_obj[this_section][key]}):
kv_prefix = "="
log.debug(f"{kv_prefix} {key} = {config_obj[this_section][key]}")
if remove_from_section:
config_obj.remove_option(this_section, key)
def has_child_elem(elem_name: str, attr_value: str) -> bool:
global arg_fw_rule_data
attr_name = "ipv"
for elem in arg_fw_rule_data.findall(elem_name):
if elem.attrib[attr_name] == attr_value:
log.debug(f"""XML has element '<{elem_name} {attr_name}="{attr_value}" .../>'""")
return True
log.debug(f"""No XML element '<{elem_name} {attr_name}="{attr_value}" .../>'""")
return False
def add_chain_elem(elem_name: str, addr_family: str) -> bool:
global arg_fw_rule_data
log.debug(f"Adding new ...")
for chain in ["FILTERS", "DOCKER-USER"]:
try:
lxml.etree.SubElement(arg_fw_rule_data, elem_name,
ipv=f"{addr_family}",
table="filter",
chain=chain)
except lxml.etree.LxmlError as le:
log.error(f"""Failed to add XML '<{elem_name} ipv=f"{addr_family}" .../>'\n"""
f"Verbatim exception was:\n"
f"f{le}\n"
f"Exiting 8 ...")
sys.exit(8)
return True
direct_tag = data.direct
chain_tag = data.chain
rule_tag = data.rule
fw_rule_data = direct_tag(
chain_tag(ipv="ipv4", table="filter", chain="DOCKER-USER"),
# rule_tag("-s 208.87.98.188 -j DROP", ipv="ipv4", table="filter", chain="DOCKER-USER", priority="0"),
chain_tag(ipv="ipv6", table="filter", chain="DOCKER-USER"),
# rule_tag("-s 2a0b:7080:20::1:f485 -j DROP", ipv="ipv6", table="filter", chain="DOCKER-USER", priority="0")
*(rule_tag(f"-s {addr} -j DROP", ipv=f"ipv4", table=f"filter", chain="DOCKER-USER", priority=f"{count}")
for count, addr in enumerate(ip_addresses["ipv4"])),
*(rule_tag(f"-s {addr} -j DROP", ipv=f"ipv6", table=f"filter", chain="DOCKER-USER", priority=f"{count}")
for count, addr in enumerate(ip_addresses["ipv6"])),
rule_tag(f"-s -j DROP", ipv="ipv4", table="filter", chain="DOCKER-USER", priority=f"{len_ipv4_addresses}"),
rule_tag(f"-s -j DROP", ipv="ipv6", table="filter", chain="DOCKER-USER", priority=f"{len_ipv6_addresses}")
)
# fw_rule_data_str = lxml.etree.tostring(
# fw_rule_data,
# pretty_print=True,
# xml_declaration=True,
# encoding="UTF-8").decode()
# log.debug(f"{fw_rule_data_str}")
return fw_rule_data
def resolve_domain(domain: str) -> list[str]:
log.debug(f"Resolving DNS A and AAAA records for '{domain}' ...")
try:
a_records = dns.resolver.resolve(domain, rdtype=dns.rdatatype.A)
except dns.resolver.NoAnswer:
log.debug(f"DNS didn't return an A record for '{domain}', ignoring ...")
a_records = []
try:
aaaa_records = dns.resolver.resolve(domain, rdtype=dns.rdatatype.AAAA)
except dns.resolver.NoAnswer:
log.debug(f"DNS didn't return a AAAA record for '{domain}', ignoring ...")
aaaa_records = []
dns_records = []
[dns_records.append(dns_record.address) for dns_record in a_records if a_records]
[dns_records.append(dns_record.address) for dns_record in aaaa_records if aaaa_records]
log.info(f"""For {domain!r} found {p.plural("record", len(dns_records))}: {dns_records}""")
return dns_records
def resolve_addresses(allow_list_mixed: list[str]) -> dict[str, list]:
allow_sources = {"ipv4": [], "ipv6": []}
allow_list_ip_only = []
for allow_source in allow_list_mixed:
if validators.domain(allow_source):
log.debug(f"'{allow_source}' is a domain.")
[allow_list_ip_only.append(addr) for addr in resolve_domain(allow_source)]
else:
allow_list_ip_only.append(allow_source)
for allow_source in allow_list_ip_only:
try:
ipv4_addr = str(ipaddress.IPv4Address(allow_source))
log.debug(f"Adding IPv4 address '{allow_source}' ...")
allow_sources["ipv4"].append(ipv4_addr)
except ipaddress.AddressValueError:
log.debug(f"Address '{allow_source}' is not a valid IPv4 address. Trying to match against IPv6 ...")
try:
ipv6_addr = str(ipaddress.IPv6Address(allow_source))
log.debug(f"Adding IPv6 address '{allow_source}' ...")
allow_sources["ipv6"].append(ipv6_addr)
except ipaddress.AddressValueError:
log.warning(f"Address '{allow_source}' is not a valid IPv6 address either. Ignoring ...")
def write_new_fwd_direct_xml(
config_obj: configparser.ConfigParser()) -> bool:
global arg_fw_rule_data
fwd_direct_xml_str = lxml.etree.tostring(arg_fw_rule_data,
pretty_print=True,
encoding="UTF-8",
xml_declaration=True).decode()
try:
with open(config_obj.get(configparser.DEFAULTSECT, "firewalld_direct_abs"), "r+") as fwd_file_handle:
log.info(f"Writing new firewalld direct config ...")
log.debug(f"New content:\n"
f"{fwd_direct_xml_str.rstrip()}")
fwd_file_handle.seek(0)
fwd_file_handle.write(fwd_direct_xml_str)
fwd_file_handle.truncate()
except OSError as ose:
log.error(f"Unable to open firewalld direct rules file for updating.\n"
f"Verbatim exception was:\n"
f"f{ose}\n"
f"Exiting 9 ...")
sys.exit(9)
else:
return True
def restart_systemd_firewalld() -> bool:
sysbus = dbus.SystemBus()
systemd1 = sysbus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
manager = dbus.Interface(systemd1, 'org.freedesktop.systemd1.Manager')
firewalld_unit = manager.LoadUnit('firewalld.service')
firewalld_proxy = sysbus.get_object('org.freedesktop.systemd1', str(firewalld_unit))
firewalld_active_state = firewalld_proxy.Get('org.freedesktop.systemd1.Unit',
'ActiveState',
dbus_interface='org.freedesktop.DBus.Properties')
if firewalld_active_state == "inactive":
log.info(f"systemd firewalld.service unit is inactive, ignoring restart instruction, leaving as-is ...")
return False
try:
log.info(f"Restarting systemd firewalld.service unit ...")
manager.TryRestartUnit('firewalld.service', 'fail')
except dbus.exceptions.DBusException as dbe:
log.error(f"Failed to restart systemd firewalld.service unit.\n"
f"Verbatim exception was:\n"
f"{dbe}\n"
f"You're going to want to check firewalld.service health.\n"
f"Exiting 10 ...")
sys.exit(10)
else:
log.info(f"Done")
return True
def add_firewall_shim(arg_phy_nics: list) -> None:
global arg_fw_rule_data
log.debug(f"Adding ip(6)tables jump target to DOCKER-USER chain ...")
for addr_family in ["ipv4", "ipv6"]:
for phy_nic in arg_phy_nics:
if has_child_elem("chain", addr_family):
add_rule_elem(
addr_family,
rules_count(addr_family, arg_chain="INPUT"),
"ACCEPT",
arg_chain="INPUT",
arg_in_interface="lo"
)
for chain in ["INPUT", "DOCKER-USER"]:
add_rule_elem(
addr_family,
rules_count(addr_family, arg_chain=chain),
"FILTERS",
arg_chain=chain,
arg_in_interface=phy_nic if chain == "DOCKER-USER" else None
)
if __name__ == '__main__':
validate_default_section(config)
if config_has_valid_section(config):
validate_config_sections(config)
else:
log.error(f"No valid config section found. A valid config section has at least the mandatory options "
f"{CONST.CFG_MANDATORY} set. Exiting 2 ...")
sys.exit(2)
log.debug(f"Iterating over config sections ...")
for section in config.sections():
log.debug(f"Processing section '[{section}]' ...")
log.debug(config.getlist(section, "addr"))
# arg_allow_sources = resolve_addresses(arg_allow_list)
# gen_fw_rule_xml(arg_allow_sources)