Major refactoring of code, added volume controls

This commit is contained in:
Ignacio Rivero 2025-03-07 19:11:27 -03:00
parent 9a14492e47
commit ddeed788c3
19 changed files with 1445 additions and 682 deletions

3
__init__.py Normal file
View File

@ -0,0 +1,3 @@
"""IPMPV - IPTV Player with MPV"""
__version__ = "0.1.0"

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

57
channels.py Normal file
View File

@ -0,0 +1,57 @@
#!/usr/bin/python
"""Channel management for IPMPV."""
import re
import requests
import sys
from utils import m3u_url
def get_channels():
"""
Get a list of channels from the M3U playlist.
Returns:
list: A list of channel dictionaries with name, url, logo, and group.
"""
if m3u_url:
try:
response = requests.get(m3u_url)
response.raise_for_status() # Raise exception for HTTP errors
except requests.RequestException as e:
print(f"Error fetching M3U playlist: {e}")
return []
else:
print("Error: IPMPV_M3U_URL not set. Please set this environment variable to the URL of your IPTV list, in M3U format.")
sys.exit(1)
lines = response.text.splitlines()
channels = []
regex = re.compile(r'tvg-logo="(.*?)".*?group-title="(.*?)"', re.IGNORECASE)
for i in range(len(lines)):
if lines[i].startswith("#EXTINF"):
match = regex.search(lines[i])
logo = match.group(1) if match else ""
group = match.group(2) if match else "Other"
name = lines[i].split(",")[-1]
url = lines[i + 1]
channels.append({"name": name, "url": url, "logo": logo, "group": group})
return channels
def group_channels(channels):
"""
Group channels by their group title.
Args:
channels (list): List of channel dictionaries.
Returns:
dict: Dictionary of channel groups.
"""
grouped_channels = {}
for channel in channels:
grouped_channels.setdefault(channel["group"], []).append(channel)
return grouped_channels

734
main.py Executable file → Normal file
View File

