feat(xml): Generate rate throttling rules via 'recent' extension and its hitcount per time

This commit is contained in:
hygienic-books 2022-07-16 02:12:40 +02:00
parent 00c43503a9
commit 262e11ba7c

View File

@ -58,6 +58,7 @@ class CONST(object):
{"key": "ports", "value": "80, 443", "is_global": False, "empty_ok": True}, {"key": "ports", "value": "80, 443", "is_global": False, "empty_ok": True},
{"key": "proto", "value": "tcp", "is_global": False, "empty_ok": True}, {"key": "proto", "value": "tcp", "is_global": False, "empty_ok": True},
{"key": "state", "value": "NEW", "is_global": False, "empty_ok": True}, {"key": "state", "value": "NEW", "is_global": False, "empty_ok": True},
{"key": "hitcount", "value": "", "is_global": False, "empty_ok": True},
{"key": "do_ipv6", "value": "false", "is_global": False, "empty_ok": False}, {"key": "do_ipv6", "value": "false", "is_global": False, "empty_ok": False},
{"key": "firewalld_direct_abs", "value": "/etc/firewalld/direct.xml", "is_global": True, "empty_ok": False}, {"key": "firewalld_direct_abs", "value": "/etc/firewalld/direct.xml", "is_global": True, "empty_ok": False},
{"key": "restart_firewalld_after_change", "value": "true", "is_global": True, "empty_ok": False} {"key": "restart_firewalld_after_change", "value": "true", "is_global": True, "empty_ok": False}
@ -290,6 +291,7 @@ def add_rule_elem(
arg_proto: str = None, arg_proto: str = None,
arg_state: str = None, arg_state: str = None,
arg_ports: list = None, arg_ports: list = None,
arg_hitcount: str = None,
arg_address: str = None, arg_address: str = None,
arg_chain: str = "FILTERS", arg_chain: str = "FILTERS",
arg_in_interface: str = None) -> bool: arg_in_interface: str = None) -> bool:
@ -300,6 +302,43 @@ def add_rule_elem(
arg_proto = "icmp" arg_proto = "icmp"
if arg_proto == "icmp" and address_family == "ipv6": if arg_proto == "icmp" and address_family == "ipv6":
arg_proto = "icmpv6" arg_proto = "icmpv6"
if arg_hitcount:
try:
lxml.etree.SubElement(arg_fw_rule_data, "rule",
ipv=f"{address_family}",
table=f"filter",
chain=arg_chain,
priority=f"""{prio}""").text = \
f"""{"--in-interface " + arg_in_interface + " " if arg_in_interface else ""}""" \
f"""{"--protocol " + arg_proto + " " if arg_proto else ""}""" \
f"""{"--match multiport --destination-ports " + ",".join(arg_ports) + " " if arg_ports else ""}""" \
f"""{"--source " + arg_address + " " if arg_address else ""}""" \
f"""{"--match recent --name " + chr(34) + arg_section[:256] + chr(34) +
" --update --hitcount " + arg_hitcount.split("/")[0] + " --seconds " + arg_hitcount.split("/")[1] + " "
if arg_section else ""}""" \
f"""--jump DROP"""
prio += 1
lxml.etree.SubElement(arg_fw_rule_data, "rule",
ipv=f"{address_family}",
table=f"filter",
chain=arg_chain,
priority=f"""{prio}""").text = \
f"""{"--in-interface " + arg_in_interface + " " if arg_in_interface else ""}""" \
f"""{"--protocol " + arg_proto + " " if arg_proto else ""}""" \
f"""{"--match multiport --destination-ports " + ",".join(arg_ports) + " " if arg_ports else ""}""" \
f"""{"--source " + arg_address + " " if arg_address else ""}""" \
f"""{"--match recent --name " + chr(34) + arg_section[:256] + chr(34) +
" --set" if arg_section else ""}"""
prio += 1
except lxml.etree.LxmlError as le:
log.error(f"""Failed to add XML '<rule ipv=f"{address_family}" .../>'\n"""
f"Verbatim exception was:\n"
f"f{le}\n"
f"Exiting 8 ...")
sys.exit(8)
try: try:
lxml.etree.SubElement(arg_fw_rule_data, "rule", lxml.etree.SubElement(arg_fw_rule_data, "rule",
ipv=f"{address_family}", ipv=f"{address_family}",
@ -369,7 +408,8 @@ def add_fw_rule_to_xml(
section_name: str, section_name: str,
target: str, target: str,
ports: list, ports: list,
proto: str) -> bool: proto: str,
hitcount: str) -> bool:
global arg_fw_rule_data global arg_fw_rule_data
global arg_allow_sources global arg_allow_sources
addr = arg_allow_sources addr = arg_allow_sources
@ -390,7 +430,11 @@ def add_fw_rule_to_xml(
arg_proto=proto, arg_proto=proto,
arg_state=config_obj.get(section_name, "state"), arg_state=config_obj.get(section_name, "state"),
arg_ports=ports, arg_ports=ports,
arg_hitcount=hitcount,
arg_address=address) arg_address=address)
if hitcount:
rules_already_added[address_family] += 3
else:
rules_already_added[address_family] += 1 rules_already_added[address_family] += 1
if not len(addr["ipv4"]) and not len(addr["ipv6"]): if not len(addr["ipv4"]) and not len(addr["ipv6"]):
if address_family == "ipv4" or (address_family == "ipv6" if address_family == "ipv4" or (address_family == "ipv6"
@ -405,7 +449,11 @@ def add_fw_rule_to_xml(
arg_section=section_name, arg_section=section_name,
arg_proto=proto, arg_proto=proto,
arg_state=config_obj.get(section_name, "state"), arg_state=config_obj.get(section_name, "state"),
arg_ports=ports) arg_ports=ports,
arg_hitcount=hitcount)
if hitcount:
rules_already_added[address_family] += 3
else:
rules_already_added[address_family] += 1 rules_already_added[address_family] += 1
return True return True
@ -624,7 +672,8 @@ if __name__ == "__main__":
section, section,
target=config.get(section, "target"), target=config.get(section, "target"),
ports=config.getlist(section, "ports"), ports=config.getlist(section, "ports"),
proto=config.get(section, "proto")) proto=config.get(section, "proto"),
hitcount=config.get(section, "hitcount"))
for arg_address_family in ["ipv4", "ipv6"]: for arg_address_family in ["ipv4", "ipv6"]:
if rules_count(arg_address_family): if rules_count(arg_address_family):
add_rule_elem( add_rule_elem(