import json import os import logging import subprocess import sys import time import re import shutil from rich.logging import RichHandler from rich.traceback import install import configparser import inflect from inotify_simple import INotify, flags import lxml.etree import lxml.builder import requests from requests.auth import HTTPBasicAuth # TODO Process files that already exist on start # Exit codes # 1: Config file invalid, it has no sections # 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY # 3: Watch directory does not exist and unable to create # 4: Unable to create output directory class CONST(object): __slots__ = () LOG_FORMAT = "%(message)s" CFG_THIS_FILE_DIRNAME = os.path.dirname(__file__) CFG_DEFAULT_FILENAME = "config.ini" CFG_DEFAULT_ABS_PATH = os.path.join(CFG_THIS_FILE_DIRNAME, CFG_DEFAULT_FILENAME) CFG_KNOWN_DEFAULTS = [ {"key": "self_name", "value": "kodi-nfo-feeder"}, {"key": "ignored_target_file_exts", "value": ".jpg, .jpeg, .png, .nfo"}, {"key": "title_regex_search", "value": ""}, {"key": "title_regex_replace", "value": ""}, {"key": "do_seasons", "value": "yes"} ] CFG_KNOWN_SECTION = [ {"key": "watch_dir", "is_mandatory": True}, {"key": "output_dir", "is_mandatory": True}, {"key": "title_regex_search", "is_mandatory": False}, {"key": "title_regex_replace", "is_mandatory": False}, {"key": "do_seasons", "is_mandatory": False}, {"key": "run_cmd", "is_mandatory": False}, {"key": "kodi_jsonrpc_address", "is_mandatory": False}, {"key": "kodi_jsonrpc_username", "is_mandatory": False}, {"key": "kodi_jsonrpc_password", "is_mandatory": False} ] CFG_MANDATORY = [section_cfg["key"] for section_cfg in CFG_KNOWN_SECTION if section_cfg["is_mandatory"]] CONST = CONST() logging.basicConfig( # Default for all modules in NOTSET so log everything level="NOTSET", format=CONST.LOG_FORMAT, datefmt="[%X]", handlers=[RichHandler( show_time=False if "SYSTEMD_EXEC_PID" in os.environ else True, rich_tracebacks=True )] ) log = logging.getLogger("rich") # Our own code logs with this level log.setLevel(logging.DEBUG) logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) install(show_locals=True) class ConfigParser( configparser.ConfigParser): """Can get options() without defaults Taken from https://stackoverflow.com/a/12600066. """ def options(self, section, no_defaults=False, **kwargs): if no_defaults: try: return list(self._sections[section].keys()) except KeyError: raise configparser.NoSectionError(section) else: return super().options(section) p = inflect.engine() ini_defaults = [] internal_defaults = {default["key"]: default["value"] for default in CONST.CFG_KNOWN_DEFAULTS} config = ConfigParser(defaults=internal_defaults) config.read(CONST.CFG_DEFAULT_ABS_PATH) ignored_target_file_exts_str = config.get(config.default_section, "ignored_target_file_exts") ignored_target_file_exts = re.split(r""",\s?|\s""", ignored_target_file_exts_str) def print_section_header( header: str) -> str: return f"Loading config section '[{header}]' ..." def validate_default_section( config_obj: configparser.ConfigParser()) -> None: log.debug(f"Loading config from file '{CONST.CFG_DEFAULT_ABS_PATH}' ...") if not config_obj.sections(): log.error(f"No config sections found in '{CONST.CFG_DEFAULT_ABS_PATH}'. Exiting 1 ...") sys.exit(1) if config.defaults(): log.debug(f"Symbol legend:\n" f"* Global default from section '[{config_obj.default_section}]'\n" f"~ Local option, doesn't exist in '[{config_obj.default_section}]'\n" f"+ Local override of a value from '[{config_obj.default_section}]'\n" f"= Local override, same value as in '[{config_obj.default_section}]'") log.debug(print_section_header(config_obj.default_section)) for default in config_obj.defaults(): ini_defaults.append({default: config_obj[config_obj.default_section][default]}) log.debug(f"* {default} = {config_obj[config_obj.default_section][default]}") else: log.debug(f"No defaults defined") def config_has_valid_section( config_obj: configparser.ConfigParser()) -> bool: has_valid_section = False for config_obj_section in config_obj.sections(): if set(CONST.CFG_MANDATORY).issubset(config_obj.options(config_obj_section)): has_valid_section = True break return has_valid_section def is_default( config_key: str) -> bool: return any(config_key in ini_default for ini_default in ini_defaults) def is_same_as_default( config_kv_pair: dict) -> bool: return config_kv_pair in ini_defaults def validate_config_sections( config_obj: configparser.ConfigParser()) -> None: for this_section in config_obj.sections(): log.debug(print_section_header(this_section)) if not set(CONST.CFG_MANDATORY).issubset(config_obj.options(this_section, no_defaults=True)): log.warning(f"Config section '[{this_section}]' does not have all mandatory options " f"{CONST.CFG_MANDATORY} set, skipping section ...") config_obj.remove_section(this_section) else: for key in config_obj.options(this_section, no_defaults=True): kv_prefix = "~" if is_default(key): kv_prefix = "+" if is_same_as_default({key: config_obj[this_section][key]}): kv_prefix = "=" log.debug(f"{kv_prefix} {key} = {config_obj[this_section][key]}") def setup_watch( csection_name: str, config_obj: configparser.ConfigParser(), inotify_obj: INotify) -> bool: global wds watch_this = config_obj.get(csection, "watch_dir") if not os.path.exists(watch_this): os.makedirs(watch_this, exist_ok=False) watch_flags = flags.MOVED_TO try: log.debug(f"Watching for '[{csection_name}]' files moved to '{watch_this}' ...") wd_obj = inotify_obj.add_watch(watch_this, watch_flags) except FileNotFoundError: log.error(f"Section '[{csection_name}]' watch directory '{watch_this}' does not exist. Please create it. " f"Exiting 3 ...") sys.exit(3) else: log.debug(f"Created watch descriptor ID {wd_obj} for '[{csection_name}]' watch directory '{watch_this}'.") wds[wd_obj] = { "watch_dir": watch_this, "output_dir": config.get(csection, "output_dir"), "section": csection_name } return True def generate_nfo( title: str, raw_file_name: str) -> lxml.builder.ElementMaker: season_ep_str = raw_file_name.split(" - ")[0] data = lxml.builder.ElementMaker() ep_details_tag = data.episodedetails title_tag = data.title id_tag = data.id nfo_data = ep_details_tag( title_tag(title), id_tag(f"{season_ep_str} - {title}") ) return nfo_data def get_basic_cleaned_title( csection_name: str, config_obj: configparser.ConfigParser(), dirty_title: str) -> str: regex_search_pattern = config_obj.get(csection_name, "title_regex_search") regex_replace_pattern = config_obj.get(csection_name, "title_regex_replace") if regex_search_pattern: log.debug(f"Doing basic title cleaning ...") pattern = re.compile(regex_search_pattern) clean_title = re.sub(pattern, regex_replace_pattern, dirty_title) log.debug(f"""Title's now "{clean_title}".""") return clean_title else: return dirty_title def get_season_and_episode( csection_name: str, config_obj: configparser.ConfigParser(), raw_file_name: str) -> dict: file_name_ext_split = os.path.splitext(raw_file_name) season_ep_str = file_name_ext_split[0].split(" - ", 1) ext = file_name_ext_split[1] season_episode = re.split("[S|E]", season_ep_str[0]) season = f"Season {season_episode[1]}" try: title = season_ep_str[1] except IndexError: log.warning(f"File name '{raw_file_name}' is not following expected format. The excepted format " f"is a season-and-episode string followed by space-slash-space and and arbitrary sequence " f"of characters suffixed with a file extension e.g. " f"'S2022E2022032001 - This is a Title.mp4'. Skipping further file processing ...") return {} else: basic_cleaned_title = get_basic_cleaned_title(csection_name, config_obj, title) got_season_and_episode = { "season_str": season, "title_str": basic_cleaned_title, "season_ep_list": season_ep_str, "ext": ext } log.debug(f"""Identified {got_season_and_episode["season_str"]}, """ f"""title "{got_season_and_episode["title_str"]}" """ f"and episode object {season_ep_str} " f"with extension '{ext}'.") return got_season_and_episode def get_target_file_list( target_dir: str) -> list: log.debug(f"Generating list of files in '{target_dir}' ...") onlyfiles = [f for f in os.listdir(target_dir) if os.path.isfile(os.path.join(target_dir, f))] filtered_files = [f for f in onlyfiles if not f.endswith(tuple(ignored_target_file_exts))] log.debug(f"Files in '{target_dir}' filtered for extensions we're ignoring " f"{ignored_target_file_exts}: {filtered_files}") return filtered_files def move_file_to_target_dir( csection_name: str, config_obj: configparser.ConfigParser(), raw_file_name: str, season_ep_str: dict, output_dir_name: str) -> str: this_watch_dir = config_obj.get(csection_name, "watch_dir") source_abs_path = os.path.join(this_watch_dir, raw_file_name) target_dir = output_dir_name target_file_list = get_target_file_list(target_dir) target_file_name = season_ep_str["season_ep_list"][0] target_ext = season_ep_str["ext"] target_file_name_plus_ext = f"{target_file_name}{target_ext}" if target_file_name_plus_ext in target_file_list: log.debug(f"Intended file name already exists in target dir, incrementing counter ...") episode_minus_counter = target_file_name[:-2] counter = target_file_name[-2:] counter_length = len(counter) counter_stripped = int(counter.lstrip("0")) counter_stripped += 1 target_file_name = f"{episode_minus_counter}{str(counter_stripped).zfill(counter_length)}" target_abs_path = os.path.join(target_dir, f"{target_file_name}{target_ext}") try: log.debug(f"Moving '{source_abs_path}' to '{target_abs_path}' ...") shutil.move(source_abs_path, target_abs_path) except OSError as move_ose: log.error(f"Failed moving file with an OSError:\n" f"{move_ose}\n" f"Continuing file watch ...") return "" else: return target_file_name def write_nfo_to_disk( nfo_data: lxml.builder.ElementMaker, target_file_name: str, output_dir_name: str) -> bool: target_dir = output_dir_name target_nfo_name = f"{target_file_name}.nfo" target_abs_path = os.path.join(target_dir, target_nfo_name) nfo_str = lxml.etree.tostring( nfo_data, pretty_print=True, xml_declaration=True, standalone=True, encoding="UTF-8") try: with open(target_abs_path, 'wb') as nfo_file: log.debug(f"Writing NFO data to '{target_abs_path}':\n" f"""{nfo_str.decode("UTF-8")}""") nfo_file.write(nfo_str) except OSError as nfo_ose: log.error(f"Failed writing NFO file '{target_abs_path}' with an OSError:\n" f"{nfo_ose}\n" f"Continuing file watch ...") return False else: return True def kodi_library_update( csection_name: str, config_obj: configparser.ConfigParser()) -> bool: kodi_jsonrpc_address = config_obj.get(csection_name, "kodi_jsonrpc_address") kodi_jsonrpc_username = config_obj.get(csection_name, "kodi_jsonrpc_username") if \ config.has_option(csection_name, "kodi_jsonrpc_username") else None kodi_jsonrpc_password = config_obj.get(csection_name, "kodi_jsonrpc_password") if \ config.has_option(csection_name, "kodi_jsonrpc_password") else None require_auth = False if (kodi_jsonrpc_username and not kodi_jsonrpc_password) or \ (not kodi_jsonrpc_username and kodi_jsonrpc_password): log.warning(f"Please make sure that both a Kodi username /and/ and password are set via " f"the 'kodi_jsonrpc_username' and 'kodi_jsonrpc_password' config options, respectively. " f"If Kodi's web interface is configured to not require authentication please set " f"both 'kodi_jsonrpc_username' and 'kodi_jsonrpc_password' to empty values (or simply remove " f"both lines from the config file).\n" f"Skipping Kodi video library reload ...") return False elif kodi_jsonrpc_username and kodi_jsonrpc_password: require_auth = True json_payload_str = { "jsonrpc": "2.0", "method": "VideoLibrary.Scan", "id": f"""{config_obj.get(csection_name, "self_name")}_trigger-vid-lib-scan""" } json_payload = json.dumps(json_payload_str) req_header = {"content-type": "application/json"} s = requests.Session() req = requests.Request( "POST", kodi_jsonrpc_address, data=json_payload, auth=requests.auth.HTTPBasicAuth( kodi_jsonrpc_username, kodi_jsonrpc_password ) if require_auth else None, headers=req_header) prepped = req.prepare() newline = "\n" log.debug(f"Triggering Kodi library update ...") log.debug(f"Request method: {req.method}\n" f"URL: {req.url}\n" f"""{newline.join(f"Header '{header}': '{value}'" for header, value in list(req.headers.items()))}\n""" f"Payload: {json_payload}") try: with s.send(prepped) as s: got_json_response = s.content if s.status_code == requests.codes.ok: log.debug(f"Kodi library update successful") return True else: log.error(f"Request failed, response code was {s.status_code}:\n" f"{json.loads(got_json_response)}") return False except requests.exceptions.ConnectionError: log.info(f"Kodi JSON-RPC endpoint {kodi_jsonrpc_address} is not currently connectable.") return False if __name__ == '__main__': validate_default_section(config) if config_has_valid_section(config): validate_config_sections(config) else: log.error(f"No valid config section found. A valid config section has at least the mandatory " f"""{p.plural("option", len(CONST.CFG_MANDATORY))} """ f"{CONST.CFG_MANDATORY} set. Exiting 2 ...") sys.exit(2) inotify = INotify() wds = {} log.debug(f"Iterating over config sections ...") for csection in config.sections(): log.debug(f"Processing section '[{csection}]' ...") setup_watch(csection, config, inotify) while True: time.sleep(0.2) for event in inotify.read(): events = [str(flags) for flags in flags.from_mask(event.mask)] if "flags.MOVED_TO" in events: file_name = event.name watch_dir_config = wds[event.wd] watch_dir = watch_dir_config["watch_dir"] output_dir = watch_dir_config["output_dir"] section_name = watch_dir_config["section"] log.debug(f"Watch dir config: {watch_dir_config}") log.info(f"File '{file_name}' was moved to watch directory " f"""'{watch_dir}', processing ...""") season_and_episode = get_season_and_episode(section_name, config, file_name) if not season_and_episode: break if config.getboolean(section_name, "do_seasons"): season_str = season_and_episode["season_str"] log.debug(f"Changing output to season-specific dir '{season_str}' ...") output_dir = os.path.join(output_dir, season_str) try: os.makedirs(output_dir, exist_ok=True) except OSError as ose: log.error(f"Unable to create section '[{section_name}]' output dir " f"'{output_dir}' with an OSError:\n" f"{ose}\n" f"Exiting 4 ...") sys.exit(4) nfo = generate_nfo(season_and_episode["title_str"], file_name) file_moved_to_target_dir = move_file_to_target_dir( section_name, config, file_name, season_and_episode, output_dir) if file_moved_to_target_dir: write_nfo_to_disk( nfo, file_moved_to_target_dir, output_dir) run_cmd = config.get(section_name, "run_cmd") if config.has_option(section_name, "run_cmd") else False if run_cmd: log.debug(f"Executing post-move command: {run_cmd} ...") run_cmd_result = subprocess.getstatusoutput(run_cmd) run_cmd_exit_code = run_cmd_result[0] run_cmd_output = run_cmd_result[1] if run_cmd_exit_code != 0: log.warning(f"Post-move command failed with exit code {run_cmd_exit_code}:\n" f"{run_cmd_output}") if config.has_option(section_name, "kodi_jsonrpc_address"): kodi_library_update(section_name, config)