import os import re import signal import subprocess import sys import time import cec import logging from rich import traceback from rich.console import Console from rich.logging import RichHandler from inotify_simple import INotify, flags traceback.install() console = Console() FORMAT = "%(message)s" logging.basicConfig( level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[RichHandler( show_time=False if "SYSTEMD_EXEC_PID" in os.environ else True )] ) log = logging.getLogger("rich") log.setLevel("DEBUG") # Vars xauth_dir = "/var/run/sddm".rstrip("/") xauth_abs_path = subprocess.getoutput("find '" + xauth_dir + "' -type f").strip() react_to_cec = True if xauth_abs_path: log.info(f".Xauthority file '{xauth_abs_path}' located") else: log.info(f"No .Xauthority file at '{xauth_dir}', waiting ...") react_to_cec = False xauth_pattern = re.compile(r"""^{[^}]+}$""") inotify = INotify() watch_flags = flags.CREATE | flags.DELETE | flags.MODIFY | flags.DELETE_SELF wd = inotify.add_watch(xauth_dir, watch_flags) adapters = cec.list_adapters() if not adapters: log.warning(f"No adapters found. Exiting 1 ...") sys.exit(1) log.debug(f"Found adapters: {adapters}") use_adapter = adapters[0] log.info(f"Using adapter '{use_adapter}'") log.debug(f"Printing environment vars ...") log.debug(os.environ) def x_focus_window(window_id: str) -> bool: log.debug(f"Focusing window ID {window_id} ...") try: subprocess.run( ["xdotool", "windowfocus", window_id], check=True, stdout=subprocess.PIPE, encoding="utf-8") log.info(f"Two 'sddm-greeter' windows detected one of which had focus, changed focus to the other one.") return True except subprocess.CalledProcessError as x_focus_window_error: log.warning(f"Failed to focus window ID {window_id}, we'll not try again") log.warning(x_focus_window_error) return False def sddm_greeter_window_ids() -> list: log.debug(f"Checking if an 'sddm-greeter' window exists ...") try: dm_windows = subprocess.run( ["xdotool", "search", "--onlyvisible", "--classname", "sddm-greeter"], check=True, stdout=subprocess.PIPE, encoding="utf-8") return dm_windows.stdout.splitlines() except subprocess.CalledProcessError as xdotool_error: log.debug(f"No 'sddm-greeter' window exists, doing nothing") log.debug(xdotool_error) return [] def has_sddm_focus() -> bool: dm_windows = sddm_greeter_window_ids() if not dm_windows: return False else: log.debug(f"Checking if 'sddm-greeter' window has focus ...") try: x_window_focus = subprocess.run( ["xdotool", "getwindowfocus"], check=True, stdout=subprocess.PIPE, encoding="utf-8") x_window_id_has_focus = x_window_focus.stdout.splitlines()[0] if x_window_id_has_focus in dm_windows: log.debug(f"An 'sddm-greeter' window has focus") if len(dm_windows) > 1: if len(dm_windows) == 2: log.debug(f"We're assuming we have to focus the other one") log.debug(f"From list of 'sddm-greeter' window IDs ({dm_windows}) remove the one that has " f"focus ({x_window_id_has_focus})") dm_windows.remove(x_window_id_has_focus) x_focus_window(dm_windows[0]) else: log.warning(f"Number of 'sddm-greeter' windows ({len(dm_windows)}) is unexpected, not doing " f"anything") else: log.debug(f"No other 'sddm-greeter' windows exist, no need to focus anything") except subprocess.CalledProcessError as xdotool_error: log.warning(f"Unable to get ID of focused window") log.warning(xdotool_error) return False def propagate_keypress(cec_key_id: int) -> None: log.info(f"""Key press '{xdo_map[cec_key_id]["human_readable"]}' detected (CEC key ID '{cec_key_id}')""") subprocess.run(["xdotool", "key", xdo_map[cec_key_id]["xdo"]], check=True) def keypress_handler(cec_event: int, cec_key_id: int, state: int) -> None: global react_to_cec if not react_to_cec: log.info(f"""Skipping CEC event, no .Xauthority file at '{xauth_dir}'""") log.debug(f"""We need an .Xauthority file to exist at '{xauth_dir}' in order to send CEC events as 'xdotool' commands to the X server. Without an .Xauthority file 'xdotool' doesn't have permission to send commands.""") return None if state == 0: if cec_key_id in xdo_map: propagate_keypress(cec_key_id) else: log.debug(f"CEC key ID '{cec_key_id}' is irrelevant to us, doing nothing") def log_handler(cec_event: int, log_level: int, timestamp: int, msg: str) -> None: if log_level == 16: if msg == "command 'PING' was not acked by the controller": log.warning(f"CEC adapter '{use_adapter}' unresponsive") elif "closing the connection" in msg: log.warning(f"Closing connection to adapter '{use_adapter}' ...") elif msg == "communication thread ended": log.warning(f"Communication thread with adapter '{use_adapter}' closed") os.kill(os.getpid(), signal.SIGUSR1) def signal_handler(rx_signal: int, frame) -> None: exit_code = 0 accepted_signals = [2, 10, 15] if rx_signal in accepted_signals: if rx_signal == 10: exit_code = 2 log.info(f"Signal '{rx_signal}' received, cleaning up ...") cec.remove_callback(log_handler, cec.EVENT_LOG) cec.remove_callback(keypress_handler, cec.EVENT_KEYPRESS) log.info(f"Clean-up done, exiting {exit_code} ...") sys.exit(exit_code) else: log.debug(f"Signal '{rx_signal}' doesn't trigger anything special, continuing ...") def set_xauth_env(file_abs_path: str) -> None: os.environ["XAUTHORITY"] = file_abs_path log.debug(f"""Env variable 'XAUTHORITY' set to '{os.environ["XAUTHORITY"]}'""") signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGUSR1, signal_handler) log.debug(f"Initializing adapter '{use_adapter}' ...") cec.init(use_adapter) log.debug(f"Adapter '{use_adapter}' initialized") # Map human-readable TV remote control button names to 'xdotool' names from # https://cgit.freedesktop.org/xorg/proto/x11proto/plain/keysymdef.h xdo_map = { 0: {"human_readable": "OK", "xdo": "Return"}, 1: {"human_readable": "Up", "xdo": "Up"}, 2: {"human_readable": "Down", "xdo": "Down"}, 3: {"human_readable": "Left", "xdo": "Left"}, 4: {"human_readable": "Right", "xdo": "Right"}, 13: {"human_readable": "Back", "xdo": "BackSpace"}, } set_xauth_env(xauth_abs_path) os.environ["DISPLAY"] = ":0" log.debug(f"""Env variable 'DISPLAY' set to '{os.environ["DISPLAY"]}'""") log.debug(f"Activating event handlers ...") cec.add_callback(keypress_handler, cec.EVENT_KEYPRESS) cec.add_callback(log_handler, cec.EVENT_LOG) log.debug(f"Event handlers active") log.info(f"Open for business on adapter '{use_adapter}'!") has_sddm_focus() while True: time.sleep(0.2) for event in inotify.read(): events = [str(flags) for flags in flags.from_mask(event.mask)] is_xauth = xauth_pattern.findall(event.name) if is_xauth and "flags.CREATE" in events: log.info(f"New .Xauthority file '{event.name}' detected") react_to_cec = True xauth_abs_path = xauth_dir + "/" + event.name set_xauth_env(xauth_abs_path) elif is_xauth and "flags.DELETE" in events: log.info(f".Xauthority file '{event.name}' was deleted") react_to_cec = False