Compare commits

...

27 Commits

Author SHA1 Message Date
ce44e728e0 refactor(systemd): Slim down log output via RichHandler 2022-07-05 04:47:24 +02:00
98f6eb0ae0 refactor(meta): Remove example allow list 2022-07-05 04:47:20 +02:00
caf7ad64d1 refactor(dns): Give more concise output when resolving DNS records 2022-07-05 04:47:16 +02:00
719ee22276 meta(docs): Add systemd example files 2022-07-05 04:47:12 +02:00
74a6f42171 feat(xml): Tie it all together by generating intro ip(6)tables targets 2022-07-05 04:47:08 +02:00
35e6f80243 feat(xml): Render XML rule content per section 2022-07-05 04:47:04 +02:00
723dac7a6f feat(XML): Render a global XML scaffolding for use throughout execution 2022-07-05 04:47:00 +02:00
905f97ef55 feat(xml): Check and add rule XML elements 2022-07-05 04:46:57 +02:00
ea344c8940 feat(xml): Add and detect XML rule elements 2022-07-05 04:46:53 +02:00
a4ff9aff4b feat(meta): Add dependencies 2022-07-05 04:46:49 +02:00
824e6c67d0 feat(dbus): Restart firewalld, provide new XML file 2022-07-05 04:46:44 +02:00
afdc8aa7af feat(config): Get log level from environment 2022-07-05 04:46:40 +02:00
2c10e3766d feat(config): Define settings that are allowed empty 2022-07-05 04:46:35 +02:00
f4339dae00 feat(config): Add iptables connection state options 2022-07-05 04:46:30 +02:00
c092cbdcf3 feat(config): Accept a list as a Configparser setting, add lambda 2022-07-05 04:46:25 +02:00
1e796771cc docs(meta): Topmost sections are H1 headings instead of H2 2022-07-05 04:46:20 +02:00
be3b65f3a4 docs(meta): Document exit codes 2022-07-05 04:46:15 +02:00
296dd39d2e docs(meta): Describe settings 2022-07-05 04:46:11 +02:00
81036f5e99 docs(meta): Add latest rendered cogapp config.ini example to docs 2022-07-05 04:46:05 +02:00
1e53adb529 docs(debug): Warn user when a section doesn't have all mandatory options set 2022-07-05 04:46:01 +02:00
40290fdc59 docs(debug): Render plurals in log output when applicable 2022-07-05 04:45:52 +02:00
a92e83a7c6 docs(debug): Config without valid section is now an error 2022-07-05 04:45:47 +02:00
69bdac4aa6 docs(config): Warn user that a trailing target=DROP rule is sensible 2022-07-05 04:45:41 +02:00
7b3ebde367 docs(config): Remove per-section values, we don't need those 2022-07-05 04:45:31 +02:00
b35aa03e70 docs(config): In example config.ini file comment out an invalid section to better differentiate it from valid sections 2022-07-05 04:45:23 +02:00
d51a1f9638 docs(config): Give example usage for a section that needs IPv6 rules 2022-07-05 04:45:15 +02:00
c5ae3c0c89 docs(config): Add examples to config defaults 2022-07-05 04:44:45 +02:00
7 changed files with 555 additions and 105 deletions

157
README.md
View File

