Compare commits

...

6 Commits

5 changed files with 196 additions and 86 deletions

View File

@ -20,8 +20,8 @@ min_duration = 1200
max_duration = 2700 max_duration = 2700
query = @maus-query.json query = @maus-query.json
title_not_regex = audiodeskription|gebärdensprache title_not_regex = audiodeskription|gebärdensprache
dl_filename_pattern = &(publish_date)s.&(ext)s # dl_filename_pattern = &(publish_date)s.&(ext)s
publish_date_srtftime_pattern = S%%YE%%Y%%m%%d01 # publish_date_srtftime_pattern = S%%YE%%Y%%m%%d01
# query = {"queries":[{"fields":["topic"],"query":"die sendung mit der maus"},{"fields":["channel"],"query":"ARD"}],"sortBy":"timestamp","sortOrder":"desc","future":false,"offset":0,"size":50} # query = {"queries":[{"fields":["topic"],"query":"die sendung mit der maus"},{"fields":["channel"],"query":"ARD"}],"sortBy":"timestamp","sortOrder":"desc","future":false,"offset":0,"size":50}
# state_file_name = maus # state_file_name = maus
# tmp_base_dir = %(tmp_base_dir)s/maus # tmp_base_dir = %(tmp_base_dir)s/maus

View File

@ -17,5 +17,5 @@
"sortOrder": "desc", "sortOrder": "desc",
"future": false, "future": false,
"offset": 0, "offset": 0,
"size": 20 "size": 15
} }

267
mvw-dl.py
View File

