feat(xml): Check and add rule XML elements

This commit is contained in:
hygienic-books 2022-07-05 04:46:57 +02:00
parent ea344c8940
commit 905f97ef55

View File

@ -265,30 +265,145 @@ def add_chain_elem(elem_name: str, addr_family: str) -> bool:
return True return True
direct_tag = data.direct def rules_count(
chain_tag = data.chain arg_ipv: str = "ipv4",
rule_tag = data.rule arg_chain: str = "FILTERS") -> int:
fw_rule_data = direct_tag(
chain_tag(ipv="ipv4", table="filter", chain="DOCKER-USER"),
# rule_tag("-s 208.87.98.188 -j DROP", ipv="ipv4", table="filter", chain="DOCKER-USER", priority="0"),
chain_tag(ipv="ipv6", table="filter", chain="DOCKER-USER"),
# rule_tag("-s 2a0b:7080:20::1:f485 -j DROP", ipv="ipv6", table="filter", chain="DOCKER-USER", priority="0")
*(rule_tag(f"-s {addr} -j DROP", ipv=f"ipv4", table=f"filter", chain="DOCKER-USER", priority=f"{count}")
for count, addr in enumerate(ip_addresses["ipv4"])),
*(rule_tag(f"-s {addr} -j DROP", ipv=f"ipv6", table=f"filter", chain="DOCKER-USER", priority=f"{count}")
for count, addr in enumerate(ip_addresses["ipv6"])),
rule_tag(f"-s -j DROP", ipv="ipv4", table="filter", chain="DOCKER-USER", priority=f"{len_ipv4_addresses}"),
rule_tag(f"-s -j DROP", ipv="ipv6", table="filter", chain="DOCKER-USER", priority=f"{len_ipv6_addresses}")
)
# fw_rule_data_str = lxml.etree.tostring( arg_rules_count = len([rule for rule in arg_fw_rule_data.findall("rule") if all([
# fw_rule_data, rule.attrib["ipv"] == arg_ipv if arg_ipv else False,
# pretty_print=True, rule.attrib["chain"] == arg_chain if arg_chain else False])])
# xml_declaration=True,
# encoding="UTF-8").decode()
# log.debug(f"{fw_rule_data_str}")
return fw_rule_data log.debug(f"""Counted {arg_rules_count} {p.plural("rule", arg_rules_count)} matching """
f"""{"ipv=" + arg_ipv + " " if arg_ipv else ""}"""
f"""{"chain=" + arg_chain + " " if arg_chain else ""}""")
return arg_rules_count
def add_rule_elem(
address_family: str,
prio: int,
target: str,
/, *,
arg_section_name: str = None,
arg_proto: str = None,
arg_state: str = None,
arg_ports: list = None,
arg_address: str = None,
arg_chain: str = "FILTERS",
arg_in_interface: str = None) -> bool:
global arg_fw_rule_data
try:
lxml.etree.SubElement(arg_fw_rule_data, "rule",
ipv=f"{address_family}",
table=f"filter",
chain=arg_chain,
priority=f"""{prio}""").text = \
f"""{"--in-interface " + arg_in_interface + " " if arg_in_interface else ""}""" \
f"""{"--protocol " + arg_proto + " " if arg_proto else ""}""" \
f"""{"--match state --state " + arg_state + " " if arg_state else ""}""" \
f"""{"--match multiport --destination-ports " + ",".join(arg_ports) + " " if arg_ports else ""}""" \
f"""{"--source " + arg_address + " " if arg_address else ""}""" \
f"""--jump {target}""" \
f"""
{" --match comment --comment " + chr(34) + arg_section_name[:256] + chr(34) if arg_section_name else ""}"""
except lxml.etree.LxmlError as le:
log.error(f"""Failed to add XML '<rule ipv=f"{address_family}" .../>'\n"""
f"Verbatim exception was:\n"
f"f{le}\n"
f"Exiting 8 ...")
sys.exit(8)
else:
return True
def get_phy_nics() -> list:
phy_nics = []
linux_sysfs_nics_abs = "/sys/class/net"
find_phy_nics = ["find", linux_sysfs_nics_abs, "-mindepth", "1", "-maxdepth", "1", "-not", "-lname", "*virtual*"]
# find_phy_nics = ["find", linux_sysfs_nics_abs, "-mindepth", "1", "-maxdepth", "1", "-lname", "*virtual*"]
if os.path.isdir(linux_sysfs_nics_abs):
try:
phy_nics_find = subprocess.run(find_phy_nics,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=True,
encoding="UTF-8")
except subprocess.CalledProcessError as cpe:
log.error(f"Failed to find physical network device in {linux_sysfs_nics_abs!r}.\n"
f"Command was:\n"
f"{cpe.cmd}\n"
f"Verbatim command output was:\n"
f"{cpe.output.rstrip()}\n"
f"Exiting 13 ...")
sys.exit(13)
else:
if not phy_nics_find.stdout:
log.error(f"No physical network device found at {linux_sysfs_nics_abs!r}.\n"
f"Command was:\n"
f"{phy_nics_find.args}\n"
f"Exiting 14 ...")
sys.exit(14)
for line in phy_nics_find.stdout.rstrip().split("\n"):
log.debug(f"Found physical network device {(phy_nic := os.path.basename(line))!r}")
phy_nics.append(phy_nic)
else:
log.error(f"Path {linux_sysfs_nics_abs!r} does not exist. This might not be a Linux-y operating system. "
f"Without that location we'll not be able to separate physical network interfaces from virtual ones. "
f"Exiting 12 ...")
sys.exit(12)
log.debug(f"List of identified physical network interfaces: {phy_nics}")
return phy_nics
def add_fw_rule_to_xml(
config_obj: configparser.ConfigParser(),
section_name: str,
target: str,
ports: list,
proto: str) -> bool:
global arg_fw_rule_data
global arg_allow_sources
addr = arg_allow_sources
rules_already_added = {"ipv4": rules_count(arg_ipv="ipv4") + 1, "ipv6": rules_count(arg_ipv="ipv6") + 1}
log.debug(f"Current rules count: {rules_already_added}")
for address_family in ["ipv4", "ipv6"]:
if len(addr[address_family]):
if not has_child_elem("chain", address_family):
add_chain_elem("chain", address_family)
for address in addr[address_family]:
add_rule_elem(
address_family,
rules_already_added[address_family],
target,
arg_section_name=section_name,
arg_proto=proto,
arg_state=config_obj.get(section_name, "state"),
arg_ports=ports,
arg_address=address)
rules_already_added[address_family] += 1
if not len(addr["ipv4"]) and not len(addr["ipv6"]):
if address_family == "ipv4" or (address_family == "ipv6"
and
config_obj.getboolean(section_name, "do_ipv6")):
if not has_child_elem("chain", address_family):
add_chain_elem("chain", address_family)
add_rule_elem(
address_family,
rules_already_added[address_family],
target,
arg_section_name=section_name,
arg_proto=proto,
arg_state=config_obj.get(section_name, "state"),
arg_ports=ports)
rules_already_added[address_family] += 1
return True
def resolve_domain(domain: str) -> list[str]: def resolve_domain(domain: str) -> list[str]: