import os import logging import sys import time from rich.logging import RichHandler from rich.traceback import install import configparser import inflect from inotify_simple import INotify, flags # Exit codes # 1: Config file invalid, it has no sections # 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY class CONST(object): __slots__ = () LOG_FORMAT = "%(message)s" CFG_THIS_FILE_DIRNAME = os.path.dirname(__file__) CFG_DEFAULT_FILENAME = "config.ini" CFG_DEFAULT_ABS_PATH = os.path.join(CFG_THIS_FILE_DIRNAME, CFG_DEFAULT_FILENAME) CFG_KNOWN_DEFAULTS = [ {"key": "self_name", "value": "kodi-nfo-feeder"}, {"key": "watch_dir", "value": os.path.join(CFG_THIS_FILE_DIRNAME, "data/var/lib/%(self_name)s/watch")} ] CFG_KNOWN_SECTION = [ {"key": "watch_dir", "is_mandatory": True}, {"key": "output_dir", "is_mandatory": True} ] CFG_MANDATORY = [section_cfg["key"] for section_cfg in CFG_KNOWN_SECTION if section_cfg["is_mandatory"]] CONST = CONST() logging.basicConfig( # Default for all modules in NOTSET so log everything level="NOTSET", format=CONST.LOG_FORMAT, datefmt="[%X]", handlers=[RichHandler( show_time=False if "SYSTEMD_EXEC_PID" in os.environ else True, rich_tracebacks=True )] ) log = logging.getLogger("rich") # Our own code logs with this level log.setLevel(logging.DEBUG) install(show_locals=True) class ConfigParser( configparser.ConfigParser): """Can get options() without defaults Taken from https://stackoverflow.com/a/12600066. """ def options(self, section, no_defaults=False, **kwargs): if no_defaults: try: return list(self._sections[section].keys()) except KeyError: raise configparser.NoSectionError(section) else: return super().options(section) p = inflect.engine() ini_defaults = [] internal_defaults = {default["key"]: default["value"] for default in CONST.CFG_KNOWN_DEFAULTS} config = ConfigParser(defaults=internal_defaults) config.read(CONST.CFG_DEFAULT_FILENAME) def print_section_header( header: str) -> str: return f"Loading config section '[{header}]' ..." def validate_default_section( config_obj: configparser.ConfigParser()) -> None: log.debug(f"Loading config from file '{CONST.CFG_DEFAULT_ABS_PATH}' ...") if not config_obj.sections(): log.error(f"No config sections found in '{CONST.CFG_DEFAULT_ABS_PATH}'. Exiting 1 ...") sys.exit(1) if config.defaults(): log.debug(f"Symbol legend:\n" f"* Global default from section '[{config_obj.default_section}]'\n" f"~ Local option, doesn't exist in '[{config_obj.default_section}]'\n" f"+ Local override of a value from '[{config_obj.default_section}]'\n" f"= Local override, same value as in '[{config_obj.default_section}]'") log.debug(print_section_header(config_obj.default_section)) for default in config_obj.defaults(): ini_defaults.append({default: config_obj[config_obj.default_section][default]}) log.debug(f"* {default} = {config_obj[config_obj.default_section][default]}") else: log.debug(f"No defaults defined") def config_has_valid_section( config_obj: configparser.ConfigParser()) -> bool: has_valid_section = False for config_obj_section in config_obj.sections(): if set(CONST.CFG_MANDATORY).issubset(config_obj.options(config_obj_section)): has_valid_section = True break return has_valid_section def is_default( config_key: str) -> bool: return any(config_key in ini_default for ini_default in ini_defaults) def is_same_as_default( config_kv_pair: dict) -> bool: return config_kv_pair in ini_defaults def validate_config_sections( config_obj: configparser.ConfigParser()) -> None: for this_section in config_obj.sections(): log.debug(print_section_header(this_section)) if not set(CONST.CFG_MANDATORY).issubset(config_obj.options(this_section, no_defaults=True)): log.warning(f"Config section '[{this_section}]' does not have all mandatory options " f"{CONST.CFG_MANDATORY} set, skipping section ...") config_obj.remove_section(this_section) else: for key in config_obj.options(this_section, no_defaults=True): kv_prefix = "~" if is_default(key): kv_prefix = "+" if is_same_as_default({key: config_obj[this_section][key]}): kv_prefix = "=" log.debug(f"{kv_prefix} {key} = {config_obj[this_section][key]}") def setup_watch( watch_this: str) -> INotify: if not os.path.exists(watch_this): os.makedirs(watch_this, exist_ok=False) inotify = INotify() watch_flags = flags.MOVED_TO try: inotify.add_watch(watch_this, watch_flags) log.debug(f"Watching for files moved to '{watch_this}' ...") except FileNotFoundError: log.error(f"Watch directory '{watch_this}' does not exist. Please create it. Exiting 3 ...") sys.exit(4) else: return inotify if __name__ == '__main__': validate_default_section(config) if config_has_valid_section(config): validate_config_sections(config) else: log.error(f"No valid config section found. A valid config section has at least the mandatory " f"""{p.plural("option", len(CONST.CFG_MANDATORY))} """ f"{CONST.CFG_MANDATORY} set. Exiting 1 ...") sys.exit(1) log.debug(f"Iterating over config sections ...") for section in config.sections(): log.debug(f"Processing section '[{section}]' ...") watch_dir = config.get(section, "watch_dir") inotify_watch = setup_watch(watch_dir) while True: time.sleep(0.2) for event in inotify_watch.read(): events = [str(flags) for flags in flags.from_mask(event.mask)] if "flags.MOVED_TO" in events: file_name = event.name log.info(f"File '{file_name}' was moved to watch directory '{watch_dir}', processing ...") # is_xauth = xauth_pattern.findall(event.name) #if is_xauth and "flags.CREATE" in events: # log.info(f"New .Xauthority file '{event.name}' detected") # react_to_cec = True # xauth_abs_path = xauth_dir + "/" + event.name # set_xauth_env(xauth_abs_path) #elif is_xauth and "flags.DELETE" in events: # log.info(f".Xauthority file '{event.name}' was deleted") # react_to_cec = False