@ -7,6 +7,8 @@ import pathlib
import re import re
import sys import sys
import time import time
import filelock
import humanize import humanize
import requests import requests
import inflect import inflect
@ -14,6 +16,8 @@ from rich.logging import RichHandler
from rich.traceback import install from rich.traceback import install
import typing as t import typing as t
from rich.console import Console from rich.console import Console
from filelock import Timeout, FileLock
import uuid
import type_def.mvw_json_response import type_def.mvw_json_response
from type_def.mvw_json_request import MVWJSONRequest from type_def.mvw_json_request import MVWJSONRequest
@ -44,6 +48,9 @@ from rich.progress import (
download_start_time = 0 download_start_time = 0
download_last_update_time = 0 download_last_update_time = 0
size_downloaded = 0 size_downloaded = 0
file_lock_timeout = 1
state_lock_file_ext = ".lock"
progress = Progress( progress = Progress(
TextColumn("[bold blue]{task.fields[filename]}", justify="right"), TextColumn("[bold blue]{task.fields[filename]}", justify="right"),
BarColumn(bar_width=None), BarColumn(bar_width=None),
@ -73,6 +80,7 @@ JSONType = t.Union[str, int, float, bool, None, t.Dict[str, t.Any], t.List[t.Any
# 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY # 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY
# 3: No search results to download # 3: No search results to download
# 4: State file already exists, has more than 0 bytes size but doesn't contain usable JSON # 4: State file already exists, has more than 0 bytes size but doesn't contain usable JSON
# 5: State file lock cannot be acquired within file_lock_timeout
class CONST(object): class CONST(object):
@ -122,9 +130,9 @@ logging.basicConfig(
log = logging.getLogger("rich") log = logging.getLogger("rich")
# Our own code logs with this level # Our own code logs with this level
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
# connectionpool logs with WARNING, we don't need its verbosity # connectionpool and filelock log with WARNING, we don't need its verbosity
log_connectionpool = logging.getLogger("urllib3.connectionpool") # logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
log_connectionpool.setLevel(logging.WARNING) logging.getLogger("filelock").setLevel(logging.WARNING)
install(show_locals=True) install(show_locals=True)
@ -362,7 +370,9 @@ def filename_replace_pattern(
section_name: str, section_name: str,
config_obj: configparser.ConfigParser(), config_obj: configparser.ConfigParser(),
show: type_def.mvw_json_response.Show, show: type_def.mvw_json_response.Show,
max_quality_url: str) -> str: max_quality_url: str,
shorthand_uuid: str) -> str:
filename = config_obj.get(section_name, "dl_filename_pattern") filename = config_obj.get(section_name, "dl_filename_pattern")
ext = pathlib.Path(max_quality_url).suffix.lstrip(".") ext = pathlib.Path(max_quality_url).suffix.lstrip(".")
publish_date = d.datetime.utcfromtimestamp(show.timestamp).strftime( publish_date = d.datetime.utcfromtimestamp(show.timestamp).strftime(
@ -371,37 +381,40 @@ def filename_replace_pattern(
show_attrs = [attr for attr in dir(show) if not attr.startswith('_') and not callable(getattr(show, attr))] show_attrs = [attr for attr in dir(show) if not attr.startswith('_') and not callable(getattr(show, attr))]
for attr in show_attrs: for attr in show_attrs:
log.debug(f"Replacing filename pattern '&({attr})s' ...") # log.debug(f"{shorthand_uuid} Replacing filename pattern '&({attr})s' ...")
filename = re.sub(r"&\(" + re.escape(attr) + r"\)s", str(getattr(show, attr)), filename) filename = re.sub(r"&\(" + re.escape(attr) + r"\)s", str(getattr(show, attr)), filename)
log.debug(f"New filename: '{filename}'") # log.debug(f"{shorthand_uuid} New filename: '{filename}'")
for extended_attr in show_extended: for extended_attr in show_extended:
log.debug(f"Replacing filename pattern '&({extended_attr})s' ...") # log.debug(f"{shorthand_uuid} Replacing filename pattern '&({extended_attr})s' ...")
filename = re.sub(r"&\(" + re.escape(extended_attr) + r"\)s", show_extended[extended_attr], filename) filename = re.sub(r"&\(" + re.escape(extended_attr) + r"\)s", show_extended[extended_attr], filename)
log.debug(f"New filename: '{filename}'") # log.debug(f"{shorthand_uuid} New filename: '{filename}'")
return filename return filename
def get_safe_filename( def get_safe_filename(
dirty_filename: str) -> str: dirty_filename: str,
shorthand_uuid: str) -> str:
"""https://stackoverflow.com/a/71199182""" """https://stackoverflow.com/a/71199182"""
log.debug(f"Replacing unsafe characters in filename with dashes ...") log.debug(f"{shorthand_uuid} Replacing unsafe characters in filename with dashes ...")
clean_filename = re.sub(r"""[/\\?%*:|"<>\x7F\x00-\x1F]""", "-", dirty_filename) clean_filename = re.sub(r"""[/\\?%*:|"<>\x7F\x00-\x1F]""", "-", dirty_filename)
log.debug(f"New filename: '{clean_filename}'") log.debug(f"{shorthand_uuid} New filename: '{clean_filename}'")
return clean_filename return clean_filename
def filename_replace_spaces_with_underscores( def filename_replace_spaces_with_underscores(
section_name: str, section_name: str,
config_obj: configparser.ConfigParser(), config_obj: configparser.ConfigParser(),
filename: str) -> str: filename: str,
shorthand_uuid: str) -> str:
space_replace_string = config_obj.get(section_name, "dl_filename_replace_spaces_with") space_replace_string = config_obj.get(section_name, "dl_filename_replace_spaces_with")
log.debug(f"Replacing space characters with '{space_replace_string}' ...") log.debug(f"{shorthand_uuid} Replacing space characters with '{space_replace_string}' ...")
underscored_filename = re.sub( underscored_filename = re.sub(
r"\s", r"\s",
space_replace_string, space_replace_string,
filename) filename)
log.debug(f"New filename: '{underscored_filename}'") log.debug(f"{shorthand_uuid} New filename: '{underscored_filename}'")
return underscored_filename return underscored_filename
@ -409,16 +422,35 @@ def get_filename(
section_name: str, section_name: str,
config_obj: configparser.ConfigParser(), config_obj: configparser.ConfigParser(),
show: type_def.mvw_json_response.Show, show: type_def.mvw_json_response.Show,
max_quality_url: str) -> str: max_quality_url: str,
filename_replaced_patterns = filename_replace_pattern(section_name, config_obj, show, max_quality_url) shorthand_uuid: str) -> str:
filename_safe = get_safe_filename(filename_replaced_patterns)
log.debug(f"{shorthand_uuid} Generating final filename ...")
filename_replaced_patterns = filename_replace_pattern(
section_name,
config_obj,
show,
max_quality_url,
shorthand_uuid)
filename_safe = get_safe_filename(
filename_replaced_patterns,
shorthand_uuid)
if config.get(section_name, "dl_filename_replace_spaces_with"): if config.get(section_name, "dl_filename_replace_spaces_with"):
filename_safe = filename_replace_spaces_with_underscores(section_name, config_obj, filename_safe) filename_safe = filename_replace_spaces_with_underscores(
section_name,
config_obj,
filename_safe,
shorthand_uuid)
if config.getboolean(section_name, "dl_filename_all_lowercase"): if config.getboolean(section_name, "dl_filename_all_lowercase"):
log.debug(f"Lowercasing all filename characters ...") log.debug(f"{shorthand_uuid} Lowercasing all filename characters ...")
filename_safe = filename_safe.lower() filename_safe = filename_safe.lower()
log.debug(f"New filename: '{filename_safe}'") log.debug(f"{shorthand_uuid} New filename: '{filename_safe}'")
log.debug(filename_safe)
log.debug(f"{shorthand_uuid} {filename_safe}")
return filename_safe return filename_safe
@ -465,38 +497,56 @@ def truncate_log(
return json_data return json_data
def get_state_file_lock(
state_lock_file: str) -> filelock.BaseFileLock:
global file_lock_timeout
try:
lock = FileLock(state_lock_file, timeout=file_lock_timeout)
return lock
except filelock.Timeout:
log.error(f"Unable to acquire lock on state lock file '{state_lock_file}' "
f"""within {file_lock_timeout} {p.plural("second", file_lock_timeout)}, exiting 5 ...""")
sys.exit(5)
def log_successful_download( def log_successful_download(
section_name: str, section_name: str,
config_obj: configparser.ConfigParser(), config_obj: configparser.ConfigParser(),
show: type_def.mvw_json_response.Show, show: type_def.mvw_json_response.Show,
state_file_abs_path: str) -> None: state_file_abs_path: str,
job_uuid: str,
shorthand_uuid: str) -> None:
timestamp_now = int(time.time()) timestamp_now = int(time.time())
state_file_none_or_valid_json(state_file_abs_path)
os.makedirs(os.path.dirname(state_file_abs_path), exist_ok=True) os.makedirs(os.path.dirname(state_file_abs_path), exist_ok=True)
state_lock_file = state_file_abs_path + state_lock_file_ext
state_body = show.dict(include={"topic", "title"}) state_body = show.dict(include={"topic", "title"})
state_body["dl_complete_timestamp_epoch"] = timestamp_now state_body["dl_complete_timestamp_epoch"] = timestamp_now
state_body["dl_complete_timestamp_human"] = \ state_body["dl_complete_timestamp_human"] = \
d.datetime.utcfromtimestamp(timestamp_now).strftime("%Y-%m-%d %H%M%S UTC") d.datetime.utcfromtimestamp(timestamp_now).strftime("%Y-%m-%d %H%M%S UTC")
state_entry = {timestamp_now: state_body} state_entry = {job_uuid: state_body}
json_state = None json_state = None
log.debug(f"Writing log entry to '{state_file_abs_path}' ...") lock = get_state_file_lock(state_lock_file)
with open(state_file_abs_path, "r+") as state_file:
try:
json_state = json.load(state_file)
except json.JSONDecodeError:
if json_state is None:
state_file.truncate()
json_state = []
with open(state_file_abs_path, "w") as state_file: with lock:
json_state.append(state_entry) state_file_none_or_valid_json(state_file_abs_path)
max_log_entries = config_obj.getint(section_name, "state_file_retention") with open(state_file_abs_path, "r+") as state_file:
if len(json_state) > max_log_entries: try:
json_state = truncate_log(json_state, max_log_entries) json_state = json.load(state_file)
json.dump(json_state, state_file, indent=4, sort_keys=True) except json.JSONDecodeError:
if json_state is None:
state_file.truncate()
json_state = []
log.debug(f"{shorthand_uuid} Writing log entry to '{state_file_abs_path}' ...")
with open(state_file_abs_path, "w") as state_file:
json_state.append(state_entry)
max_log_entries = config_obj.getint(section_name, "state_file_retention")
if len(json_state) > max_log_entries:
json_state = truncate_log(json_state, max_log_entries)
json.dump(json_state, state_file, indent=4, sort_keys=True)
def copy_url( def copy_url(
@ -505,7 +555,12 @@ def copy_url(
show: type_def.mvw_json_response.Show, show: type_def.mvw_json_response.Show,
video_metadata: dict, video_metadata: dict,
total_content_length: int, total_content_length: int,
state_file_abs_path: str) -> None: state_file_abs_path: str,
show_name: str,
job_uuid: str,
shorthand_uuid: str,
tmp_dir: str,
dest_dir: str) -> None:
"""Copy data from a url to a local file.""" """Copy data from a url to a local file."""
global download_start_time global download_start_time
@ -514,37 +569,54 @@ def copy_url(
update_interval = config_obj.getint(section_name, "dl_progress_update_interval") update_interval = config_obj.getint(section_name, "dl_progress_update_interval")
max_quality_url = video_metadata["url"] max_quality_url = video_metadata["url"]
filename = get_filename(section_name, config_obj, show, max_quality_url) filename = get_filename(section_name, config_obj, show, max_quality_url, shorthand_uuid)
dest_dir = expanded_dest_dir(config_obj.get(section_name, "dl_dir")) resume_header = {}
tmp_file_size = 0
tmp_path = os.path.join(tmp_dir, filename)
dest_path = os.path.join(dest_dir, filename) dest_path = os.path.join(dest_dir, filename)
show_name = f"{show.topic} - {show.title}"
os.makedirs(os.path.dirname(dest_path), exist_ok=True) os.makedirs(os.path.dirname(tmp_path), exist_ok=True)
# TODO quit log.info(f"{shorthand_uuid} Download location resolved to '{tmp_path}'")
log_successful_download(section_name, config_obj, show, state_file_abs_path) if os.path.exists(tmp_path):
quit() tmp_file_size = os.path.getsize(tmp_path)
with open(dest_path, "wb") as dest_file: log.debug(f"{shorthand_uuid} Temporary file '{tmp_path}' exists likely from a previous incomplete "
log.info(f"""Downloading "{show_name}" ...""") f"download attempt, size is {humanize.naturalsize(tmp_file_size, binary=True)}. Resuming ...")
log.info(f"Download location resolved to {dest_path}") try:
r = requests.get(max_quality_url, stream=True) with open(tmp_path, "wb") as tmp_file:
for chunk in r.iter_content(32768): log.info(f"""{shorthand_uuid} Downloading "{show_name}" ...""")
size_downloaded += len(chunk) if tmp_file_size > 0:
dest_file.write(chunk) resume_header = {"range": f"bytes={tmp_file_size}-"}
if time.time() - download_last_update_time >= update_interval: log.info(f"resume_header: {resume_header}")
download_last_update_time = time.time() r = requests.get(max_quality_url, headers=resume_header, stream=True)
dl_speed_so_far = size_downloaded / (download_last_update_time - download_start_time) for chunk in r.iter_content(32768):
human_dl_speed_so_far = f"{humanize.naturalsize(dl_speed_so_far, binary=True)}/s" size_downloaded += len(chunk)
percentage_done = size_downloaded / total_content_length * 100 tmp_file.write(chunk)
human_pct = "{:.1f}".format(percentage_done) if time.time() - download_last_update_time >= update_interval:
human_size_dl = humanize.naturalsize(size_downloaded, binary=True) download_last_update_time = time.time()
human_total_dl = humanize.naturalsize(total_content_length, binary=True) dl_speed_so_far = size_downloaded / (download_last_update_time - download_start_time)
log.debug(f"Downloaded {human_pct}% ({human_size_dl}/{human_total_dl} at an average " human_dl_speed_so_far = f"{humanize.naturalsize(dl_speed_so_far, binary=True)}/s"
f"{human_dl_speed_so_far})") percentage_done = size_downloaded / total_content_length * 100
if done_event.is_set(): human_pct = "{:.1f}".format(percentage_done)
log.info(f"""Download of "{show_name}" interrupted""") human_size_dl = humanize.naturalsize(size_downloaded, binary=True)
return human_total_dl = humanize.naturalsize(total_content_length, binary=True)
log.info(f"""Download of "{show_name}" done""") log.debug(f"[thread] Downloaded {human_pct}% ({human_size_dl}/{human_total_dl} "
# log_successful_download(show) f"at an average {human_dl_speed_so_far})")
if done_event.is_set():
log.info(f"""{shorthand_uuid} Download of "{show_name}" interrupted""")
return
log.info(f"""{shorthand_uuid} Download of "{show_name}" done""")
except IOError:
log.error(f"{shorthand_uuid} IOError during download. Aborting this download thread ...")
return
log.info(f"{shorthand_uuid} Moving file to final location '{dest_path}' ...")
try:
os.rename(tmp_path, dest_path)
log_successful_download(section_name, config_obj, show, state_file_abs_path, job_uuid, shorthand_uuid)
log.info(f"{shorthand_uuid} Done moving")
except Exception:
console.print_exception(show_locals=True)
log.error(f"{shorthand_uuid} Failed moving file")
def get_max_quality_url( def get_max_quality_url(
@ -567,6 +639,24 @@ def get_content_length(
return 0 return 0
def is_already_downloaded(
show: type_def.mvw_json_response.Show,
state_file_abs_path: str,
show_name: str) -> bool:
with open(state_file_abs_path, "r") as state_file:
try:
json_state = json.load(state_file)
for log_entry in json_state:
for log_data in [key for key in log_entry]:
if show.topic == log_entry[log_data]["topic"] and show.title == log_entry[log_data]["title"]:
log.debug(f"""Show "{show_name}" already downloaded, won't queue""")
return True
except json.JSONDecodeError:
return False
return False
def download_media( def download_media(
section_name: str, section_name: str,
config_obj: configparser.ConfigParser(), config_obj: configparser.ConfigParser(),
@ -577,7 +667,7 @@ def download_media(
dl_threads = config_obj.getint(section_name, "dl_threads") dl_threads = config_obj.getint(section_name, "dl_threads")
state_file_abs_path = get_state_file_abs_path(section_name, config_obj) state_file_abs_path = get_state_file_abs_path(section_name, config_obj)
state_file_none_or_valid_json(state_file_abs_path) state_lock_file = state_file_abs_path + state_lock_file_ext
video_metadata = {} video_metadata = {}
for result in json_obj.result.results.copy(): for result in json_obj.result.results.copy():
@ -588,23 +678,40 @@ def download_media(
for video in video_metadata: for video in video_metadata:
total_content_length += video_metadata[video]["content_length"] total_content_length += video_metadata[video]["content_length"]
video_metadata["total_content_length"] = total_content_length video_metadata["total_content_length"] = total_content_length
log.info(f"""Download location is {config_obj.get(section_name, "dl_dir")}""") tmp_dir = expanded_dest_dir(config_obj.get(section_name, "tmp_base_dir"))
dest_dir = expanded_dest_dir(config_obj.get(section_name, "dl_dir"))
log.info(f"""Download location is {tmp_dir}""")
log.info(f"""Final location is {dest_dir}""")
log.info(f"Limiting parallel downloads to {dl_threads} ...") log.info(f"Limiting parallel downloads to {dl_threads} ...")
# TODO prior to download check state file
lock = get_state_file_lock(state_lock_file)
with ThreadPoolExecutor(max_workers=dl_threads) as pool: with ThreadPoolExecutor(max_workers=dl_threads) as pool:
download_last_update_time = time.time() download_last_update_time = time.time()
download_start_time = download_last_update_time download_start_time = download_last_update_time
update_interval = config_obj.getint(section_name, "dl_progress_update_interval") update_interval = config_obj.getint(section_name, "dl_progress_update_interval")
log.debug(f"""Will provide updates every {update_interval} {p.plural("second", update_interval)}""") log.debug(f"""Will provide updates every {update_interval} {p.plural("second", update_interval)}""")
for result in json_obj.result.results.copy(): with lock:
pool.submit( state_file_none_or_valid_json(state_file_abs_path)
copy_url, for result in json_obj.result.results.copy():
section_name, show_name = f"{result.topic} - {result.title}"
config_obj, if not is_already_downloaded(result, state_file_abs_path, show_name):
result, job_uuid = str(uuid.uuid4())
video_metadata[result.id], shorthand_uuid = f"[{job_uuid[:2]}..{job_uuid[-2:]}]"
video_metadata["total_content_length"], log.debug(f"""Queuing "{show_name}" for download ...""")
state_file_abs_path) pool.submit(
copy_url,
section_name,
config_obj,
result,
video_metadata[result.id],
video_metadata["total_content_length"],
state_file_abs_path,
show_name,
job_uuid,
shorthand_uuid,
tmp_dir,
dest_dir)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -2,4 +2,5 @@ rich
requests requests
inflect inflect
pydantic pydantic
humanize humanize
filelock

View File

@ -10,6 +10,8 @@ charset-normalizer==2.0.12
# via requests # via requests
commonmark==0.9.1 commonmark==0.9.1
# via rich # via rich
filelock==3.6.0
# via -r requirements.in
humanize==4.0.0 humanize==4.0.0
# via -r requirements.in # via -r requirements.in
idna==3.3 idna==3.3
@ -26,5 +28,5 @@ rich==12.0.0
# via -r requirements.in # via -r requirements.in
typing-extensions==4.1.1 typing-extensions==4.1.1
# via pydantic # via pydantic
urllib3==1.26.8 urllib3==1.26.9
# via requests # via requests