From 905f97ef5513634d1759427fd9cdc5abf403db79 Mon Sep 17 00:00:00 2001 From: hygienic-books Date: Tue, 5 Jul 2022 04:46:57 +0200 Subject: [PATCH] feat(xml): Check and add rule XML elements --- update-firewall-source.py | 159 ++++++++++++++++++++++++++++++++------ 1 file changed, 137 insertions(+), 22 deletions(-) diff --git a/update-firewall-source.py b/update-firewall-source.py index 2e0174c..c12c39e 100644 --- a/update-firewall-source.py +++ b/update-firewall-source.py @@ -265,30 +265,145 @@ def add_chain_elem(elem_name: str, addr_family: str) -> bool: return True - direct_tag = data.direct - chain_tag = data.chain - rule_tag = data.rule - fw_rule_data = direct_tag( - chain_tag(ipv="ipv4", table="filter", chain="DOCKER-USER"), - # rule_tag("-s 208.87.98.188 -j DROP", ipv="ipv4", table="filter", chain="DOCKER-USER", priority="0"), - chain_tag(ipv="ipv6", table="filter", chain="DOCKER-USER"), - # rule_tag("-s 2a0b:7080:20::1:f485 -j DROP", ipv="ipv6", table="filter", chain="DOCKER-USER", priority="0") - *(rule_tag(f"-s {addr} -j DROP", ipv=f"ipv4", table=f"filter", chain="DOCKER-USER", priority=f"{count}") - for count, addr in enumerate(ip_addresses["ipv4"])), - *(rule_tag(f"-s {addr} -j DROP", ipv=f"ipv6", table=f"filter", chain="DOCKER-USER", priority=f"{count}") - for count, addr in enumerate(ip_addresses["ipv6"])), - rule_tag(f"-s -j DROP", ipv="ipv4", table="filter", chain="DOCKER-USER", priority=f"{len_ipv4_addresses}"), - rule_tag(f"-s -j DROP", ipv="ipv6", table="filter", chain="DOCKER-USER", priority=f"{len_ipv6_addresses}") - ) +def rules_count( + arg_ipv: str = "ipv4", + arg_chain: str = "FILTERS") -> int: - # fw_rule_data_str = lxml.etree.tostring( - # fw_rule_data, - # pretty_print=True, - # xml_declaration=True, - # encoding="UTF-8").decode() - # log.debug(f"{fw_rule_data_str}") + arg_rules_count = len([rule for rule in arg_fw_rule_data.findall("rule") if all([ + rule.attrib["ipv"] == arg_ipv if arg_ipv else False, + rule.attrib["chain"] == arg_chain if arg_chain else False])]) - return fw_rule_data + log.debug(f"""Counted {arg_rules_count} {p.plural("rule", arg_rules_count)} matching """ + f"""{"ipv=" + arg_ipv + " " if arg_ipv else ""}""" + f"""{"chain=" + arg_chain + " " if arg_chain else ""}""") + return arg_rules_count + + +def add_rule_elem( + address_family: str, + prio: int, + target: str, + /, *, + arg_section_name: str = None, + arg_proto: str = None, + arg_state: str = None, + arg_ports: list = None, + arg_address: str = None, + arg_chain: str = "FILTERS", + arg_in_interface: str = None) -> bool: + + global arg_fw_rule_data + + try: + lxml.etree.SubElement(arg_fw_rule_data, "rule", + ipv=f"{address_family}", + table=f"filter", + chain=arg_chain, + priority=f"""{prio}""").text = \ + f"""{"--in-interface " + arg_in_interface + " " if arg_in_interface else ""}""" \ + f"""{"--protocol " + arg_proto + " " if arg_proto else ""}""" \ + f"""{"--match state --state " + arg_state + " " if arg_state else ""}""" \ + f"""{"--match multiport --destination-ports " + ",".join(arg_ports) + " " if arg_ports else ""}""" \ + f"""{"--source " + arg_address + " " if arg_address else ""}""" \ + f"""--jump {target}""" \ + f""" + {" --match comment --comment " + chr(34) + arg_section_name[:256] + chr(34) if arg_section_name else ""}""" + except lxml.etree.LxmlError as le: + log.error(f"""Failed to add XML ''\n""" + f"Verbatim exception was:\n" + f"f{le}\n" + f"Exiting 8 ...") + sys.exit(8) + else: + return True + + +def get_phy_nics() -> list: + phy_nics = [] + linux_sysfs_nics_abs = "/sys/class/net" + find_phy_nics = ["find", linux_sysfs_nics_abs, "-mindepth", "1", "-maxdepth", "1", "-not", "-lname", "*virtual*"] + # find_phy_nics = ["find", linux_sysfs_nics_abs, "-mindepth", "1", "-maxdepth", "1", "-lname", "*virtual*"] + + if os.path.isdir(linux_sysfs_nics_abs): + try: + phy_nics_find = subprocess.run(find_phy_nics, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + check=True, + encoding="UTF-8") + except subprocess.CalledProcessError as cpe: + log.error(f"Failed to find physical network device in {linux_sysfs_nics_abs!r}.\n" + f"Command was:\n" + f"{cpe.cmd}\n" + f"Verbatim command output was:\n" + f"{cpe.output.rstrip()}\n" + f"Exiting 13 ...") + sys.exit(13) + else: + if not phy_nics_find.stdout: + log.error(f"No physical network device found at {linux_sysfs_nics_abs!r}.\n" + f"Command was:\n" + f"{phy_nics_find.args}\n" + f"Exiting 14 ...") + sys.exit(14) + for line in phy_nics_find.stdout.rstrip().split("\n"): + log.debug(f"Found physical network device {(phy_nic := os.path.basename(line))!r}") + phy_nics.append(phy_nic) + else: + log.error(f"Path {linux_sysfs_nics_abs!r} does not exist. This might not be a Linux-y operating system. " + f"Without that location we'll not be able to separate physical network interfaces from virtual ones. " + f"Exiting 12 ...") + sys.exit(12) + + log.debug(f"List of identified physical network interfaces: {phy_nics}") + return phy_nics + + +def add_fw_rule_to_xml( + config_obj: configparser.ConfigParser(), + section_name: str, + target: str, + ports: list, + proto: str) -> bool: + global arg_fw_rule_data + global arg_allow_sources + addr = arg_allow_sources + + rules_already_added = {"ipv4": rules_count(arg_ipv="ipv4") + 1, "ipv6": rules_count(arg_ipv="ipv6") + 1} + log.debug(f"Current rules count: {rules_already_added}") + + for address_family in ["ipv4", "ipv6"]: + if len(addr[address_family]): + if not has_child_elem("chain", address_family): + add_chain_elem("chain", address_family) + for address in addr[address_family]: + add_rule_elem( + address_family, + rules_already_added[address_family], + target, + arg_section_name=section_name, + arg_proto=proto, + arg_state=config_obj.get(section_name, "state"), + arg_ports=ports, + arg_address=address) + rules_already_added[address_family] += 1 + if not len(addr["ipv4"]) and not len(addr["ipv6"]): + if address_family == "ipv4" or (address_family == "ipv6" + and + config_obj.getboolean(section_name, "do_ipv6")): + if not has_child_elem("chain", address_family): + add_chain_elem("chain", address_family) + add_rule_elem( + address_family, + rules_already_added[address_family], + target, + arg_section_name=section_name, + arg_proto=proto, + arg_state=config_obj.get(section_name, "state"), + arg_ports=ports) + rules_already_added[address_family] += 1 + + return True def resolve_domain(domain: str) -> list[str]: