mvw-dl/mvw-dl.py

784 lines
31 KiB
Python

import configparser
import datetime as d
import json
import logging
import os
import pathlib
import re
import shutil
import sys
import time
import filelock
import humanize
import requests
import inflect
from rich.logging import RichHandler
from rich.traceback import install
import typing as t
from rich.console import Console
from filelock import Timeout, FileLock
import uuid
import type_def.mvw_json_response
from type_def.mvw_json_request import MVWJSONRequest
from type_def.mvw_json_response import MVWJSONResponse
# Downloading
import os.path
import sys
from concurrent.futures import ThreadPoolExecutor
import signal
from functools import partial
from threading import Event
from typing import Iterable
from urllib.request import urlopen
from rich.progress import (
BarColumn,
DownloadColumn,
Progress,
TaskID,
TextColumn,
TimeRemainingColumn,
TransferSpeedColumn,
)
# TODO set locale for datetime and others to globally stick to en_US
# TODO thread log messages display timestamp in systemd journal
# TODO Increment file name suffix more than once of needed
# TODO [23:15:14] DEBUG [thread]
# TODO Clean mvw-dl.timer
# TODO Reset maus-query.json
download_start_time = 0
download_last_update_time = 0
total_content_length = 0
size_downloaded_for_progress_tracking = 0
size_downloaded_for_speed_tracking = 0
file_lock_timeout = 1
state_lock_file_ext = ".lock"
progress = Progress(
TextColumn("[bold blue]{task.fields[filename]}", justify="right"),
BarColumn(bar_width=None),
"[progress.percentage]{task.percentage:>3.1f}%",
"",
DownloadColumn(),
"",
TransferSpeedColumn(),
"",
TimeRemainingColumn(),
)
# Downloading
# Without width
console = Console(width=180)
p = inflect.engine()
# We use Python 3.5+ type hints; we're working with JSON objects; we're following a 2016 suggestion from
# Python's "typing" GitHub issue tracker on how to create a "JSONType" hint since such a thing does not yet
# officially exist: https://github.com/python/typing/issues/182#issuecomment-186684288
# JSONType = t.Union[str, int, float, bool, None, t.Dict[str, t.Any], t.List[t.Any]]
JSONType = t.Union[str, int, float, bool, None, t.Dict[str, t.Any], t.List[t.Any]]
# Exit codes
# 1: Config file invalid, it has no sections
# 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY
# 3: No search results to download
# 4: State file already exists, has more than 0 bytes size but doesn't contain usable JSON
# 5: State file lock cannot be acquired within file_lock_timeout
# 6: Unable to create state directory
class CONST(object):
__slots__ = ()
LOG_FORMAT = "%(message)s"
CFG_THIS_FILE_DIRNAME = os.path.dirname(__file__)
CFG_DEFAULT_FILENAME = "config.ini"
CFG_DEFAULT_ABS_PATH = os.path.join(CFG_THIS_FILE_DIRNAME, CFG_DEFAULT_FILENAME)
CFG_KNOWN_DEFAULTS = [
{"key": "self_name", "value": "mvw-dl"},
{"key": "tmp_base_dir", "value": os.path.join(CFG_THIS_FILE_DIRNAME, "data/tmp/%(self_name)s")},
{"key": "state_base_dir", "value": os.path.join(CFG_THIS_FILE_DIRNAME, "data/var/lib/%(self_name)s")},
{"key": "state_files_dir", "value": "%(state_base_dir)s/state"},
{"key": "state_file_retention", "value": "50"},
{"key": "state_file_name_prefix", "value": "state-"},
{"key": "state_file_name_suffix", "value": ".log"},
{"key": "mvw_endpoint", "value": "http://localhost:8000/api/query"},
{"key": "title_dedup_winner", "value": "first"},
{"key": "dl_progress_update_interval", "value": "10"},
{"key": "dl_threads", "value": "2"},
{"key": "dl_filename_pattern", "value": "&(channel)s - &(publish_date)s - &(topic)s - &(title)s"},
{"key": "publish_date_srtftime_pattern", "value": "%%Y%%m%%d"},
{"key": "dl_filename_replace_spaces_with", "value": "_"},
{"key": "dl_filename_all_lowercase", "value": "yes"}
]
CFG_KNOWN_SECTION = [
{"key": "min_duration", "is_mandatory": False},
{"key": "max_duration", "is_mandatory": False},
{"key": "title_not_regex", "is_mandatory": False},
{"key": "query", "is_mandatory": True},
{"key": "dl_dir", "is_mandatory": True}
]
CFG_MANDATORY = [section_cfg["key"] for section_cfg in CFG_KNOWN_SECTION if section_cfg["is_mandatory"]]
CONST = CONST()
logging.basicConfig(
# Default for all modules in NOTSET so log everything
level="NOTSET",
format=CONST.LOG_FORMAT,
datefmt="[%X]",
handlers=[RichHandler(
show_time=False,
rich_tracebacks=True
)]
)
log = logging.getLogger("rich")
# Our own code logs with this level
log.setLevel(logging.DEBUG)
# connectionpool and filelock log with WARNING, we don't need its verbosity
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
logging.getLogger("filelock").setLevel(logging.WARNING)
install(show_locals=True)
class ConfigParser(
configparser.ConfigParser):
"""Can get options() without defaults
Taken from https://stackoverflow.com/a/12600066.
"""
def options(self, section, no_defaults=False, **kwargs):
if no_defaults:
try:
return list(self._sections[section].keys())
except KeyError:
raise configparser.NoSectionError(section)
else:
return super().options(section)
ini_defaults = []
internal_defaults = {default["key"]: default["value"] for default in CONST.CFG_KNOWN_DEFAULTS}
config = ConfigParser(defaults=internal_defaults)
config.read(CONST.CFG_DEFAULT_ABS_PATH)
def print_section_header(
header: str) -> str:
return f"Loading config section '[{header}]' ..."
def validate_default_section(
config_obj: configparser.ConfigParser()) -> None:
log.debug(f"Loading config from file '{CONST.CFG_DEFAULT_ABS_PATH}' ...")
if not config_obj.sections():
log.error(f"No config sections found in '{CONST.CFG_DEFAULT_ABS_PATH}'. Exiting 1 ...")
sys.exit(1)
if config.defaults():
log.debug(f"Symbol legend:\n"
f"* Global default from section '[{config_obj.default_section}]'\n"
f"~ Local option, doesn't exist in '[{config_obj.default_section}]'\n"
f"+ Local override of a value from '[{config_obj.default_section}]'\n"
f"= Local override, same value as in '[{config_obj.default_section}]'")
log.debug(print_section_header(config_obj.default_section))
for default in config_obj.defaults():
ini_defaults.append({default: config_obj[config_obj.default_section][default]})
log.debug(f"* {default} = {config_obj[config_obj.default_section][default]}")
else:
log.debug(f"No defaults defined")
def config_has_valid_section(
config_obj: configparser.ConfigParser()) -> bool:
has_valid_section = False
for config_obj_section in config_obj.sections():
if set(CONST.CFG_MANDATORY).issubset(config_obj.options(config_obj_section)):
has_valid_section = True
break
return has_valid_section
def is_default(
config_key: str) -> bool:
return any(config_key in ini_default for ini_default in ini_defaults)
def is_same_as_default(
config_kv_pair: dict) -> bool:
return config_kv_pair in ini_defaults
def validate_config_sections(
config_obj: configparser.ConfigParser()) -> None:
for this_section in config_obj.sections():
log.debug(print_section_header(this_section))
if not set(CONST.CFG_MANDATORY).issubset(config_obj.options(this_section, no_defaults=True)):
log.warning(f"Config section '[{this_section}]' does not have all mandatory options "
f"{CONST.CFG_MANDATORY} set, skipping section ...")
config_obj.remove_section(this_section)
else:
for key in config_obj.options(this_section, no_defaults=True):
kv_prefix = "~"
if is_default(key):
kv_prefix = "+"
if is_same_as_default({key: config_obj[this_section][key]}):
kv_prefix = "="
log.debug(f"{kv_prefix} {key} = {config_obj[this_section][key]}")
def query_string_from_file(
filename: str) -> str:
filename_abs_path = os.path.join(CONST.CFG_THIS_FILE_DIRNAME, filename)
with open(filename_abs_path, "r", encoding="utf-8") as jsonfile:
query_string = jsonfile.read()
return query_string
def get_query_payload(
section_name: str,
config_obj: configparser.ConfigParser()) -> MVWJSONRequest:
log.debug(f"Generating HTTP POST JSON payload ...")
query = config_obj.get(section_name, "query")
if query[0] == "@":
query = query.split("@", 1)[1]
query = query_string_from_file(query)
got_query_payload = MVWJSONRequest(**json.loads(query))
return got_query_payload
def get_json_response(
section_name: str,
config_obj: configparser.ConfigParser(),
payload: MVWJSONRequest) -> MVWJSONResponse:
log.debug(f"Downloading JSON list of Mediathek files that match search criteria")
serialized_payload = payload.json()
url = config_obj.get(section_name, "mvw_endpoint")
req_header = {"Content-Type": "text/plain"}
s = requests.Session()
req = requests.Request("POST", url, data=serialized_payload, headers=req_header)
prepped = req.prepare()
newline = "\n"
log.debug(f"Request method: {req.method}\n"
f"URL: {req.url}\n"
f"""{newline.join(f"Header '{header}': '{value}'" for header, value in list(req.headers.items()))}\n"""
f"Payload: {payload}")
with s.send(prepped) as s:
got_json_response = MVWJSONResponse(**json.loads(s.content))
return got_json_response
def no_downloads_needed() -> None:
log.info(f"No search results to download, exiting 3 ...")
sys.exit(3)
def remove_result(
json_obj: MVWJSONResponse,
result_obj: type_def.mvw_json_response.Show) -> MVWJSONResponse:
json_obj.result.results.remove(result_obj)
json_obj.result.queryInfo.resultCount -= 1
if json_obj.result.queryInfo.resultCount:
return json_obj
else:
no_downloads_needed()
def log_result_count(result_count: int, pre_filter: bool = True) -> None:
if pre_filter:
log.debug(f"""Search result contains {result_count} {p.plural("show", result_count)} going in""")
else:
log.debug(f"""Search result now contains {result_count} {p.plural("show", result_count)}""")
def filter_json_by_duration(
section_name: str,
config_obj: configparser.ConfigParser(),
json_obj: MVWJSONResponse) -> MVWJSONResponse:
min_duration = config_obj.getint(section_name, "min_duration")
max_duration = config_obj.getint(section_name, "max_duration")
log_result_count(json_obj.result.queryInfo.resultCount)
if min_duration >= 0:
log.debug(f"Filtering '[{section_name}]' JSON for minimum length of {min_duration} "
f"""{p.plural("second", min_duration)} ...""")
for result in json_obj.result.results.copy():
if not result.duration >= min_duration:
remove_result(json_obj, result)
if max_duration >= 0:
log.debug(f"Filtering '[{section_name}]' JSON for maximum length of {max_duration} "
f"""{p.plural("second", max_duration)} ...""")
for result in json_obj.result.results.copy():
if not result.duration <= max_duration:
remove_result(json_obj, result)
log_result_count(json_obj.result.queryInfo.resultCount, False)
return json_obj
def filter_json_by_title_regex(
section_name: str,
config_obj: configparser.ConfigParser(),
json_obj: MVWJSONResponse) -> MVWJSONResponse:
title_not_regex = re.compile(config_obj.get(section_name, "title_not_regex"), re.IGNORECASE)
log_result_count(json_obj.result.queryInfo.resultCount)
log.debug(f"Filtering '[{section_name}]' JSON by title regular expression")
for result in json_obj.result.results.copy():
if title_not_regex.search(result.title):
remove_result(json_obj, result)
log_result_count(json_obj.result.queryInfo.resultCount, False)
return json_obj
def dedup_json_titles(
section_name: str,
config_obj: configparser.ConfigParser(),
json_obj: MVWJSONResponse) -> MVWJSONResponse:
title_dedup_winner = config_obj.get(section_name, "title_dedup_winner")
titles_list = {}
log_result_count(json_obj.result.queryInfo.resultCount)
for result in json_obj.result.results.copy():
if result.title not in titles_list:
titles_list[result.title] = {}
if result.id not in titles_list[result.title]:
titles_list[result.title][result.id] = result.timestamp
for result in titles_list.copy():
if title_dedup_winner == "first":
dd_winner = min(titles_list[result], key=str)
else:
dd_winner = max(titles_list[result], key=str)
titles_list[result] = dd_winner
for result in json_obj.result.results.copy():
if result.title in titles_list:
if result.id != titles_list[result.title]:
log.debug(f"""Deduplicating '[{section_name}]' result "{result.title}" ...""")
remove_result(json_obj, result)
log_result_count(json_obj.result.queryInfo.resultCount, False)
return json_obj
def expanded_dest_dir(
raw_dest_dir: str) -> str:
user_expanded_dest_dir = os.path.expanduser(raw_dest_dir)
all_expanded_dest_dir = os.path.expandvars(user_expanded_dest_dir)
return all_expanded_dest_dir
def filename_replace_pattern(
section_name: str,
config_obj: configparser.ConfigParser(),
show: type_def.mvw_json_response.Show,
max_quality_url: str,
shorthand_uuid: str) -> str:
filename = config_obj.get(section_name, "dl_filename_pattern")
ext = pathlib.Path(max_quality_url).suffix.lstrip(".")
publish_date = d.datetime.utcfromtimestamp(show.timestamp).strftime(
config_obj.get(section_name, "publish_date_srtftime_pattern"))
show_extended = {"ext": ext, "publish_date": publish_date}
show_attrs = [attr for attr in dir(show) if not attr.startswith('_') and not callable(getattr(show, attr))]
for attr in show_attrs:
attr_re = re.compile(r"&\(" + re.escape(attr) + r"\)s")
if re.search(attr_re, filename):
log.debug(f"{shorthand_uuid} Replacing filename pattern '&({attr})s' ...")
filename = re.sub(attr_re, str(getattr(show, attr)), filename)
log.debug(f"{shorthand_uuid} New filename: '{filename}'")
for extended_attr in show_extended:
extended_attr_re = re.compile(r"&\(" + re.escape(extended_attr) + r"\)s")
if re.search(extended_attr_re, filename):
log.debug(f"{shorthand_uuid} Replacing filename pattern '&({extended_attr})s' ...")
filename = re.sub(extended_attr_re, show_extended[extended_attr], filename)
log.debug(f"{shorthand_uuid} New filename: '{filename}'")
return filename
def get_safe_filename(
dirty_filename: str,
shorthand_uuid: str) -> str:
"""https://stackoverflow.com/a/71199182"""
log.debug(f"{shorthand_uuid} Removing question marks from file name ...")
clean_filename = re.sub(r"""[?]""", "", dirty_filename)
log.debug(f"{shorthand_uuid} Replacing unsafe characters in filename with dashes ...")
clean_filename = re.sub(r"""[/\\?%*:|"<>\x7F\x00-\x1F]""", "-", clean_filename)
log.debug(f"{shorthand_uuid} New filename: '{clean_filename}'")
return clean_filename
def filename_replace_spaces_with_underscores(
section_name: str,
config_obj: configparser.ConfigParser(),
filename: str,
shorthand_uuid: str) -> str:
space_replace_string = config_obj.get(section_name, "dl_filename_replace_spaces_with")
log.debug(f"{shorthand_uuid} Replacing space characters with '{space_replace_string}' ...")
underscored_filename = re.sub(
r"\s",
space_replace_string,
filename)
log.debug(f"{shorthand_uuid} New filename: '{underscored_filename}'")
return underscored_filename
def get_filename(
section_name: str,
config_obj: configparser.ConfigParser(),
show: type_def.mvw_json_response.Show,
max_quality_url: str,
shorthand_uuid: str) -> str:
log.debug(f"{shorthand_uuid} Generating final filename ...")
filename_replaced_patterns = filename_replace_pattern(
section_name,
config_obj,
show,
max_quality_url,
shorthand_uuid)
filename_safe = get_safe_filename(
filename_replaced_patterns,
shorthand_uuid)
if config.get(section_name, "dl_filename_replace_spaces_with"):
filename_safe = filename_replace_spaces_with_underscores(
section_name,
config_obj,
filename_safe,
shorthand_uuid)
if config.getboolean(section_name, "dl_filename_all_lowercase"):
log.debug(f"{shorthand_uuid} Lowercasing all filename characters ...")
filename_safe = filename_safe.lower()
log.debug(f"{shorthand_uuid} New filename: '{filename_safe}'")
return filename_safe
def get_state_file_abs_path(
section_name: str,
config_obj: configparser.ConfigParser()) -> str:
state_dir = config_obj.get(section_name, "state_files_dir")
try:
os.makedirs(state_dir, exist_ok=True)
except OSError:
log.error(f"Unable to create '[{section}]' state directory '{state_dir}'. "
f"We're not going to be able to log state information. Exiting 6 ...")
sys.exit(6)
else:
state_file = \
config_obj.get(section_name, "state_file_name_prefix") + \
section_name + \
config_obj.get(section_name, "state_file_name_suffix")
state_file_abs_path = os.path.join(state_dir, state_file)
return state_file_abs_path
def state_file_none_or_valid_json(
state_file_abs_path: str) -> bool:
if os.path.exists(state_file_abs_path):
if os.path.getsize(state_file_abs_path) > 0:
with open(state_file_abs_path, "r", encoding="utf-8") as state_file:
try:
json.loads(state_file.read())
return True
except json.JSONDecodeError:
log.warning(f"State file '{state_file_abs_path}' does not contain valid JSON. We're not going to "
f"be able to log anything into it. Exiting 4 ...")
sys.exit(4)
else:
return True
else:
return True
def truncate_log(
json_data: json.loads,
max_log_entries: int) -> json.loads:
for i in range(len(json_data)):
del json_data[i]
if len(json_data) <= max_log_entries:
break
return json_data
def get_state_file_lock(
state_lock_file: str) -> filelock.BaseFileLock:
global file_lock_timeout
try:
lock = FileLock(state_lock_file, timeout=file_lock_timeout)
return lock
except filelock.Timeout:
log.error(f"Unable to acquire lock on state lock file '{state_lock_file}' "
f"""within {file_lock_timeout} {p.plural("second", file_lock_timeout)}, exiting 5 ...""")
sys.exit(5)
def log_successful_download(
section_name: str,
config_obj: configparser.ConfigParser(),
show: type_def.mvw_json_response.Show,
state_file_abs_path: str,
job_uuid: str,
shorthand_uuid: str) -> None:
timestamp_now = int(time.time())
os.makedirs(os.path.dirname(state_file_abs_path), exist_ok=True)
state_lock_file = state_file_abs_path + state_lock_file_ext
state_body = show.dict(include={"topic", "title"})
state_body["dl_complete_timestamp_epoch"] = timestamp_now
state_body["dl_complete_timestamp_human"] = \
d.datetime.utcfromtimestamp(timestamp_now).strftime("%Y-%m-%d %H%M%S UTC")
state_entry = {job_uuid: state_body}
json_state = None
lock = get_state_file_lock(state_lock_file)
with lock:
state_file_none_or_valid_json(state_file_abs_path)
state_file_open_mode = "r+" if os.path.exists(state_file_abs_path) else "w+"
with open(state_file_abs_path, state_file_open_mode, encoding="utf-8") as state_file:
try:
json_state = json.load(state_file)
except json.JSONDecodeError:
if json_state is None:
state_file.truncate()
json_state = []
log.debug(f"{shorthand_uuid} Writing log entry to '{state_file_abs_path}' ...")
with open(state_file_abs_path, "w", encoding="utf-8") as state_file:
json_state.append(state_entry)
max_log_entries = config_obj.getint(section_name, "state_file_retention")
if len(json_state) > max_log_entries:
json_state = truncate_log(json_state, max_log_entries)
json.dump(json_state, state_file, indent=4, sort_keys=True, ensure_ascii=False)
def copy_url(
section_name: str,
config_obj: configparser.ConfigParser(),
show: type_def.mvw_json_response.Show,
video_metadata: dict,
state_file_abs_path: str,
show_name: str,
job_uuid: str,
shorthand_uuid: str,
tmp_dir: str,
dest_dir: str) -> None:
"""Copy data from a url to a local file."""
global download_start_time
global download_last_update_time
global size_downloaded_for_progress_tracking
global size_downloaded_for_speed_tracking
update_interval = config_obj.getint(section_name, "dl_progress_update_interval")
max_quality_url = video_metadata["url"]
filename = get_filename(section_name, config_obj, show, max_quality_url, shorthand_uuid)
resume_header = {}
tmp_file_open_mode = "wb"
tmp_file_size = 0
tmp_path = os.path.join(tmp_dir, filename)
dest_path = os.path.join(dest_dir, filename)
os.makedirs(os.path.dirname(tmp_path), exist_ok=True)
log.info(f"{shorthand_uuid} Download location resolved to '{tmp_path}'")
if os.path.exists(tmp_path):
tmp_file_size = os.path.getsize(tmp_path)
log.debug(f"{shorthand_uuid} Temporary file '{tmp_path}' exists likely from a previous incomplete "
f"download attempt, size is {humanize.naturalsize(tmp_file_size, binary=True)}. Resuming ...")
tmp_file_open_mode = "ab"
try:
with open(tmp_path, tmp_file_open_mode) as tmp_file:
log.info(f"""{shorthand_uuid} Downloading "{show_name}" ...""")
if tmp_file_size > 0:
resume_header = {"range": f"bytes={tmp_file_size}-"}
log.debug(f"resume_header: {resume_header}")
size_downloaded_for_progress_tracking += tmp_file_size
r = requests.get(max_quality_url, headers=resume_header, stream=True)
for chunk in r.iter_content(32768):
size_downloaded_for_progress_tracking += len(chunk)
size_downloaded_for_speed_tracking += len(chunk)
tmp_file.write(chunk)
if time.time() - download_last_update_time >= update_interval:
download_last_update_time = time.time()
time_in_progress = download_last_update_time - download_start_time
dl_speed_so_far = size_downloaded_for_speed_tracking / time_in_progress
human_dl_speed_so_far = f"{humanize.naturalsize(dl_speed_so_far, binary=True)}/s"
data_missing = total_content_length - size_downloaded_for_progress_tracking
time_til_completion = 1 / dl_speed_so_far * data_missing
human_time_til_completion = humanize.naturaldelta(d.timedelta(seconds=time_til_completion))
percentage_done = size_downloaded_for_progress_tracking / total_content_length * 100
human_pct = "{:.1f}".format(percentage_done)
human_size_dl = humanize.naturalsize(size_downloaded_for_progress_tracking, binary=True)
human_total_dl = humanize.naturalsize(total_content_length, binary=True)
log.debug(f"[thread] Downloaded {human_pct}% ({human_size_dl}/{human_total_dl} "
f"at an average {human_dl_speed_so_far}, approximately {human_time_til_completion} "
f"left til completion.)")
log.info(f"""{shorthand_uuid} Download of "{show_name}" done""")
except IOError:
log.error(f"{shorthand_uuid} IOError during download. Aborting this download thread ...")
return
log.info(f"{shorthand_uuid} Moving file to final location '{dest_path}' ...")
try:
shutil.move(tmp_path, dest_path)
except OSError as ose:
log.error(f"{shorthand_uuid} Failed moving file with an OSError\n"
f"{ose}\n"
f"Other threads continue unhindered.")
else:
log_successful_download(section_name, config_obj, show, state_file_abs_path, job_uuid, shorthand_uuid)
log.info(f"{shorthand_uuid} Done moving")
def get_max_quality_url(
show: type_def.mvw_json_response.Show) -> str:
if show.url_video_hd:
max_quality_url = show.url_video_hd
elif show.url_video:
max_quality_url = show.url_video
else:
max_quality_url = show.url_video_low
return max_quality_url
def get_content_length(
video_url: str) -> int:
r = requests.head(video_url)
if r.status_code == requests.codes.ok:
return int(r.headers["content-length"])
else:
return 0
def get_json_state(
state_file_abs_path: str) -> json.loads:
try:
with open(state_file_abs_path, "r", encoding="utf-8") as state_file:
try:
json_state = json.load(state_file)
except json.JSONDecodeError:
return []
else:
return json_state
except FileNotFoundError:
log.debug(f"State file does not exist (yet), assuming no previous downloads have ever happened ...")
return []
def is_already_downloaded(
show: type_def.mvw_json_response.Show,
json_state: json.loads,
show_name: str) -> bool:
for log_entry in json_state:
for log_data in [key for key in log_entry]:
if show.topic == log_entry[log_data]["topic"] and show.title == log_entry[log_data]["title"]:
log.debug(f"""Show "{show_name}" already downloaded, won't queue""")
return True
def download_media(
section_name: str,
config_obj: configparser.ConfigParser(),
json_obj: MVWJSONResponse) -> None:
global download_start_time
global download_last_update_time
global total_content_length
dl_threads = config_obj.getint(section_name, "dl_threads")
state_file_abs_path = get_state_file_abs_path(section_name, config_obj)
state_lock_file = state_file_abs_path + state_lock_file_ext
video_metadata = {}
tmp_dir = expanded_dest_dir(config_obj.get(section_name, "tmp_base_dir"))
dest_dir = expanded_dest_dir(config_obj.get(section_name, "dl_dir"))
log.info(f"""Download location is {tmp_dir}""")
log.info(f"""Final location is {dest_dir}""")
log.info(f"Limiting parallel downloads to {dl_threads} ...")
lock = get_state_file_lock(state_lock_file)
with lock:
state_file_none_or_valid_json(state_file_abs_path)
json_state = get_json_state(state_file_abs_path)
with ThreadPoolExecutor(max_workers=dl_threads) as pool:
download_last_update_time = time.time()
download_start_time = download_last_update_time
update_interval = config_obj.getint(section_name, "dl_progress_update_interval")
log.debug(f"""Will provide updates every {update_interval} {p.plural("second", update_interval)}""")
for result in json_obj.result.results.copy():
show_name = f"{result.topic} - {result.title}"
future = None
if not is_already_downloaded(result, json_state, show_name):
max_quality_url = get_max_quality_url(result)
content_length = get_content_length(max_quality_url)
video_metadata[result.id] = {"url": max_quality_url, "content_length": content_length}
total_content_length += video_metadata[result.id]["content_length"]
log.debug(f"Total download size upped to "
f"{humanize.naturalsize(total_content_length, binary=True)}")
job_uuid = str(uuid.uuid4())
shorthand_uuid = f"[{job_uuid[:2]}..{job_uuid[-2:]}]"
log.debug(f"{shorthand_uuid} Job UUID {job_uuid} generated, shorthand is {shorthand_uuid}")
log.debug(f"""{shorthand_uuid} Queuing "{show_name}" for download ...""")
future = pool.submit(
copy_url,
section_name,
config_obj,
result,
video_metadata[result.id],
state_file_abs_path,
show_name,
job_uuid,
shorthand_uuid,
tmp_dir,
dest_dir)
if future is not None:
future.result()
if __name__ == '__main__':
validate_default_section(config)
if config_has_valid_section(config):
validate_config_sections(config)
else:
log.error(f"No valid config section found. A valid config section has at least the mandatory options "
f"{CONST.CFG_MANDATORY} set. Exiting 2 ...")
sys.exit(2)
log.debug(f"Iterating over config sections ...")
for section in config.sections():
log.debug(f"Processing section '[{section}]' ...")
query_payload = get_query_payload(section, config)
json_response = get_json_response(section, config, query_payload)
log.debug(f"Filtering results by duration where applicable ...")
if config.has_option(section, "min_duration") or config.has_option(section, "max_duration"):
json_response = filter_json_by_duration(section, config, json_response)
log.debug(f"Filtering results by title regular expression where applicable ...")
if config.has_option(section, "title_not_regex"):
json_response = filter_json_by_title_regex(section, config, json_response)
log.debug(f"Deduplicating results by title where needed ...")
if config.has_option(section, "title_not_regex"):
json_response = dedup_json_titles(section, config, json_response)
log.debug(f"Interested in {json_response.result.queryInfo.resultCount} "
f"""{p.plural("show", json_response.result.queryInfo.resultCount)} ...""")
download_media(section, config, json_response)