@ -1,687 +1,71 @@
#!/usr/bin/python
import sys
import mpv
import requests
import flask
import re
import subprocess
import os
import time
"""Entry point for IPMPV."""
import multiprocessing
from flask import request, jsonify, send_from_directory
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from multiprocessing import Queue
import sys
os.environ["LC_ALL"] = "C"
os.environ["LANG"] = "C"
# Set up utils first
from utils import setup_environment, get_current_resolution, ipmpv_retroarch_cmd
is_wayland = "WAYLAND_DISPLAY" in os.environ
osd_corner_radius = os.environ.get("IPMPV_CORNER_RADIUS")
ipmpv_retroarch_cmd = os.environ.get("IPMPV_RETROARCH_CMD")
# Initialize environment
setup_environment()
to_qt_queue = Queue()
from_qt_queue = Queue()
# Set up channel data
from channels import get_channels
M3U_URL = os.environ.get('IPMPV_M3U_URL')
# Import remaining modules
from player import Player
from server import IPMPVServer
from qt_process import qt_process
from volume import VolumeControl
class OsdWidget(QWidget):
def __init__(self, channel_info, width=600, height=165, close_time=5, corner_radius=int(osd_corner_radius) if osd_corner_radius is not None else 15):
def main():
"""Main entry point for IPMPV."""
# Create communication queues
to_qt_queue = Queue()
from_qt_queue = Queue()
# Get initial data
channels = get_channels()
resolution = get_current_resolution()
# Initialize player
player = Player(to_qt_queue)
QFontDatabase.addApplicationFont('FiraSans-Regular.ttf')
QFontDatabase.addApplicationFont('FiraSans-Bold.ttf')
global is_wayland
super().__init__()
self.channel_info = channel_info
self.orig_width = width
self.orig_height = height
self.close_time = close_time
self.corner_radius = corner_radius
self.video_codec = None
self.audio_codec = None
self.video_res = None
self.interlaced = None
# Setup window
self.setWindowTitle("OSD")
self.setFixedSize(width, height)
# Check if we're running on Wayland
self.is_wayland = is_wayland
# Set appropriate window flags and size
if self.is_wayland:
# For Wayland, use fullscreen transparent approach
self.setWindowFlags(
Qt.FramelessWindowHint |
Qt.WindowStaysOnTopHint |
Qt.WindowDoesNotAcceptFocus
)
# Set fullscreen size
self.screen_geometry = QApplication.desktop().screenGeometry()
self.setFixedSize(self.screen_geometry.width(), self.screen_geometry.height())
# Calculate content positioning
self.content_x = (self.screen_geometry.width() - self.orig_width) // 2
self.content_y = 20 # 20px from top
else:
# For X11, use the original approach
self.setWindowFlags(
Qt.FramelessWindowHint |
Qt.WindowStaysOnTopHint |
Qt.X11BypassWindowManagerHint |
Qt.Tool |
Qt.ToolTip
)
self.setFixedSize(width, height)
self.content_x = 0
self.content_y = 0
# Enable transparency
self.setAttribute(Qt.WA_TranslucentBackground)
# Position window at the top center of the screen
self.position_window()
# Load logo if available
self.logo_pixmap = None
if channel_info["logo"]:
self.load_logo()
if self.is_wayland:
self.setAttribute(Qt.WA_TransparentForMouseEvents)
def position_window(self):
if self.is_wayland:
# For Wayland, we just position at 0,0 (fullscreen)
self.move(0, 0)
# Ensure window stays on top
self.stay_on_top_timer = QTimer(self)
self.stay_on_top_timer.timeout.connect(lambda: self.raise_())
self.stay_on_top_timer.start(100) # Check every second
else:
# For X11, center at top
screen_geometry = QApplication.desktop().screenGeometry()
x = (screen_geometry.width() - self.orig_width) // 2
y = 20 # 20px from top
self.setGeometry(x, y, self.orig_width, self.orig_height)
# X11 specific window hints
self.setAttribute(Qt.WA_X11NetWmWindowTypeNotification)
QTimer.singleShot(100, lambda: self.move(x, y))
QTimer.singleShot(500, lambda: self.move(x, y))
# Periodically ensure window stays on top
self.stay_on_top_timer = QTimer(self)
self.stay_on_top_timer.timeout.connect(lambda: self.raise_())
self.stay_on_top_timer.start(1000) # Check every second
def load_logo(self):
try:
response = requests.get(self.channel_info["logo"])
if response.ok:
pixmap = QPixmap()
pixmap.loadFromData(response.content)
if not pixmap.isNull():
self.logo_pixmap = pixmap.scaled(80, 80, Qt.KeepAspectRatio, Qt.SmoothTransformation)
self.update() # Trigger repaint
except Exception as e:
print(f"Failed to load logo: {e}")
def paintEvent(self, a0):
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing)
if self.is_wayland:
# For Wayland, we're drawing the content in the right position on a fullscreen widget
self.draw_osd_content(painter, self.content_x, self.content_y)
else:
# For X11, we're drawing directly at (0,0) since the widget is already positioned
self.draw_osd_content(painter, 0, 0)
def draw_osd_content(self, painter, x_offset, y_offset):
# Create a path for rounded rectangle background
path = QPainterPath()
path.addRoundedRect(
x_offset, y_offset,
self.orig_width, self.orig_height,
self.corner_radius, self.corner_radius
)
# Fill the rounded rectangle with semi-transparent background
painter.setPen(Qt.NoPen)
painter.setBrush(QColor(0, 50, 100, 200)) # RGBA
painter.drawPath(path)
# Setup text drawing
painter.setPen(QColor(255, 255, 255))
try:
font = QFont("Fira Sans", 18)
font.setBold(True)
painter.setFont(font)
# Draw channel name
painter.drawText(x_offset + 20, y_offset + 40, self.channel_info["name"])
font.setPointSize(14)
font.setBold(False)
painter.setFont(font)
# Draw deinterlace status
painter.drawText(x_offset + 20, y_offset + 70, f"Deinterlacing {'on' if self.channel_info['deinterlace'] else 'off'}")
# Draw latency mode
painter.drawText(x_offset + 20, y_offset + 100, f"{'Low' if self.channel_info['low_latency'] else 'High'} latency")
# Draw codec badges if available
if self.video_codec:
self.draw_badge(painter, self.video_codec, x_offset + 80, y_offset + self.orig_height - 40)
if self.audio_codec:
self.draw_badge(painter, self.audio_codec, x_offset + 140, y_offset + self.orig_height - 40)
if self.video_res:
self.draw_badge(painter, f"{self.video_res}{self.interlaced if self.interlaced is not None else ''}", x_offset + 20, y_offset + self.orig_height - 40)
# Draw logo if available
if self.logo_pixmap:
painter.drawPixmap(x_offset + self.orig_width - 100, y_offset + 20, self.logo_pixmap)
except Exception as e:
print(f"Error in painting: {e}")
import traceback
traceback.print_exc()
def draw_badge(self, painter, text, x, y):
# Save current painter state
painter.save()
# Draw rounded badge
painter.setPen(QPen(QColor(255, 255, 255, 255), 2))
painter.setBrush(Qt.NoBrush)
# Use QPainterPath for consistent rounded corners
badge_path = QPainterPath()
badge_path.addRoundedRect(x, y, 48, 20, 7, 7)
painter.drawPath(badge_path)
# Draw text
painter.setPen(QColor(255, 255, 255))
font = painter.font()
font.setBold(True)
font.setPointSize(8)
painter.setFont(font)
# Center text in badge
font_metrics = painter.fontMetrics()
text_width = font_metrics.width(text)
text_height = font_metrics.height()
# We need to use integer coordinates for drawText, not floats
text_x = int(x + (48 - text_width) / 2)
text_y = int(y + text_height)
# Use the int, int, string version of drawText
painter.drawText(text_x, text_y, text)
# Restore painter state
painter.restore()
def update_codecs(self, video_codec, audio_codec, video_res, interlaced):
if video_codec:
self.video_codec = video_codec
if audio_codec:
self.audio_codec = audio_codec
if video_res:
self.video_res = video_res
if interlaced is not None:
self.interlaced = f"{'i' if interlaced else 'p'}"
self.update() # Trigger repaint
def close_widget(self):
# Stop any active timers
if hasattr(self, 'stay_on_top_timer') and self.stay_on_top_timer.isActive():
self.stay_on_top_timer.stop()
# Close the widget
self.hide()
def start_close_timer(self, seconds=5):
"""
Starts a timer to close the widget after the specified number of seconds.
Parameters:
seconds (int): Number of seconds before closing the widget (default: 3)
"""
# Cancel any existing close timer
if hasattr(self, 'close_timer') and self.close_timer.isActive():
self.close_timer.stop()
# Create and start a new timer
self.close_timer = QTimer(self)
self.close_timer.setSingleShot(True)
self.close_timer.timeout.connect(self.close_widget)
self.close_timer.start(seconds * 1000) # Convert seconds to milliseconds
osd = None
def qt_process():
"""Run Qt application in a separate process"""
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QTimer
app = QApplication(sys.argv)
osd = None
# Check the queue periodically for commands
def check_queue():
global osd
if not to_qt_queue.empty():
command = to_qt_queue.get()
if command['action'] == 'show_osd':
if osd is not None:
osd.close_widget()
# Create new OSD
osd = OsdWidget(command['channel_info'])
if is_wayland:
osd.showFullScreen()
else:
osd.show()
elif command['action'] == 'start_close':
if osd is not None:
osd.start_close_timer()
elif command['action'] == 'close_osd':
if osd is not None:
osd.close_widget()
osd = None
elif command['action'] == 'update_codecs':
if osd is not None:
osd.update_codecs(command['vcodec'], command['acodec'], command['video_res'], command['interlaced'])
# Schedule next check
QTimer.singleShot(100, check_queue)
# Start the queue check
check_queue()
# Run Qt event loop
app.exec_()
def get_channels():
if M3U_URL:
response = requests.get(M3U_URL)
else:
print("Error: IPMPV_M3U_URL not set. Please set this environment variable to the URL of your IPTV list, in M3U format.")
exit(1)
lines = response.text.splitlines()
channels = []
regex = re.compile(r'tvg-logo="(.*?)".*?group-title="(.*?)"', re.IGNORECASE)
for i in range(len(lines)):
if lines[i].startswith("#EXTINF"):
match = regex.search(lines[i])
logo = match.group(1) if match else ""
group = match.group(2) if match else "Other"
name = lines[i].split(",")[-1]
url = lines[i + 1]
channels.append({"name": name, "url": url, "logo": logo, "group": group})
return channels
def get_current_resolution():
global is_wayland
# Initialize volume control
volume_control = VolumeControl(to_qt_queue=to_qt_queue)
# Start Qt process
qt_proc = multiprocessing.Process(
target=qt_process,
args=(to_qt_queue, from_qt_queue),
daemon=True
)
qt_proc.start()
# Start Flask server
server = IPMPVServer(
channels=channels,
player=player,
to_qt_queue=to_qt_queue,
from_qt_queue=from_qt_queue,
resolution=resolution,
ipmpv_retroarch_cmd=ipmpv_retroarch_cmd,
volume_control=volume_control
)
try:
if is_wayland:
wlr_randr_env = os.environ.copy()
output = subprocess.check_output(["wlr-randr"], universal_newlines=True, env=wlr_randr_env)
if "Composite-1" in output.split("\n")[0]:
for line in output.split("\n"):
if "720x480" in line and "current" in line:
return "480i"
elif "720x240" in line and "current" in line:
return "240p"
elif "720x576" in line and "current" in line:
return "576i"
elif "720x288" in line and "current" in line:
return "288p"
else:
xrandr_env = os.environ.copy()
output = subprocess.check_output(["xrandr"], universal_newlines=True, env=xrandr_env)
for line in output.split("\n"):
if "Composite-1" in line:
if "720x480" in line:
return "480i"
elif "720x240" in line:
return "240p"
elif "720x576" in line:
return "576i"
elif "720x288" in line:
return "288p"
except subprocess.CalledProcessError:
if is_wayland:
print("Error: Cannot get display resolution. Is this a wl-roots compatible compositor?")
else:
print("Error: Cannot get display resolution. Is an X session running?")
except FileNotFoundError:
if is_wayland:
print("Error: Could not find wlr-randr, resolution will be unknown")
else:
print("Error: Could not find xrandr, resolution will be unknown")
return "UNK"
def error_check(loglevel, component, message):
global player
print(f"[{loglevel}] {component}: {message}")
if loglevel == 'error' and (component == 'ffmpeg' or component == 'cplayer') and 'Failed' in message:
player.loadfile("./nosignal.png")
to_qt_queue.put({
'action': 'start_close'
})
player = mpv.MPV(
# it's a bit weird that i have to use the logs to get errors,
# but catch_errors is apparently broken
log_handler = error_check,
vo = 'gpu',
hwdec = 'auto-safe',
demuxer_lavf_o = 'reconnect=1',
deinterlace = 'no',
keepaspect = 'no',
geometry = '100%:100%',
fullscreen = 'yes',
loop_playlist = 'inf'
)
deinterlace = False
channels = get_channels()
current_index = None
retroarch_p = None
resolution = get_current_resolution()
low_latency = False
vcodec = None
acodec = None
video_res = None
interlaced = None
@player.property_observer('video-format')
def video_codec_observer(name, value):
global vcodec, acodec
if value:
vcodec = value.upper()
@player.property_observer('audio-codec-name')
def audio_codec_observer(name, value):
global acodec, vcodec
if value:
acodec = value.upper()
def play_channel(index):
global current_index, vcodec, acodec, video_res, interlaced
print(f"\n=== Starting channel change to index {index} ===")
to_qt_queue.put({
'action': 'close_osd'
})
print("Closed OSD")
vcodec = None
acodec = None
current_index = index % len(channels)
print(f"Playing channel: {channels[current_index]['name']} ({channels[current_index]['url']})")
try:
player.loadfile("./novideo.png")
player.wait_until_playing()
channel_info = {
"name": channels[current_index]["name"],
"deinterlace": deinterlace,
"low_latency": low_latency,
"logo": channels[current_index]["logo"]
}
to_qt_queue.put({
'action': 'show_osd',
'channel_info': channel_info
})
player.loadfile(channels[current_index]['url'])
time.sleep(0.5)
player.wait_until_playing()
video_params = player.video_params
video_frame_info = player.video_frame_info
if video_params and video_frame_info:
video_res = video_params.get('h')
interlaced = video_frame_info.get('interlaced')
to_qt_queue.put({
'action': 'update_codecs',
'vcodec': vcodec,
'acodec': acodec,
'video_res': video_res,
'interlaced': interlaced
})
to_qt_queue.put({
'action': 'start_close',
})
except Exception as e:
print(f"\033[91mError in play_channel: {str(e)}\033[0m")
traceback.print_exc()
app = flask.Flask(__name__)
# Flask routes here
@app.route("/")
def index():
grouped_channels = {}
for channel in channels:
grouped_channels.setdefault(channel["group"], []).append(channel)
flat_channel_list = [channel for channel in channels]
# Create the channel groups HTML
channel_groups_html = ""
for group, ch_list in grouped_channels.items():
channel_groups_html += f'<div class="group">{group}'
for channel in ch_list:
index = flat_channel_list.index(channel) # Get correct global index
channel_groups_html += f'''
<div class="channel">
<img src="{channel['logo']}" onerror="this.style.display='none'">
<button onclick="changeChannel({index})">{channel['name']}</button>
</div>
'''
channel_groups_html += '</div>'
# Replace placeholders with actual values
html = open("templates/index.html").read()
html = html.replace("%CURRENT_CHANNEL%", channels[current_index]['name'] if current_index is not None else "None")
html = html.replace("%RETROARCH_STATE%", "ON" if retroarch_p and retroarch_p.poll() is None else "OFF")
html = html.replace("%RETROARCH_LABEL%", "Stop RetroArch" if retroarch_p and retroarch_p.poll() is None else "Start RetroArch")
html = html.replace("%DEINTERLACE_STATE%", "ON" if deinterlace else "OFF")
html = html.replace("%RESOLUTION%", resolution)
html = html.replace("%LATENCY_STATE%", "ON" if low_latency else "OFF")
html = html.replace("%LATENCY_LABEL%", "Lo" if low_latency else "Hi")
html = html.replace("%CHANNEL_GROUPS%", channel_groups_html)
return html
def is_valid_url(url):
return re.match(r"^(https?|rtmp|rtmps|udp|tcp):\/\/[\w\-]+(\.[\w\-]+)*(:\d+)?([\/?].*)?$", url) is not None
@app.route("/play_custom")
def play_custom():
global current_index
current_index = None
url = request.args.get("url")
if not url or not is_valid_url(url):
return jsonify(success=False, error="Invalid or unsupported URL")
player.loadfile(url)
return jsonify(success=True)
@app.route("/hide_osd")
def hide_osd():
to_qt_queue.put({
'action': 'close_osd',
})
return "", 204
@app.route("/show_osd")
def show_osd():
if current_index is not None:
channel_info = {
"name": channels[current_index]["name"],
"deinterlace": deinterlace,
"low_latency": low_latency,
"logo": channels[current_index]["logo"]
}
to_qt_queue.put({
'action': 'show_osd',
'channel_info': channel_info
})
to_qt_queue.put({
'action': 'update_codecs',
'vcodec': vcodec,
'acodec': acodec,
'video_res': video_res,
'interlaced': interlaced
})
return "", 204
@app.route("/channel")
def switch_channel():
index = int(request.args.get("index", current_index))
play_channel(index)
return "", 204
@app.route("/toggle_deinterlace")
def toggle_deinterlace():
global deinterlace
deinterlace = not deinterlace
if deinterlace:
player['vf'] = 'yadif=0'
else:
player['vf'] = ''
return jsonify(state=deinterlace)
@app.route("/stop_player")
def stop_player():
global current_index
global osd
current_index = None
to_qt_queue.put({
'action': 'close_osd',
})
player.stop()
return "", 204
@app.route("/toggle_retroarch")
def toggle_retroarch():
global retroarch_p
retroarch_pid = subprocess.run(["pgrep", "-fx", "retroarch"], stdout=subprocess.PIPE).stdout.strip()
if retroarch_pid:
print("Retroarch already open. Trying to close it.")
subprocess.run(["kill", retroarch_pid])
retroarch_p.terminate()
return jsonify(state=False)
else:
print("Launching RetroArch")
retroarch_env = os.environ.copy()
retroarch_env["MESA_GL_VERSION_OVERRIDE"] = "3.3"
retroarch_p = subprocess.Popen(re.split("\\s", ipmpv_retroarch_cmd if ipmpv_retroarch_cmd is not None else 'retroarch'), env=retroarch_env)
return jsonify(state=True)
@app.route("/toggle_latency")
def toggle_latency():
global low_latency
low_latency = not low_latency
player['audio-buffer'] = '0' if low_latency else '0.2'
player['vd-lavc-threads'] = '1' if low_latency else '0'
player['cache-pause'] = 'no' if low_latency else 'yes'
player['demuxer-lavf-o'] = 'reconnect=1,fflags=+nobuffer' if low_latency else 'reconnect=1'
player['demuxer-lavf-probe-info'] = 'nostreams' if low_latency else 'auto'
player['demuxer-lavf-analyzeduration'] = '0.1' if low_latency else '0'
player['video-sync'] = 'audio'
player['interpolation'] = 'no'
player['video-latency-hacks'] = 'yes' if low_latency else 'no'
player['stream-buffer-size'] = '4k' if low_latency else '128k'
print("JSON: ",jsonify(state=low_latency))
return jsonify(state=low_latency)
@app.route("/toggle_resolution")
def toggle_resolution():
global resolution,is_wayland
new_res = ""
if is_wayland:
if resolution == "480i":
new_res = "720x240"
elif resolution == "240p":
new_res = "720x480"
elif resolution == "576i":
new_res = "720x288"
elif resolution == "288p":
new_res = "720x576"
else:
if resolution == "480i":
new_res = "720x240"
elif resolution == "240p":
new_res = "720x480i"
elif resolution == "576i":
new_res = "720x288"
elif resolution == "288p":
new_res = "720x576i"
if new_res:
if is_wayland:
wlr_randr_env = os.environ.copy()
wlr_randr_env["DISPLAY"] = ":0"
subprocess.run(["wlr-randr", "--output", "Composite-1", "--mode", new_res], check=False, env=wlr_randr_env)
resolution = get_current_resolution()
else:
xrandr_env = os.environ.copy()
xrandr_env["DISPLAY"] = ":0"
subprocess.run(["xrandr", "--output", "Composite-1", "--mode", new_res], check=False, env=xrandr_env)
resolution = get_current_resolution()
return jsonify(res=get_current_resolution())
@app.route('/manifest.json')
def serve_manifest():
return send_from_directory("static", 'manifest.json',
mimetype='application/manifest+json')
# Serve icon files from root
@app.route('/icon512_rounded.png')
def serve_rounded_icon():
return send_from_directory("static", 'icon512_rounded.png',
mimetype='image/png')
@app.route('/icon512_maskable.png')
def serve_maskable_icon():
return send_from_directory("static", 'icon512_maskable.png',
mimetype='image/png')
@app.route('/screenshot1.png')
def serve_screenshot_1():
return send_from_directory("static", 'screenshot1.png',
mimetype='image/png')
# Run the Flask server (this will block)
server.run(host="0.0.0.0", port=5000)
except KeyboardInterrupt:
print("Shutting down...")
finally:
# Clean up
if qt_proc.is_alive():
qt_proc.terminate()
qt_proc.join(timeout=1)
sys.exit(0)
if __name__ == "__main__":
# Start Qt process
qt_proc = multiprocessing.Process(target=qt_process)
qt_proc.daemon = True
qt_proc.start()
# Start Flask in main thread
app.run(host="0.0.0.0", port=5000)
main()

