#!/usr/bin/python """Flask server for IPMPV.""" import os import re import subprocess import threading import flask from flask import request, jsonify, send_from_directory, redirect, url_for, make_response from localization import localization, _ from utils import is_valid_url, change_resolution, get_current_resolution, is_wayland, get_or_create_secret_key from channel_updater import ChannelUpdater class IPMPVServer: """Flask server for IPMPV web interface.""" def __init__(self, channels, player, to_qt_queue, from_qt_queue, resolution, ipmpv_retroarch_cmd, volume_control=None, update_interval=3600): """Initialize the server.""" self.app = flask.Flask(__name__, static_folder='static', template_folder='templates') self.channels = channels self.player = player self.to_qt_queue = to_qt_queue self.from_qt_queue = from_qt_queue self.resolution = resolution self.ipmpv_retroarch_cmd = ipmpv_retroarch_cmd self.retroarch_p = None self.volume_control = volume_control self.channel_updater = ChannelUpdater(self, update_interval=update_interval) # Register routes self._register_routes() def run(self, host="0.0.0.0", port=5000): """Run the Flask server.""" self.channel_updater.start() try: self.app.run(host=host, port=port) finally: self.channel_updater.stop() def _register_routes(self): """Register Flask routes.""" @self.app.route("/switch_language/") def switch_language(language): response = make_response(redirect(request.referrer or url_for('index'))) localization.set_language(language, response) return response @self.app.route("/") def index(): return self._handle_index() @self.app.route("/play_custom") def play_custom(): return self._handle_play_custom() @self.app.route("/hide_osd") def hide_osd(): return self._handle_hide_osd() @self.app.route("/show_osd") def show_osd(): return self._handle_show_osd() @self.app.route("/channel") def switch_channel(): return self._handle_switch_channel() @self.app.route("/toggle_deinterlace") def toggle_deinterlace(): return self._handle_toggle_deinterlace() @self.app.route("/stop_player") def stop_player(): return self._handle_stop_player() @self.app.route("/toggle_retroarch") def toggle_retroarch(): return self._handle_toggle_retroarch() @self.app.route("/toggle_latency") def toggle_latency(): return self._handle_toggle_latency() @self.app.route("/toggle_resolution") def toggle_resolution(): return self._handle_toggle_resolution() @self.app.route("/volume_up") def volume_up(): return self._handle_volume_up() @self.app.route("/volume_down") def volume_down(): return self._handle_volume_down() @self.app.route("/toggle_mute") def toggle_mute(): return self._handle_toggle_mute() @self.app.route("/channel_up") def channel_up(): return self._handle_channel_up() @self.app.route("/channel_down") def channel_down(): return self._handle_channel_down() @self.app.route('/update_channels') def update_channels(): return self._handle_update_channels() @self.app.route('/manifest.json') def serve_manifest(): return send_from_directory("static", 'manifest.json', mimetype='application/manifest+json') @self.app.route('/icon512_rounded.png') def serve_rounded_icon(): return send_from_directory("static", 'icon512_rounded.png', mimetype='image/png') @self.app.route('/icon512_maskable.png') def serve_maskable_icon(): return send_from_directory("static", 'icon512_maskable.png', mimetype='image/png') @self.app.route('/screenshot1.png') def serve_screenshot_1(): return send_from_directory("static", 'screenshot1.png', mimetype='image/png') @self.app.route('/favicon.ico') def serve_favicon(): return send_from_directory("static", 'favicon.ico', mimetype='image/vnd.microsoft.icon') def _handle_index(self): """Handle the index route.""" from channels import group_channels grouped_channels = group_channels(self.channels) flat_channel_list = [channel for channel in self.channels] # Create the channel groups HTML channel_groups_html = "" for group, ch_list in grouped_channels.items(): # Translate group name if it's a common group translated_group = _(group.lower()) if group.lower() in ["other"] else group channel_groups_html += f'
{translated_group}' for channel in ch_list: index = flat_channel_list.index(channel) # Get correct global index channel_groups_html += f'''
''' channel_groups_html += '
' # Get current language and available languages for language selector current_language = localization.get_language() languages = { 'en': 'English', 'es': 'EspaƱol' # Add more languages here as you support them } language_selector_html = "" for code, name in languages.items(): selected = ' selected' if code == current_language else '' language_selector_html += f'' # Replace placeholders with actual values, using translation html = open("templates/index.html").read() html = html.replace("%WELCOME_TEXT%", _("welcome_to_ipmpv")) html = html.replace("%CURRENT_CHANNEL_LABEL%", _("current_channel")) html = html.replace("%CURRENT_CHANNEL%", self.channels[self.player.current_index]['name'] if self.player.current_index is not None else "None") html = html.replace("%RETROARCH_STATE%", "ON" if self.retroarch_p and self.retroarch_p.poll() is None else "OFF") html = html.replace("%RETROARCH_LABEL%", _("stop_retroarch") if self.retroarch_p and self.retroarch_p.poll() is None else _("start_retroarch")) html = html.replace("%DEINTERLACE_LABEL%", _("deinterlacing")) html = html.replace("%DEINTERLACE_STATE%", _("on") if self.player.deinterlace else _("off")) html = html.replace("%RESOLUTION_LABEL%", _("resolution")) html = html.replace("%RESOLUTION%", self.resolution) html = html.replace("%REFRESH_CH_LABEL%", _("refresh_channels")) html = html.replace("%LATENCY_STATE%", "ON" if self.player.low_latency else "OFF") html = html.replace("%LATENCY_LABEL%", _("latency_low") if self.player.low_latency else _("latency_high")) html = html.replace("%CHANNEL_GROUPS%", channel_groups_html) html = html.replace("%VOLUME_LABEL%", _("volume")) html = html.replace("%MUTE_LABEL%", _("mute")) html = html.replace("%TOGGLE_OSD_LABEL%", _("toggle_osd")) html = html.replace("%ON_LABEL%", _("on")) html = html.replace("%OFF_LABEL%", _("off")) html = html.replace("%PLAY_CUSTOM_URL_LABEL%", _("play_custom_url")) html = html.replace("%ENTER_URL_PLACEHOLDER%", _("enter_stream_url")) html = html.replace("%PLAY_LABEL%", _("play")) html = html.replace("%ALL_CHANNELS_LABEL%", _("all_channels")) html = html.replace("%STOP_LABEL%", _("stop")) html = html.replace("%LANGUAGE_SELECTOR%", language_selector_html) html = html.replace("%JS_LOADING%", _("loading")) html = html.replace("%JS_NOW_PLAYING%", _("now_playing")) html = html.replace("%JS_ERROR%", _("error")) html = html.replace("%JS_CONNECTION_ERROR%", _("connection_error")) html = html.replace("%JS_LOADING_CHANNEL%", _("loading_channel")) html = html.replace("%JS_ERROR_LOADING_CHANNEL%", _("error_loading_channel")) html = html.replace("%JS_VOLUME_LEVEL%", _("volume_level")) html = html.replace("%JS_MUTED_YES%", _("muted_yes")) html = html.replace("%JS_MUTED_NO%", _("muted_no")) html = html.replace("%JS_LATENCY_LOW%", _("latency_low")) html = html.replace("%JS_LATENCY_HIGH%", _("latency_high")) html = html.replace("%JS_STOP_RETROARCH%", _("stop_retroarch")) html = html.replace("%JS_START_RETROARCH%", _("start_retroarch")) html = html.replace("%JS_ON_LABEL%", _("on")) html = html.replace("%JS_OFF_LABEL%", _("off")) html = html.replace("%JS_CH_UPDATE_P1%", _("channels_updated_p1")) html = html.replace("%JS_CH_UPDATE_P2%", _("channels_updated_p2")) html = html.replace("%JS_CH_UPDATE_FAIL%", _("channel_update_failed")) return html def _handle_play_custom(self): """Handle the play_custom route.""" url = request.args.get("url") if not url or not is_valid_url(url): return jsonify(success=False, error=_("invalid_url")) self.player.player.loadfile(url) self.player.current_index = None return jsonify(success=True) def _handle_hide_osd(self): """Handle the hide_osd route.""" self.to_qt_queue.put({ 'action': 'close_osd', }) return "", 204 def _handle_show_osd(self): """Handle the show_osd route.""" if self.player.current_index is not None: channel_info = { "name": self.channels[self.player.current_index]["name"], "deinterlace": self.player.deinterlace, "low_latency": self.player.low_latency, "logo": self.channels[self.player.current_index]["logo"] } self.to_qt_queue.put({ 'action': 'show_osd', 'channel_info': channel_info }) self.to_qt_queue.put({ 'action': 'update_codecs', 'vcodec': self.player.vcodec, 'acodec': self.player.acodec, 'video_res': self.player.video_res, 'interlaced': self.player.interlaced }) return "", 204 def _handle_switch_channel(self): """Handle the switch_channel route.""" self.player.stop() index = int(request.args.get("index", self.player.current_index)) thread = threading.Thread( target=self.player.play_channel, args=(index,self.channels), daemon=True ) thread.start() return "", 204 def _handle_channel_up(self): """Handle the channel_up route.""" index = self.player.current_index + 1 if self.player.current_index is not None else 0; thread = threading.Thread( target=self.player.play_channel, args=(index,self.channels), daemon=True ) thread.start() return "", 204 def _handle_channel_down(self): """Handle the channel_down route.""" index = self.player.current_index - 1 if self.player.current_index is not None else -1; thread = threading.Thread( target=self.player.play_channel, args=(index,self.channels), daemon=True ) thread.start() return "", 204 def _handle_toggle_deinterlace(self): """Handle the toggle_deinterlace route.""" state = self.player.toggle_deinterlace() return jsonify(state=state) def _handle_stop_player(self): """Handle the stop_player route.""" self.to_qt_queue.put({ 'action': 'close_osd', }) self.player.stop() return "", 204 def _handle_toggle_retroarch(self): """Handle the toggle_retroarch route.""" retroarch_pid = subprocess.run(["pgrep", "-fx", "retroarch"], stdout=subprocess.PIPE).stdout.strip() if retroarch_pid: print("Retroarch already open. Trying to close it.") subprocess.run(["kill", retroarch_pid]) if self.retroarch_p: self.retroarch_p.terminate() return jsonify(state=False) else: print("Launching RetroArch") retroarch_env = os.environ.copy() retroarch_env["MESA_GL_VERSION_OVERRIDE"] = "3.3" self.retroarch_p = subprocess.Popen(re.split("\\s", self.ipmpv_retroarch_cmd if self.ipmpv_retroarch_cmd is not None else 'retroarch'), env=retroarch_env) return jsonify(state=True) def _handle_toggle_latency(self): """Handle the toggle_latency route.""" state = self.player.toggle_latency() return jsonify(state=state) def _handle_toggle_resolution(self): """Handle the toggle_resolution route.""" self.resolution = change_resolution(self.resolution) return jsonify(res=self.resolution) def _handle_volume_up(self): """Handle the volume_up route.""" if self.volume_control: step = request.args.get("step") step = int(step) if step and step.isdigit() else None new_volume = self.volume_control.volume_up(step) return jsonify(volume=new_volume, muted=self.volume_control.is_muted()) return jsonify(error="Volume control not available"), 404 def _handle_volume_down(self): """Handle the volume_down route.""" if self.volume_control: step = request.args.get("step") step = int(step) if step and step.isdigit() else None new_volume = self.volume_control.volume_down(step) return jsonify(volume=new_volume, muted=self.volume_control.is_muted()) return jsonify(error="Volume control not available"), 404 def _handle_toggle_mute(self): """Handle the toggle_mute route.""" if self.volume_control: is_muted = self.volume_control.toggle_mute() volume = self.volume_control.get_volume() return jsonify(muted=is_muted, volume=volume) return jsonify(error="Volume control not available"), 404 def _handle_get_volume(self): """Handle the get_volume route.""" if self.volume_control: volume = self.volume_control.get_volume() is_muted = self.volume_control.is_muted() return jsonify(volume=volume, muted=is_muted) return jsonify(error="Volume control not available"), 404 def _handle_update_channels(self): """Handle the update_channels route.""" success = self.channel_updater.force_update() return jsonify(success=success, channel_count=len(self.channels) if success else None)