kodi-nfo-feeder/kodi-nfo-feeder.py

339 lines
12 KiB
Python

import os
import logging
import sys
import time
import re
import shutil
from rich.logging import RichHandler
from rich.traceback import install
import configparser
import inflect
from inotify_simple import INotify, flags
import lxml.etree
import lxml.builder
# TODO Create season subdir if it doesn't exist
# TODO Thread config sections
# Exit codes
# 1: Config file invalid, it has no sections
# 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY
# 3: Watch directory does not exist and unable to create
# 4: Unable to create output directory
class CONST(object):
__slots__ = ()
LOG_FORMAT = "%(message)s"
CFG_THIS_FILE_DIRNAME = os.path.dirname(__file__)
CFG_DEFAULT_FILENAME = "config.ini"
CFG_DEFAULT_ABS_PATH = os.path.join(CFG_THIS_FILE_DIRNAME, CFG_DEFAULT_FILENAME)
CFG_KNOWN_DEFAULTS = [
{"key": "self_name", "value": "kodi-nfo-feeder"},
{"key": "ignored_target_file_exts", "value": ".jpg, .jpeg, .png, .nfo"},
{"key": "title_regex_search", "value": ""},
{"key": "title_regex_replace", "value": ""}
]
CFG_KNOWN_SECTION = [
{"key": "watch_dir", "is_mandatory": True},
{"key": "output_dir", "is_mandatory": True},
{"key": "title_regex_search", "is_mandatory": False},
{"key": "title_regex_replace", "is_mandatory": False}
]
CFG_MANDATORY = [section_cfg["key"] for section_cfg in CFG_KNOWN_SECTION if section_cfg["is_mandatory"]]
CONST = CONST()
logging.basicConfig(
# Default for all modules in NOTSET so log everything
level="NOTSET",
format=CONST.LOG_FORMAT,
datefmt="[%X]",
handlers=[RichHandler(
show_time=False if "SYSTEMD_EXEC_PID" in os.environ else True,
rich_tracebacks=True
)]
)
log = logging.getLogger("rich")
# Our own code logs with this level
log.setLevel(logging.DEBUG)
install(show_locals=True)
class ConfigParser(
configparser.ConfigParser):
"""Can get options() without defaults
Taken from https://stackoverflow.com/a/12600066.
"""
def options(self, section, no_defaults=False, **kwargs):
if no_defaults:
try:
return list(self._sections[section].keys())
except KeyError:
raise configparser.NoSectionError(section)
else:
return super().options(section)
p = inflect.engine()
ini_defaults = []
internal_defaults = {default["key"]: default["value"] for default in CONST.CFG_KNOWN_DEFAULTS}
config = ConfigParser(defaults=internal_defaults)
config.read(CONST.CFG_DEFAULT_FILENAME)
ignored_target_file_exts_str = config.get(config.default_section, "ignored_target_file_exts")
ignored_target_file_exts = re.split(r""",\s?|\s""", ignored_target_file_exts_str)
def print_section_header(
header: str) -> str:
return f"Loading config section '[{header}]' ..."
def validate_default_section(
config_obj: configparser.ConfigParser()) -> None:
log.debug(f"Loading config from file '{CONST.CFG_DEFAULT_ABS_PATH}' ...")
if not config_obj.sections():
log.error(f"No config sections found in '{CONST.CFG_DEFAULT_ABS_PATH}'. Exiting 1 ...")
sys.exit(1)
if config.defaults():
log.debug(f"Symbol legend:\n"
f"* Global default from section '[{config_obj.default_section}]'\n"
f"~ Local option, doesn't exist in '[{config_obj.default_section}]'\n"
f"+ Local override of a value from '[{config_obj.default_section}]'\n"
f"= Local override, same value as in '[{config_obj.default_section}]'")
log.debug(print_section_header(config_obj.default_section))
for default in config_obj.defaults():
ini_defaults.append({default: config_obj[config_obj.default_section][default]})
log.debug(f"* {default} = {config_obj[config_obj.default_section][default]}")
else:
log.debug(f"No defaults defined")
def config_has_valid_section(
config_obj: configparser.ConfigParser()) -> bool:
has_valid_section = False
for config_obj_section in config_obj.sections():
if set(CONST.CFG_MANDATORY).issubset(config_obj.options(config_obj_section)):
has_valid_section = True
break
return has_valid_section
def is_default(
config_key: str) -> bool:
return any(config_key in ini_default for ini_default in ini_defaults)
def is_same_as_default(
config_kv_pair: dict) -> bool:
return config_kv_pair in ini_defaults
def validate_config_sections(
config_obj: configparser.ConfigParser()) -> None:
for this_section in config_obj.sections():
log.debug(print_section_header(this_section))
if not set(CONST.CFG_MANDATORY).issubset(config_obj.options(this_section, no_defaults=True)):
log.warning(f"Config section '[{this_section}]' does not have all mandatory options "
f"{CONST.CFG_MANDATORY} set, skipping section ...")
config_obj.remove_section(this_section)
else:
for key in config_obj.options(this_section, no_defaults=True):
kv_prefix = "~"
if is_default(key):
kv_prefix = "+"
if is_same_as_default({key: config_obj[this_section][key]}):
kv_prefix = "="
log.debug(f"{kv_prefix} {key} = {config_obj[this_section][key]}")
def setup_watch(
watch_this: str) -> INotify:
if not os.path.exists(watch_this):
os.makedirs(watch_this, exist_ok=False)
inotify = INotify()
watch_flags = flags.MOVED_TO
try:
inotify.add_watch(watch_this, watch_flags)
log.debug(f"Watching for files moved to '{watch_this}' ...")
except FileNotFoundError:
log.error(f"Watch directory '{watch_this}' does not exist. Please create it. Exiting 3 ...")
sys.exit(3)
else:
return inotify
def generate_nfo(
title: str,
raw_file_name: str) -> lxml.builder.ElementMaker:
season_ep_str = raw_file_name.split(" - ")[0]
data = lxml.builder.ElementMaker()
ep_details_tag = data.episodedetails
title_tag = data.title
id_tag = data.id
nfo_data = ep_details_tag(
title_tag(title),
id_tag(f"{season_ep_str} - {title}")
)
return nfo_data
def get_basic_cleaned_title(
section_name: str,
config_obj: configparser.ConfigParser(),
dirty_title: str) -> str:
regex_search_pattern = config_obj.get(section_name, "title_regex_search")
regex_replace_pattern = config_obj.get(section_name, "title_regex_replace")
if regex_search_pattern:
log.debug(regex_search_pattern)
log.debug(f"Doing basic title cleaning ...")
pattern = re.compile(regex_search_pattern)
clean_title = re.sub(pattern, regex_replace_pattern, dirty_title)
log.debug(f"""Title's now "{clean_title}".""")
quit()
return clean_title
else:
return dirty_title
def get_season_and_episode(
section_name: str,
config_obj: configparser.ConfigParser(),
raw_file_name: str) -> dict:
file_name_ext_split = os.path.splitext(raw_file_name)
season_ep_str = file_name_ext_split[0].split(" - ", 1)
ext = file_name_ext_split[1]
season_episode = re.split("[S|E]", season_ep_str[0])
season = f"Season {season_episode[1]}"
title = season_ep_str[1]
basic_cleaned_title = get_basic_cleaned_title(section_name, config_obj, title)
got_season_and_episode = {
"season_str": season,
"title_str": basic_cleaned_title,
"season_ep_list": season_ep_str,
"ext": ext
}
log.debug(f"""Identified {got_season_and_episode["season_str"]}, """
f"""title "{got_season_and_episode["title_str"]}" """
f"and episode object {season_ep_str} "
f"with extension '{ext}'.")
return got_season_and_episode
def get_target_file_list(
target_dir: str) -> list:
log.debug(f"Generating list of files in '{target_dir}' ...")
onlyfiles = [f for f in os.listdir(target_dir) if os.path.isfile(os.path.join(target_dir, f))]
filtered_files = [f for f in onlyfiles if not f.endswith(tuple(ignored_target_file_exts))]
log.debug(f"Files in '{target_dir}' filtered for extensions we're ignoring "
f"{ignored_target_file_exts}: {filtered_files}")
return filtered_files
def move_file_to_target_dir(
section_name: str,
config_obj: configparser.ConfigParser(),
raw_file_name: str,
season_ep_str: dict) -> str:
this_watch_dir = config_obj.get(section_name, "watch_dir")
source_abs_path = os.path.join(this_watch_dir, raw_file_name)
target_dir = config_obj.get(section_name, "output_dir")
target_file_list = get_target_file_list(target_dir)
target_file_name = season_ep_str["season_ep_list"][0]
target_ext = season_ep_str["ext"]
target_file_name_plus_ext = f"{target_file_name}{target_ext}"
if target_file_name_plus_ext in target_file_list:
log.debug(f"File name already exists in target dir, incrementing counter ...")
episode_minus_counter = target_file_name[:-2]
counter = target_file_name[-2:]
counter_length = len(counter)
counter_stripped = int(counter.lstrip("0"))
counter_stripped += 1
target_file_name = f"{episode_minus_counter}{str(counter_stripped).zfill(counter_length)}"
target_abs_path = os.path.join(target_dir, f"{target_file_name}{target_ext}")
try:
log.debug(f"Moving '{source_abs_path}' to '{target_abs_path}' ...")
# shutil.move(source_abs_path, target_abs_path)
except OSError as ose:
log.error(f"Failed moving file with an OSError:\n"
f"{ose}\n"
f"Continuing file watch ...")
return ""
else:
return target_file_name
def write_nfo_to_disk(
nfo_data: lxml.builder.ElementMaker,
target_file_name: str,
output_dir_name: str) -> bool:
target_dir = output_dir_name
target_nfo_name = f"{target_file_name}.nfo"
target_abs_path = os.path.join(target_dir, target_nfo_name)
nfo_str = lxml.etree.tostring(
nfo_data,
pretty_print=True,
xml_declaration=True,
standalone=True,
encoding="UTF-8")
try:
with open(target_abs_path, 'wb') as nfo_file:
log.debug(f"Writing NFO data to '{target_abs_path}':\n"
f"""{nfo_str.decode("UTF-8")}""")
nfo_file.write(nfo_str)
except OSError as ose:
log.error(f"Failed writing NFO file '{target_abs_path}' with an OSError:\n"
f"{ose}\n"
f"Continuing file watch ...")
return False
else:
return True
if __name__ == '__main__':
validate_default_section(config)
if config_has_valid_section(config):
validate_config_sections(config)
else:
log.error(f"No valid config section found. A valid config section has at least the mandatory "
f"""{p.plural("option", len(CONST.CFG_MANDATORY))} """
f"{CONST.CFG_MANDATORY} set. Exiting 2 ...")
sys.exit(2)
log.debug(f"Iterating over config sections ...")
for section in config.sections():
log.debug(f"Processing section '[{section}]' ...")
watch_dir = config.get(section, "watch_dir")
inotify_watch = setup_watch(watch_dir)
while True:
time.sleep(0.2)
for event in inotify_watch.read():
events = [str(flags) for flags in flags.from_mask(event.mask)]
if "flags.MOVED_TO" in events:
file_name = event.name
log.info(f"File '{file_name}' was moved to watch directory '{watch_dir}', processing ...")
# TODO https://docs.python.org/3/library/xml.etree.elementtree.html
# TODO generate xml
# TODO: Basic string manipulation via regex in options file