250
osd.py Normal file
View File

@ -0,0 +1,250 @@
#!/usr/bin/python
"""On-screen display widget for IPMPV."""
import os
import requests
import traceback
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from utils import is_wayland, osd_corner_radius
class OsdWidget(QWidget):
"""Widget for the on-screen display."""
def __init__(self, channel_info, width=600, height=165, close_time=5, corner_radius=int(osd_corner_radius) if osd_corner_radius is not None else 15):
"""Initialize the OSD widget."""
QFontDatabase.addApplicationFont('FiraSans-Regular.ttf')
QFontDatabase.addApplicationFont('FiraSans-Bold.ttf')
super().__init__()
self.channel_info = channel_info
self.orig_width = width
self.orig_height = height
self.close_time = close_time
self.corner_radius = corner_radius
self.video_codec = None
self.audio_codec = None
self.video_res = None
self.interlaced = None
# Setup window
self.setWindowTitle("OSD")
self.setFixedSize(width, height)
# Check if we're running on Wayland
self.is_wayland = is_wayland
# Set appropriate window flags and size
if self.is_wayland:
# For Wayland, use fullscreen transparent approach
self.setWindowFlags(
Qt.FramelessWindowHint |
Qt.WindowStaysOnTopHint
)
# Set fullscreen size
self.screen_geometry = QApplication.desktop().screenGeometry()
self.setFixedSize(self.screen_geometry.width(), self.screen_geometry.height())
# Calculate content positioning
self.content_x = (self.screen_geometry.width() - self.orig_width) // 2
self.content_y = 20 # 20px from top
else:
# For X11, use the original approach
self.setWindowFlags(
Qt.FramelessWindowHint |
Qt.WindowStaysOnTopHint |
Qt.X11BypassWindowManagerHint |
Qt.Tool |
Qt.ToolTip
)
self.setFixedSize(width, height)
self.content_x = 0
self.content_y = 0
# Enable transparency
self.setAttribute(Qt.WA_TranslucentBackground)
# Position window at the top center of the screen
self.position_window()
# Load logo if available
self.logo_pixmap = None
if channel_info["logo"]:
self.load_logo()
if self.is_wayland:
self.setAttribute(Qt.WA_TransparentForMouseEvents)
def position_window(self):
"""Position the window on the screen."""
if self.is_wayland:
# For Wayland, we just position at 0,0 (fullscreen)
self.move(0, 0)
# Ensure window stays on top
self.stay_on_top_timer = QTimer(self)
self.stay_on_top_timer.timeout.connect(lambda: self.raise_())
self.stay_on_top_timer.start(100) # Check every 100ms
else:
# For X11, center at top
screen_geometry = QApplication.desktop().screenGeometry()
x = (screen_geometry.width() - self.orig_width) // 2
y = 20 # 20px from top
self.setGeometry(x, y, self.orig_width, self.orig_height)
# X11 specific window hints
self.setAttribute(Qt.WA_X11NetWmWindowTypeNotification)
QTimer.singleShot(100, lambda: self.move(x, y))
QTimer.singleShot(500, lambda: self.move(x, y))
# Periodically ensure window stays on top
self.stay_on_top_timer = QTimer(self)
self.stay_on_top_timer.timeout.connect(lambda: self.raise_())
self.stay_on_top_timer.start(1000) # Check every second
def load_logo(self):
"""Load the channel logo."""
try:
response = requests.get(self.channel_info["logo"])
if response.ok:
pixmap = QPixmap()
pixmap.loadFromData(response.content)
if not pixmap.isNull():
self.logo_pixmap = pixmap.scaled(80, 80, Qt.KeepAspectRatio, Qt.SmoothTransformation)
self.update() # Trigger repaint
except Exception as e:
print(f"Failed to load logo: {e}")
def paintEvent(self, a0):
"""Paint event handler."""
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing)
if self.is_wayland:
# For Wayland, we're drawing the content in the right position on a fullscreen widget
self.draw_osd_content(painter, self.content_x, self.content_y)
else:
# For X11, we're drawing directly at (0,0) since the widget is already positioned
self.draw_osd_content(painter, 0, 0)
def draw_osd_content(self, painter, x_offset, y_offset):
"""Draw the OSD content."""
# Create a path for rounded rectangle background
path = QPainterPath()
path.addRoundedRect(
x_offset, y_offset,
self.orig_width, self.orig_height,
self.corner_radius, self.corner_radius
)
# Fill the rounded rectangle with semi-transparent background
painter.setPen(Qt.NoPen)
painter.setBrush(QColor(0, 50, 100, 200)) # RGBA
painter.drawPath(path)
# Setup text drawing
painter.setPen(QColor(255, 255, 255))
try:
font = QFont("Fira Sans", 18)
font.setBold(True)
painter.setFont(font)
# Draw channel name
painter.drawText(x_offset + 20, y_offset + 40, self.channel_info["name"])
font.setPointSize(14)
font.setBold(False)
painter.setFont(font)
# Draw deinterlace status
painter.drawText(x_offset + 20, y_offset + 70, f"Deinterlacing {'on' if self.channel_info['deinterlace'] else 'off'}")
# Draw latency mode
painter.drawText(x_offset + 20, y_offset + 100, f"{'Low' if self.channel_info['low_latency'] else 'High'} latency")
# Draw codec badges if available
if self.video_codec:
self.draw_badge(painter, self.video_codec, x_offset + 80, y_offset + self.orig_height - 40)
if self.audio_codec:
self.draw_badge(painter, self.audio_codec, x_offset + 140, y_offset + self.orig_height - 40)
if self.video_res:
self.draw_badge(painter, f"{self.video_res}{self.interlaced if self.interlaced is not None else ''}", x_offset + 20, y_offset + self.orig_height - 40)
# Draw logo if available
if self.logo_pixmap:
painter.drawPixmap(x_offset + self.orig_width - 100, y_offset + 20, self.logo_pixmap)
except Exception as e:
print(f"Error in painting: {e}")
traceback.print_exc()
def draw_badge(self, painter, text, x, y):
"""Draw a badge with text."""
# Save current painter state
painter.save()
# Draw rounded badge
painter.setPen(QPen(QColor(255, 255, 255, 255), 2))
painter.setBrush(Qt.NoBrush)
# Use QPainterPath for consistent rounded corners
badge_path = QPainterPath()
badge_path.addRoundedRect(x, y, 48, 20, 7, 7)
painter.drawPath(badge_path)
# Draw text
painter.setPen(QColor(255, 255, 255))
font = painter.font()
font.setBold(True)
font.setPointSize(8)
painter.setFont(font)
# Center text in badge
font_metrics = painter.fontMetrics()
text_width = font_metrics.width(text)
text_height = font_metrics.height()
# We need to use integer coordinates for drawText, not floats
text_x = int(x + (48 - text_width) / 2)
text_y = int(y + text_height)
# Use the int, int, string version of drawText
painter.drawText(text_x, text_y, text)
# Restore painter state
painter.restore()
def update_codecs(self, video_codec, audio_codec, video_res, interlaced):
"""Update codec information."""
if video_codec:
self.video_codec = video_codec
if audio_codec:
self.audio_codec = audio_codec
if video_res:
self.video_res = video_res
if interlaced is not None:
self.interlaced = f"{'i' if interlaced else 'p'}"
self.update() # Trigger repaint
def close_widget(self):
"""Close the widget."""
# Stop any active timers
if hasattr(self, 'stay_on_top_timer') and self.stay_on_top_timer.isActive():
self.stay_on_top_timer.stop()
# Close the widget
self.hide()
def start_close_timer(self, seconds=5):
"""Start a timer to close the widget."""
# Cancel any existing close timer
if hasattr(self, 'close_timer') and self.close_timer.isActive():
self.close_timer.stop()
# Create and start a new timer
self.close_timer = QTimer(self)
self.close_timer.setSingleShot(True)
self.close_timer.timeout.connect(self.close_widget)
self.close_timer.start(seconds * 1000) # Convert seconds to milliseconds

