Initial commit
This commit is contained in:
parent
2d5d40fc34
commit
ffa32bf098
152
main.py
Normal file
152
main.py
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
import os
|
||||||
|
import re
|
||||||
|
import signal
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import cec
|
||||||
|
import logging
|
||||||
|
from rich import traceback
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.logging import RichHandler
|
||||||
|
from inotify_simple import INotify, flags
|
||||||
|
traceback.install()
|
||||||
|
console = Console()
|
||||||
|
|
||||||
|
|
||||||
|
FORMAT = "%(message)s"
|
||||||
|
logging.basicConfig(
|
||||||
|
level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[RichHandler(
|
||||||
|
show_time=False if "SYSTEMD_EXEC_PID" in os.environ else True
|
||||||
|
)]
|
||||||
|
)
|
||||||
|
log = logging.getLogger("rich")
|
||||||
|
|
||||||
|
|
||||||
|
# Vars
|
||||||
|
xauth_dir = "/var/run/sddm".rstrip("/")
|
||||||
|
xauth_abs_path = subprocess.getoutput("find '" + xauth_dir + "' -type f").strip()
|
||||||
|
react_to_cec = True
|
||||||
|
if xauth_abs_path:
|
||||||
|
log.info(f".Xauthority file '{xauth_abs_path}' located")
|
||||||
|
else:
|
||||||
|
log.info(f"No .Xauthority file at '{xauth_dir}', waiting ...")
|
||||||
|
react_to_cec = False
|
||||||
|
xauth_pattern = re.compile(r"""^{[^}]+}$""")
|
||||||
|
|
||||||
|
|
||||||
|
inotify = INotify()
|
||||||
|
watch_flags = flags.CREATE | flags.DELETE | flags.MODIFY | flags.DELETE_SELF
|
||||||
|
wd = inotify.add_watch(xauth_dir, watch_flags)
|
||||||
|
|
||||||
|
|
||||||
|
adapters = cec.list_adapters()
|
||||||
|
if not adapters:
|
||||||
|
log.warning(f"No adapters found. Exiting 1 ...")
|
||||||
|
sys.exit(1)
|
||||||
|
log.debug(f"Found adapters: {adapters}")
|
||||||
|
use_adapter = adapters[0]
|
||||||
|
log.info(f"Using adapter '{use_adapter}'")
|
||||||
|
|
||||||
|
|
||||||
|
log.debug(f"Printing environment vars ...")
|
||||||
|
log.debug(os.environ)
|
||||||
|
|
||||||
|
|
||||||
|
def propagate_keypress(cec_key_id: int) -> None:
|
||||||
|
log.info(f"""Key press '{xdo_map[cec_key_id]["human_readable"]}' detected (CEC key ID '{cec_key_id}')""")
|
||||||
|
subprocess.run(["xdotool", "key", xdo_map[cec_key_id]["xdo"]], check=True)
|
||||||
|
|
||||||
|
|
||||||
|
def keypress_handler(cec_event: int, cec_key_id: int, state: int) -> None:
|
||||||
|
global react_to_cec
|
||||||
|
if not react_to_cec:
|
||||||
|
log.info(f"""Skipping CEC event, no .Xauthority file at '{xauth_dir}'""")
|
||||||
|
log.debug(f"""We need an .Xauthority file to exist at '{xauth_dir}' in order to send CEC events as 'xdotool'
|
||||||
|
commands to the X server. Without an .Xauthority file 'xdotool' doesn't have permission to send commands.""")
|
||||||
|
return None
|
||||||
|
if state == 0:
|
||||||
|
if cec_key_id in xdo_map:
|
||||||
|
propagate_keypress(cec_key_id)
|
||||||
|
else:
|
||||||
|
log.debug(f"CEC key ID '{cec_key_id}' is irrelevant to us, doing nothing")
|
||||||
|
|
||||||
|
|
||||||
|
def log_handler(cec_event: int, log_level: int, timestamp: int, msg: str) -> None:
|
||||||
|
if log_level == 16:
|
||||||
|
if msg == "command 'PING' was not acked by the controller":
|
||||||
|
log.warning(f"CEC adapter '{use_adapter}' unresponsive")
|
||||||
|
elif "closing the connection" in msg:
|
||||||
|
log.warning(f"Closing connection to adapter '{use_adapter}' ...")
|
||||||
|
elif msg == "communication thread ended":
|
||||||
|
log.warning(f"Communication thread with adapter '{use_adapter}' closed")
|
||||||
|
os.kill(os.getpid(), signal.SIGUSR1)
|
||||||
|
|
||||||
|
|
||||||
|
def signal_handler(rx_signal: int, frame) -> None:
|
||||||
|
exit_code = 0
|
||||||
|
accepted_signals = [2, 10, 15]
|
||||||
|
if rx_signal in accepted_signals:
|
||||||
|
if rx_signal == 10:
|
||||||
|
exit_code = 2
|
||||||
|
log.info(f"Signal '{rx_signal}' received, cleaning up ...")
|
||||||
|
cec.remove_callback(log_handler, cec.EVENT_LOG)
|
||||||
|
cec.remove_callback(keypress_handler, cec.EVENT_KEYPRESS)
|
||||||
|
log.info(f"Clean-up done, exiting {exit_code} ...")
|
||||||
|
sys.exit(exit_code)
|
||||||
|
else:
|
||||||
|
log.debug(f"Signal '{rx_signal}' doesn't trigger anything special, continuing ...")
|
||||||
|
|
||||||
|
|
||||||
|
def set_xauth_env(file_abs_path: str) -> None:
|
||||||
|
os.environ["XAUTHORITY"] = file_abs_path
|
||||||
|
log.debug(f"""Env variable 'XAUTHORITY' set to '{os.environ["XAUTHORITY"]}'""")
|
||||||
|
|
||||||
|
|
||||||
|
signal.signal(signal.SIGINT, signal_handler)
|
||||||
|
signal.signal(signal.SIGTERM, signal_handler)
|
||||||
|
signal.signal(signal.SIGUSR1, signal_handler)
|
||||||
|
|
||||||
|
|
||||||
|
log.debug(f"Initializing adapter '{use_adapter}' ...")
|
||||||
|
cec.init(use_adapter)
|
||||||
|
log.debug(f"Adapter '{use_adapter}' initialized")
|
||||||
|
|
||||||
|
|
||||||
|
# Map human-readable TV remote control button names to 'xdotool' names from
|
||||||
|
# https://cgit.freedesktop.org/xorg/proto/x11proto/plain/keysymdef.h
|
||||||
|
xdo_map = {
|
||||||
|
0: {"human_readable": "OK", "xdo": "Return"},
|
||||||
|
1: {"human_readable": "Up", "xdo": "Up"},
|
||||||
|
2: {"human_readable": "Down", "xdo": "Down"},
|
||||||
|
3: {"human_readable": "Left", "xdo": "Left"},
|
||||||
|
4: {"human_readable": "Right", "xdo": "Right"},
|
||||||
|
13: {"human_readable": "Back", "xdo": "BackSpace"},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
set_xauth_env(xauth_abs_path)
|
||||||
|
os.environ["DISPLAY"] = ":0"
|
||||||
|
log.debug(f"""Env variable 'DISPLAY' set to '{os.environ["DISPLAY"]}'""")
|
||||||
|
|
||||||
|
|
||||||
|
log.debug(f"Activating event handlers ...")
|
||||||
|
cec.add_callback(keypress_handler, cec.EVENT_KEYPRESS)
|
||||||
|
cec.add_callback(log_handler, cec.EVENT_LOG)
|
||||||
|
log.debug(f"Event handlers active")
|
||||||
|
log.info(f"Open for business on adapter '{use_adapter}'!")
|
||||||
|
|
||||||
|
|
||||||
|
while True:
|
||||||
|
time.sleep(0.2)
|
||||||
|
for event in inotify.read():
|
||||||
|
events = [str(flags) for flags in flags.from_mask(event.mask)]
|
||||||
|
is_xauth = xauth_pattern.findall(event.name)
|
||||||
|
if is_xauth and "flags.CREATE" in events:
|
||||||
|
log.info(f"New .Xauthority file '{event.name}' detected")
|
||||||
|
react_to_cec = True
|
||||||
|
xauth_abs_path = xauth_dir + "/" + event.name
|
||||||
|
set_xauth_env(xauth_abs_path)
|
||||||
|
elif is_xauth and "flags.DELETE" in events:
|
||||||
|
log.info(f".Xauthority file '{event.name}' was deleted")
|
||||||
|
react_to_cec = False
|
3
requirements.in
Normal file
3
requirements.in
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
cec
|
||||||
|
inotify_simple
|
||||||
|
rich
|
16
requirements.txt
Normal file
16
requirements.txt
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
#
|
||||||
|
# This file is autogenerated by pip-compile with python 3.10
|
||||||
|
# To update, run:
|
||||||
|
#
|
||||||
|
# pip-compile
|
||||||
|
#
|
||||||
|
cec==0.2.8
|
||||||
|
# via -r requirements.in
|
||||||
|
colorama==0.4.4
|
||||||
|
# via rich
|
||||||
|
commonmark==0.9.1
|
||||||
|
# via rich
|
||||||
|
pygments==2.11.2
|
||||||
|
# via rich
|
||||||
|
rich==11.2.0
|
||||||
|
# via -r requirements.in
|
Loading…
x
Reference in New Issue
Block a user