diff --git a/.gitignore b/.gitignore index 2b4e390..b7caf63 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .venv +__pycache__ bleh diff --git a/app.py b/app.py index bf2eeed..dc943cf 100644 --- a/app.py +++ b/app.py @@ -1,8 +1,8 @@ import io import tempfile -import subprocess import requests import base64 +import socket from flask import Flask, render_template, request, send_from_directory, session from PIL import Image, ImageDraw, ImageFont import os @@ -24,10 +24,189 @@ HEADER_SIZE_3 = 28 BANNER_FONT_SIZE = 300 IMAGE_WIDTH = 384 BULLET_CHAR = "• " +MIN_LINES = 86 +BLEHD_SOCKET = os.environ.get("BLEHD_SOCKET", "/run/bleh/blehd.sock") with open('static/translations.json', 'r', encoding='utf-8') as f: TRANSLATIONS = json.load(f) + +def _dither_none(gray_img, levels): + if levels == 2: + return gray_img.point(lambda p: 0 if p < 128 else 255, mode="L") + step = 255 / (levels - 1) + return gray_img.point(lambda p: int(round(p / step) * step), mode="L") + + +def _dither_ordered(gray_img, matrix, levels, strength=1.0): + src = gray_img.load() + w, h = gray_img.size + out = Image.new("L", (w, h), 255) + dst = out.load() + n = len(matrix) + step = 255 / (levels - 1) + for y in range(h): + for x in range(w): + t = (matrix[y % n][x % n] / (n * n) - 0.5) * step * strength + v = max(0, min(255, int(src[x, y] + t))) + q = int(round(v / step) * step) + dst[x, y] = q + return out + + +def _dither_error_diffusion(gray_img, levels, matrix, divisor): + w, h = gray_img.size + buf = [[float(gray_img.getpixel((x, y))) for x in range(w)] for y in range(h)] + step = 255 / (levels - 1) + for y in range(h): + for x in range(w): + old = buf[y][x] + new = round(old / step) * step + new = max(0, min(255, new)) + err = old - new + buf[y][x] = new + for dx, dy, weight in matrix: + nx = x + dx + ny = y + dy + if 0 <= nx < w and 0 <= ny < h: + buf[ny][nx] += err * (weight / divisor) + out = Image.new("L", (w, h), 255) + for y in range(h): + for x in range(w): + out.putpixel((x, y), int(max(0, min(255, round(buf[y][x]))))) + return out + + +def _build_bayer(n): + m = [[0, 2], [3, 1]] + size = 2 + while size < n: + prev = m + size *= 2 + m = [[0] * size for _ in range(size)] + for y in range(size // 2): + for x in range(size // 2): + v = prev[y][x] * 4 + m[y][x] = v + 0 + m[y][x + size // 2] = v + 2 + m[y + size // 2][x] = v + 3 + m[y + size // 2][x + size // 2] = v + 1 + return m + + +def apply_dither(gray_img, dither_type, levels): + if dither_type == "none": + return _dither_none(gray_img, levels) + if dither_type == "floyd": + # x+1,y ; x-1,y+1 ; x,y+1 ; x+1,y+1 + fs = [(1, 0, 7), (-1, 1, 3), (0, 1, 5), (1, 1, 1)] + return _dither_error_diffusion(gray_img, levels, fs, 16) + if dither_type == "atkinson": + at = [(1, 0, 1), (2, 0, 1), (-1, 1, 1), (0, 1, 1), (1, 1, 1), (0, 2, 1)] + return _dither_error_diffusion(gray_img, levels, at, 8) + if dither_type == "jjn": + # Jarvis-Judice-Ninke kernel centered at current pixel + jjn = [ + (1, 0, 7), (2, 0, 5), + (-2, 1, 3), (-1, 1, 5), (0, 1, 7), (1, 1, 5), (2, 1, 3), + (-2, 2, 1), (-1, 2, 3), (0, 2, 5), (1, 2, 3), (2, 2, 1), + ] + return _dither_error_diffusion(gray_img, levels, jjn, 48) + if dither_type == "bayer2x2": + return _dither_ordered(gray_img, _build_bayer(2), levels, strength=1.0 if levels == 2 else 0.2) + if dither_type == "bayer4x4": + return _dither_ordered(gray_img, _build_bayer(4), levels, strength=1.0 if levels == 2 else 0.2) + if dither_type == "bayer8x8": + return _dither_ordered(gray_img, _build_bayer(8), levels, strength=1.0 if levels == 2 else 0.2) + if dither_type == "bayer16x16": + return _dither_ordered(gray_img, _build_bayer(16), levels, strength=1.0 if levels == 2 else 0.2) + return _dither_none(gray_img, levels) + + +def preprocess_for_mode(img, dithering, mode): + img = remove_alpha(img) + if img.width <= 0 or img.height <= 0: + raise ValueError("Invalid image dimensions") + + ratio = img.width / img.height + height = int(IMAGE_WIDTH / ratio) + if height <= 0: + raise ValueError("Computed invalid image height") + + img = img.resize((IMAGE_WIDTH, height), Image.Resampling.LANCZOS).convert("L") + levels = 2 if mode == "1bpp" else 16 + out = apply_dither(img, dithering, levels) + return out + + +def pack_pixels(img, mode): + width, height = img.size + if width != IMAGE_WIDTH: + img = img.resize((IMAGE_WIDTH, height), Image.Resampling.LANCZOS) + width = IMAGE_WIDTH + + if mode == "1bpp": + packed = bytearray((width * height) // 8) + px = img.load() + for y in range(height): + for x in range(width): + if px[x, y] < 128: + idx = (y * width + x) // 8 + packed[idx] |= 1 << (x % 8) + return bytes(packed), height + + # 4bpp + packed = bytearray((width * height) // 2) + px = img.load() + for y in range(height): + for x in range(width): + gray = px[x, y] + level = (255 - gray) >> 4 + idx = (y * width + x) >> 1 + shift = ((x & 1) ^ 1) << 2 + packed[idx] |= (level & 0x0F) << shift + return bytes(packed), height + + +def image_from_url(url, dithering, mode): + resp = requests.get(url, stream=True, timeout=20) + resp.raise_for_status() + img = Image.open(io.BytesIO(resp.content)) + return preprocess_for_mode(img, dithering, mode) + + +def image_from_bytes(image_bytes, dithering, mode): + img = Image.open(io.BytesIO(image_bytes)) + return preprocess_for_mode(img, dithering, mode) + + +def pad_image_to_min_lines(img): + if img.height >= MIN_LINES: + return img + out = Image.new("L", (img.width, MIN_LINES), 255) + out.paste(img, (0, 0)) + return out + + +def blehd_call(method, params=None, socket_path=BLEHD_SOCKET, timeout=90): + req = {"id": "1", "method": method, "params": params or {}} + data = (json.dumps(req, ensure_ascii=False) + "\n").encode("utf-8") + + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: + s.settimeout(timeout) + s.connect(socket_path) + s.sendall(data) + f = s.makefile("rb") + line = f.readline() + + if not line: + raise RuntimeError("No response from blehd") + resp = json.loads(line.decode("utf-8")) + if not resp.get("ok"): + err = resp.get("error") or {} + raise RuntimeError(err.get("message", "Unknown blehd error")) + return resp.get("result") + def remove_uploaded_img(): path = session.pop('uploaded_img_path', None) if path and os.path.exists(path): @@ -50,45 +229,11 @@ def remove_alpha(img): return img def bleh_image_from_url(url, dithering, mode): - resp = requests.get(url, stream=True) - resp.raise_for_status() - img = Image.open(io.BytesIO(resp.content)) - img = remove_alpha(img) - buf = io.BytesIO() - img.save(buf, format="PNG") - image_bytes_no_alpha = buf.getvalue() - bleh = subprocess.Popen( - ["./bleh", "-o", "-", "-mode", f"{mode}", "-d", f"{dithering}", "-"], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out, err = bleh.communicate(image_bytes_no_alpha) - if bleh.returncode != 0: - raise RuntimeError(f"Driver failed: {err.decode()}") - img = Image.open(io.BytesIO(out)).convert("L") - # Optionally check width, pad/resize if needed - if img.width != IMAGE_WIDTH: - img = img.resize((IMAGE_WIDTH, img.height), Image.LANCZOS) - return img + return image_from_url(url, dithering, mode) def bleh_image_from_bytes(image_bytes, dithering, mode): - # OPEN the uploaded image and remove alpha - img = Image.open(io.BytesIO(image_bytes)) - img = remove_alpha(img) - buf = io.BytesIO() - img.save(buf, format="PNG") - image_bytes_no_alpha = buf.getvalue() - # pass that to bleh - bleh = subprocess.Popen( - ["./bleh", "-o", "-", "-mode", f"{mode}", "-d", f"{dithering}", "-"], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - out, err = bleh.communicate(image_bytes_no_alpha) - if bleh.returncode != 0: - raise RuntimeError(f"Driver failed: {err.decode()}") - img = Image.open(io.BytesIO(out)).convert("L") - if img.width != IMAGE_WIDTH: - img = img.resize((IMAGE_WIDTH, img.height), Image.LANCZOS) - return img + return image_from_bytes(image_bytes, dithering, mode) def rotate_image_bytes(image_bytes, rotation): if rotation == 0: @@ -445,25 +590,21 @@ def index(): error = "Intensity must be a number between 0 and 100" intensity = 85 session['intensity'] = intensity - # If print button pressed, send to driver + # If print button pressed, send to blehd directly via Unix socket if "print" in request.form: try: - with tempfile.NamedTemporaryFile(suffix=".png", delete=True) as tmpfile: - image.save(tmpfile, format="PNG") - tmpfile.flush() - # Run the bleh command - result = subprocess.run([ - "./bleh", - "-mode", f"{printmode}", - "-intensity", f"{intensity}", - tmpfile.name - ], capture_output=True, text=True, timeout=90) - if result.returncode != 0: - error = f"Printer error: {result.stderr or result.stdout}" - else: - printed = True + printable = pad_image_to_min_lines(image) + pixels, height = pack_pixels(printable, printmode) + params = { + "mode": printmode, + "intensity": intensity, + "height": height, + "pixels_b64": base64.b64encode(pixels).decode("ascii"), + } + blehd_call("print.start", params) + printed = True except Exception as e: - error = f"Failed to print: {e}" + error = f"Failed to print via blehd socket: {e}" return render_template( "index.html", dithering_modes=dithering_modes,