151
player.py Normal file
View File

@ -0,0 +1,151 @@
#!/usr/bin/python
"""MPV player functionality for IPMPV."""
import mpv
import threading
import time
import traceback
class Player:
"""MPV player wrapper with IPMPV-specific functionality."""
def __init__(self, to_qt_queue):
"""Initialize the player."""
self.to_qt_queue = to_qt_queue
self.player = mpv.MPV(
log_handler=self.error_check,
vo='gpu',
hwdec='auto-safe',
demuxer_lavf_o='reconnect=1',
deinterlace='no',
keepaspect='no',
geometry='100%:100%',
fullscreen='yes',
loop_playlist='inf'
)
self.deinterlace = False
self.low_latency = False
self.current_index = None
self.vcodec = None
self.acodec = None
self.video_res = None
self.interlaced = None
# Set up property observers
self.player.observe_property('video-format', self.video_codec_observer)
self.player.observe_property('audio-codec-name', self.audio_codec_observer)
# Channel change management
self.channel_change_lock = threading.Lock()
self.current_channel_thread = None
self.channel_change_counter = 0 # To track the most recent channel change
def error_check(self, loglevel, component, message):
"""Check for errors in MPV logs."""
print(f"[{loglevel}] {component}: {message}")
if loglevel == 'error' and (component == 'ffmpeg' or component == 'cplayer') and 'Failed' in message:
self.player.loadfile("./nosignal.png")
self.to_qt_queue.put({
'action': 'start_close'
})
def video_codec_observer(self, name, value):
"""Observe changes to the video codec."""
if value:
self.vcodec = value.upper()
def audio_codec_observer(self, name, value):
"""Observe changes to the audio codec."""
if value:
self.acodec = value.upper()
def play_channel(self, index, channels):
"""
Play a channel by index.
Args:
index (int): Index of the channel to play.
channels (list): List of channel dictionaries.
"""
print(f"\n=== Changing channel to index {index} ===")
self.vcodec = None
self.acodec = None
self.current_index = index % len(channels)
print(f"Playing channel: {channels[self.current_index]['name']} ({channels[self.current_index]['url']})")
try:
self.player.loadfile("./novideo.png")
self.player.wait_until_playing()
channel_info = {
"name": channels[self.current_index]["name"],
"deinterlace": self.deinterlace,
"low_latency": self.low_latency,
"logo": channels[self.current_index]["logo"]
}
self.to_qt_queue.put({
'action': 'show_osd',
'channel_info': channel_info
})
self.player.loadfile(channels[self.current_index]['url'])
self.player.wait_until_playing()
video_params = self.player.video_params
video_frame_info = self.player.video_frame_info
if video_params and video_frame_info:
self.video_res = video_params.get('h')
self.interlaced = video_frame_info.get('interlaced')
self.to_qt_queue.put({
'action': 'update_codecs',
'vcodec': self.vcodec,
'acodec': self.acodec,
'video_res': self.video_res,
'interlaced': self.interlaced
})
self.to_qt_queue.put({
'action': 'start_close',
})
except Exception as e:
print(f"\033[91mError in play_channel: {str(e)}\033[0m")
traceback.print_exc()
return
def toggle_deinterlace(self):
"""Toggle deinterlacing."""
self.deinterlace = not self.deinterlace
if self.deinterlace:
self.player['vf'] = 'yadif=0'
else:
self.player['vf'] = ''
return self.deinterlace
def toggle_latency(self):
"""Toggle low latency mode."""
self.low_latency = not self.low_latency
self.player['audio-buffer'] = '0' if self.low_latency else '0.2'
self.player['vd-lavc-threads'] = '1' if self.low_latency else '0'
self.player['cache-pause'] = 'no' if self.low_latency else 'yes'
self.player['demuxer-lavf-o'] = 'reconnect=1,fflags=+nobuffer' if self.low_latency else 'reconnect=1'
self.player['demuxer-lavf-probe-info'] = 'nostreams' if self.low_latency else 'auto'
self.player['demuxer-lavf-analyzeduration'] = '0.1' if self.low_latency else '0'
self.player['video-sync'] = 'audio'
self.player['interpolation'] = 'no'
self.player['video-latency-hacks'] = 'yes' if self.low_latency else 'no'
self.player['stream-buffer-size'] = '4k' if self.low_latency else '128k'
return self.low_latency
def stop(self):
"""Stop the player."""
self.player.stop()
self.current_index = None

