refactor(dns): Give more concise output when resolving DNS records
This commit is contained in:
parent
719ee22276
commit
caf7ad64d1
@ -426,30 +426,50 @@ def resolve_domain(domain: str) -> list[str]:
|
|||||||
return dns_records
|
return dns_records
|
||||||
|
|
||||||
|
|
||||||
def resolve_addresses(allow_list_mixed: list[str]) -> dict[str, list]:
|
def resolve_addresses(
|
||||||
allow_sources = {"ipv4": [], "ipv6": []}
|
config_obj: configparser.ConfigParser(),
|
||||||
|
section_name: str,
|
||||||
|
allow_list_mixed: list[str]) -> dict[str, list]:
|
||||||
|
global arg_allow_sources
|
||||||
allow_list_ip_only = []
|
allow_list_ip_only = []
|
||||||
|
|
||||||
|
log.info(f"""Verifying {p.plural("address", len(allow_list_mixed))} {allow_list_mixed!r} ...""")
|
||||||
for allow_source in allow_list_mixed:
|
for allow_source in allow_list_mixed:
|
||||||
|
log.debug(f"Checking if '{allow_source}' is a domain ...")
|
||||||
if validators.domain(allow_source):
|
if validators.domain(allow_source):
|
||||||
log.debug(f"'{allow_source}' is a domain.")
|
log.debug(f"'{allow_source}' is a domain.")
|
||||||
[allow_list_ip_only.append(addr) for addr in resolve_domain(allow_source)]
|
[allow_list_ip_only.append(addr) for addr in resolve_domain(allow_source)]
|
||||||
else:
|
else:
|
||||||
|
log.debug(f"'{allow_source}' is not a domain.")
|
||||||
allow_list_ip_only.append(allow_source)
|
allow_list_ip_only.append(allow_source)
|
||||||
|
|
||||||
for allow_source in allow_list_ip_only:
|
for allow_source in allow_list_ip_only:
|
||||||
try:
|
try:
|
||||||
ipv4_addr = str(ipaddress.IPv4Address(allow_source))
|
ipv4_addr = str(ipaddress.IPv4Address(allow_source))
|
||||||
log.debug(f"Adding IPv4 address '{allow_source}' ...")
|
log.info(f"Adding IPv4 address '{allow_source}' ...")
|
||||||
allow_sources["ipv4"].append(ipv4_addr)
|
arg_allow_sources["ipv4"].append(ipv4_addr)
|
||||||
except ipaddress.AddressValueError:
|
except ipaddress.AddressValueError:
|
||||||
log.debug(f"Address '{allow_source}' is not a valid IPv4 address. Trying to match against IPv6 ...")
|
log.debug(f"Address '{allow_source}' is not a valid IPv4 address.")
|
||||||
|
if not config_obj.getboolean(section_name, "do_ipv6"):
|
||||||
|
log.info(f"For section '[{section_name}]' option 'do_ipv6' equals false. "
|
||||||
|
f"Skipping IPv6 handling of '{allow_source}' ...")
|
||||||
|
continue
|
||||||
try:
|
try:
|
||||||
ipv6_addr = str(ipaddress.IPv6Address(allow_source))
|
ipv6_addr = str(ipaddress.IPv6Address(allow_source))
|
||||||
log.debug(f"Adding IPv6 address '{allow_source}' ...")
|
|
||||||
allow_sources["ipv6"].append(ipv6_addr)
|
|
||||||
except ipaddress.AddressValueError:
|
except ipaddress.AddressValueError:
|
||||||
log.warning(f"Address '{allow_source}' is not a valid IPv6 address either. Ignoring ...")
|
log.debug(f"Address '{allow_source}' is not a valid IPv6 address either. Ignoring ...")
|
||||||
|
else:
|
||||||
|
log.info(f"Adding IPv6 address '{allow_source}' ...")
|
||||||
|
arg_allow_sources["ipv6"].append(ipv6_addr)
|
||||||
|
|
||||||
|
return arg_allow_sources
|
||||||
|
|
||||||
|
|
||||||
|
def gen_fwd_direct_scaffolding() -> lxml.builder.ElementMaker:
|
||||||
|
data = lxml.builder.ElementMaker()
|
||||||
|
direct_tag = data.direct
|
||||||
|
fw_rule_data = direct_tag()
|
||||||
|
return fw_rule_data
|
||||||
|
|
||||||
|
|
||||||
def write_new_fwd_direct_xml(
|
def write_new_fwd_direct_xml(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user