import os import re import signal import subprocess import sys import time import cec import logging import inflect from rich import traceback from rich.console import Console from rich.logging import RichHandler from inotify_simple import INotify, flags traceback.install() console = Console() p = inflect.engine() FORMAT = "%(message)s" logging.basicConfig( level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[RichHandler( show_time=False if "SYSTEMD_EXEC_PID" in os.environ else True )] ) log = logging.getLogger("rich") log.setLevel("DEBUG") # Vars xauth_dir = "/var/run/sddm".rstrip("/") xauth_abs_path = subprocess.getoutput("find '" + xauth_dir + "' -type f").strip() react_to_cec = True if xauth_abs_path: log.info(f".Xauthority file '{xauth_abs_path}' located") else: log.info(f"No .Xauthority file at '{xauth_dir}', waiting ...") react_to_cec = False xauth_pattern = re.compile(r"""^{[^}]+}$""") inotify = INotify() watch_flags = flags.CREATE | flags.DELETE | flags.MODIFY | flags.DELETE_SELF wd = inotify.add_watch(xauth_dir, watch_flags) adapters = cec.list_adapters() if not adapters: log.warning(f"No adapters found. Exiting 1 ...") sys.exit(1) log.debug(f"Found adapters: {adapters}") use_adapter = adapters[0] log.info(f"Using adapter '{use_adapter}'") log.debug(f"Printing environment vars ...") log.debug(os.environ) def x_focus_window(window_id: str) -> bool: log.debug(f"Focusing window ID {window_id} ...") try: subprocess.run( ["xdotool", "windowfocus", window_id], check=True, stdout=subprocess.PIPE, encoding="utf-8") log.info(f"Two 'sddm-greeter' windows detected one of which had focus, changed focus to the other one.") return True except subprocess.CalledProcessError as x_focus_window_error: log.warning(f"Failed to focus window ID {window_id}, we'll not try again") log.warning(x_focus_window_error) return False def sddm_greeter_window_ids() -> list: sddm_check_attempt = 1 sddm_check_attempts_max = 2 sddm_check_sleep = 2 while sddm_check_attempt <= sddm_check_attempts_max: log.debug(f"Checking if 'sddm-greeter' window exists " f"({sddm_check_attempt} of {sddm_check_attempts_max}) ...") try: dm_windows = subprocess.run( ["xdotool", "search", "--onlyvisible", "--classname", "sddm-greeter"], check=True, capture_output=True, encoding="utf-8") return dm_windows.stdout.splitlines() except subprocess.CalledProcessError as xdotool_error: if sddm_check_attempt < sddm_check_attempts_max: log.debug(f"No 'sddm-greeter' window, checking again in " f"""{sddm_check_sleep} {p.plural("second", sddm_check_sleep)} ...""") sddm_check_attempt += 1 time.sleep(sddm_check_sleep) else: log.debug(f"No 'sddm-greeter' window exists, doing nothing") log.debug(xdotool_error) return [] def sddm_change_focus_if_two_windows() -> bool: dm_windows = sddm_greeter_window_ids() if not dm_windows: return False else: sddm_focus_attempt = 1 sddm_focus_attempt_max = 2 sddm_focus_sleep = 2 sddm_window_count_attempt = 1 sddm_window_count_attempt_max = 2 while sddm_focus_attempt <= sddm_focus_attempt_max: log.debug(f"Checking if 'sddm-greeter' window has focus " f"({sddm_focus_attempt} of {sddm_focus_attempt_max}) ...") try: x_window_focus = subprocess.run( ["xdotool", "getwindowfocus"], check=True, capture_output=True, encoding="utf-8") x_window_id_has_focus = x_window_focus.stdout.splitlines()[0] if x_window_id_has_focus in dm_windows: log.debug(f"An 'sddm-greeter' window has focus") if len(dm_windows) > 1: if len(dm_windows) == 2: log.debug(f"We're assuming we have to focus the other one") log.debug(f"From list of 'sddm-greeter' window IDs ({dm_windows}) remove the one that has " f"focus ({x_window_id_has_focus})") dm_windows.remove(x_window_id_has_focus) x_focus_window(dm_windows[0]) return True else: log.warning(f"Number of 'sddm-greeter' windows ({len(dm_windows)}) unexpected, " f"not doing anything") return False else: log.debug(f"No other 'sddm-greeter' windows exist, checking again ...") sddm_window_count_attempt += 1 if sddm_window_count_attempt == sddm_window_count_attempt_max: log.debug(f"No other 'sddm-greeter' windows exist, no need to focus anything") return False except subprocess.CalledProcessError as xdotool_error: if sddm_focus_attempt < sddm_focus_attempt_max: log.debug(f"Unable to get ID of focused window, checking again in " f"""{sddm_focus_sleep} {p.plural("second", sddm_focus_sleep)} ...""") sddm_focus_attempt += 1 time.sleep(sddm_focus_sleep) else: log.warning(f"Unable to get ID of focused window") log.warning(xdotool_error) return False def propagate_keypress(cec_key_id: int) -> None: log.info(f"""Key press '{xdo_map[cec_key_id]["human_readable"]}' detected (CEC key ID '{cec_key_id}')""") subprocess.run(["xdotool", "key", xdo_map[cec_key_id]["xdo"]], check=True) def keypress_handler(cec_event: int, cec_key_id: int, state: int) -> None: global react_to_cec if not react_to_cec: log.info(f"""Skipping CEC event, no .Xauthority file at '{xauth_dir}'""") log.debug(f"""We need an .Xauthority file to exist at '{xauth_dir}' in order to send CEC events as 'xdotool' commands to the X server. Without an .Xauthority file 'xdotool' doesn't have permission to send commands.""") return None if state == 0: if cec_key_id in xdo_map: propagate_keypress(cec_key_id) else: log.debug(f"CEC key ID '{cec_key_id}' is irrelevant to us, doing nothing") def log_handler(cec_event: int, log_level: int, timestamp: int, msg: str) -> None: if log_level == 16: if msg == "command 'PING' was not acked by the controller": log.warning(f"CEC adapter '{use_adapter}' unresponsive") elif "closing the connection" in msg: log.warning(f"Closing connection to adapter '{use_adapter}' ...") elif msg == "communication thread ended": log.warning(f"Communication thread with adapter '{use_adapter}' closed") os.kill(os.getpid(), signal.SIGUSR1) def signal_handler(rx_signal: int, frame) -> None: exit_code = 0 accepted_signals = [2, 10, 15] if rx_signal in accepted_signals: if rx_signal == 10: exit_code = 2 log.info(f"Signal '{rx_signal}' received, cleaning up ...") cec.remove_callback(log_handler, cec.EVENT_LOG) cec.remove_callback(keypress_handler, cec.EVENT_KEYPRESS) log.info(f"Clean-up done, exiting {exit_code} ...") sys.exit(exit_code) else: log.debug(f"Signal '{rx_signal}' doesn't trigger anything special, continuing ...") def set_xauth_env(file_abs_path: str) -> None: os.environ["XAUTHORITY"] = file_abs_path log.debug(f"""Env variable 'XAUTHORITY' set to '{os.environ["XAUTHORITY"]}'""") sddm_change_focus_if_two_windows() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGUSR1, signal_handler) log.debug(f"Initializing adapter '{use_adapter}' ...") cec.init(use_adapter) log.debug(f"Adapter '{use_adapter}' initialized") # Map human-readable TV remote control button names to 'xdotool' names from # https://cgit.freedesktop.org/xorg/proto/x11proto/plain/keysymdef.h xdo_map = { 0: {"human_readable": "OK", "xdo": "Return"}, 1: {"human_readable": "Up", "xdo": "Up"}, 2: {"human_readable": "Down", "xdo": "Down"}, 3: {"human_readable": "Left", "xdo": "Left"}, 4: {"human_readable": "Right", "xdo": "Right"}, 13: {"human_readable": "Back", "xdo": "BackSpace"}, } os.environ["DISPLAY"] = ":0" log.debug(f"""Env variable 'DISPLAY' set to '{os.environ["DISPLAY"]}'""") set_xauth_env(xauth_abs_path) log.debug(f"Activating event handlers ...") cec.add_callback(keypress_handler, cec.EVENT_KEYPRESS) cec.add_callback(log_handler, cec.EVENT_LOG) log.debug(f"Event handlers active") log.info(f"Open for business on adapter '{use_adapter}'!") while True: time.sleep(0.2) for event in inotify.read(): events = [str(flags) for flags in flags.from_mask(event.mask)] is_xauth = xauth_pattern.findall(event.name) if is_xauth and "flags.CREATE" in events: log.info(f"New .Xauthority file '{event.name}' detected") react_to_cec = True xauth_abs_path = xauth_dir + "/" + event.name set_xauth_env(xauth_abs_path) elif is_xauth and "flags.DELETE" in events: log.info(f".Xauthority file '{event.name}' was deleted") react_to_cec = False