94
qt_process.py Normal file
View File

@ -0,0 +1,94 @@
#!/usr/bin/python
"""Qt process for IPMPV OSD."""
import sys
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QTimer
from osd import OsdWidget
from volume_osd import VolumeOsdWidget
from utils import is_wayland
def qt_process(to_qt_queue, from_qt_queue):
"""
Run Qt application in a separate process.
Args:
to_qt_queue: Queue for messages to Qt process
from_qt_queue: Queue for messages from Qt process
"""
app = QApplication(sys.argv)
osd = None
volume_osd = None
# Check the queue periodically for commands
def check_queue():
nonlocal osd, volume_osd
if not to_qt_queue.empty():
command = to_qt_queue.get()
# Channel OSD commands
if command['action'] == 'show_osd':
if osd is not None:
osd.close_widget()
# Create new OSD
osd = OsdWidget(command['channel_info'])
if is_wayland:
osd.showFullScreen()
else:
osd.show()
elif command['action'] == 'start_close':
if osd is not None:
osd.start_close_timer()
elif command['action'] == 'close_osd':
if osd is not None:
osd.close_widget()
osd = None
elif command['action'] == 'update_codecs':
if osd is not None:
osd.update_codecs(command['vcodec'], command['acodec'], command['video_res'], command['interlaced'])
# Volume OSD commands
elif command['action'] == 'show_volume_osd':
# Close existing volume OSD if present
if volume_osd is not None:
volume_osd.close_widget()
# Get volume level and mute state
volume_level = command.get('volume_level', 0)
is_muted = command.get('is_muted', False)
# If muted, override volume display to 0
display_volume = 0 if is_muted else volume_level
# Create new volume OSD
volume_osd = VolumeOsdWidget(display_volume)
if is_wayland:
volume_osd.showFullScreen()
else:
volume_osd.show()
# Start the close timer
volume_osd.start_close_timer()
elif command['action'] == 'update_volume_osd':
if volume_osd is not None:
volume_level = command.get('volume_level', 0)
is_muted = command.get('is_muted', False)
# If muted, override volume display to 0
display_volume = 0 if is_muted else volume_level
volume_osd.update_volume(display_volume)
volume_osd.start_close_timer()
elif command['action'] == 'close_volume_osd':
if volume_osd is not None:
volume_osd.close_widget()
volume_osd = None
# Schedule next check
QTimer.singleShot(100, check_queue)
# Start the queue check
check_queue()
# Run Qt event loop
app.exec_()

274
server.py Normal file
View File

