Check state file for previous downloads, improve logging per thread
This commit is contained in:
parent
c0a271d0eb
commit
facfe4e7d3
@ -20,8 +20,8 @@ min_duration = 1200
|
|||||||
max_duration = 2700
|
max_duration = 2700
|
||||||
query = @maus-query.json
|
query = @maus-query.json
|
||||||
title_not_regex = audiodeskription|gebärdensprache
|
title_not_regex = audiodeskription|gebärdensprache
|
||||||
dl_filename_pattern = &(publish_date)s.&(ext)s
|
# dl_filename_pattern = &(publish_date)s.&(ext)s
|
||||||
publish_date_srtftime_pattern = S%%YE%%Y%%m%%d01
|
# publish_date_srtftime_pattern = S%%YE%%Y%%m%%d01
|
||||||
# query = {"queries":[{"fields":["topic"],"query":"die sendung mit der maus"},{"fields":["channel"],"query":"ARD"}],"sortBy":"timestamp","sortOrder":"desc","future":false,"offset":0,"size":50}
|
# query = {"queries":[{"fields":["topic"],"query":"die sendung mit der maus"},{"fields":["channel"],"query":"ARD"}],"sortBy":"timestamp","sortOrder":"desc","future":false,"offset":0,"size":50}
|
||||||
# state_file_name = maus
|
# state_file_name = maus
|
||||||
# tmp_base_dir = %(tmp_base_dir)s/maus
|
# tmp_base_dir = %(tmp_base_dir)s/maus
|
||||||
|
@ -17,5 +17,5 @@
|
|||||||
"sortOrder": "desc",
|
"sortOrder": "desc",
|
||||||
"future": false,
|
"future": false,
|
||||||
"offset": 0,
|
"offset": 0,
|
||||||
"size": 20
|
"size": 15
|
||||||
}
|
}
|
||||||
|
182
mvw-dl.py
182
mvw-dl.py
@ -7,6 +7,8 @@ import pathlib
|
|||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
import filelock
|
||||||
import humanize
|
import humanize
|
||||||
import requests
|
import requests
|
||||||
import inflect
|
import inflect
|
||||||
@ -14,6 +16,8 @@ from rich.logging import RichHandler
|
|||||||
from rich.traceback import install
|
from rich.traceback import install
|
||||||
import typing as t
|
import typing as t
|
||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
|
from filelock import Timeout, FileLock
|
||||||
|
import uuid
|
||||||
|
|
||||||
import type_def.mvw_json_response
|
import type_def.mvw_json_response
|
||||||
from type_def.mvw_json_request import MVWJSONRequest
|
from type_def.mvw_json_request import MVWJSONRequest
|
||||||
@ -44,6 +48,9 @@ from rich.progress import (
|
|||||||
download_start_time = 0
|
download_start_time = 0
|
||||||
download_last_update_time = 0
|
download_last_update_time = 0
|
||||||
size_downloaded = 0
|
size_downloaded = 0
|
||||||
|
file_lock_timeout = 1
|
||||||
|
state_lock_file_ext = ".lock"
|
||||||
|
|
||||||
progress = Progress(
|
progress = Progress(
|
||||||
TextColumn("[bold blue]{task.fields[filename]}", justify="right"),
|
TextColumn("[bold blue]{task.fields[filename]}", justify="right"),
|
||||||
BarColumn(bar_width=None),
|
BarColumn(bar_width=None),
|
||||||
@ -73,6 +80,7 @@ JSONType = t.Union[str, int, float, bool, None, t.Dict[str, t.Any], t.List[t.Any
|
|||||||
# 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY
|
# 2: Config file invalid, sections must define at least CONST.CFG_MANDATORY
|
||||||
# 3: No search results to download
|
# 3: No search results to download
|
||||||
# 4: State file already exists, has more than 0 bytes size but doesn't contain usable JSON
|
# 4: State file already exists, has more than 0 bytes size but doesn't contain usable JSON
|
||||||
|
# 5: State file lock cannot be acquired within file_lock_timeout
|
||||||
|
|
||||||
|
|
||||||
class CONST(object):
|
class CONST(object):
|
||||||
@ -122,9 +130,9 @@ logging.basicConfig(
|
|||||||
log = logging.getLogger("rich")
|
log = logging.getLogger("rich")
|
||||||
# Our own code logs with this level
|
# Our own code logs with this level
|
||||||
log.setLevel(logging.DEBUG)
|
log.setLevel(logging.DEBUG)
|
||||||
# connectionpool logs with WARNING, we don't need its verbosity
|
# connectionpool and filelock log with WARNING, we don't need its verbosity
|
||||||
log_connectionpool = logging.getLogger("urllib3.connectionpool")
|
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
|
||||||
log_connectionpool.setLevel(logging.WARNING)
|
logging.getLogger("filelock").setLevel(logging.WARNING)
|
||||||
install(show_locals=True)
|
install(show_locals=True)
|
||||||
|
|
||||||
|
|
||||||
@ -362,7 +370,9 @@ def filename_replace_pattern(
|
|||||||
section_name: str,
|
section_name: str,
|
||||||
config_obj: configparser.ConfigParser(),
|
config_obj: configparser.ConfigParser(),
|
||||||
show: type_def.mvw_json_response.Show,
|
show: type_def.mvw_json_response.Show,
|
||||||
max_quality_url: str) -> str:
|
max_quality_url: str,
|
||||||
|
shorthand_uuid: str) -> str:
|
||||||
|
|
||||||
filename = config_obj.get(section_name, "dl_filename_pattern")
|
filename = config_obj.get(section_name, "dl_filename_pattern")
|
||||||
ext = pathlib.Path(max_quality_url).suffix.lstrip(".")
|
ext = pathlib.Path(max_quality_url).suffix.lstrip(".")
|
||||||
publish_date = d.datetime.utcfromtimestamp(show.timestamp).strftime(
|
publish_date = d.datetime.utcfromtimestamp(show.timestamp).strftime(
|
||||||
@ -371,37 +381,40 @@ def filename_replace_pattern(
|
|||||||
show_attrs = [attr for attr in dir(show) if not attr.startswith('_') and not callable(getattr(show, attr))]
|
show_attrs = [attr for attr in dir(show) if not attr.startswith('_') and not callable(getattr(show, attr))]
|
||||||
|
|
||||||
for attr in show_attrs:
|
for attr in show_attrs:
|
||||||
log.debug(f"Replacing filename pattern '&({attr})s' ...")
|
log.debug(f"{shorthand_uuid} Replacing filename pattern '&({attr})s' ...")
|
||||||
filename = re.sub(r"&\(" + re.escape(attr) + r"\)s", str(getattr(show, attr)), filename)
|
filename = re.sub(r"&\(" + re.escape(attr) + r"\)s", str(getattr(show, attr)), filename)
|
||||||
log.debug(f"New filename: '{filename}'")
|
log.debug(f"{shorthand_uuid} New filename: '{filename}'")
|
||||||
for extended_attr in show_extended:
|
for extended_attr in show_extended:
|
||||||
log.debug(f"Replacing filename pattern '&({extended_attr})s' ...")
|
log.debug(f"{shorthand_uuid} Replacing filename pattern '&({extended_attr})s' ...")
|
||||||
filename = re.sub(r"&\(" + re.escape(extended_attr) + r"\)s", show_extended[extended_attr], filename)
|
filename = re.sub(r"&\(" + re.escape(extended_attr) + r"\)s", show_extended[extended_attr], filename)
|
||||||
log.debug(f"New filename: '{filename}'")
|
log.debug(f"{shorthand_uuid} New filename: '{filename}'")
|
||||||
return filename
|
return filename
|
||||||
|
|
||||||
|
|
||||||
def get_safe_filename(
|
def get_safe_filename(
|
||||||
dirty_filename: str) -> str:
|
dirty_filename: str,
|
||||||
|
shorthand_uuid: str) -> str:
|
||||||
"""https://stackoverflow.com/a/71199182"""
|
"""https://stackoverflow.com/a/71199182"""
|
||||||
|
|
||||||
log.debug(f"Replacing unsafe characters in filename with dashes ...")
|
log.debug(f"{shorthand_uuid} Replacing unsafe characters in filename with dashes ...")
|
||||||
clean_filename = re.sub(r"""[/\\?%*:|"<>\x7F\x00-\x1F]""", "-", dirty_filename)
|
clean_filename = re.sub(r"""[/\\?%*:|"<>\x7F\x00-\x1F]""", "-", dirty_filename)
|
||||||
log.debug(f"New filename: '{clean_filename}'")
|
log.debug(f"{shorthand_uuid} New filename: '{clean_filename}'")
|
||||||
return clean_filename
|
return clean_filename
|
||||||
|
|
||||||
|
|
||||||
def filename_replace_spaces_with_underscores(
|
def filename_replace_spaces_with_underscores(
|
||||||
section_name: str,
|
section_name: str,
|
||||||
config_obj: configparser.ConfigParser(),
|
config_obj: configparser.ConfigParser(),
|
||||||
filename: str) -> str:
|
filename: str,
|
||||||
|
shorthand_uuid: str) -> str:
|
||||||
|
|
||||||
space_replace_string = config_obj.get(section_name, "dl_filename_replace_spaces_with")
|
space_replace_string = config_obj.get(section_name, "dl_filename_replace_spaces_with")
|
||||||
log.debug(f"Replacing space characters with '{space_replace_string}' ...")
|
log.debug(f"{shorthand_uuid} Replacing space characters with '{space_replace_string}' ...")
|
||||||
underscored_filename = re.sub(
|
underscored_filename = re.sub(
|
||||||
r"\s",
|
r"\s",
|
||||||
space_replace_string,
|
space_replace_string,
|
||||||
filename)
|
filename)
|
||||||
log.debug(f"New filename: '{underscored_filename}'")
|
log.debug(f"{shorthand_uuid} New filename: '{underscored_filename}'")
|
||||||
return underscored_filename
|
return underscored_filename
|
||||||
|
|
||||||
|
|
||||||
@ -409,16 +422,35 @@ def get_filename(
|
|||||||
section_name: str,
|
section_name: str,
|
||||||
config_obj: configparser.ConfigParser(),
|
config_obj: configparser.ConfigParser(),
|
||||||
show: type_def.mvw_json_response.Show,
|
show: type_def.mvw_json_response.Show,
|
||||||
max_quality_url: str) -> str:
|
max_quality_url: str,
|
||||||
filename_replaced_patterns = filename_replace_pattern(section_name, config_obj, show, max_quality_url)
|
shorthand_uuid: str) -> str:
|
||||||
filename_safe = get_safe_filename(filename_replaced_patterns)
|
|
||||||
|
log.debug(f"{shorthand_uuid} Generating final filename ...")
|
||||||
|
|
||||||
|
filename_replaced_patterns = filename_replace_pattern(
|
||||||
|
section_name,
|
||||||
|
config_obj,
|
||||||
|
show,
|
||||||
|
max_quality_url,
|
||||||
|
shorthand_uuid)
|
||||||
|
|
||||||
|
filename_safe = get_safe_filename(
|
||||||
|
filename_replaced_patterns,
|
||||||
|
shorthand_uuid)
|
||||||
|
|
||||||
if config.get(section_name, "dl_filename_replace_spaces_with"):
|
if config.get(section_name, "dl_filename_replace_spaces_with"):
|
||||||
filename_safe = filename_replace_spaces_with_underscores(section_name, config_obj, filename_safe)
|
filename_safe = filename_replace_spaces_with_underscores(
|
||||||
|
section_name,
|
||||||
|
config_obj,
|
||||||
|
filename_safe,
|
||||||
|
shorthand_uuid)
|
||||||
|
|
||||||
if config.getboolean(section_name, "dl_filename_all_lowercase"):
|
if config.getboolean(section_name, "dl_filename_all_lowercase"):
|
||||||
log.debug(f"Lowercasing all filename characters ...")
|
log.debug(f"{shorthand_uuid} Lowercasing all filename characters ...")
|
||||||
filename_safe = filename_safe.lower()
|
filename_safe = filename_safe.lower()
|
||||||
log.debug(f"New filename: '{filename_safe}'")
|
log.debug(f"{shorthand_uuid} New filename: '{filename_safe}'")
|
||||||
log.debug(filename_safe)
|
|
||||||
|
log.debug(f"{shorthand_uuid} {filename_safe}")
|
||||||
return filename_safe
|
return filename_safe
|
||||||
|
|
||||||
|
|
||||||
@ -465,38 +497,56 @@ def truncate_log(
|
|||||||
return json_data
|
return json_data
|
||||||
|
|
||||||
|
|
||||||
|
def get_state_file_lock(
|
||||||
|
state_lock_file: str) -> filelock.BaseFileLock:
|
||||||
|
global file_lock_timeout
|
||||||
|
try:
|
||||||
|
lock = FileLock(state_lock_file, timeout=file_lock_timeout)
|
||||||
|
return lock
|
||||||
|
except filelock.Timeout:
|
||||||
|
log.error(f"Unable to acquire lock on state lock file '{state_lock_file}' "
|
||||||
|
f"""within {file_lock_timeout} {p.plural("second", file_lock_timeout)}, exiting 5 ...""")
|
||||||
|
sys.exit(5)
|
||||||
|
|
||||||
|
|
||||||
def log_successful_download(
|
def log_successful_download(
|
||||||
section_name: str,
|
section_name: str,
|
||||||
config_obj: configparser.ConfigParser(),
|
config_obj: configparser.ConfigParser(),
|
||||||
show: type_def.mvw_json_response.Show,
|
show: type_def.mvw_json_response.Show,
|
||||||
state_file_abs_path: str) -> None:
|
state_file_abs_path: str,
|
||||||
|
job_uuid: str,
|
||||||
|
shorthand_uuid: str) -> None:
|
||||||
|
|
||||||
timestamp_now = int(time.time())
|
timestamp_now = int(time.time())
|
||||||
state_file_none_or_valid_json(state_file_abs_path)
|
|
||||||
os.makedirs(os.path.dirname(state_file_abs_path), exist_ok=True)
|
os.makedirs(os.path.dirname(state_file_abs_path), exist_ok=True)
|
||||||
|
state_lock_file = state_file_abs_path + state_lock_file_ext
|
||||||
|
|
||||||
state_body = show.dict(include={"topic", "title"})
|
state_body = show.dict(include={"topic", "title"})
|
||||||
state_body["dl_complete_timestamp_epoch"] = timestamp_now
|
state_body["dl_complete_timestamp_epoch"] = timestamp_now
|
||||||
state_body["dl_complete_timestamp_human"] = \
|
state_body["dl_complete_timestamp_human"] = \
|
||||||
d.datetime.utcfromtimestamp(timestamp_now).strftime("%Y-%m-%d %H%M%S UTC")
|
d.datetime.utcfromtimestamp(timestamp_now).strftime("%Y-%m-%d %H%M%S UTC")
|
||||||
state_entry = {timestamp_now: state_body}
|
state_entry = {job_uuid: state_body}
|
||||||
json_state = None
|
json_state = None
|
||||||
|
|
||||||
log.debug(f"Writing log entry to '{state_file_abs_path}' ...")
|
lock = get_state_file_lock(state_lock_file)
|
||||||
with open(state_file_abs_path, "r+") as state_file:
|
|
||||||
try:
|
|
||||||
json_state = json.load(state_file)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
if json_state is None:
|
|
||||||
state_file.truncate()
|
|
||||||
json_state = []
|
|
||||||
|
|
||||||
with open(state_file_abs_path, "w") as state_file:
|
with lock:
|
||||||
json_state.append(state_entry)
|
state_file_none_or_valid_json(state_file_abs_path)
|
||||||
max_log_entries = config_obj.getint(section_name, "state_file_retention")
|
with open(state_file_abs_path, "r+") as state_file:
|
||||||
if len(json_state) > max_log_entries:
|
try:
|
||||||
json_state = truncate_log(json_state, max_log_entries)
|
json_state = json.load(state_file)
|
||||||
json.dump(json_state, state_file, indent=4, sort_keys=True)
|
except json.JSONDecodeError:
|
||||||
|
if json_state is None:
|
||||||
|
state_file.truncate()
|
||||||
|
json_state = []
|
||||||
|
|
||||||
|
log.debug(f"{shorthand_uuid} Writing log entry to '{state_file_abs_path}' ...")
|
||||||
|
with open(state_file_abs_path, "w") as state_file:
|
||||||
|
json_state.append(state_entry)
|
||||||
|
max_log_entries = config_obj.getint(section_name, "state_file_retention")
|
||||||
|
if len(json_state) > max_log_entries:
|
||||||
|
json_state = truncate_log(json_state, max_log_entries)
|
||||||
|
json.dump(json_state, state_file, indent=4, sort_keys=True)
|
||||||
|
|
||||||
|
|
||||||
def copy_url(
|
def copy_url(
|
||||||
@ -505,7 +555,10 @@ def copy_url(
|
|||||||
show: type_def.mvw_json_response.Show,
|
show: type_def.mvw_json_response.Show,
|
||||||
video_metadata: dict,
|
video_metadata: dict,
|
||||||
total_content_length: int,
|
total_content_length: int,
|
||||||
state_file_abs_path: str) -> None:
|
state_file_abs_path: str,
|
||||||
|
show_name: str,
|
||||||
|
job_uuid: str,
|
||||||
|
shorthand_uuid: str) -> None:
|
||||||
"""Copy data from a url to a local file."""
|
"""Copy data from a url to a local file."""
|
||||||
|
|
||||||
global download_start_time
|
global download_start_time
|
||||||
@ -514,18 +567,17 @@ def copy_url(
|
|||||||
|
|
||||||
update_interval = config_obj.getint(section_name, "dl_progress_update_interval")
|
update_interval = config_obj.getint(section_name, "dl_progress_update_interval")
|
||||||
max_quality_url = video_metadata["url"]
|
max_quality_url = video_metadata["url"]
|
||||||
filename = get_filename(section_name, config_obj, show, max_quality_url)
|
filename = get_filename(section_name, config_obj, show, max_quality_url, shorthand_uuid)
|
||||||
dest_dir = expanded_dest_dir(config_obj.get(section_name, "dl_dir"))
|
dest_dir = expanded_dest_dir(config_obj.get(section_name, "tmp_base_dir"))
|
||||||
dest_path = os.path.join(dest_dir, filename)
|
dest_path = os.path.join(dest_dir, filename)
|
||||||
show_name = f"{show.topic} - {show.title}"
|
|
||||||
|
|
||||||
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
||||||
# TODO quit
|
# TODO quit
|
||||||
log_successful_download(section_name, config_obj, show, state_file_abs_path)
|
log_successful_download(section_name, config_obj, show, state_file_abs_path, job_uuid, shorthand_uuid)
|
||||||
quit()
|
quit()
|
||||||
with open(dest_path, "wb") as dest_file:
|
with open(dest_path, "wb") as dest_file:
|
||||||
log.info(f"""Downloading "{show_name}" ...""")
|
log.info(f"""{shorthand_uuid} Downloading "{show_name}" ...""")
|
||||||
log.info(f"Download location resolved to {dest_path}")
|
log.info(f"{shorthand_uuid} Download location resolved to {dest_path}")
|
||||||
r = requests.get(max_quality_url, stream=True)
|
r = requests.get(max_quality_url, stream=True)
|
||||||
for chunk in r.iter_content(32768):
|
for chunk in r.iter_content(32768):
|
||||||
size_downloaded += len(chunk)
|
size_downloaded += len(chunk)
|
||||||
@ -538,12 +590,12 @@ def copy_url(
|
|||||||
human_pct = "{:.1f}".format(percentage_done)
|
human_pct = "{:.1f}".format(percentage_done)
|
||||||
human_size_dl = humanize.naturalsize(size_downloaded, binary=True)
|
human_size_dl = humanize.naturalsize(size_downloaded, binary=True)
|
||||||
human_total_dl = humanize.naturalsize(total_content_length, binary=True)
|
human_total_dl = humanize.naturalsize(total_content_length, binary=True)
|
||||||
log.debug(f"Downloaded {human_pct}% ({human_size_dl}/{human_total_dl} at an average "
|
log.debug(f"{shorthand_uuid} Downloaded {human_pct}% ({human_size_dl}/{human_total_dl} at an average "
|
||||||
f"{human_dl_speed_so_far})")
|
f"{human_dl_speed_so_far})")
|
||||||
if done_event.is_set():
|
if done_event.is_set():
|
||||||
log.info(f"""Download of "{show_name}" interrupted""")
|
log.info(f"""{shorthand_uuid} Download of "{show_name}" interrupted""")
|
||||||
return
|
return
|
||||||
log.info(f"""Download of "{show_name}" done""")
|
log.info(f"""{shorthand_uuid} Download of "{show_name}" done""")
|
||||||
# log_successful_download(show)
|
# log_successful_download(show)
|
||||||
|
|
||||||
|
|
||||||
@ -577,7 +629,7 @@ def download_media(
|
|||||||
|
|
||||||
dl_threads = config_obj.getint(section_name, "dl_threads")
|
dl_threads = config_obj.getint(section_name, "dl_threads")
|
||||||
state_file_abs_path = get_state_file_abs_path(section_name, config_obj)
|
state_file_abs_path = get_state_file_abs_path(section_name, config_obj)
|
||||||
state_file_none_or_valid_json(state_file_abs_path)
|
state_lock_file = state_file_abs_path + state_lock_file_ext
|
||||||
video_metadata = {}
|
video_metadata = {}
|
||||||
|
|
||||||
for result in json_obj.result.results.copy():
|
for result in json_obj.result.results.copy():
|
||||||
@ -590,21 +642,33 @@ def download_media(
|
|||||||
video_metadata["total_content_length"] = total_content_length
|
video_metadata["total_content_length"] = total_content_length
|
||||||
log.info(f"""Download location is {config_obj.get(section_name, "dl_dir")}""")
|
log.info(f"""Download location is {config_obj.get(section_name, "dl_dir")}""")
|
||||||
log.info(f"Limiting parallel downloads to {dl_threads} ...")
|
log.info(f"Limiting parallel downloads to {dl_threads} ...")
|
||||||
# TODO prior to download check state file
|
|
||||||
|
lock = get_state_file_lock(state_lock_file)
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=dl_threads) as pool:
|
with ThreadPoolExecutor(max_workers=dl_threads) as pool:
|
||||||
download_last_update_time = time.time()
|
download_last_update_time = time.time()
|
||||||
download_start_time = download_last_update_time
|
download_start_time = download_last_update_time
|
||||||
update_interval = config_obj.getint(section_name, "dl_progress_update_interval")
|
update_interval = config_obj.getint(section_name, "dl_progress_update_interval")
|
||||||
log.debug(f"""Will provide updates every {update_interval} {p.plural("second", update_interval)}""")
|
log.debug(f"""Will provide updates every {update_interval} {p.plural("second", update_interval)}""")
|
||||||
for result in json_obj.result.results.copy():
|
with lock:
|
||||||
pool.submit(
|
state_file_none_or_valid_json(state_file_abs_path)
|
||||||
copy_url,
|
for result in json_obj.result.results.copy():
|
||||||
section_name,
|
show_name = f"{result.topic} - {result.title}"
|
||||||
config_obj,
|
if not is_already_downloaded(result, state_file_abs_path, show_name):
|
||||||
result,
|
job_uuid = str(uuid.uuid4())
|
||||||
video_metadata[result.id],
|
shorthand_uuid = f"[{job_uuid[:2]}..{job_uuid[-2:]}]"
|
||||||
video_metadata["total_content_length"],
|
log.debug(f"""Queuing "{show_name}" for download ...""")
|
||||||
state_file_abs_path)
|
pool.submit(
|
||||||
|
copy_url,
|
||||||
|
section_name,
|
||||||
|
config_obj,
|
||||||
|
result,
|
||||||
|
video_metadata[result.id],
|
||||||
|
video_metadata["total_content_length"],
|
||||||
|
state_file_abs_path,
|
||||||
|
show_name,
|
||||||
|
job_uuid,
|
||||||
|
shorthand_uuid)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
Loading…
x
Reference in New Issue
Block a user