feat(dbus): Restart firewalld, provide new XML file

This commit is contained in:
hygienic-books 2022-07-05 04:46:44 +02:00
parent afdc8aa7af
commit 824e6c67d0

View File

@ -303,7 +303,85 @@ def resolve_addresses(allow_list_mixed: list[str]) -> dict[str, list]:
except ipaddress.AddressValueError: except ipaddress.AddressValueError:
log.warning(f"Address '{allow_source}' is not a valid IPv6 address either. Ignoring ...") log.warning(f"Address '{allow_source}' is not a valid IPv6 address either. Ignoring ...")
return allow_sources
def write_new_fwd_direct_xml(
config_obj: configparser.ConfigParser()) -> bool:
global arg_fw_rule_data
fwd_direct_xml_str = lxml.etree.tostring(arg_fw_rule_data,
pretty_print=True,
encoding="UTF-8",
xml_declaration=True).decode()
try:
with open(config_obj.get(configparser.DEFAULTSECT, "firewalld_direct_abs"), "r+") as fwd_file_handle:
log.info(f"Writing new firewalld direct config ...")
log.debug(f"New content:\n"
f"{fwd_direct_xml_str.rstrip()}")
fwd_file_handle.seek(0)
fwd_file_handle.write(fwd_direct_xml_str)
fwd_file_handle.truncate()
except OSError as ose:
log.error(f"Unable to open firewalld direct rules file for updating.\n"
f"Verbatim exception was:\n"
f"f{ose}\n"
f"Exiting 9 ...")
sys.exit(9)
else:
return True
def restart_systemd_firewalld() -> bool:
sysbus = dbus.SystemBus()
systemd1 = sysbus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
manager = dbus.Interface(systemd1, 'org.freedesktop.systemd1.Manager')
firewalld_unit = manager.LoadUnit('firewalld.service')
firewalld_proxy = sysbus.get_object('org.freedesktop.systemd1', str(firewalld_unit))
firewalld_active_state = firewalld_proxy.Get('org.freedesktop.systemd1.Unit',
'ActiveState',
dbus_interface='org.freedesktop.DBus.Properties')
if firewalld_active_state == "inactive":
log.info(f"systemd firewalld.service unit is inactive, ignoring restart instruction, leaving as-is ...")
return False
try:
log.info(f"Restarting systemd firewalld.service unit ...")
manager.TryRestartUnit('firewalld.service', 'fail')
except dbus.exceptions.DBusException as dbe:
log.error(f"Failed to restart systemd firewalld.service unit.\n"
f"Verbatim exception was:\n"
f"{dbe}\n"
f"You're going to want to check firewalld.service health.\n"
f"Exiting 10 ...")
sys.exit(10)
else:
log.info(f"Done")
return True
def add_firewall_shim(arg_phy_nics: list) -> None:
global arg_fw_rule_data
log.debug(f"Adding ip(6)tables jump target to DOCKER-USER chain ...")
for addr_family in ["ipv4", "ipv6"]:
for phy_nic in arg_phy_nics:
if has_child_elem("chain", addr_family):
add_rule_elem(
addr_family,
rules_count(addr_family, arg_chain="INPUT"),
"ACCEPT",
arg_chain="INPUT",
arg_in_interface="lo"
)
for chain in ["INPUT", "DOCKER-USER"]:
add_rule_elem(
addr_family,
rules_count(addr_family, arg_chain=chain),
"FILTERS",
arg_chain=chain,
arg_in_interface=phy_nic if chain == "DOCKER-USER" else None
)
if __name__ == '__main__': if __name__ == '__main__':