@ -0,0 +1,274 @@
#!/usr/bin/python
"""Flask server for IPMPV."""
import os
import re
import subprocess
import threading
import flask
from flask import request, jsonify, send_from_directory
from utils import is_valid_url, change_resolution, get_current_resolution, is_wayland
class IPMPVServer:
"""Flask server for IPMPV web interface."""
def __init__(self, channels, player, to_qt_queue, from_qt_queue, resolution, ipmpv_retroarch_cmd, volume_control=None):
"""Initialize the server."""
self.app = flask.Flask(__name__,
static_folder='static',
template_folder='templates')
self.channels = channels
self.player = player
self.to_qt_queue = to_qt_queue
self.from_qt_queue = from_qt_queue
self.resolution = resolution
self.ipmpv_retroarch_cmd = ipmpv_retroarch_cmd
self.retroarch_p = None
self.volume_control = volume_control
# Register routes
self._register_routes()
def run(self, host="0.0.0.0", port=5000):
"""Run the Flask server."""
self.app.run(host=host, port=port)
def _register_routes(self):
"""Register Flask routes."""
@self.app.route("/")
def index():
return self._handle_index()
@self.app.route("/play_custom")
def play_custom():
return self._handle_play_custom()
@self.app.route("/hide_osd")
def hide_osd():
return self._handle_hide_osd()
@self.app.route("/show_osd")
def show_osd():
return self._handle_show_osd()
@self.app.route("/channel")
def switch_channel():
return self._handle_switch_channel()
@self.app.route("/toggle_deinterlace")
def toggle_deinterlace():
return self._handle_toggle_deinterlace()
@self.app.route("/stop_player")
def stop_player():
return self._handle_stop_player()
@self.app.route("/toggle_retroarch")
def toggle_retroarch():
return self._handle_toggle_retroarch()
@self.app.route("/toggle_latency")
def toggle_latency():
return self._handle_toggle_latency()
@self.app.route("/toggle_resolution")
def toggle_resolution():
return self._handle_toggle_resolution()
@self.app.route("/volume_up")
def volume_up():
return self._handle_volume_up()
@self.app.route("/volume_down")
def volume_down():
return self._handle_volume_down()
@self.app.route("/toggle_mute")
def toggle_mute():
return self._handle_toggle_mute()
def _handle_index(self):
"""Handle the index route."""
from channels import group_channels
grouped_channels = group_channels(self.channels)
flat_channel_list = [channel for channel in self.channels]
# Create the channel groups HTML
channel_groups_html = ""
for group, ch_list in grouped_channels.items():
channel_groups_html += f'<div class="group">{group}'
for channel in ch_list:
index = flat_channel_list.index(channel) # Get correct global index
channel_groups_html += f'''
<div class="channel">
<img src="{channel['logo']}" onerror="this.style.display='none'">
<button onclick="changeChannel({index})">{channel['name']}</button>
</div>
'''
channel_groups_html += '</div>'
# Replace placeholders with actual values
html = open("templates/index.html").read()
html = html.replace("%CURRENT_CHANNEL%",
self.channels[self.player.current_index]['name']
if self.player.current_index is not None else "None")
html = html.replace("%RETROARCH_STATE%",
"ON" if self.retroarch_p and self.retroarch_p.poll() is None else "OFF")
html = html.replace("%RETROARCH_LABEL%",
"Stop RetroArch" if self.retroarch_p and self.retroarch_p.poll() is None else "Start RetroArch")
html = html.replace("%DEINTERLACE_STATE%", "ON" if self.player.deinterlace else "OFF")
html = html.replace("%RESOLUTION%", self.resolution)
html = html.replace("%LATENCY_STATE%", "ON" if self.player.low_latency else "OFF")
html = html.replace("%LATENCY_LABEL%", "Lo" if self.player.low_latency else "Hi")
html = html.replace("%CHANNEL_GROUPS%", channel_groups_html)
return html
def _handle_play_custom(self):
"""Handle the play_custom route."""
url = request.args.get("url")
if not url or not is_valid_url(url):
return jsonify(success=False, error="Invalid or unsupported URL")
self.player.player.loadfile(url)
self.player.current_index = None
return jsonify(success=True)
def _handle_hide_osd(self):
"""Handle the hide_osd route."""
self.to_qt_queue.put({
'action': 'close_osd',
})
return "", 204
def _handle_show_osd(self):
"""Handle the show_osd route."""
if self.player.current_index is not None:
channel_info = {
"name": self.channels[self.player.current_index]["name"],
"deinterlace": self.player.deinterlace,
"low_latency": self.player.low_latency,
"logo": self.channels[self.player.current_index]["logo"]
}
self.to_qt_queue.put({
'action': 'show_osd',
'channel_info': channel_info
})
self.to_qt_queue.put({
'action': 'update_codecs',
'vcodec': self.player.vcodec,
'acodec': self.player.acodec,
'video_res': self.player.video_res,
'interlaced': self.player.interlaced
})
return "", 204
def _handle_switch_channel(self):
"""Handle the switch_channel route."""
self.player.stop()
index = int(request.args.get("index", self.player.current_index))
thread = threading.Thread(
target=self.player.play_channel,
args=(index,self.channels),
daemon=True
)
thread.start()
return "", 204
def _handle_toggle_deinterlace(self):
"""Handle the toggle_deinterlace route."""
state = self.player.toggle_deinterlace()
return jsonify(state=state)
def _handle_stop_player(self):
"""Handle the stop_player route."""
self.to_qt_queue.put({
'action': 'close_osd',
})
self.player.stop()
return "", 204
def _handle_toggle_retroarch(self):
"""Handle the toggle_retroarch route."""
retroarch_pid = subprocess.run(["pgrep", "-fx", "retroarch"], stdout=subprocess.PIPE).stdout.strip()
if retroarch_pid:
print("Retroarch already open. Trying to close it.")
subprocess.run(["kill", retroarch_pid])
if self.retroarch_p:
self.retroarch_p.terminate()
return jsonify(state=False)
else:
print("Launching RetroArch")
retroarch_env = os.environ.copy()
retroarch_env["MESA_GL_VERSION_OVERRIDE"] = "3.3"
self.retroarch_p = subprocess.Popen(re.split("\\s", self.ipmpv_retroarch_cmd
if self.ipmpv_retroarch_cmd is not None
else 'retroarch'), env=retroarch_env)
return jsonify(state=True)
def _handle_toggle_latency(self):
"""Handle the toggle_latency route."""
state = self.player.toggle_latency()
return jsonify(state=state)
def _handle_toggle_resolution(self):
"""Handle the toggle_resolution route."""
self.resolution = change_resolution(self.resolution)
return jsonify(res=self.resolution)
@self.app.route('/manifest.json')
def serve_manifest():
return send_from_directory("static", 'manifest.json',
mimetype='application/manifest+json')
@self.app.route('/icon512_rounded.png')
def serve_rounded_icon():
return send_from_directory("static", 'icon512_rounded.png',
mimetype='image/png')
@self.app.route('/icon512_maskable.png')
def serve_maskable_icon():
return send_from_directory("static", 'icon512_maskable.png',
mimetype='image/png')
@self.app.route('/screenshot1.png')
def serve_screenshot_1():
return send_from_directory("static", 'screenshot1.png',
mimetype='image/png')
def _handle_volume_up(self):
"""Handle the volume_up route."""
if self.volume_control:
step = request.args.get("step")
step = int(step) if step and step.isdigit() else None
new_volume = self.volume_control.volume_up(step)
return jsonify(volume=new_volume, muted=self.volume_control.is_muted())
return jsonify(error="Volume control not available"), 404
def _handle_volume_down(self):
"""Handle the volume_down route."""
if self.volume_control:
step = request.args.get("step")
step = int(step) if step and step.isdigit() else None
new_volume = self.volume_control.volume_down(step)
return jsonify(volume=new_volume, muted=self.volume_control.is_muted())
return jsonify(error="Volume control not available"), 404
def _handle_toggle_mute(self):
"""Handle the toggle_mute route."""
if self.volume_control:
is_muted = self.volume_control.toggle_mute()
volume = self.volume_control.get_volume()
return jsonify(muted=is_muted, volume=volume)
return jsonify(error="Volume control not available"), 404
def _handle_get_volume(self):
"""Handle the get_volume route."""
if self.volume_control:
volume = self.volume_control.get_volume()
is_muted = self.volume_control.is_muted()
return jsonify(volume=volume, muted=is_muted)
return jsonify(error="Volume control not available"), 404

View File