@ -2,7 +2,7 @@
Update a firewall rule that relies on dynamic DNS names
## What
# What
* This script assumes exclusive ownership of the `firewalld` direct rules file `/etc/firewalld/direct.xml` or whereever configured
@ -18,8 +18,16 @@ Update a firewall rule that relies on dynamic DNS names
* added in order
* related, established? needed?
* section names as comments?
* Comment max 256 chars
## Config structure
# Prep
Python dependencies aside make sure that your OS has headers and static libraries for D-Bus GLib bindings installed as well as generic D-Bus development files. On a Rocky Linux 8 installation for example these come via:
```
dnf -y install dbus-glib-devel dbus-devel
```
# Config structure
Package configuration happens via a `config.ini` file that follows INI-style syntax. Copy [examples/config.ini.example](examples/config.ini.example) to `config.ini` to get started:
@ -44,19 +52,24 @@ target = ACCEPT
addr =
ports = 80, 443
proto = tcp
do_config_check = true
state = NEW
do_ipv6 = false
firewalld_direct_file_abs = /etc/firewalld/direct.xml
restart_firewalld_after_change = true
[anyone-can-access-website]
[these-guys-can-dns]
addr = google.li, 142.251.36.195, lowendbox.com, 2606:4700:20::ac43:4775
ports = 53
proto = tcp, udp
# Unsetting 'proto' while having a 'ports' value results in an invalid section
# [these-guys-can-dns]
# addr = google.li, 142.251.36.195, lowendbox.com, 2606:4700:20::ac43:4775
# ports = 53
# proto =
# do_ipv6 = true
[maybe-a-webserver]
addr = 2606:4700:20::681a:804, lowendtalk.com
ports = 80, 443
do_ipv6 = true
[allow-anyone-to-access-mail-services]
ports = 143, 993, 110, 995, 25, 465, 587
@ -66,18 +79,18 @@ target = DROP
addr =
ports =
proto =
state =
do_ipv6 = true
```
<!-- [[[end]]] -->
### Layout
# Layout
A config file can have an optional `[DEFAULT]` section and must have at least one `[section]` other than `[DEFAULT]`. Any `[DEFAULT]` option that's undefined retains its default value. Feel free to delete the entire `[DEFAULT]` section from your file. A setting changed in `[DEFAULT]` section affects all sections. A setting changed only in a custom `[section]` overwrites it for only the section.
Custom sections such as `[maybe-a-webserver]` in above example file are treated as organizational helper constructs. You can but don't have to group IP address rules by sections. Technically nothing's stopping you from adding all IP allow list entries into a single section.
### Example explanation
With `config_check_after_change`
# Example explanation
Setting `restart_firewalld_after_change` controls if you want the `firewalld` systemd unit to be restarted
@ -89,45 +102,121 @@ We strongly recommend you do keep the very last example section:
target = DROP
addr =
ports =
proto =
proto =
state =
do_ipv6 = true
```
If a packet has traversed rules this far without being accepted it will be dropped.
If a packet has traversed rules this far without being accepted it will be dropped. Note that if any of your custom `[sections]` use `do_ipv6 = true` your final `DROP` rule should do the same. Otherwise you'll just get `DROP` rule in `iptables` but not in `ip6tables`.
## Options
# Options
### Globals
## Globals
In `[DEFAULT]` section the following settings are called globals. They're only valid in `[DEFAULT]` context. Adding them to a custom `[section]` (see [Locals](#locals) below) won't do anything, in a custom `[section]` the following settings are ignored.
* `do_config_check`, __*optional*__, defaults to `true`: Do a `firewall-cmd --check-config` once before changing `/etc/firewalld/direct.xml`. Abort if this initial check fails, inform the user. Obviously something's very wrong before we've even touched firewall configs. Otherwise back up `/etc/firewalld/direct.xml`, change it then do another check. If `firewall-cmd --check-config` finds a syntax error restore the backed up known good `/etc/firewalld/direct.xml`, delete the temporary backed up one and inform the user.
If `restart_firewalld_after_change` is also `true` (default) restart the `firewalld` systemd service unit after the second config check. See steps __*a*__ through __*f*__ below at `restart_firewalld_after_change` for exact order.
* `firewalld_direct_file_abs`, __*optional*__, defaults to `/etc/firewalld/direct.xml`: Location of `firewalld`'s direct rules file. This is where new XML rule content is written.
* `restart_firewalld_after_change`, __*optional*__, defaults to `true`: After putting a new `/etc/firewalld/direct.xml` file in place restart the `firewalld` systemd service unit.
* If `do_config_check` is also `true` (default) the exact order is:
1. Config check, bail on error and inform user
2. Back up `/etc/firewalld/direct.xml` to temporary location
3. Write new `/etc/firewalld/direct.xml` content
4. Perform config check, bail on error and restore backed up `/etc/firewalld/direct.xml`
5. Delete backed up `/etc/firewalld/direct.xml`
6. Restart `firewalld` systemd service unit
* If `do_config_check` is `false`:
1. Write new `/etc/firewalld/direct.xml` content
2. Restart `firewalld` systemd service unit
### Locals
## Locals
A custom `[section]` has the following options. We're calling the locals all of which are optional.
A custom `[section]` has the following options. We're calling them locals most of which are optional.
* `target`, __*optional*__, defaults to `ACCEPT`: A string specifying the fate of a packet that matched this rule. By default matching packets are accepted. See "TARGETS" section in [iptables man page](https://ipset.netfilter.org/iptables.man.html). You'll most likely want to stick to either `ACCEPT` or `DROP`.
* `target`, __*mandatory*__, defaults to `ACCEPT`, can be any valid `iptables` target. Must not be empty nor unset. A string specifying the fate of a packet that matched this rule. See "TARGETS" section in [iptables man page](https://ipset.netfilter.org/iptables.man.html). You're most likely going to want to stick to either `ACCEPT` or `DROP`. By default matching packets are accepted. We do not do our own validation of what you write here. By default (see [Globals](#globals)) `do_config_check` equals to true in which case we let `firewalld` do a config check to catch nonsense rules.
* `addr`, __*optional*__: A comma-separated list of any combination of IPv4 addresses, IPv6 addresses and domain names. When `update-firewall-source.py` constructs `firewalld` rules these addresses are allowed to access the server. If left undefined `addr` defaults to an empty list meaning rules apply to any arbitrary source address.
* `addr`, __*optional*__, defaults to an empty string: A comma-separated list of any combination of IPv4 addresses, IPv6 addresses and domain names. When `update-firewall-source.py` constructs `firewalld` rules these addresses are allowed to access the server. If left undefined `addr` defaults to an empty list meaning rules apply to any and all source address.
Subnets are unsupported, both as subnet masks (`142.251.36.195/255.255.255.248`) and in [CIDR](https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing) notation (`142.251.36.195/29`). Do not single- nor double-quote list entries. Do feel free to separate entries with comma-space instead of just a comma.
* `ports`, __*optional*__, defaults to `80, 443`: A comma-separated list of ports that should be accessible from `addr`. If overwritten to an empty option `addr` may access all ports.
* `ports`, __*optional*__, defaults to `80, 443`: A comma-separated list of ports that should be accessible from `addr`. If empty `addr` may access all ports. See [iptables-extensions man page, section "multiport"](https://ipset.netfilter.org/iptables-extensions.man.html#lbBM) for syntax reference. All port-based rules use `iptables ... --match multiport` even if you're only allowing access to a single port. In essence construct your ports list with any combination of single ports (`80, 443, 8080`) and port ranges (`6660:7000, 61000:65535`).
* `proto`, __*optional*__, defaults to `tcp`: A protocol that should be allowed for `addr` on `ports`. If undefined this defaults to `tcp` being allowed. Since `firewalld` direct rules use `iptables` syntax the list of possible protocol names is largely identical to what the [iptables man page](https://ipset.netfilter.org/iptables.man.html) says about its `--protocol` argument:
```
# Valid example:
ports = 80, 443, 6660:7000, 8080
```
* `proto`, __*optional*__, defaults to `tcp`: A singular protocol that should be allowed for `addr` on `ports`. Can be set to an empty value in which case all protocols are allowed. Since `firewalld` direct rules use `iptables` syntax the list of possible protocol names is largely identical to what the [iptables man page](https://ipset.netfilter.org/iptables.man.html) says about its `--protocol` argument:
> The specified protocol can be one of `tcp`, `udp`, `udplite`, `icmp`, `icmpv6`, `esp`, `ah`, `sctp`, `mh` or the special keyword `all`, or it can be a numeric value, representing one of these protocols or a different one. A protocol name from `/etc/protocols` is also allowed. A `!` argument before the protocol inverts the test. The number zero is equivalent to `all`.
Your mileage may vary depending on which specific OS flavor and version you're running.
**Implementation details:**
* `proto` is treated as a string, not a list. To for example allow access via both TCP and UDP create two `[sections]` like so:
```
[tcp-rule]
addr = 1.1.1.1
ports =
proto = tcp
[udp-rule]
addr = 1.1.1.1
ports =
proto = udp
```
Since `proto = tcp` is default you can leave it out of the top section. Side note, in this specific example you would want to set the `[DEFAULT]` value `ports =` instead of repeating it in each `[section]`. Alternatively set `proto =` to allow all protocols in which case a single `[section]` is enough to cover that use case:
```
[permit-port-53-via-all-protocols]
addr = 1.1.1.1
ports =
proto =
```
Make sure that when `proto` is unset you also unset `ports`. See next bullet point for details on that.
* Unsetting `proto` while at the same time leaving at least one `ports` value in place (which is the default with `ports = 80, 443`) is an error.
It will result in a rule that `firewalld` cannot load into `ip(6)tables`. It will report it as such in its systemd journal visible e.g. via `journalctl -fu firewalld.service`. This is because having at least one port configured will always result in adding a `--match multiport` which is only valid when also giving a `--protocol` such as `--protocol tcp`.
```
[DEFAULT]
ports = 80, 443
proto = tcp
[valid]
addr = example.net
ports = 22, 80, 443
[invalid]
addr = example.com
proto =
# Without 'ports' there will be no '--match multiport'
# and without /that/ you can safely unset 'proto':
[also-valid]
addr = example.org
ports =
proto =
```
* `state`, __*optional*__, defaults to `NEW`: Comma-separated list of connection tracking states against which a packet is matched. Most of the time your rules will want to use the default `NEW`. The final `DROP` rule present in the example `config.ini` file at [examples/config.ini.example](examples/config.ini.example) is one occasion where you'll want to deviate and unset `state` to an empty value. See ["state" extension man page in iptables docs](https://ipset.netfilter.org/iptables-extensions.man.html#lbCC) for reference.
* `do_ipv6`, __*optional*__, defaults to `false`: Decide if you want `firewalld` to generate `ip6tables` rules in addition to `iptables` rules. A default install of Docker Engine will have its IPv6 support disabled in `/etc/docker/daemon.json` in which case `ip6tables` will not have a `DOCKER-USER` or similar Docker-related chains. In this default setup having `update-firewall-source.py` generate an otherwise unused `DOCKER-USER` chain and adding rules to it clutters your rule set. Consider setting this to `true` if and when your Docker install uses IPv6.
If this is `true` IPv6 addresses found or resolved in `addr` in a `[section]` will be discarded.
# Development
## Conventional Commits
We use [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/#summary).
### Scopes
The following **_scopes_** are known for this project. A Conventional Commits commit message may optionally use one of the following scopes or none:
* `config`: Structure or content of a `config.ini` file
* `dbus`: Deals with functionality to restart the `firewalld.service` unit
* `systemd`: Deals with lifecycle as a systemd unit
* `meta`: Affects the project's repo layout, readme content, file names etc.
* `dns`: Resolution of DNS records
* `xml`: XML content handling for `firewalld` direct rules, includes segues into `ip(6)tables` territory
* `netdev`: Network devices
* `debug`: Deals with debuggability, concise messages to end user
### Types
The following **_types_** are known for this project in addition to Conventional Commits default types `fix` and `feat`. A Conventional Commits commit message must use either one of the two default types or optionally a type from this list:
* `build`: Project structure, directory layout, build instructions for roll-out
* `refactor`: Keeping functionality while streamlining or otherwise improving function flow
* `test`: Working on test coverage
* `docs`: Documentation for project or components

View File

@ -3,19 +3,24 @@ target = ACCEPT
addr =
ports = 80, 443
proto = tcp
do_config_check = true
state = NEW
do_ipv6 = false
firewalld_direct_file_abs = /etc/firewalld/direct.xml
restart_firewalld_after_change = true
[anyone-can-access-website]
[these-guys-can-dns]
addr = google.li, 142.251.36.195, lowendbox.com, 2606:4700:20::ac43:4775
ports = 53
proto = tcp, udp
# Unsetting 'proto' while having a 'ports' value results in an invalid section
# [these-guys-can-dns]
# addr = google.li, 142.251.36.195, lowendbox.com, 2606:4700:20::ac43:4775
# ports = 53
# proto =
# do_ipv6 = true
[maybe-a-webserver]
addr = 2606:4700:20::681a:804, lowendtalk.com
ports = 80, 443
do_ipv6 = true
[allow-anyone-to-access-mail-services]
ports = 143, 993, 110, 995, 25, 465, 587
@ -25,3 +30,5 @@ target = DROP
addr =
ports =
proto =
state =
do_ipv6 = true

View File

@ -0,0 +1,12 @@
[Unit]
Description=firewalld direct rules generator
After=multi-user.target
[Service]
Type=oneshot
RemainAfterExit=no
Environment='PATH=/usr/local/sbin:/usr/local/bin:/usr/bin' 'UFS_LOGLEVEL=INFO'
ExecStart=/opt/miniconda3/envs/update-firewall-source/bin/python /opt/python/update-firewall-source/dev/update-firewall-source.py
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,13 @@
[Unit]
Description=Run firewalld direct rules generator
[Timer]
OnCalendar=
OnCalendar=4,5,6:00,15,30,45 Asia/Shanghai
OnCalendar=1,10,14,18,22:00 Asia/Shanghai
OnBootSec=5min
RandomizedDelaySec=2min
Persistent=true
[Install]
WantedBy=timers.target

View File

@ -2,3 +2,6 @@ lxml
rich
validators
dnspython
inflect
cogapp
dbus-python

View File

@ -4,13 +4,19 @@
#
# pip-compile
#
cogapp==3.3.0
# via -r requirements.in
commonmark==0.9.1
# via rich
dbus-python==1.2.18
# via -r requirements.in
decorator==5.1.1
# via validators
dnspython==2.2.1
# via -r requirements.in
lxml==4.9.0
inflect==5.6.0
# via -r requirements.in
lxml==4.9.1
# via -r requirements.in
pygments==2.12.0
# via rich

View File

@ -16,11 +16,29 @@ import validators
# Build XML structure
import lxml.etree
import lxml.builder
# Correctly generate plurals, singular nouns etc.
import inflect
# Restart firewalld systemd service unit
import dbus
# Find physical network interface via 'find' command
import subprocess
# Exit codes
# 1: Config file invalid, it has no sections
# 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY
# 1 : Config file invalid, it has no sections
# 2 : Config file invalid, sections must define at least CONST.CFG_MANDATORY
# 3 : Performing a firewalld rules check failed
# 4 : Performing a firewalld rules encountered a FileNotFoundError
# 5 : Unable to open firewalld direct rules file
# 6 : Source and destination are identical when attempting to back up firewalld direct rules file
# 7 : An option that must have a non-null value is either unset or null
# 8 : Exception while adding a chain XML element to firewalld direct rules
# 9 : Unable to open firewalld direct rules file for updating
# 10: Unable to restart systemd firewalld.service unit
# 11: Unable to add a <rule/> tag to firewalld
# 12: Kernel sysfs export for network devices at "/sys/class/net" doesn't exist
# 13: Linux find command exited non-zero trying to find a physical network device at "/sys/class/net"
# 14: No physical network device found at "/sys/class/net"
class CONST(object):
@ -33,22 +51,29 @@ class CONST(object):
# Values you don't have to set, these are their internal defaults. You may optionally add a key 'is_global' equal
# to either True or False. By default if left off it'll be assumed False. Script will treat values where
# 'is_global' equals True as not being overridable in a '[section]'. It's a setting that only makes sense in a
# global context for the entire script.
# global context for the entire script. An option where 'empty_ok' equals True can safely be unset or set to
# an empty string. An example config.ini file may give a sane config example value here, removing that value
# still results in a valid file.
CFG_KNOWN_DEFAULTS = [
{"key": "target", "value": "ACCEPT", "is_global": False},
{"key": "addr", "value": "", "is_global": False},
{"key": "ports", "value": "80, 443", "is_global": False},
{"key": "proto", "value": "tcp", "is_global": False},
{"key": "do_config_check", "value": "true", "is_global": True},
{"key": "restart_firewalld_after_change", "value": "true", "is_global": True}
{"key": "target", "value": "ACCEPT", "is_global": False, "empty_ok": False},
{"key": "addr", "value": "", "is_global": False, "empty_ok": True},
{"key": "ports", "value": "80, 443", "is_global": False, "empty_ok": True},
{"key": "proto", "value": "tcp", "is_global": False, "empty_ok": True},
{"key": "state", "value": "NEW", "is_global": False, "empty_ok": True},
{"key": "do_ipv6", "value": "false", "is_global": False, "empty_ok": False},
{"key": "firewalld_direct_abs", "value": "/etc/firewalld/direct.xml", "is_global": True, "empty_ok": False},
{"key": "restart_firewalld_after_change", "value": "true", "is_global": True, "empty_ok": False}
]
# In all sections other than 'default' the following settings are known and accepted. We silently ignore other
# settings. We use 'is_mandatory' to determine if we have to raise errors on missing settings.
# In all sections other than 'default' the following settings are known and accepted. We ignore other settings.
# Per CFG_KNOWN_DEFAULTS above most '[DEFAULT]' options are accepted by virtue of being defaults and overridable.
# The only exception are options where "is_global" equals True, they can't be overridden in '[sections]'; any
# attempt at doing it anyway will be ignored. The main purpose of this list is to name settings that do not have
# a default value but can - if set - influence how a '[section]' behaves. Repeating a '[DEFAULT]' here does not
# make sense. We use 'is_mandatory' to determine if we have to raise errors on missing settings. Here
# 'is_mandatory' means the setting must be given in a '[section]'. It may be empty.
CFG_KNOWN_SECTION = [
{"key": "target", "is_mandatory": False},
{"key": "addr", "is_mandatory": False},
{"key": "ports", "is_mandatory": False},
{"key": "proto", "is_mandatory": False}
# {"key": "an_option", "is_mandatory": True},
# {"key": "another_one", "is_mandatory": False}
]
CFG_MANDATORY = [section_cfg["key"] for section_cfg in CFG_KNOWN_SECTION if section_cfg["is_mandatory"]]
@ -59,12 +84,19 @@ logging.basicConfig(
format=CONST.LOG_FORMAT,
datefmt="[%X]",
handlers=[RichHandler(
rich_tracebacks=True
show_time=False if any([systemd_env_var in os.environ for systemd_env_var in [
"SYSTEMD_EXEC_PID",
"INVOCATION_ID"]]) else True,
rich_tracebacks=True,
show_path=False,
show_level=False
)]
)
log = logging.getLogger("rich")
# Our own code logs with this level
log.setLevel(logging.DEBUG)
log.setLevel(os.environ.get("UFS_LOGLEVEL") if "UFS_LOGLEVEL" in [k for k, v in os.environ.items()] else logging.INFO)
p = inflect.engine()
# Use this version of class ConfigParser to log.debug contents of our config file. When parsing sections other than
@ -86,15 +118,12 @@ class ConfigParser(
return super().options(section)
# arg_allow_list = ["77.13.129.237", "2a0b:7080:20::1:f485", "home.seneve.de", "208.87.98.188", "outlook.com",
# "uberspace.de"]
ini_defaults = []
internal_defaults = {default["key"]: default["value"] for default in CONST.CFG_KNOWN_DEFAULTS}
internal_globals = [default["key"] for default in CONST.CFG_KNOWN_DEFAULTS if default["is_global"]]
internal_empty_ok = [default["key"] for default in CONST.CFG_KNOWN_DEFAULTS if default["empty_ok"]]
config = ConfigParser(defaults=internal_defaults,
converters={'list': lambda x: [i.strip() for i in x.split(',')]})
converters={'list': lambda x: [i.strip() for i in x.split(',') if len(x) > 0]})
config.read(CONST.CFG_DEFAULT_ABS_PATH)
@ -153,13 +182,41 @@ def is_same_as_default(
return config_kv_pair in ini_defaults
def we_have_unset_options(
config_obj: configparser.ConfigParser(),
section_name: str) -> list:
options_must_be_non_empty = []
for option in config_obj.options(section_name):
if not config_obj.get(section_name, option):
if option not in internal_empty_ok:
log.warning(f"In section '[{section_name}]' option '{option}' is empty, it mustn't be.")
options_must_be_non_empty.append(option)
return options_must_be_non_empty
def validate_config_sections(
config_obj: configparser.ConfigParser()) -> None:
for this_section in config_obj.sections():
log.debug(print_section_header(this_section))
unset_options = we_have_unset_options(config_obj, this_section)
if unset_options:
log.error(f"""{p.plural("Option", len(unset_options))} {unset_options} """
f"""{p.plural("is", len(unset_options))} unset. """
f"""{p.singular_noun("They", len(unset_options))} """
f"must have a non-null value. "
f"""{p.plural("Default", len(unset_options))} {p.plural("is", len(unset_options))}:""")
for unset_option in unset_options:
log.error(f"{unset_option} = {internal_defaults[unset_option]}")
log.error(f"Exiting 7 ...")
sys.exit(7)
if not set(CONST.CFG_MANDATORY).issubset(config_obj.options(this_section, no_defaults=True)):
log.debug(f"Config section '[{this_section}]' does not have all mandatory options "
f"{CONST.CFG_MANDATORY} set, skipping section ...")
log.warning(f"Config section '[{this_section}]' does not have all mandatory options "
f"{CONST.CFG_MANDATORY} set, skipping section ...")
config_obj.remove_section(this_section)
else:
for key in config_obj.options(this_section, no_defaults=True):
@ -177,35 +234,177 @@ def validate_config_sections(
config_obj.remove_option(this_section, key)
def gen_fw_rule_xml(ip_addresses: dict[str, list]) -> lxml.builder.ElementMaker:
len_ipv4_addresses = len(ip_addresses["ipv4"])
len_ipv6_addresses = len(ip_addresses["ipv6"])
data = lxml.builder.ElementMaker()
def has_child_elem(elem_name: str, attr_value: str) -> bool:
global arg_fw_rule_data
attr_name = "ipv"
direct_tag = data.direct
chain_tag = data.chain
rule_tag = data.rule
fw_rule_data = direct_tag(
chain_tag(ipv="ipv4", table="filter", chain="DOCKER-USER"),
# rule_tag("-s 208.87.98.188 -j DROP", ipv="ipv4", table="filter", chain="DOCKER-USER", priority="0"),
chain_tag(ipv="ipv6", table="filter", chain="DOCKER-USER"),
# rule_tag("-s 2a0b:7080:20::1:f485 -j DROP", ipv="ipv6", table="filter", chain="DOCKER-USER", priority="0")
*(rule_tag(f"-s {addr} -j DROP", ipv=f"ipv4", table=f"filter", chain="DOCKER-USER", priority=f"{count}")
for count, addr in enumerate(ip_addresses["ipv4"])),
*(rule_tag(f"-s {addr} -j DROP", ipv=f"ipv6", table=f"filter", chain="DOCKER-USER", priority=f"{count}")
for count, addr in enumerate(ip_addresses["ipv6"])),
rule_tag(f"-s -j DROP", ipv="ipv4", table="filter", chain="DOCKER-USER", priority=f"{len_ipv4_addresses}"),
rule_tag(f"-s -j DROP", ipv="ipv6", table="filter", chain="DOCKER-USER", priority=f"{len_ipv6_addresses}")
)
for elem in arg_fw_rule_data.findall(elem_name):
if elem.attrib[attr_name] == attr_value:
log.debug(f"""XML has element '<{elem_name} {attr_name}="{attr_value}" .../>'""")
return True
log.debug(f"""No XML element '<{elem_name} {attr_name}="{attr_value}" .../>'""")
return False
# fw_rule_data_str = lxml.etree.tostring(
# fw_rule_data,
# pretty_print=True,
# xml_declaration=True,
# encoding="UTF-8").decode()
# log.debug(f"{fw_rule_data_str}")
return fw_rule_data
def add_chain_elem(elem_name: str, addr_family: str) -> bool:
global arg_fw_rule_data
log.debug(f"Adding new ...")
for chain in ["FILTERS", "DOCKER-USER"]:
try:
lxml.etree.SubElement(arg_fw_rule_data, elem_name,
ipv=f"{addr_family}",
table="filter",
chain=chain)
except lxml.etree.LxmlError as le:
log.error(f"""Failed to add XML '<{elem_name} ipv=f"{addr_family}" .../>'\n"""
f"Verbatim exception was:\n"
f"f{le}\n"
f"Exiting 8 ...")
sys.exit(8)
return True
def rules_count(
arg_ipv: str = "ipv4",
arg_chain: str = "FILTERS") -> int:
arg_rules_count = len([rule for rule in arg_fw_rule_data.findall("rule") if all([
rule.attrib["ipv"] == arg_ipv if arg_ipv else False,
rule.attrib["chain"] == arg_chain if arg_chain else False])])
log.debug(f"""Counted {arg_rules_count} {p.plural("rule", arg_rules_count)} matching """
f"""{"ipv=" + arg_ipv + " " if arg_ipv else ""}"""
f"""{"chain=" + arg_chain + " " if arg_chain else ""}""")
return arg_rules_count
def add_rule_elem(
address_family: str,
prio: int,
target: str,
/, *,
arg_section_name: str = None,
arg_proto: str = None,
arg_state: str = None,
arg_ports: list = None,
arg_address: str = None,
arg_chain: str = "FILTERS",
arg_in_interface: str = None) -> bool:
global arg_fw_rule_data
try:
lxml.etree.SubElement(arg_fw_rule_data, "rule",
ipv=f"{address_family}",
table=f"filter",
chain=arg_chain,
priority=f"""{prio}""").text = \
f"""{"--in-interface " + arg_in_interface + " " if arg_in_interface else ""}""" \
f"""{"--protocol " + arg_proto + " " if arg_proto else ""}""" \
f"""{"--match state --state " + arg_state + " " if arg_state else ""}""" \
f"""{"--match multiport --destination-ports " + ",".join(arg_ports) + " " if arg_ports else ""}""" \
f"""{"--source " + arg_address + " " if arg_address else ""}""" \
f"""--jump {target}""" \
f"""
{" --match comment --comment " + chr(34) + arg_section_name[:256] + chr(34) if arg_section_name else ""}"""
except lxml.etree.LxmlError as le:
log.error(f"""Failed to add XML '<rule ipv=f"{address_family}" .../>'\n"""
f"Verbatim exception was:\n"
f"f{le}\n"
f"Exiting 8 ...")
sys.exit(8)
else:
return True
def get_phy_nics() -> list:
phy_nics = []
linux_sysfs_nics_abs = "/sys/class/net"
find_phy_nics = ["find", linux_sysfs_nics_abs, "-mindepth", "1", "-maxdepth", "1", "-not", "-lname", "*virtual*"]
# find_phy_nics = ["find", linux_sysfs_nics_abs, "-mindepth", "1", "-maxdepth", "1", "-lname", "*virtual*"]
if os.path.isdir(linux_sysfs_nics_abs):
try:
phy_nics_find = subprocess.run(find_phy_nics,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=True,
encoding="UTF-8")
except subprocess.CalledProcessError as cpe:
log.error(f"Failed to find physical network device in {linux_sysfs_nics_abs!r}.\n"
f"Command was:\n"
f"{cpe.cmd}\n"
f"Verbatim command output was:\n"
f"{cpe.output.rstrip()}\n"
f"Exiting 13 ...")
sys.exit(13)
else:
if not phy_nics_find.stdout:
log.error(f"No physical network device found at {linux_sysfs_nics_abs!r}.\n"
f"Command was:\n"
f"{phy_nics_find.args}\n"
f"Exiting 14 ...")
sys.exit(14)
for line in phy_nics_find.stdout.rstrip().split("\n"):
log.debug(f"Found physical network device {(phy_nic := os.path.basename(line))!r}")
phy_nics.append(phy_nic)
else:
log.error(f"Path {linux_sysfs_nics_abs!r} does not exist. This might not be a Linux-y operating system. "
f"Without that location we'll not be able to separate physical network interfaces from virtual ones. "
f"Exiting 12 ...")
sys.exit(12)
log.debug(f"List of identified physical network interfaces: {phy_nics}")
return phy_nics
def add_fw_rule_to_xml(
config_obj: configparser.ConfigParser(),
section_name: str,
target: str,
ports: list,
proto: str) -> bool:
global arg_fw_rule_data
global arg_allow_sources
addr = arg_allow_sources
rules_already_added = {"ipv4": rules_count(arg_ipv="ipv4") + 1, "ipv6": rules_count(arg_ipv="ipv6") + 1}
log.debug(f"Current rules count: {rules_already_added}")
for address_family in ["ipv4", "ipv6"]:
if len(addr[address_family]):
if not has_child_elem("chain", address_family):
add_chain_elem("chain", address_family)
for address in addr[address_family]:
add_rule_elem(
address_family,
rules_already_added[address_family],
target,
arg_section_name=section_name,
arg_proto=proto,
arg_state=config_obj.get(section_name, "state"),
arg_ports=ports,
arg_address=address)
rules_already_added[address_family] += 1
if not len(addr["ipv4"]) and not len(addr["ipv6"]):
if address_family == "ipv4" or (address_family == "ipv6"
and
config_obj.getboolean(section_name, "do_ipv6")):
if not has_child_elem("chain", address_family):
add_chain_elem("chain", address_family)
add_rule_elem(
address_family,
rules_already_added[address_family],
target,
arg_section_name=section_name,
arg_proto=proto,
arg_state=config_obj.get(section_name, "state"),
arg_ports=ports)
rules_already_added[address_family] += 1
return True
def resolve_domain(domain: str) -> list[str]:
@ -224,36 +423,134 @@ def resolve_domain(domain: str) -> list[str]:
dns_records = []
[dns_records.append(dns_record.address) for dns_record in a_records if a_records]
[dns_records.append(dns_record.address) for dns_record in aaaa_records if aaaa_records]
log.debug(f"Found records: {dns_records}")
log.info(f"""For {domain!r} found {p.plural("record", len(dns_records))}: {dns_records}""")
return dns_records
def resolve_addresses(allow_list_mixed: list[str]) -> dict[str, list]:
allow_sources = {"ipv4": [], "ipv6": []}
def resolve_addresses(
config_obj: configparser.ConfigParser(),
section_name: str,
allow_list_mixed: list[str]) -> dict[str, list]:
global arg_allow_sources
allow_list_ip_only = []
log.info(f"""Verifying {p.plural("address", len(allow_list_mixed))} {allow_list_mixed!r} ...""")
for allow_source in allow_list_mixed:
log.debug(f"Checking if '{allow_source}' is a domain ...")
if validators.domain(allow_source):
log.debug(f"'{allow_source}' is a domain.")
[allow_list_ip_only.append(addr) for addr in resolve_domain(allow_source)]
else:
log.debug(f"'{allow_source}' is not a domain.")
allow_list_ip_only.append(allow_source)
for allow_source in allow_list_ip_only:
try:
ipv4_addr = str(ipaddress.IPv4Address(allow_source))
log.debug(f"Adding IPv4 address '{allow_source}' ...")
allow_sources["ipv4"].append(ipv4_addr)
log.info(f"Adding IPv4 address '{allow_source}' ...")
arg_allow_sources["ipv4"].append(ipv4_addr)
except ipaddress.AddressValueError:
log.debug(f"Address '{allow_source}' is not a valid IPv4 address. Trying to match against IPv6 ...")
log.debug(f"Address '{allow_source}' is not a valid IPv4 address.")
if not config_obj.getboolean(section_name, "do_ipv6"):
log.info(f"For section '[{section_name}]' option 'do_ipv6' equals false. "
f"Skipping IPv6 handling of '{allow_source}' ...")
continue
try:
ipv6_addr = str(ipaddress.IPv6Address(allow_source))
log.debug(f"Adding IPv6 address '{allow_source}' ...")
allow_sources["ipv6"].append(ipv6_addr)
except ipaddress.AddressValueError:
log.warning(f"Address '{allow_source}' is not a valid IPv6 address either. Ignoring ...")
log.debug(f"Address '{allow_source}' is not a valid IPv6 address either. Ignoring ...")
else:
log.info(f"Adding IPv6 address '{allow_source}' ...")
arg_allow_sources["ipv6"].append(ipv6_addr)
return allow_sources
return arg_allow_sources
def gen_fwd_direct_scaffolding() -> lxml.builder.ElementMaker:
data = lxml.builder.ElementMaker()
direct_tag = data.direct
fw_rule_data = direct_tag()
return fw_rule_data
def write_new_fwd_direct_xml(
config_obj: configparser.ConfigParser()) -> bool:
global arg_fw_rule_data
fwd_direct_xml_str = lxml.etree.tostring(arg_fw_rule_data,
pretty_print=True,
encoding="UTF-8",
xml_declaration=True).decode()
try:
with open(config_obj.get(configparser.DEFAULTSECT, "firewalld_direct_abs"), "r+") as fwd_file_handle:
log.info(f"Writing new firewalld direct config ...")
log.debug(f"New content:\n"
f"{fwd_direct_xml_str.rstrip()}")
fwd_file_handle.seek(0)
fwd_file_handle.write(fwd_direct_xml_str)
fwd_file_handle.truncate()
except OSError as ose:
log.error(f"Unable to open firewalld direct rules file for updating.\n"
f"Verbatim exception was:\n"
f"f{ose}\n"
f"Exiting 9 ...")
sys.exit(9)
else:
return True
def restart_systemd_firewalld() -> bool:
sysbus = dbus.SystemBus()
systemd1 = sysbus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
manager = dbus.Interface(systemd1, 'org.freedesktop.systemd1.Manager')
firewalld_unit = manager.LoadUnit('firewalld.service')
firewalld_proxy = sysbus.get_object('org.freedesktop.systemd1', str(firewalld_unit))
firewalld_active_state = firewalld_proxy.Get('org.freedesktop.systemd1.Unit',
'ActiveState',
dbus_interface='org.freedesktop.DBus.Properties')
if firewalld_active_state == "inactive":
log.info(f"systemd firewalld.service unit is inactive, ignoring restart instruction, leaving as-is ...")
return False
try:
log.info(f"Restarting systemd firewalld.service unit ...")
manager.TryRestartUnit('firewalld.service', 'fail')
except dbus.exceptions.DBusException as dbe:
log.error(f"Failed to restart systemd firewalld.service unit.\n"
f"Verbatim exception was:\n"
f"{dbe}\n"
f"You're going to want to check firewalld.service health.\n"
f"Exiting 10 ...")
sys.exit(10)
else:
log.info(f"Done")
return True
def add_firewall_shim(arg_phy_nics: list) -> None:
global arg_fw_rule_data
log.debug(f"Adding ip(6)tables jump target to DOCKER-USER chain ...")
for addr_family in ["ipv4", "ipv6"]:
for phy_nic in arg_phy_nics:
if has_child_elem("chain", addr_family):
add_rule_elem(
addr_family,
rules_count(addr_family, arg_chain="INPUT"),
"ACCEPT",
arg_chain="INPUT",
arg_in_interface="lo"
)
for chain in ["INPUT", "DOCKER-USER"]:
add_rule_elem(
addr_family,
rules_count(addr_family, arg_chain=chain),
"FILTERS",
arg_chain=chain,
arg_in_interface=phy_nic if chain == "DOCKER-USER" else None
)
if __name__ == '__main__':
@ -261,14 +558,37 @@ if __name__ == '__main__':
if config_has_valid_section(config):
validate_config_sections(config)
else:
log.debug(f"No valid config section found. A valid config section has at least the mandatory options "
log.error(f"No valid config section found. A valid config section has at least the mandatory options "
f"{CONST.CFG_MANDATORY} set. Exiting 2 ...")
sys.exit(2)
arg_fw_rule_data = gen_fwd_direct_scaffolding()
log.debug(f"Iterating over config sections ...")
for section in config.sections():
log.debug(f"Processing section '[{section}]' ...")
log.debug(config.getlist(section, "addr"))
log.info(f"Generating rules from section '[{section}]' ...")
arg_fwd_addr = config.getlist(section, "addr")
arg_allow_sources = {"ipv4": [], "ipv6": []}
if arg_fwd_addr:
arg_allow_sources = resolve_addresses(config, section, arg_fwd_addr)
log.debug(arg_allow_sources)
else:
log.info(f"No source address given. Rules will apply to all sources.")
# arg_allow_sources = resolve_addresses(arg_allow_list)
# gen_fw_rule_xml(arg_allow_sources)
add_fw_rule_to_xml(config,
section,
target=config.get(section, "target"),
ports=config.getlist(section, "ports"),
proto=config.get(section, "proto"))
for arg_address_family in ["ipv4", "ipv6"]:
if rules_count(arg_address_family):
add_rule_elem(
arg_address_family,
0,
"ACCEPT",
arg_state="ESTABLISHED,RELATED")
add_firewall_shim(get_phy_nics())
write_new_fwd_direct_xml(config)
if config.getboolean(configparser.DEFAULTSECT, "restart_firewalld_after_change"):
restart_systemd_firewalld()