#!/usr/bin/python import sys import mpv import requests import flask import re import subprocess import os import time import multiprocessing from flask import request from flask import jsonify from PyQt5.QtWidgets import * from PyQt5.QtCore import * from PyQt5.QtGui import * from multiprocessing import Queue os.environ["LC_ALL"] = "C" os.environ["LANG"] = "C" is_wayland = "WAYLAND_DISPLAY" in os.environ osd_corner_radius = os.environ.get("IPMPV_CORNER_RADIUS") to_qt_queue = Queue() from_qt_queue = Queue() M3U_URL = os.environ.get('IPMPV_M3U_URL') class OsdWidget(QWidget): def __init__(self, channel_info, width=600, height=165, close_time=5, corner_radius=int(osd_corner_radius) if osd_corner_radius is not None else 15): QFontDatabase.addApplicationFont('FiraSans-Regular.ttf') QFontDatabase.addApplicationFont('FiraSans-Bold.ttf') global is_wayland super().__init__() self.channel_info = channel_info self.orig_width = width self.orig_height = height self.close_time = close_time self.corner_radius = corner_radius self.video_codec = None self.audio_codec = None self.video_res = None self.interlaced = None # Setup window self.setWindowTitle("OSD") self.setFixedSize(width, height) # Check if we're running on Wayland self.is_wayland = is_wayland # Set appropriate window flags and size if self.is_wayland: # For Wayland, use fullscreen transparent approach self.setWindowFlags( Qt.FramelessWindowHint | Qt.WindowStaysOnTopHint | Qt.WindowDoesNotAcceptFocus ) # Set fullscreen size self.screen_geometry = QApplication.desktop().screenGeometry() self.setFixedSize(self.screen_geometry.width(), self.screen_geometry.height()) # Calculate content positioning self.content_x = (self.screen_geometry.width() - self.orig_width) // 2 self.content_y = 20 # 20px from top else: # For X11, use the original approach self.setWindowFlags( Qt.FramelessWindowHint | Qt.WindowStaysOnTopHint | Qt.X11BypassWindowManagerHint | Qt.Tool | Qt.ToolTip ) self.setFixedSize(width, height) self.content_x = 0 self.content_y = 0 # Enable transparency self.setAttribute(Qt.WA_TranslucentBackground) # Position window at the top center of the screen self.position_window() # Load logo if available self.logo_pixmap = None if channel_info["logo"]: self.load_logo() if self.is_wayland: self.setAttribute(Qt.WA_TransparentForMouseEvents) def position_window(self): if self.is_wayland: # For Wayland, we just position at 0,0 (fullscreen) self.move(0, 0) # Ensure window stays on top self.stay_on_top_timer = QTimer(self) self.stay_on_top_timer.timeout.connect(lambda: self.raise_()) self.stay_on_top_timer.start(100) # Check every second else: # For X11, center at top screen_geometry = QApplication.desktop().screenGeometry() x = (screen_geometry.width() - self.orig_width) // 2 y = 20 # 20px from top self.setGeometry(x, y, self.orig_width, self.orig_height) # X11 specific window hints self.setAttribute(Qt.WA_X11NetWmWindowTypeNotification) QTimer.singleShot(100, lambda: self.move(x, y)) QTimer.singleShot(500, lambda: self.move(x, y)) # Periodically ensure window stays on top self.stay_on_top_timer = QTimer(self) self.stay_on_top_timer.timeout.connect(lambda: self.raise_()) self.stay_on_top_timer.start(1000) # Check every second def load_logo(self): try: response = requests.get(self.channel_info["logo"]) if response.ok: pixmap = QPixmap() pixmap.loadFromData(response.content) if not pixmap.isNull(): self.logo_pixmap = pixmap.scaled(80, 80, Qt.KeepAspectRatio, Qt.SmoothTransformation) self.update() # Trigger repaint except Exception as e: print(f"Failed to load logo: {e}") def paintEvent(self, a0): painter = QPainter(self) painter.setRenderHint(QPainter.Antialiasing) if self.is_wayland: # For Wayland, we're drawing the content in the right position on a fullscreen widget self.draw_osd_content(painter, self.content_x, self.content_y) else: # For X11, we're drawing directly at (0,0) since the widget is already positioned self.draw_osd_content(painter, 0, 0) def draw_osd_content(self, painter, x_offset, y_offset): # Create a path for rounded rectangle background path = QPainterPath() path.addRoundedRect( x_offset, y_offset, self.orig_width, self.orig_height, self.corner_radius, self.corner_radius ) # Fill the rounded rectangle with semi-transparent background painter.setPen(Qt.NoPen) painter.setBrush(QColor(0, 50, 100, 200)) # RGBA painter.drawPath(path) # Setup text drawing painter.setPen(QColor(255, 255, 255)) try: font = QFont("Fira Sans", 18) font.setBold(True) painter.setFont(font) # Draw channel name painter.drawText(x_offset + 20, y_offset + 40, self.channel_info["name"]) font.setPointSize(14) font.setBold(False) painter.setFont(font) # Draw deinterlace status painter.drawText(x_offset + 20, y_offset + 70, f"Deinterlacing {'on' if self.channel_info['deinterlace'] else 'off'}") # Draw latency mode painter.drawText(x_offset + 20, y_offset + 100, f"{'Low' if self.channel_info['low_latency'] else 'High'} latency") # Draw codec badges if available if self.video_codec: self.draw_badge(painter, self.video_codec, x_offset + 80, y_offset + self.orig_height - 40) if self.audio_codec: self.draw_badge(painter, self.audio_codec, x_offset + 140, y_offset + self.orig_height - 40) if self.video_res: self.draw_badge(painter, f"{self.video_res}{self.interlaced if self.interlaced is not None else ''}", x_offset + 20, y_offset + self.orig_height - 40) # Draw logo if available if self.logo_pixmap: painter.drawPixmap(x_offset + self.orig_width - 100, y_offset + 20, self.logo_pixmap) except Exception as e: print(f"Error in painting: {e}") import traceback traceback.print_exc() def draw_badge(self, painter, text, x, y): # Save current painter state painter.save() # Draw rounded badge painter.setPen(QPen(QColor(255, 255, 255, 255), 2)) painter.setBrush(Qt.NoBrush) # Use QPainterPath for consistent rounded corners badge_path = QPainterPath() badge_path.addRoundedRect(x, y, 48, 20, 7, 7) painter.drawPath(badge_path) # Draw text painter.setPen(QColor(255, 255, 255)) font = painter.font() font.setBold(True) font.setPointSize(8) painter.setFont(font) # Center text in badge font_metrics = painter.fontMetrics() text_width = font_metrics.width(text) text_height = font_metrics.height() # We need to use integer coordinates for drawText, not floats text_x = int(x + (48 - text_width) / 2) text_y = int(y + text_height) # Use the int, int, string version of drawText painter.drawText(text_x, text_y, text) # Restore painter state painter.restore() def update_codecs(self, video_codec, audio_codec, video_res, interlaced): if video_codec: self.video_codec = video_codec if audio_codec: self.audio_codec = audio_codec if video_res: self.video_res = video_res if interlaced is not None: self.interlaced = f"{'i' if interlaced else 'p'}" self.update() # Trigger repaint def close_widget(self): # Stop any active timers if hasattr(self, 'stay_on_top_timer') and self.stay_on_top_timer.isActive(): self.stay_on_top_timer.stop() # Close the widget self.hide() def start_close_timer(self, seconds=5): """ Starts a timer to close the widget after the specified number of seconds. Parameters: seconds (int): Number of seconds before closing the widget (default: 3) """ # Cancel any existing close timer if hasattr(self, 'close_timer') and self.close_timer.isActive(): self.close_timer.stop() # Create and start a new timer self.close_timer = QTimer(self) self.close_timer.setSingleShot(True) self.close_timer.timeout.connect(self.close_widget) self.close_timer.start(seconds * 1000) # Convert seconds to milliseconds osd = None def qt_process(): """Run Qt application in a separate process""" from PyQt5.QtWidgets import QApplication from PyQt5.QtCore import QTimer app = QApplication(sys.argv) osd = None # Check the queue periodically for commands def check_queue(): global osd if not to_qt_queue.empty(): command = to_qt_queue.get() if command['action'] == 'show_osd': if osd is not None: osd.close_widget() # Create new OSD osd = OsdWidget(command['channel_info']) osd.show() elif command['action'] == 'start_close': if osd is not None: osd.start_close_timer() elif command['action'] == 'close_osd': if osd is not None: osd.close_widget() osd = None elif command['action'] == 'update_codecs': if osd is not None: osd.update_codecs(command['vcodec'], command['acodec'], command['video_res'], command['interlaced']) # Schedule next check QTimer.singleShot(100, check_queue) # Start the queue check check_queue() # Run Qt event loop app.exec_() def get_channels(): if M3U_URL: response = requests.get(M3U_URL) else: print("Error: IPMPV_M3U_URL not set. Please set this environment variable to the URL of your IPTV list, in M3U format.") exit(1) lines = response.text.splitlines() channels = [] regex = re.compile(r'tvg-logo="(.*?)".*?group-title="(.*?)"', re.IGNORECASE) for i in range(len(lines)): if lines[i].startswith("#EXTINF"): match = regex.search(lines[i]) logo = match.group(1) if match else "" group = match.group(2) if match else "Other" name = lines[i].split(",")[-1] url = lines[i + 1] channels.append({"name": name, "url": url, "logo": logo, "group": group}) return channels def get_current_resolution(): global is_wayland try: if is_wayland: wlr_randr_env = os.environ.copy() output = subprocess.check_output(["wlr-randr"], universal_newlines=True, env=wlr_randr_env) if "Composite-1" in output.split("\n")[0]: for line in output.split("\n"): if "720x480" in line and "current" in line: return "480i" elif "720x240" in line and "current" in line: return "240p" elif "720x576" in line and "current" in line: return "576i" elif "720x288" in line and "current" in line: return "288p" else: xrandr_env = os.environ.copy() output = subprocess.check_output(["xrandr"], universal_newlines=True, env=xrandr_env) for line in output.split("\n"): if "Composite-1" in line: if "720x480" in line: return "480i" elif "720x240" in line: return "240p" elif "720x576" in line: return "576i" elif "720x288" in line: return "288p" except subprocess.CalledProcessError: if is_wayland: print("Error: Cannot get display resolution. Is this a wl-roots compatible compositor?") else: print("Error: Cannot get display resolution. Is an X session running?") except FileNotFoundError: if is_wayland: print("Error: Could not find wlr-randr, resolution will be unknown") else: print("Error: Could not find xrandr, resolution will be unknown") return "UNK" def error_check(loglevel, component, message): global player print(f"[{loglevel}] {component}: {message}") if loglevel == 'error' and (component == 'ffmpeg' or component == 'cplayer') and 'Failed' in message: player.loadfile("./nosignal.png") to_qt_queue.put({ 'action': 'start_close' }) player = mpv.MPV( # it's a bit weird that i have to use the logs to get errors, # but catch_errors is apparently broken log_handler = error_check, vo = 'gpu', hwdec = 'auto-safe', demuxer_lavf_o = 'reconnect=1', deinterlace = 'no', keepaspect = 'no', geometry = '100%:100%', fullscreen = 'yes', loop_playlist = 'inf' ) deinterlace = False channels = get_channels() current_index = None retroarch_p = None resolution = get_current_resolution() low_latency = False vcodec = None acodec = None video_res = None interlaced = None @player.property_observer('video-format') def video_codec_observer(name, value): global vcodec, acodec if value: vcodec = value.upper() @player.property_observer('audio-codec-name') def audio_codec_observer(name, value): global acodec, vcodec if value: acodec = value.upper() def play_channel(index): global current_index, vcodec, acodec, video_res, interlaced print(f"\n=== Starting channel change to index {index} ===") to_qt_queue.put({ 'action': 'close_osd' }) print("Closed OSD") vcodec = None acodec = None current_index = index % len(channels) print(f"Playing channel: {channels[current_index]['name']} ({channels[current_index]['url']})") try: player.loadfile("./novideo.png") player.wait_until_playing() channel_info = { "name": channels[current_index]["name"], "deinterlace": deinterlace, "low_latency": low_latency, "logo": channels[current_index]["logo"] } to_qt_queue.put({ 'action': 'show_osd', 'channel_info': channel_info }) player.loadfile(channels[current_index]['url']) time.sleep(0.5) player.wait_until_playing() video_params = player.video_params video_frame_info = player.video_frame_info if video_params and video_frame_info: video_res = video_params.get('h') interlaced = video_frame_info.get('interlaced') to_qt_queue.put({ 'action': 'update_codecs', 'vcodec': vcodec, 'acodec': acodec, 'video_res': video_res, 'interlaced': interlaced }) to_qt_queue.put({ 'action': 'start_close', }) except Exception as e: print(f"\033[91mError in play_channel: {str(e)}\033[0m") traceback.print_exc() app = flask.Flask(__name__) # Flask routes here @app.route("/") def index(): grouped_channels = {} for channel in channels: grouped_channels.setdefault(channel["group"], []).append(channel) flat_channel_list = [channel for channel in channels] html = f"""
Current Channel: {channels[current_index]['name'] if current_index is not None else "None"}
""" # # global deinterlace global low_latency deinterlace_state = "ON" if deinterlace else "OFF" retroarch_state = "ON" if retroarch_p and retroarch_p.poll() is None else "OFF" html += f"""