kodi-nfo-feeder/kodi-nfo-feeder.py

463 lines
18 KiB
Python

import json
import os
import logging
import subprocess
import sys
import time
import re
import shutil
from rich.logging import RichHandler
from rich.traceback import install
import configparser
import inflect
from inotify_simple import INotify, flags
import lxml.etree
import lxml.builder
import requests
from requests.auth import HTTPBasicAuth
# Exit codes
# 1: Config file invalid, it has no sections
# 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY
# 3: Watch directory does not exist and unable to create
# 4: Unable to create output directory
class CONST(object):
__slots__ = ()
LOG_FORMAT = "%(message)s"
CFG_THIS_FILE_DIRNAME = os.path.dirname(__file__)
CFG_DEFAULT_FILENAME = "config.ini"
CFG_DEFAULT_ABS_PATH = os.path.join(CFG_THIS_FILE_DIRNAME, CFG_DEFAULT_FILENAME)
CFG_KNOWN_DEFAULTS = [
{"key": "self_name", "value": "kodi-nfo-feeder"},
{"key": "ignored_target_file_exts", "value": ".jpg, .jpeg, .png, .nfo"},
{"key": "title_regex_search", "value": ""},
{"key": "title_regex_replace", "value": ""},
{"key": "do_seasons", "value": "yes"}
]
CFG_KNOWN_SECTION = [
{"key": "watch_dir", "is_mandatory": True},
{"key": "output_dir", "is_mandatory": True},
{"key": "title_regex_search", "is_mandatory": False},
{"key": "title_regex_replace", "is_mandatory": False},
{"key": "do_seasons", "is_mandatory": False},
{"key": "run_cmd", "is_mandatory": False},
{"key": "kodi_jsonrpc_address", "is_mandatory": False},
{"key": "kodi_jsonrpc_username", "is_mandatory": False},
{"key": "kodi_jsonrpc_password", "is_mandatory": False}
]
CFG_MANDATORY = [section_cfg["key"] for section_cfg in CFG_KNOWN_SECTION if section_cfg["is_mandatory"]]
CONST = CONST()
logging.basicConfig(
# Default for all modules in NOTSET so log everything
level="NOTSET",
format=CONST.LOG_FORMAT,
datefmt="[%X]",
handlers=[RichHandler(
show_time=False if "SYSTEMD_EXEC_PID" in os.environ else True,
rich_tracebacks=True
)]
)
log = logging.getLogger("rich")
# Our own code logs with this level
log.setLevel(logging.DEBUG)
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
install(show_locals=True)
class ConfigParser(
configparser.ConfigParser):
"""Can get options() without defaults
Taken from https://stackoverflow.com/a/12600066.
"""
def options(self, section, no_defaults=False, **kwargs):
if no_defaults:
try:
return list(self._sections[section].keys())
except KeyError:
raise configparser.NoSectionError(section)
else:
return super().options(section)
p = inflect.engine()
ini_defaults = []
internal_defaults = {default["key"]: default["value"] for default in CONST.CFG_KNOWN_DEFAULTS}
config = ConfigParser(defaults=internal_defaults)
config.read(CONST.CFG_DEFAULT_FILENAME)
ignored_target_file_exts_str = config.get(config.default_section, "ignored_target_file_exts")
ignored_target_file_exts = re.split(r""",\s?|\s""", ignored_target_file_exts_str)
def print_section_header(
header: str) -> str:
return f"Loading config section '[{header}]' ..."
def validate_default_section(
config_obj: configparser.ConfigParser()) -> None:
log.debug(f"Loading config from file '{CONST.CFG_DEFAULT_ABS_PATH}' ...")
if not config_obj.sections():
log.error(f"No config sections found in '{CONST.CFG_DEFAULT_ABS_PATH}'. Exiting 1 ...")
sys.exit(1)
if config.defaults():
log.debug(f"Symbol legend:\n"
f"* Global default from section '[{config_obj.default_section}]'\n"
f"~ Local option, doesn't exist in '[{config_obj.default_section}]'\n"
f"+ Local override of a value from '[{config_obj.default_section}]'\n"
f"= Local override, same value as in '[{config_obj.default_section}]'")
log.debug(print_section_header(config_obj.default_section))
for default in config_obj.defaults():
ini_defaults.append({default: config_obj[config_obj.default_section][default]})
log.debug(f"* {default} = {config_obj[config_obj.default_section][default]}")
else:
log.debug(f"No defaults defined")
def config_has_valid_section(
config_obj: configparser.ConfigParser()) -> bool:
has_valid_section = False
for config_obj_section in config_obj.sections():
if set(CONST.CFG_MANDATORY).issubset(config_obj.options(config_obj_section)):
has_valid_section = True
break
return has_valid_section
def is_default(
config_key: str) -> bool:
return any(config_key in ini_default for ini_default in ini_defaults)
def is_same_as_default(
config_kv_pair: dict) -> bool:
return config_kv_pair in ini_defaults
def validate_config_sections(
config_obj: configparser.ConfigParser()) -> None:
for this_section in config_obj.sections():
log.debug(print_section_header(this_section))
if not set(CONST.CFG_MANDATORY).issubset(config_obj.options(this_section, no_defaults=True)):
log.warning(f"Config section '[{this_section}]' does not have all mandatory options "
f"{CONST.CFG_MANDATORY} set, skipping section ...")
config_obj.remove_section(this_section)
else:
for key in config_obj.options(this_section, no_defaults=True):
kv_prefix = "~"
if is_default(key):
kv_prefix = "+"
if is_same_as_default({key: config_obj[this_section][key]}):
kv_prefix = "="
log.debug(f"{kv_prefix} {key} = {config_obj[this_section][key]}")
def setup_watch(
csection_name: str,
config_obj: configparser.ConfigParser(),
inotify_obj: INotify) -> bool:
global wds
watch_this = config_obj.get(csection, "watch_dir")
if not os.path.exists(watch_this):
os.makedirs(watch_this, exist_ok=False)
watch_flags = flags.MOVED_TO
try:
log.debug(f"Watching for '[{csection_name}]' files moved to '{watch_this}' ...")
wd_obj = inotify_obj.add_watch(watch_this, watch_flags)
except FileNotFoundError:
log.error(f"Section '[{csection_name}]' watch directory '{watch_this}' does not exist. Please create it. "
f"Exiting 3 ...")
sys.exit(3)
else:
log.debug(f"Created watch descriptor ID {wd_obj} for '[{csection_name}]' watch directory '{watch_this}'.")
wds[wd_obj] = {
"watch_dir": watch_this,
"output_dir": config.get(csection, "output_dir"),
"section": csection_name
}
return True
def generate_nfo(
title: str,
raw_file_name: str) -> lxml.builder.ElementMaker:
season_ep_str = raw_file_name.split(" - ")[0]
data = lxml.builder.ElementMaker()
ep_details_tag = data.episodedetails
title_tag = data.title
id_tag = data.id
nfo_data = ep_details_tag(
title_tag(title),
id_tag(f"{season_ep_str} - {title}")
)
return nfo_data
def get_basic_cleaned_title(
csection_name: str,
config_obj: configparser.ConfigParser(),
dirty_title: str) -> str:
regex_search_pattern = config_obj.get(csection_name, "title_regex_search")
regex_replace_pattern = config_obj.get(csection_name, "title_regex_replace")
if regex_search_pattern:
log.debug(f"Doing basic title cleaning ...")
pattern = re.compile(regex_search_pattern)
clean_title = re.sub(pattern, regex_replace_pattern, dirty_title)
log.debug(f"""Title's now "{clean_title}".""")
return clean_title
else:
return dirty_title
def get_season_and_episode(
csection_name: str,
config_obj: configparser.ConfigParser(),
raw_file_name: str) -> dict:
file_name_ext_split = os.path.splitext(raw_file_name)
season_ep_str = file_name_ext_split[0].split(" - ", 1)
ext = file_name_ext_split[1]
season_episode = re.split("[S|E]", season_ep_str[0])
season = f"Season {season_episode[1]}"
title = season_ep_str[1]
basic_cleaned_title = get_basic_cleaned_title(csection_name, config_obj, title)
got_season_and_episode = {
"season_str": season,
"title_str": basic_cleaned_title,
"season_ep_list": season_ep_str,
"ext": ext
}
log.debug(f"""Identified {got_season_and_episode["season_str"]}, """
f"""title "{got_season_and_episode["title_str"]}" """
f"and episode object {season_ep_str} "
f"with extension '{ext}'.")
return got_season_and_episode
def get_target_file_list(
target_dir: str) -> list:
log.debug(f"Generating list of files in '{target_dir}' ...")
onlyfiles = [f for f in os.listdir(target_dir) if os.path.isfile(os.path.join(target_dir, f))]
filtered_files = [f for f in onlyfiles if not f.endswith(tuple(ignored_target_file_exts))]
log.debug(f"Files in '{target_dir}' filtered for extensions we're ignoring "
f"{ignored_target_file_exts}: {filtered_files}")
return filtered_files
def move_file_to_target_dir(
csection_name: str,
config_obj: configparser.ConfigParser(),
raw_file_name: str,
season_ep_str: dict,
output_dir_name: str) -> str:
this_watch_dir = config_obj.get(csection_name, "watch_dir")
source_abs_path = os.path.join(this_watch_dir, raw_file_name)
target_dir = output_dir_name
target_file_list = get_target_file_list(target_dir)
target_file_name = season_ep_str["season_ep_list"][0]
target_ext = season_ep_str["ext"]
target_file_name_plus_ext = f"{target_file_name}{target_ext}"
if target_file_name_plus_ext in target_file_list:
log.debug(f"Intended file name already exists in target dir, incrementing counter ...")
episode_minus_counter = target_file_name[:-2]
counter = target_file_name[-2:]
counter_length = len(counter)
counter_stripped = int(counter.lstrip("0"))
counter_stripped += 1
target_file_name = f"{episode_minus_counter}{str(counter_stripped).zfill(counter_length)}"
target_abs_path = os.path.join(target_dir, f"{target_file_name}{target_ext}")
try:
log.debug(f"Moving '{source_abs_path}' to '{target_abs_path}' ...")
shutil.move(source_abs_path, target_abs_path)
except OSError as move_ose:
log.error(f"Failed moving file with an OSError:\n"
f"{move_ose}\n"
f"Continuing file watch ...")
return ""
else:
return target_file_name
def write_nfo_to_disk(
nfo_data: lxml.builder.ElementMaker,
target_file_name: str,
output_dir_name: str) -> bool:
target_dir = output_dir_name
target_nfo_name = f"{target_file_name}.nfo"
target_abs_path = os.path.join(target_dir, target_nfo_name)
nfo_str = lxml.etree.tostring(
nfo_data,
pretty_print=True,
xml_declaration=True,
standalone=True,
encoding="UTF-8")
try:
with open(target_abs_path, 'wb') as nfo_file:
log.debug(f"Writing NFO data to '{target_abs_path}':\n"
f"""{nfo_str.decode("UTF-8")}""")
nfo_file.write(nfo_str)
except OSError as nfo_ose:
log.error(f"Failed writing NFO file '{target_abs_path}' with an OSError:\n"
f"{nfo_ose}\n"
f"Continuing file watch ...")
return False
else:
return True
def kodi_library_update(
csection_name: str,
config_obj: configparser.ConfigParser()) -> bool:
kodi_jsonrpc_address = config_obj.get(csection_name, "kodi_jsonrpc_address")
kodi_jsonrpc_username = config_obj.get(csection_name, "kodi_jsonrpc_username") if \
config.has_option(csection_name, "kodi_jsonrpc_username") else None
kodi_jsonrpc_password = config_obj.get(csection_name, "kodi_jsonrpc_password") if \
config.has_option(csection_name, "kodi_jsonrpc_password") else None
require_auth = False
if (kodi_jsonrpc_username and not kodi_jsonrpc_password) or \
(not kodi_jsonrpc_username and kodi_jsonrpc_password):
log.warning(f"Please make sure that both a Kodi username /and/ and password are set via "
f"the 'kodi_jsonrpc_username' and 'kodi_jsonrpc_password' config options, respectively. "
f"If Kodi's web interface is configured to not require authentication please set "
f"both 'kodi_jsonrpc_username' and 'kodi_jsonrpc_password' to empty values (or simply remove "
f"both lines from the config file).\n"
f"Skipping Kodi video library reload ...")
return False
elif kodi_jsonrpc_username and kodi_jsonrpc_password:
require_auth = True
json_payload_str = {
"jsonrpc": "2.0",
"method": "VideoLibrary.Scan",
"id": f"""{config_obj.get(csection_name, "self_name")}_trigger-vid-lib-scan"""
}
json_payload = json.dumps(json_payload_str)
req_header = {"content-type": "application/json"}
s = requests.Session()
req = requests.Request(
"POST",
kodi_jsonrpc_address,
data=json_payload,
auth=requests.auth.HTTPBasicAuth(
kodi_jsonrpc_username,
kodi_jsonrpc_password
) if require_auth else None,
headers=req_header)
prepped = req.prepare()
newline = "\n"
log.debug(f"Triggering Kodi library update ...")
log.debug(f"Request method: {req.method}\n"
f"URL: {req.url}\n"
f"""{newline.join(f"Header '{header}': '{value}'" for header, value in list(req.headers.items()))}\n"""
f"Payload: {json_payload}")
try:
with s.send(prepped) as s:
got_json_response = s.content
if s.status_code == requests.codes.ok:
log.debug(f"Kodi library update successful")
return True
else:
log.error(f"Request failed, response code was {s.status_code}:\n"
f"{json.loads(got_json_response)}")
return False
except requests.exceptions.ConnectionError:
log.info(f"Kodi JSON-RPC endpoint {kodi_jsonrpc_address} is not currently connectable.")
return False
if __name__ == '__main__':
validate_default_section(config)
if config_has_valid_section(config):
validate_config_sections(config)
else:
log.error(f"No valid config section found. A valid config section has at least the mandatory "
f"""{p.plural("option", len(CONST.CFG_MANDATORY))} """
f"{CONST.CFG_MANDATORY} set. Exiting 2 ...")
sys.exit(2)
inotify = INotify()
wds = {}
log.debug(f"Iterating over config sections ...")
for csection in config.sections():
log.debug(f"Processing section '[{csection}]' ...")
setup_watch(csection, config, inotify)
while True:
time.sleep(0.2)
for event in inotify.read():
events = [str(flags) for flags in flags.from_mask(event.mask)]
if "flags.MOVED_TO" in events:
file_name = event.name
watch_dir_config = wds[event.wd]
watch_dir = watch_dir_config["watch_dir"]
output_dir = watch_dir_config["output_dir"]
section_name = watch_dir_config["section"]
log.debug(f"Watch dir config: {watch_dir_config}")
log.info(f"File '{file_name}' was moved to watch directory "
f"""'{watch_dir}', processing ...""")
season_and_episode = get_season_and_episode(section_name, config, file_name)
if config.getboolean(section_name, "do_seasons"):
season_str = season_and_episode["season_str"]
log.debug(f"Changing output to season-specific dir '{season_str}' ...")
output_dir = os.path.join(output_dir, season_str)
try:
os.makedirs(output_dir, exist_ok=True)
except OSError as ose:
log.error(f"Unable to create section '[{section_name}]' output dir "
f"'{output_dir}' with an OSError:\n"
f"{ose}\n"
f"Exiting 4 ...")
sys.exit(4)
nfo = generate_nfo(season_and_episode["title_str"], file_name)
file_moved_to_target_dir = move_file_to_target_dir(
section_name,
config,
file_name,
season_and_episode,
output_dir)
if file_moved_to_target_dir:
write_nfo_to_disk(
nfo,
file_moved_to_target_dir,
output_dir)
run_cmd = config.get(section_name, "run_cmd") if config.has_option(section_name, "run_cmd") else False
if run_cmd:
log.debug(f"Executing post-move command: {run_cmd} ...")
run_cmd_result = subprocess.getstatusoutput(run_cmd)
run_cmd_exit_code = run_cmd_result[0]
run_cmd_output = run_cmd_result[1]
if run_cmd_exit_code != 0:
log.warning(f"Post-move command failed with exit code {run_cmd_exit_code}:\n"
f"{run_cmd_output}")
if config.has_option(section_name, "kodi_jsonrpc_address"):
kodi_library_update(section_name, config)