kodi-nfo-feeder/kodi-nfo-feeder.py

475 lines
18 KiB
Python
Raw Normal View History

import json
2022-03-21 02:48:01 +01:00
import os
import logging
import subprocess
2022-03-21 02:57:54 +01:00
import sys
2022-03-21 03:32:49 +01:00
import time
2022-03-22 02:35:26 +01:00
import re
import shutil
2022-03-21 02:48:01 +01:00
from rich.logging import RichHandler
from rich.traceback import install
import configparser
2022-03-21 02:57:54 +01:00
import inflect
from inotify_simple import INotify, flags
2022-03-22 02:35:26 +01:00
import lxml.etree
import lxml.builder
2022-03-23 13:13:14 +01:00
import requests
from requests.auth import HTTPBasicAuth
2022-03-22 02:35:26 +01:00
2022-03-21 02:57:54 +01:00
# Exit codes
# 1: Config file invalid, it has no sections
# 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY
2022-03-22 02:35:26 +01:00
# 3: Watch directory does not exist and unable to create
# 4: Unable to create output directory
2022-03-21 02:48:01 +01:00
class CONST(object):
__slots__ = ()
LOG_FORMAT = "%(message)s"
CFG_THIS_FILE_DIRNAME = os.path.dirname(__file__)
CFG_DEFAULT_FILENAME = "config.ini"
CFG_DEFAULT_ABS_PATH = os.path.join(CFG_THIS_FILE_DIRNAME, CFG_DEFAULT_FILENAME)
CFG_KNOWN_DEFAULTS = [
{"key": "self_name", "value": "kodi-nfo-feeder"},
2022-03-22 02:33:57 +01:00
{"key": "ignored_target_file_exts", "value": ".jpg, .jpeg, .png, .nfo"},
{"key": "title_regex_search", "value": ""},
2022-03-22 19:37:40 +01:00
{"key": "title_regex_replace", "value": ""},
{"key": "do_seasons", "value": "yes"}
2022-03-21 02:48:01 +01:00
]
CFG_KNOWN_SECTION = [
{"key": "watch_dir", "is_mandatory": True},
2022-03-22 02:33:57 +01:00
{"key": "output_dir", "is_mandatory": True},
{"key": "title_regex_search", "is_mandatory": False},
2022-03-22 19:37:40 +01:00
{"key": "title_regex_replace", "is_mandatory": False},
2022-03-23 13:11:09 +01:00
{"key": "do_seasons", "is_mandatory": False},
{"key": "run_cmd", "is_mandatory": False},
{"key": "kodi_jsonrpc_address", "is_mandatory": False},
{"key": "kodi_jsonrpc_username", "is_mandatory": False},
{"key": "kodi_jsonrpc_password", "is_mandatory": False}
2022-03-21 02:48:01 +01:00
]
CFG_MANDATORY = [section_cfg["key"] for section_cfg in CFG_KNOWN_SECTION if section_cfg["is_mandatory"]]
CONST = CONST()
logging.basicConfig(
# Default for all modules in NOTSET so log everything
level="NOTSET",
format=CONST.LOG_FORMAT,
datefmt="[%X]",
handlers=[RichHandler(
show_time=False if "SYSTEMD_EXEC_PID" in os.environ else True,
rich_tracebacks=True
)]
)
log = logging.getLogger("rich")
# Our own code logs with this level
log.setLevel(logging.DEBUG)
2022-03-23 13:13:14 +01:00
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
2022-03-21 02:48:01 +01:00
install(show_locals=True)
class ConfigParser(
configparser.ConfigParser):
"""Can get options() without defaults
Taken from https://stackoverflow.com/a/12600066.
"""
def options(self, section, no_defaults=False, **kwargs):
if no_defaults:
try:
return list(self._sections[section].keys())
except KeyError:
raise configparser.NoSectionError(section)
else:
return super().options(section)
2022-03-21 02:57:54 +01:00
p = inflect.engine()
2022-03-21 02:48:01 +01:00
ini_defaults = []
internal_defaults = {default["key"]: default["value"] for default in CONST.CFG_KNOWN_DEFAULTS}
config = ConfigParser(defaults=internal_defaults)
config.read(CONST.CFG_DEFAULT_ABS_PATH)
2022-03-22 02:33:57 +01:00
ignored_target_file_exts_str = config.get(config.default_section, "ignored_target_file_exts")
ignored_target_file_exts = re.split(r""",\s?|\s""", ignored_target_file_exts_str)
2022-03-21 02:57:54 +01:00
def print_section_header(
header: str) -> str:
return f"Loading config section '[{header}]' ..."
def validate_default_section(
config_obj: configparser.ConfigParser()) -> None:
log.debug(f"Loading config from file '{CONST.CFG_DEFAULT_ABS_PATH}' ...")
if not config_obj.sections():
log.error(f"No config sections found in '{CONST.CFG_DEFAULT_ABS_PATH}'. Exiting 1 ...")
sys.exit(1)
if config.defaults():
log.debug(f"Symbol legend:\n"
f"* Global default from section '[{config_obj.default_section}]'\n"
f"~ Local option, doesn't exist in '[{config_obj.default_section}]'\n"
f"+ Local override of a value from '[{config_obj.default_section}]'\n"
f"= Local override, same value as in '[{config_obj.default_section}]'")
log.debug(print_section_header(config_obj.default_section))
for default in config_obj.defaults():
ini_defaults.append({default: config_obj[config_obj.default_section][default]})
log.debug(f"* {default} = {config_obj[config_obj.default_section][default]}")
else:
log.debug(f"No defaults defined")
def config_has_valid_section(
config_obj: configparser.ConfigParser()) -> bool:
has_valid_section = False
for config_obj_section in config_obj.sections():
if set(CONST.CFG_MANDATORY).issubset(config_obj.options(config_obj_section)):
has_valid_section = True
break
return has_valid_section
def is_default(
config_key: str) -> bool:
return any(config_key in ini_default for ini_default in ini_defaults)
def is_same_as_default(
config_kv_pair: dict) -> bool:
return config_kv_pair in ini_defaults
def validate_config_sections(
config_obj: configparser.ConfigParser()) -> None:
for this_section in config_obj.sections():
log.debug(print_section_header(this_section))
if not set(CONST.CFG_MANDATORY).issubset(config_obj.options(this_section, no_defaults=True)):
log.warning(f"Config section '[{this_section}]' does not have all mandatory options "
f"{CONST.CFG_MANDATORY} set, skipping section ...")
config_obj.remove_section(this_section)
else:
for key in config_obj.options(this_section, no_defaults=True):
kv_prefix = "~"
if is_default(key):
kv_prefix = "+"
if is_same_as_default({key: config_obj[this_section][key]}):
kv_prefix = "="
log.debug(f"{kv_prefix} {key} = {config_obj[this_section][key]}")
2022-03-21 03:32:49 +01:00
def setup_watch(
2022-03-22 19:37:40 +01:00
csection_name: str,
config_obj: configparser.ConfigParser(),
inotify_obj: INotify) -> bool:
global wds
watch_this = config_obj.get(csection, "watch_dir")
2022-03-21 03:32:49 +01:00
if not os.path.exists(watch_this):
os.makedirs(watch_this, exist_ok=False)
watch_flags = flags.MOVED_TO
try:
2022-03-22 19:37:40 +01:00
log.debug(f"Watching for '[{csection_name}]' files moved to '{watch_this}' ...")
wd_obj = inotify_obj.add_watch(watch_this, watch_flags)
2022-03-21 03:32:49 +01:00
except FileNotFoundError:
2022-03-22 19:37:40 +01:00
log.error(f"Section '[{csection_name}]' watch directory '{watch_this}' does not exist. Please create it. "
f"Exiting 3 ...")
2022-03-22 02:35:26 +01:00
sys.exit(3)
2022-03-21 03:32:49 +01:00
else:
2022-03-22 19:37:40 +01:00
log.debug(f"Created watch descriptor ID {wd_obj} for '[{csection_name}]' watch directory '{watch_this}'.")
wds[wd_obj] = {
"watch_dir": watch_this,
"output_dir": config.get(csection, "output_dir"),
"section": csection_name
}
return True
2022-03-21 03:32:49 +01:00
2022-03-22 02:36:10 +01:00
def generate_nfo(
title: str,
raw_file_name: str) -> lxml.builder.ElementMaker:
season_ep_str = raw_file_name.split(" - ")[0]
data = lxml.builder.ElementMaker()
ep_details_tag = data.episodedetails
title_tag = data.title
id_tag = data.id
nfo_data = ep_details_tag(
title_tag(title),
id_tag(f"{season_ep_str} - {title}")
)
return nfo_data
def get_basic_cleaned_title(
2022-03-22 19:37:40 +01:00
csection_name: str,
2022-03-22 02:36:10 +01:00
config_obj: configparser.ConfigParser(),
dirty_title: str) -> str:
2022-03-22 19:37:40 +01:00
regex_search_pattern = config_obj.get(csection_name, "title_regex_search")
regex_replace_pattern = config_obj.get(csection_name, "title_regex_replace")
2022-03-22 02:36:10 +01:00
if regex_search_pattern:
log.debug(f"Doing basic title cleaning ...")
pattern = re.compile(regex_search_pattern)
clean_title = re.sub(pattern, regex_replace_pattern, dirty_title)
log.debug(f"""Title's now "{clean_title}".""")
return clean_title
else:
return dirty_title
def get_season_and_episode(
2022-03-22 19:37:40 +01:00
csection_name: str,
2022-03-22 02:36:10 +01:00
config_obj: configparser.ConfigParser(),
raw_file_name: str) -> dict:
file_name_ext_split = os.path.splitext(raw_file_name)
season_ep_str = file_name_ext_split[0].split(" - ", 1)
ext = file_name_ext_split[1]
season_episode = re.split("[S|E]", season_ep_str[0])
season = f"Season {season_episode[1]}"
2022-03-23 23:42:22 +01:00
try:
title = season_ep_str[1]
except IndexError:
log.warning(f"File name '{raw_file_name}' is not following expected format. The excepted format "
f"is a season-and-episode string followed by space-slash-space and and arbitrary sequence "
f"of characters suffixed with a file extension e.g. "
f"'S2022E2022032001 - This is a Title.mp4'. Skipping further file processing ...")
return {}
else:
basic_cleaned_title = get_basic_cleaned_title(csection_name, config_obj, title)
got_season_and_episode = {
"season_str": season,
"title_str": basic_cleaned_title,
"season_ep_list": season_ep_str,
"ext": ext
}
log.debug(f"""Identified {got_season_and_episode["season_str"]}, """
f"""title "{got_season_and_episode["title_str"]}" """
f"and episode object {season_ep_str} "
f"with extension '{ext}'.")
return got_season_and_episode
2022-03-22 02:36:10 +01:00
def get_target_file_list(
target_dir: str) -> list:
log.debug(f"Generating list of files in '{target_dir}' ...")
onlyfiles = [f for f in os.listdir(target_dir) if os.path.isfile(os.path.join(target_dir, f))]
filtered_files = [f for f in onlyfiles if not f.endswith(tuple(ignored_target_file_exts))]
log.debug(f"Files in '{target_dir}' filtered for extensions we're ignoring "
f"{ignored_target_file_exts}: {filtered_files}")
return filtered_files
def move_file_to_target_dir(
2022-03-22 19:37:40 +01:00
csection_name: str,
2022-03-22 02:36:10 +01:00
config_obj: configparser.ConfigParser(),
raw_file_name: str,
2022-03-22 19:37:40 +01:00
season_ep_str: dict,
output_dir_name: str) -> str:
2022-03-22 02:36:10 +01:00
2022-03-22 19:37:40 +01:00
this_watch_dir = config_obj.get(csection_name, "watch_dir")
2022-03-22 02:36:10 +01:00
source_abs_path = os.path.join(this_watch_dir, raw_file_name)
2022-03-22 19:37:40 +01:00
target_dir = output_dir_name
2022-03-22 02:36:10 +01:00
target_file_list = get_target_file_list(target_dir)
target_file_name = season_ep_str["season_ep_list"][0]
target_ext = season_ep_str["ext"]
target_file_name_plus_ext = f"{target_file_name}{target_ext}"
while target_file_name_plus_ext in target_file_list:
log.debug(f"Intended file name already exists in target dir, incrementing counter suffix ...")
2022-03-22 02:36:10 +01:00
episode_minus_counter = target_file_name[:-2]
counter = target_file_name[-2:]
counter_length = len(counter)
counter_stripped = int(counter.lstrip("0"))
counter_stripped += 1
target_file_name = f"{episode_minus_counter}{str(counter_stripped).zfill(counter_length)}"
target_file_name_plus_ext = f"{target_file_name}{target_ext}"
2022-03-22 02:36:10 +01:00
target_abs_path = os.path.join(target_dir, f"{target_file_name}{target_ext}")
try:
log.debug(f"Moving '{source_abs_path}' to '{target_abs_path}' ...")
2022-03-22 19:37:40 +01:00
shutil.move(source_abs_path, target_abs_path)
except OSError as move_ose:
2022-03-22 02:36:10 +01:00
log.error(f"Failed moving file with an OSError:\n"
2022-03-22 19:37:40 +01:00
f"{move_ose}\n"
2022-03-22 02:36:10 +01:00
f"Continuing file watch ...")
return ""
else:
return target_file_name
def write_nfo_to_disk(
nfo_data: lxml.builder.ElementMaker,
target_file_name: str,
output_dir_name: str) -> bool:
target_dir = output_dir_name
target_nfo_name = f"{target_file_name}.nfo"
target_abs_path = os.path.join(target_dir, target_nfo_name)
nfo_str = lxml.etree.tostring(
nfo_data,
pretty_print=True,
xml_declaration=True,
standalone=True,
encoding="UTF-8")
try:
with open(target_abs_path, 'wb') as nfo_file:
log.debug(f"Writing NFO data to '{target_abs_path}':\n"
f"""{nfo_str.decode("UTF-8")}""")
nfo_file.write(nfo_str)
2022-03-22 19:37:40 +01:00
except OSError as nfo_ose:
2022-03-22 02:36:10 +01:00
log.error(f"Failed writing NFO file '{target_abs_path}' with an OSError:\n"
2022-03-22 19:37:40 +01:00
f"{nfo_ose}\n"
2022-03-22 02:36:10 +01:00
f"Continuing file watch ...")
return False
else:
return True
2022-03-23 13:13:14 +01:00
def kodi_library_update(
csection_name: str,
config_obj: configparser.ConfigParser()) -> bool:
kodi_jsonrpc_address = config_obj.get(csection_name, "kodi_jsonrpc_address")
kodi_jsonrpc_username = config_obj.get(csection_name, "kodi_jsonrpc_username") if \
config.has_option(csection_name, "kodi_jsonrpc_username") else None
kodi_jsonrpc_password = config_obj.get(csection_name, "kodi_jsonrpc_password") if \
config.has_option(csection_name, "kodi_jsonrpc_password") else None
require_auth = False
if (kodi_jsonrpc_username and not kodi_jsonrpc_password) or \
(not kodi_jsonrpc_username and kodi_jsonrpc_password):
log.warning(f"Please make sure that both a Kodi username /and/ and password are set via "
f"the 'kodi_jsonrpc_username' and 'kodi_jsonrpc_password' config options, respectively. "
f"If Kodi's web interface is configured to not require authentication please set "
f"both 'kodi_jsonrpc_username' and 'kodi_jsonrpc_password' to empty values (or simply remove "
f"both lines from the config file).\n"
f"Skipping Kodi video library reload ...")
return False
elif kodi_jsonrpc_username and kodi_jsonrpc_password:
require_auth = True
json_payload_str = {
"jsonrpc": "2.0",
"method": "VideoLibrary.Scan",
"id": f"""{config_obj.get(csection_name, "self_name")}_trigger-vid-lib-scan"""
}
json_payload = json.dumps(json_payload_str)
req_header = {"content-type": "application/json"}
s = requests.Session()
req = requests.Request(
"POST",
kodi_jsonrpc_address,
data=json_payload,
auth=requests.auth.HTTPBasicAuth(
kodi_jsonrpc_username,
kodi_jsonrpc_password
) if require_auth else None,
headers=req_header)
prepped = req.prepare()
newline = "\n"
log.debug(f"Triggering Kodi library update ...")
log.debug(f"Request method: {req.method}\n"
f"URL: {req.url}\n"
f"""{newline.join(f"Header '{header}': '{value}'" for header, value in list(req.headers.items()))}\n"""
f"Payload: {json_payload}")
try:
with s.send(prepped) as s:
got_json_response = s.content
if s.status_code == requests.codes.ok:
log.debug(f"Kodi library update successful")
return True
else:
log.error(f"Request failed, response code was {s.status_code}:\n"
f"{json.loads(got_json_response)}")
return False
except requests.exceptions.ConnectionError:
log.info(f"Kodi JSON-RPC endpoint {kodi_jsonrpc_address} is not currently connectable.")
return False
2022-03-21 02:57:54 +01:00
if __name__ == '__main__':
validate_default_section(config)
if config_has_valid_section(config):
validate_config_sections(config)
else:
log.error(f"No valid config section found. A valid config section has at least the mandatory "
f"""{p.plural("option", len(CONST.CFG_MANDATORY))} """
2022-03-22 02:35:26 +01:00
f"{CONST.CFG_MANDATORY} set. Exiting 2 ...")
sys.exit(2)
2022-03-21 02:57:54 +01:00
2022-03-22 19:37:40 +01:00
inotify = INotify()
wds = {}
2022-03-21 02:57:54 +01:00
log.debug(f"Iterating over config sections ...")
2022-03-22 19:37:40 +01:00
for csection in config.sections():
log.debug(f"Processing section '[{csection}]' ...")
setup_watch(csection, config, inotify)
while True:
time.sleep(0.2)
for event in inotify.read():
events = [str(flags) for flags in flags.from_mask(event.mask)]
if "flags.MOVED_TO" in events:
file_name = event.name
watch_dir_config = wds[event.wd]
watch_dir = watch_dir_config["watch_dir"]
output_dir = watch_dir_config["output_dir"]
section_name = watch_dir_config["section"]
log.info(f"File '{file_name}' was moved to watch directory "
f"""'{watch_dir}', processing ...""")
season_and_episode = get_season_and_episode(section_name, config, file_name)
2022-03-23 23:42:22 +01:00
if not season_and_episode:
break
2022-03-22 19:37:40 +01:00
if config.getboolean(section_name, "do_seasons"):
season_str = season_and_episode["season_str"]
log.debug(f"Changing output to season-specific dir '{season_str}' ...")
output_dir = os.path.join(output_dir, season_str)
try:
os.makedirs(output_dir, exist_ok=True)
except OSError as ose:
log.error(f"Unable to create section '[{section_name}]' output dir "
f"'{output_dir}' with an OSError:\n"
f"{ose}\n"
f"Exiting 4 ...")
sys.exit(4)
nfo = generate_nfo(season_and_episode["title_str"], file_name)
file_moved_to_target_dir = move_file_to_target_dir(
section_name,
config,
file_name,
season_and_episode,
output_dir)
if file_moved_to_target_dir:
write_nfo_to_disk(
nfo,
file_moved_to_target_dir,
output_dir)
run_cmd = config.get(section_name, "run_cmd") if config.has_option(section_name, "run_cmd") else False
if run_cmd:
log.debug(f"Executing post-move command: {run_cmd} ...")
run_cmd_result = subprocess.getstatusoutput(run_cmd)
run_cmd_exit_code = run_cmd_result[0]
run_cmd_output = run_cmd_result[1]
if run_cmd_exit_code != 0:
log.warning(f"Post-move command failed with exit code {run_cmd_exit_code}:\n"
f"{run_cmd_output}")
if config.has_option(section_name, "kodi_jsonrpc_address"):
kodi_library_update(section_name, config)