From facfe4e7d3d520dea0e0e653d3693dfed7892cc9 Mon Sep 17 00:00:00 2001 From: hygienic-books Date: Sat, 19 Mar 2022 16:08:12 +0100 Subject: [PATCH] Check state file for previous downloads, improve logging per thread --- config.ini | 4 +- maus-query.json | 2 +- mvw-dl.py | 182 ++++++++++++++++++++++++++++++++---------------- 3 files changed, 126 insertions(+), 62 deletions(-) diff --git a/config.ini b/config.ini index 263bb3c..8a31745 100644 --- a/config.ini +++ b/config.ini @@ -20,8 +20,8 @@ min_duration = 1200 max_duration = 2700 query = @maus-query.json title_not_regex = audiodeskription|gebärdensprache -dl_filename_pattern = &(publish_date)s.&(ext)s -publish_date_srtftime_pattern = S%%YE%%Y%%m%%d01 +# dl_filename_pattern = &(publish_date)s.&(ext)s +# publish_date_srtftime_pattern = S%%YE%%Y%%m%%d01 # query = {"queries":[{"fields":["topic"],"query":"die sendung mit der maus"},{"fields":["channel"],"query":"ARD"}],"sortBy":"timestamp","sortOrder":"desc","future":false,"offset":0,"size":50} # state_file_name = maus # tmp_base_dir = %(tmp_base_dir)s/maus diff --git a/maus-query.json b/maus-query.json index 7703479..6bd94b4 100644 --- a/maus-query.json +++ b/maus-query.json @@ -17,5 +17,5 @@ "sortOrder": "desc", "future": false, "offset": 0, - "size": 20 + "size": 15 } diff --git a/mvw-dl.py b/mvw-dl.py index da67cb8..ffa23dc 100644 --- a/mvw-dl.py +++ b/mvw-dl.py @@ -7,6 +7,8 @@ import pathlib import re import sys import time + +import filelock import humanize import requests import inflect @@ -14,6 +16,8 @@ from rich.logging import RichHandler from rich.traceback import install import typing as t from rich.console import Console +from filelock import Timeout, FileLock +import uuid import type_def.mvw_json_response from type_def.mvw_json_request import MVWJSONRequest @@ -44,6 +48,9 @@ from rich.progress import ( download_start_time = 0 download_last_update_time = 0 size_downloaded = 0 +file_lock_timeout = 1 +state_lock_file_ext = ".lock" + progress = Progress( TextColumn("[bold blue]{task.fields[filename]}", justify="right"), BarColumn(bar_width=None), @@ -73,6 +80,7 @@ JSONType = t.Union[str, int, float, bool, None, t.Dict[str, t.Any], t.List[t.Any # 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY # 3: No search results to download # 4: State file already exists, has more than 0 bytes size but doesn't contain usable JSON +# 5: State file lock cannot be acquired within file_lock_timeout class CONST(object): @@ -122,9 +130,9 @@ logging.basicConfig( log = logging.getLogger("rich") # Our own code logs with this level log.setLevel(logging.DEBUG) -# connectionpool logs with WARNING, we don't need its verbosity -log_connectionpool = logging.getLogger("urllib3.connectionpool") -log_connectionpool.setLevel(logging.WARNING) +# connectionpool and filelock log with WARNING, we don't need its verbosity +logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) +logging.getLogger("filelock").setLevel(logging.WARNING) install(show_locals=True) @@ -362,7 +370,9 @@ def filename_replace_pattern( section_name: str, config_obj: configparser.ConfigParser(), show: type_def.mvw_json_response.Show, - max_quality_url: str) -> str: + max_quality_url: str, + shorthand_uuid: str) -> str: + filename = config_obj.get(section_name, "dl_filename_pattern") ext = pathlib.Path(max_quality_url).suffix.lstrip(".") publish_date = d.datetime.utcfromtimestamp(show.timestamp).strftime( @@ -371,37 +381,40 @@ def filename_replace_pattern( show_attrs = [attr for attr in dir(show) if not attr.startswith('_') and not callable(getattr(show, attr))] for attr in show_attrs: - log.debug(f"Replacing filename pattern '&({attr})s' ...") + log.debug(f"{shorthand_uuid} Replacing filename pattern '&({attr})s' ...") filename = re.sub(r"&\(" + re.escape(attr) + r"\)s", str(getattr(show, attr)), filename) - log.debug(f"New filename: '{filename}'") + log.debug(f"{shorthand_uuid} New filename: '{filename}'") for extended_attr in show_extended: - log.debug(f"Replacing filename pattern '&({extended_attr})s' ...") + log.debug(f"{shorthand_uuid} Replacing filename pattern '&({extended_attr})s' ...") filename = re.sub(r"&\(" + re.escape(extended_attr) + r"\)s", show_extended[extended_attr], filename) - log.debug(f"New filename: '{filename}'") + log.debug(f"{shorthand_uuid} New filename: '{filename}'") return filename def get_safe_filename( - dirty_filename: str) -> str: + dirty_filename: str, + shorthand_uuid: str) -> str: """https://stackoverflow.com/a/71199182""" - log.debug(f"Replacing unsafe characters in filename with dashes ...") + log.debug(f"{shorthand_uuid} Replacing unsafe characters in filename with dashes ...") clean_filename = re.sub(r"""[/\\?%*:|"<>\x7F\x00-\x1F]""", "-", dirty_filename) - log.debug(f"New filename: '{clean_filename}'") + log.debug(f"{shorthand_uuid} New filename: '{clean_filename}'") return clean_filename def filename_replace_spaces_with_underscores( section_name: str, config_obj: configparser.ConfigParser(), - filename: str) -> str: + filename: str, + shorthand_uuid: str) -> str: + space_replace_string = config_obj.get(section_name, "dl_filename_replace_spaces_with") - log.debug(f"Replacing space characters with '{space_replace_string}' ...") + log.debug(f"{shorthand_uuid} Replacing space characters with '{space_replace_string}' ...") underscored_filename = re.sub( r"\s", space_replace_string, filename) - log.debug(f"New filename: '{underscored_filename}'") + log.debug(f"{shorthand_uuid} New filename: '{underscored_filename}'") return underscored_filename @@ -409,16 +422,35 @@ def get_filename( section_name: str, config_obj: configparser.ConfigParser(), show: type_def.mvw_json_response.Show, - max_quality_url: str) -> str: - filename_replaced_patterns = filename_replace_pattern(section_name, config_obj, show, max_quality_url) - filename_safe = get_safe_filename(filename_replaced_patterns) + max_quality_url: str, + shorthand_uuid: str) -> str: + + log.debug(f"{shorthand_uuid} Generating final filename ...") + + filename_replaced_patterns = filename_replace_pattern( + section_name, + config_obj, + show, + max_quality_url, + shorthand_uuid) + + filename_safe = get_safe_filename( + filename_replaced_patterns, + shorthand_uuid) + if config.get(section_name, "dl_filename_replace_spaces_with"): - filename_safe = filename_replace_spaces_with_underscores(section_name, config_obj, filename_safe) + filename_safe = filename_replace_spaces_with_underscores( + section_name, + config_obj, + filename_safe, + shorthand_uuid) + if config.getboolean(section_name, "dl_filename_all_lowercase"): - log.debug(f"Lowercasing all filename characters ...") + log.debug(f"{shorthand_uuid} Lowercasing all filename characters ...") filename_safe = filename_safe.lower() - log.debug(f"New filename: '{filename_safe}'") - log.debug(filename_safe) + log.debug(f"{shorthand_uuid} New filename: '{filename_safe}'") + + log.debug(f"{shorthand_uuid} {filename_safe}") return filename_safe @@ -465,38 +497,56 @@ def truncate_log( return json_data +def get_state_file_lock( + state_lock_file: str) -> filelock.BaseFileLock: + global file_lock_timeout + try: + lock = FileLock(state_lock_file, timeout=file_lock_timeout) + return lock + except filelock.Timeout: + log.error(f"Unable to acquire lock on state lock file '{state_lock_file}' " + f"""within {file_lock_timeout} {p.plural("second", file_lock_timeout)}, exiting 5 ...""") + sys.exit(5) + + def log_successful_download( section_name: str, config_obj: configparser.ConfigParser(), show: type_def.mvw_json_response.Show, - state_file_abs_path: str) -> None: + state_file_abs_path: str, + job_uuid: str, + shorthand_uuid: str) -> None: timestamp_now = int(time.time()) - state_file_none_or_valid_json(state_file_abs_path) os.makedirs(os.path.dirname(state_file_abs_path), exist_ok=True) + state_lock_file = state_file_abs_path + state_lock_file_ext state_body = show.dict(include={"topic", "title"}) state_body["dl_complete_timestamp_epoch"] = timestamp_now state_body["dl_complete_timestamp_human"] = \ d.datetime.utcfromtimestamp(timestamp_now).strftime("%Y-%m-%d %H%M%S UTC") - state_entry = {timestamp_now: state_body} + state_entry = {job_uuid: state_body} json_state = None - log.debug(f"Writing log entry to '{state_file_abs_path}' ...") - with open(state_file_abs_path, "r+") as state_file: - try: - json_state = json.load(state_file) - except json.JSONDecodeError: - if json_state is None: - state_file.truncate() - json_state = [] + lock = get_state_file_lock(state_lock_file) - with open(state_file_abs_path, "w") as state_file: - json_state.append(state_entry) - max_log_entries = config_obj.getint(section_name, "state_file_retention") - if len(json_state) > max_log_entries: - json_state = truncate_log(json_state, max_log_entries) - json.dump(json_state, state_file, indent=4, sort_keys=True) + with lock: + state_file_none_or_valid_json(state_file_abs_path) + with open(state_file_abs_path, "r+") as state_file: + try: + json_state = json.load(state_file) + except json.JSONDecodeError: + if json_state is None: + state_file.truncate() + json_state = [] + + log.debug(f"{shorthand_uuid} Writing log entry to '{state_file_abs_path}' ...") + with open(state_file_abs_path, "w") as state_file: + json_state.append(state_entry) + max_log_entries = config_obj.getint(section_name, "state_file_retention") + if len(json_state) > max_log_entries: + json_state = truncate_log(json_state, max_log_entries) + json.dump(json_state, state_file, indent=4, sort_keys=True) def copy_url( @@ -505,7 +555,10 @@ def copy_url( show: type_def.mvw_json_response.Show, video_metadata: dict, total_content_length: int, - state_file_abs_path: str) -> None: + state_file_abs_path: str, + show_name: str, + job_uuid: str, + shorthand_uuid: str) -> None: """Copy data from a url to a local file.""" global download_start_time @@ -514,18 +567,17 @@ def copy_url( update_interval = config_obj.getint(section_name, "dl_progress_update_interval") max_quality_url = video_metadata["url"] - filename = get_filename(section_name, config_obj, show, max_quality_url) - dest_dir = expanded_dest_dir(config_obj.get(section_name, "dl_dir")) + filename = get_filename(section_name, config_obj, show, max_quality_url, shorthand_uuid) + dest_dir = expanded_dest_dir(config_obj.get(section_name, "tmp_base_dir")) dest_path = os.path.join(dest_dir, filename) - show_name = f"{show.topic} - {show.title}" os.makedirs(os.path.dirname(dest_path), exist_ok=True) # TODO quit - log_successful_download(section_name, config_obj, show, state_file_abs_path) + log_successful_download(section_name, config_obj, show, state_file_abs_path, job_uuid, shorthand_uuid) quit() with open(dest_path, "wb") as dest_file: - log.info(f"""Downloading "{show_name}" ...""") - log.info(f"Download location resolved to {dest_path}") + log.info(f"""{shorthand_uuid} Downloading "{show_name}" ...""") + log.info(f"{shorthand_uuid} Download location resolved to {dest_path}") r = requests.get(max_quality_url, stream=True) for chunk in r.iter_content(32768): size_downloaded += len(chunk) @@ -538,12 +590,12 @@ def copy_url( human_pct = "{:.1f}".format(percentage_done) human_size_dl = humanize.naturalsize(size_downloaded, binary=True) human_total_dl = humanize.naturalsize(total_content_length, binary=True) - log.debug(f"Downloaded {human_pct}% ({human_size_dl}/{human_total_dl} at an average " + log.debug(f"{shorthand_uuid} Downloaded {human_pct}% ({human_size_dl}/{human_total_dl} at an average " f"{human_dl_speed_so_far})") if done_event.is_set(): - log.info(f"""Download of "{show_name}" interrupted""") + log.info(f"""{shorthand_uuid} Download of "{show_name}" interrupted""") return - log.info(f"""Download of "{show_name}" done""") + log.info(f"""{shorthand_uuid} Download of "{show_name}" done""") # log_successful_download(show) @@ -577,7 +629,7 @@ def download_media( dl_threads = config_obj.getint(section_name, "dl_threads") state_file_abs_path = get_state_file_abs_path(section_name, config_obj) - state_file_none_or_valid_json(state_file_abs_path) + state_lock_file = state_file_abs_path + state_lock_file_ext video_metadata = {} for result in json_obj.result.results.copy(): @@ -590,21 +642,33 @@ def download_media( video_metadata["total_content_length"] = total_content_length log.info(f"""Download location is {config_obj.get(section_name, "dl_dir")}""") log.info(f"Limiting parallel downloads to {dl_threads} ...") - # TODO prior to download check state file + + lock = get_state_file_lock(state_lock_file) + with ThreadPoolExecutor(max_workers=dl_threads) as pool: download_last_update_time = time.time() download_start_time = download_last_update_time update_interval = config_obj.getint(section_name, "dl_progress_update_interval") log.debug(f"""Will provide updates every {update_interval} {p.plural("second", update_interval)}""") - for result in json_obj.result.results.copy(): - pool.submit( - copy_url, - section_name, - config_obj, - result, - video_metadata[result.id], - video_metadata["total_content_length"], - state_file_abs_path) + with lock: + state_file_none_or_valid_json(state_file_abs_path) + for result in json_obj.result.results.copy(): + show_name = f"{result.topic} - {result.title}" + if not is_already_downloaded(result, state_file_abs_path, show_name): + job_uuid = str(uuid.uuid4()) + shorthand_uuid = f"[{job_uuid[:2]}..{job_uuid[-2:]}]" + log.debug(f"""Queuing "{show_name}" for download ...""") + pool.submit( + copy_url, + section_name, + config_obj, + result, + video_metadata[result.id], + video_metadata["total_content_length"], + state_file_abs_path, + show_name, + job_uuid, + shorthand_uuid) if __name__ == '__main__':