@ -143,20 +143,46 @@
margin: 0;
}
#osd-on-btn {
.leftbtn {
border-radius: var(--border-radius) 0 0 var(--border-radius);
min-width: 80px;
margin-right: 0;
border-right: 0;
}
#osd-off-btn {
.rightbtn {
border-radius: 0 var(--border-radius) var(--border-radius) 0;
min-width: 80px;
margin-left: 0;
border-left: 0;
}
.midbtn {
border-left: none;
border-right: none;
margin-left: 0;
margin-right: 0;
border-radius: 0;
}
#osd-on-btn {
min-width: 80px;
}
#osd-off-btn {
min-width: 80px;
}
#vol-up-btn {
min-width: 60px;
}
#vol-dn-btn {
min-width: 60px;
}
#vol-mute-btn {
min-width: 80px;
}
#latency-btn {
min-width: 60px;
width: 60px;
@ -355,11 +381,20 @@
</button>
</div>
<div class="section">
<h2>Volume</h2>
<div class="osd-toggle">
<button class="leftbtn" id="vol-up-btn" onclick="volumeUp()">+</button>
<button class="midbtn %MUTE_STATE%" id="vol-mute-btn" onclick="toggleMute()">mute</button>
<button class="rightbtn" id="vol-dn-btn" onclick="volumeDown()">-</button>
</div>
</div>
<div class="section">
<h2>Toggle OSD</h2>
<div class="osd-toggle">
<button id="osd-on-btn" onclick="showOSD()">on</button>
<button id="osd-off-btn" onclick="hideOSD()">off</button>
<button class="leftbtn" id="osd-on-btn" onclick="showOSD()">on</button>
<button class="rightbtn" id="osd-off-btn" onclick="hideOSD()">off</button>
</div>
</div>
@ -497,6 +532,31 @@
fetch(`/hide_osd`).then(response => response.json());
}
function volumeUp() {
fetch(`/volume_up`)
.then(response => response.json())
.then(data => {
showToast("Volume: "+data.volume+"%");
});
}
function volumeDown() {
fetch(`/volume_down`)
.then(response => response.json())
.then(data => {
showToast("Volume: "+data.volume+"%");
});
}
function toggleMute() {
fetch(`/toggle_mute`)
.then(response => response.json())
.then(data => {
muted = data.muted ? "yes" : "no";
showToast("Muted: " + muted);
});
}
// Mobile-friendly toast notification
function showToast(message) {
const toast = document.createElement('div');
@ -526,4 +586,4 @@
</script>
</body>
</html>
</html>

98
utils.py Normal file
View File

@ -0,0 +1,98 @@
#!/usr/bin/python
"""Utility functions for IPMPV."""
import os
import re
import subprocess
# Environment variables
is_wayland = "WAYLAND_DISPLAY" in os.environ
osd_corner_radius = os.environ.get("IPMPV_CORNER_RADIUS")
ipmpv_retroarch_cmd = os.environ.get("IPMPV_RETROARCH_CMD")
m3u_url = os.environ.get('IPMPV_M3U_URL')
def setup_environment():
"""Set up environment variables."""
os.environ["LC_ALL"] = "C"
os.environ["LANG"] = "C"
def is_valid_url(url):
"""Check if a URL is valid."""
return re.match(r"^(https?|rtmp|rtmps|udp|tcp):\/\/[\w\-]+(\.[\w\-]+)*(:\d+)?([\/?].*)?$", url) is not None
def get_current_resolution():
"""Get the current display resolution."""
try:
if is_wayland:
wlr_randr_env = os.environ.copy()
output = subprocess.check_output(["wlr-randr"], universal_newlines=True, env=wlr_randr_env)
if "Composite-1" in output.split("\n")[0]:
for line in output.split("\n"):
if "720x480" in line and "current" in line:
return "480i"
elif "720x240" in line and "current" in line:
return "240p"
elif "720x576" in line and "current" in line:
return "576i"
elif "720x288" in line and "current" in line:
return "288p"
else:
xrandr_env = os.environ.copy()
output = subprocess.check_output(["xrandr"], universal_newlines=True, env=xrandr_env)
for line in output.split("\n"):
if "Composite-1" in line:
if "720x480" in line:
return "480i"
elif "720x240" in line:
return "240p"
elif "720x576" in line:
return "576i"
elif "720x288" in line:
return "288p"
except subprocess.CalledProcessError:
if is_wayland:
print("Error: Cannot get display resolution. Is this a wl-roots compatible compositor?")
else:
print("Error: Cannot get display resolution. Is an X session running?")
except FileNotFoundError:
if is_wayland:
print("Error: Could not find wlr-randr, resolution will be unknown")
else:
print("Error: Could not find xrandr, resolution will be unknown")
return "UNK"
def change_resolution(current_resolution):
"""Change the display resolution."""
new_res = ""
if is_wayland:
if current_resolution == "480i":
new_res = "720x240"
elif current_resolution == "240p":
new_res = "720x480"
elif current_resolution == "576i":
new_res = "720x288"
elif current_resolution == "288p":
new_res = "720x576"
else:
if current_resolution == "480i":
new_res = "720x240"
elif current_resolution == "240p":
new_res = "720x480i"
elif current_resolution == "576i":
new_res = "720x288"
elif current_resolution == "288p":
new_res = "720x576i"
if new_res:
if is_wayland:
wlr_randr_env = os.environ.copy()
wlr_randr_env["DISPLAY"] = ":0"
subprocess.run(["wlr-randr", "--output", "Composite-1", "--mode", new_res], check=False, env=wlr_randr_env)
return get_current_resolution()
else:
xrandr_env = os.environ.copy()
xrandr_env["DISPLAY"] = ":0"
subprocess.run(["xrandr", "--output", "Composite-1", "--mode", new_res], check=False, env=xrandr_env)
return get_current_resolution()
return current_resolution

181
volume.py Normal file
View File

@ -0,0 +1,181 @@
#!/usr/bin/python
"""Alsa volume control functions for IPMPV."""
import subprocess
import alsaaudio
import threading
import traceback
import time
class VolumeControl:
"""
Class for controlling audio volume with ALSA.
This class provides an interface to control the volume with PyAlsa,
with methods to get current volume, increase/decrease volume,
and toggle mute.
"""
def __init__(self, mixer_name='Master', step=5, to_qt_queue=None):
"""
Initialize the volume control.
Args:
mixer_name (str): Name of the ALSA mixer to control
step (int): Default step size for volume increments
to_qt_queue: Queue for sending messages to Qt process for OSD
"""
self.mixer_name = mixer_name
self.step = step
self.to_qt_queue = to_qt_queue
self.volume_thread = None
self.volume_lock = threading.Lock()
self._init_mixer()
def _init_mixer(self):
"""Initialize the ALSA mixer."""
try:
# Try to get the requested mixer
self.mixer = alsaaudio.Mixer(self.mixer_name)
print(f"Successfully initialized ALSA mixer: {self.mixer_name}")
except alsaaudio.ALSAAudioError as e:
print(f"Error initializing mixer '{self.mixer_name}': {e}")
# Try to get a list of available mixers
available_mixers = alsaaudio.mixers()
print(f"Available mixers: {available_mixers}")
# Try some common alternatives
fallback_mixers = ['PCM', 'Speaker', 'Master', 'Front', 'Headphone']
for mixer in fallback_mixers:
if mixer in available_mixers:
try:
self.mixer = alsaaudio.Mixer(mixer)
self.mixer_name = mixer
print(f"Using fallback mixer: {mixer}")
return
except alsaaudio.ALSAAudioError:
continue
# If we still don't have a mixer, raise an exception
raise Exception("Could not find a suitable ALSA mixer")
def get_volume(self):
"""
Get the current volume level.
Returns:
int: Current volume as a percentage (0-100)
"""
try:
# Get all channels and average them
volumes = self.mixer.getvolume()
return sum(volumes) // len(volumes)
except Exception as e:
print(f"Error getting volume: {e}")
traceback.print_exc()
return 0
def is_muted(self):
"""
Check if audio is muted.
Returns:
bool: True if muted, False otherwise
"""
try:
# Get mute state for all channels (returns list of 0/1 values)
mutes = self.mixer.getmute()
# If any channel is muted (1), consider it muted
return any(mute == 1 for mute in mutes)
except Exception as e:
print(f"Error checking mute state: {e}")
traceback.print_exc()
return False
def volume_up(self, step=None):
"""
Increase volume.
Args:
step (int, optional): Amount to increase volume by. Defaults to self.step.
Returns:
int: New volume level
"""
return self._adjust_volume(step if step is not None else self.step)
def volume_down(self, step=None):
"""
Decrease volume.
Args:
step (int, optional): Amount to decrease volume by. Defaults to self.step.
Returns:
int: New volume level
"""
return self._adjust_volume(-(step if step is not None else self.step))
def toggle_mute(self):
"""
Toggle mute state.
Returns:
bool: New mute state
"""
try:
muted = self.is_muted()
# Set all channels to the opposite of current mute state
self.mixer.setmute(0 if muted else 1)
new_mute_state = not muted
# Update OSD if queue is available
if self.to_qt_queue is not None:
self.to_qt_queue.put({
'action': 'show_volume_osd',
'volume_level': 0 if new_mute_state else self.get_volume(),
'is_muted': new_mute_state
})
return new_mute_state
except Exception as e:
print(f"Error toggling mute: {e}")
traceback.print_exc()
return self.is_muted()
def _adjust_volume(self, change):
"""
Internal method to adjust volume.
Args:
change (int): Amount to change volume by (positive or negative)
Returns:
int: New volume level
"""
with self.volume_lock:
try:
current = self.get_volume()
new_volume = max(0, min(100, current + change))
# Set new volume on all channels
self.mixer.setvolume(new_volume)
# If we were muted and increasing volume, unmute
if change > 0 and self.is_muted():
self.mixer.setmute(0)
# Update OSD if queue is available
if self.to_qt_queue is not None:
self.to_qt_queue.put({
'action': 'show_volume_osd',
'volume_level': new_volume,
'is_muted': False
})
return new_volume
except Exception as e:
print(f"Error adjusting volume: {e}")
traceback.print_exc()
return self.get_volume()

211
volume_osd.py Normal file
View File

@ -0,0 +1,211 @@
#!/usr/bin/python
"""Volume on-screen display widget for IPMPV."""
import os
import traceback
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from utils import is_wayland, osd_corner_radius
class VolumeOsdWidget(QWidget):
"""Widget for the volume on-screen display."""
def __init__(self, volume_level, width=300, height=80, close_time=2, corner_radius=int(osd_corner_radius) if osd_corner_radius is not None else 15):
"""
Initialize the volume OSD widget.
Args:
volume_level (int): Current volume level (0-100)
width (int): Width of the widget
height (int): Height of the widget
close_time (int): Time in seconds before the widget closes
corner_radius (int): Corner radius for the widget
"""
QFontDatabase.addApplicationFont('FiraSans-Regular.ttf')
QFontDatabase.addApplicationFont('FiraSans-Bold.ttf')
super().__init__()
self.volume_level = volume_level
self.orig_width = width
self.orig_height = height
self.close_time = close_time
self.corner_radius = corner_radius
# Setup window
self.setWindowTitle("Volume OSD")
self.setFixedSize(width, height)
# Check if we're running on Wayland
self.is_wayland = is_wayland
# Set appropriate window flags and size
if self.is_wayland:
# For Wayland, use fullscreen transparent approach
self.setWindowFlags(
Qt.FramelessWindowHint |
Qt.WindowStaysOnTopHint
)
# Set fullscreen size
self.screen_geometry = QApplication.desktop().screenGeometry()
self.setFixedSize(self.screen_geometry.width(), self.screen_geometry.height())
# Calculate content positioning
self.content_x = (self.screen_geometry.width() - self.orig_width) // 2
self.content_y = self.screen_geometry.height() - self.orig_height - 20 # 20px from bottom
else:
# For X11, use the original approach
self.setWindowFlags(
Qt.FramelessWindowHint |
Qt.WindowStaysOnTopHint |
Qt.X11BypassWindowManagerHint |
Qt.Tool |
Qt.ToolTip
)
self.setFixedSize(width, height)
self.content_x = 0
self.content_y = 0
# Enable transparency
self.setAttribute(Qt.WA_TranslucentBackground)
# Position window at the bottom center of the screen
self.position_window()
if self.is_wayland:
self.setAttribute(Qt.WA_TransparentForMouseEvents)
def position_window(self):
"""Position the window on the screen."""
if self.is_wayland:
# For Wayland, we just position at 0,0 (fullscreen)
self.move(0, 0)
# Ensure window stays on top
self.stay_on_top_timer = QTimer(self)
self.stay_on_top_timer.timeout.connect(lambda: self.raise_())
self.stay_on_top_timer.start(100) # Check every 100ms
else:
# For X11, center at bottom
screen_geometry = QApplication.desktop().screenGeometry()
x = (screen_geometry.width() - self.orig_width) // 2
y = screen_geometry.height() - self.orig_height - 20 # 20px from bottom
self.setGeometry(x, y, self.orig_width, self.orig_height)
# X11 specific window hints
self.setAttribute(Qt.WA_X11NetWmWindowTypeNotification)
QTimer.singleShot(100, lambda: self.move(x, y))
QTimer.singleShot(500, lambda: self.move(x, y))
# Periodically ensure window stays on top
self.stay_on_top_timer = QTimer(self)
self.stay_on_top_timer.timeout.connect(lambda: self.raise_())
self.stay_on_top_timer.start(1000) # Check every second
def paintEvent(self, a0):
"""Paint event handler."""
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing)
if self.is_wayland:
# For Wayland, we're drawing the content in the right position on a fullscreen widget
self.draw_osd_content(painter, self.content_x, self.content_y)
else:
# For X11, we're drawing directly at (0,0) since the widget is already positioned
self.draw_osd_content(painter, 0, 0)
def draw_osd_content(self, painter, x_offset, y_offset):
"""Draw the OSD content."""
try:
# Create a path for rounded rectangle background
path = QPainterPath()
path.addRoundedRect(
x_offset, y_offset,
self.orig_width, self.orig_height,
self.corner_radius, self.corner_radius
)
# Fill the rounded rectangle with semi-transparent background
painter.setPen(Qt.NoPen)
painter.setBrush(QColor(0, 50, 100, 200)) # RGBA
painter.drawPath(path)
# Setup text drawing
painter.setPen(QColor(255, 255, 255))
# Draw volume icon or symbol
font = QFont("Fira Sans", 14)
font.setBold(True)
painter.setFont(font)
# Draw volume label
painter.drawText(x_offset + 20, y_offset + 30, "Volume")
# Draw volume value
font.setPointSize(12)
painter.setFont(font)
volume_text = f"{self.volume_level}%"
painter.drawText(x_offset + self.orig_width - 60, y_offset + 30, volume_text)
# Draw volume bar background
bar_x = x_offset + 20
bar_y = y_offset + 40
bar_width = self.orig_width - 40
bar_height = 16
bg_path = QPainterPath()
bg_path.addRoundedRect(bar_x, bar_y, bar_width, bar_height, 8, 8)
painter.setPen(Qt.NoPen)
painter.setBrush(QColor(255, 255, 255, 70))
painter.drawPath(bg_path)
# Draw volume level fill
if self.volume_level > 0:
fill_width = int((bar_width * self.volume_level) / 100)
fill_path = QPainterPath()
fill_path.addRoundedRect(bar_x, bar_y, fill_width, bar_height, 8, 8)
# Determine color based on volume level
if self.volume_level <= 30:
fill_color = QColor(0, 200, 83) # Green
elif self.volume_level <= 70:
fill_color = QColor(255, 193, 7) # Yellow/Amber
else:
fill_color = QColor(255, 87, 34) # Red/Orange
painter.setBrush(fill_color)
painter.drawPath(fill_path)
except Exception as e:
print(f"Error in painting volume OSD: {e}")
traceback.print_exc()
def update_volume(self, volume_level):
"""Update the volume level displayed in the OSD."""
self.volume_level = volume_level
self.update() # Trigger repaint
def close_widget(self):
"""Close the widget."""
# Stop any active timers
if hasattr(self, 'stay_on_top_timer') and self.stay_on_top_timer.isActive():
self.stay_on_top_timer.stop()
# Close the widget
self.hide()
def start_close_timer(self, seconds=None):
"""Start a timer to close the widget."""
# Use the provided seconds or default to self.close_time
seconds = seconds if seconds is not None else self.close_time
# Cancel any existing close timer
if hasattr(self, 'close_timer') and self.close_timer.isActive():
self.close_timer.stop()
# Create and start a new timer
self.close_timer = QTimer(self)
self.close_timer.setSingleShot(True)
self.close_timer.timeout.connect(self.close_widget)
self.close_timer.start(seconds * 1000) # Convert seconds to milliseconds