Now works directly with blehd socket
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,2 +1,3 @@
|
|||||||
.venv
|
.venv
|
||||||
|
__pycache__
|
||||||
bleh
|
bleh
|
||||||
|
|||||||
245
app.py
245
app.py
@@ -1,8 +1,8 @@
|
|||||||
import io
|
import io
|
||||||
import tempfile
|
import tempfile
|
||||||
import subprocess
|
|
||||||
import requests
|
import requests
|
||||||
import base64
|
import base64
|
||||||
|
import socket
|
||||||
from flask import Flask, render_template, request, send_from_directory, session
|
from flask import Flask, render_template, request, send_from_directory, session
|
||||||
from PIL import Image, ImageDraw, ImageFont
|
from PIL import Image, ImageDraw, ImageFont
|
||||||
import os
|
import os
|
||||||
@@ -24,10 +24,189 @@ HEADER_SIZE_3 = 28
|
|||||||
BANNER_FONT_SIZE = 300
|
BANNER_FONT_SIZE = 300
|
||||||
IMAGE_WIDTH = 384
|
IMAGE_WIDTH = 384
|
||||||
BULLET_CHAR = "• "
|
BULLET_CHAR = "• "
|
||||||
|
MIN_LINES = 86
|
||||||
|
BLEHD_SOCKET = os.environ.get("BLEHD_SOCKET", "/run/bleh/blehd.sock")
|
||||||
|
|
||||||
with open('static/translations.json', 'r', encoding='utf-8') as f:
|
with open('static/translations.json', 'r', encoding='utf-8') as f:
|
||||||
TRANSLATIONS = json.load(f)
|
TRANSLATIONS = json.load(f)
|
||||||
|
|
||||||
|
|
||||||
|
def _dither_none(gray_img, levels):
|
||||||
|
if levels == 2:
|
||||||
|
return gray_img.point(lambda p: 0 if p < 128 else 255, mode="L")
|
||||||
|
step = 255 / (levels - 1)
|
||||||
|
return gray_img.point(lambda p: int(round(p / step) * step), mode="L")
|
||||||
|
|
||||||
|
|
||||||
|
def _dither_ordered(gray_img, matrix, levels, strength=1.0):
|
||||||
|
src = gray_img.load()
|
||||||
|
w, h = gray_img.size
|
||||||
|
out = Image.new("L", (w, h), 255)
|
||||||
|
dst = out.load()
|
||||||
|
n = len(matrix)
|
||||||
|
step = 255 / (levels - 1)
|
||||||
|
for y in range(h):
|
||||||
|
for x in range(w):
|
||||||
|
t = (matrix[y % n][x % n] / (n * n) - 0.5) * step * strength
|
||||||
|
v = max(0, min(255, int(src[x, y] + t)))
|
||||||
|
q = int(round(v / step) * step)
|
||||||
|
dst[x, y] = q
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _dither_error_diffusion(gray_img, levels, matrix, divisor):
|
||||||
|
w, h = gray_img.size
|
||||||
|
buf = [[float(gray_img.getpixel((x, y))) for x in range(w)] for y in range(h)]
|
||||||
|
step = 255 / (levels - 1)
|
||||||
|
for y in range(h):
|
||||||
|
for x in range(w):
|
||||||
|
old = buf[y][x]
|
||||||
|
new = round(old / step) * step
|
||||||
|
new = max(0, min(255, new))
|
||||||
|
err = old - new
|
||||||
|
buf[y][x] = new
|
||||||
|
for dx, dy, weight in matrix:
|
||||||
|
nx = x + dx
|
||||||
|
ny = y + dy
|
||||||
|
if 0 <= nx < w and 0 <= ny < h:
|
||||||
|
buf[ny][nx] += err * (weight / divisor)
|
||||||
|
out = Image.new("L", (w, h), 255)
|
||||||
|
for y in range(h):
|
||||||
|
for x in range(w):
|
||||||
|
out.putpixel((x, y), int(max(0, min(255, round(buf[y][x])))))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _build_bayer(n):
|
||||||
|
m = [[0, 2], [3, 1]]
|
||||||
|
size = 2
|
||||||
|
while size < n:
|
||||||
|
prev = m
|
||||||
|
size *= 2
|
||||||
|
m = [[0] * size for _ in range(size)]
|
||||||
|
for y in range(size // 2):
|
||||||
|
for x in range(size // 2):
|
||||||
|
v = prev[y][x] * 4
|
||||||
|
m[y][x] = v + 0
|
||||||
|
m[y][x + size // 2] = v + 2
|
||||||
|
m[y + size // 2][x] = v + 3
|
||||||
|
m[y + size // 2][x + size // 2] = v + 1
|
||||||
|
return m
|
||||||
|
|
||||||
|
|
||||||
|
def apply_dither(gray_img, dither_type, levels):
|
||||||
|
if dither_type == "none":
|
||||||
|
return _dither_none(gray_img, levels)
|
||||||
|
if dither_type == "floyd":
|
||||||
|
# x+1,y ; x-1,y+1 ; x,y+1 ; x+1,y+1
|
||||||
|
fs = [(1, 0, 7), (-1, 1, 3), (0, 1, 5), (1, 1, 1)]
|
||||||
|
return _dither_error_diffusion(gray_img, levels, fs, 16)
|
||||||
|
if dither_type == "atkinson":
|
||||||
|
at = [(1, 0, 1), (2, 0, 1), (-1, 1, 1), (0, 1, 1), (1, 1, 1), (0, 2, 1)]
|
||||||
|
return _dither_error_diffusion(gray_img, levels, at, 8)
|
||||||
|
if dither_type == "jjn":
|
||||||
|
# Jarvis-Judice-Ninke kernel centered at current pixel
|
||||||
|
jjn = [
|
||||||
|
(1, 0, 7), (2, 0, 5),
|
||||||
|
(-2, 1, 3), (-1, 1, 5), (0, 1, 7), (1, 1, 5), (2, 1, 3),
|
||||||
|
(-2, 2, 1), (-1, 2, 3), (0, 2, 5), (1, 2, 3), (2, 2, 1),
|
||||||
|
]
|
||||||
|
return _dither_error_diffusion(gray_img, levels, jjn, 48)
|
||||||
|
if dither_type == "bayer2x2":
|
||||||
|
return _dither_ordered(gray_img, _build_bayer(2), levels, strength=1.0 if levels == 2 else 0.2)
|
||||||
|
if dither_type == "bayer4x4":
|
||||||
|
return _dither_ordered(gray_img, _build_bayer(4), levels, strength=1.0 if levels == 2 else 0.2)
|
||||||
|
if dither_type == "bayer8x8":
|
||||||
|
return _dither_ordered(gray_img, _build_bayer(8), levels, strength=1.0 if levels == 2 else 0.2)
|
||||||
|
if dither_type == "bayer16x16":
|
||||||
|
return _dither_ordered(gray_img, _build_bayer(16), levels, strength=1.0 if levels == 2 else 0.2)
|
||||||
|
return _dither_none(gray_img, levels)
|
||||||
|
|
||||||
|
|
||||||
|
def preprocess_for_mode(img, dithering, mode):
|
||||||
|
img = remove_alpha(img)
|
||||||
|
if img.width <= 0 or img.height <= 0:
|
||||||
|
raise ValueError("Invalid image dimensions")
|
||||||
|
|
||||||
|
ratio = img.width / img.height
|
||||||
|
height = int(IMAGE_WIDTH / ratio)
|
||||||
|
if height <= 0:
|
||||||
|
raise ValueError("Computed invalid image height")
|
||||||
|
|
||||||
|
img = img.resize((IMAGE_WIDTH, height), Image.Resampling.LANCZOS).convert("L")
|
||||||
|
levels = 2 if mode == "1bpp" else 16
|
||||||
|
out = apply_dither(img, dithering, levels)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def pack_pixels(img, mode):
|
||||||
|
width, height = img.size
|
||||||
|
if width != IMAGE_WIDTH:
|
||||||
|
img = img.resize((IMAGE_WIDTH, height), Image.Resampling.LANCZOS)
|
||||||
|
width = IMAGE_WIDTH
|
||||||
|
|
||||||
|
if mode == "1bpp":
|
||||||
|
packed = bytearray((width * height) // 8)
|
||||||
|
px = img.load()
|
||||||
|
for y in range(height):
|
||||||
|
for x in range(width):
|
||||||
|
if px[x, y] < 128:
|
||||||
|
idx = (y * width + x) // 8
|
||||||
|
packed[idx] |= 1 << (x % 8)
|
||||||
|
return bytes(packed), height
|
||||||
|
|
||||||
|
# 4bpp
|
||||||
|
packed = bytearray((width * height) // 2)
|
||||||
|
px = img.load()
|
||||||
|
for y in range(height):
|
||||||
|
for x in range(width):
|
||||||
|
gray = px[x, y]
|
||||||
|
level = (255 - gray) >> 4
|
||||||
|
idx = (y * width + x) >> 1
|
||||||
|
shift = ((x & 1) ^ 1) << 2
|
||||||
|
packed[idx] |= (level & 0x0F) << shift
|
||||||
|
return bytes(packed), height
|
||||||
|
|
||||||
|
|
||||||
|
def image_from_url(url, dithering, mode):
|
||||||
|
resp = requests.get(url, stream=True, timeout=20)
|
||||||
|
resp.raise_for_status()
|
||||||
|
img = Image.open(io.BytesIO(resp.content))
|
||||||
|
return preprocess_for_mode(img, dithering, mode)
|
||||||
|
|
||||||
|
|
||||||
|
def image_from_bytes(image_bytes, dithering, mode):
|
||||||
|
img = Image.open(io.BytesIO(image_bytes))
|
||||||
|
return preprocess_for_mode(img, dithering, mode)
|
||||||
|
|
||||||
|
|
||||||
|
def pad_image_to_min_lines(img):
|
||||||
|
if img.height >= MIN_LINES:
|
||||||
|
return img
|
||||||
|
out = Image.new("L", (img.width, MIN_LINES), 255)
|
||||||
|
out.paste(img, (0, 0))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def blehd_call(method, params=None, socket_path=BLEHD_SOCKET, timeout=90):
|
||||||
|
req = {"id": "1", "method": method, "params": params or {}}
|
||||||
|
data = (json.dumps(req, ensure_ascii=False) + "\n").encode("utf-8")
|
||||||
|
|
||||||
|
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
|
||||||
|
s.settimeout(timeout)
|
||||||
|
s.connect(socket_path)
|
||||||
|
s.sendall(data)
|
||||||
|
f = s.makefile("rb")
|
||||||
|
line = f.readline()
|
||||||
|
|
||||||
|
if not line:
|
||||||
|
raise RuntimeError("No response from blehd")
|
||||||
|
resp = json.loads(line.decode("utf-8"))
|
||||||
|
if not resp.get("ok"):
|
||||||
|
err = resp.get("error") or {}
|
||||||
|
raise RuntimeError(err.get("message", "Unknown blehd error"))
|
||||||
|
return resp.get("result")
|
||||||
|
|
||||||
def remove_uploaded_img():
|
def remove_uploaded_img():
|
||||||
path = session.pop('uploaded_img_path', None)
|
path = session.pop('uploaded_img_path', None)
|
||||||
if path and os.path.exists(path):
|
if path and os.path.exists(path):
|
||||||
@@ -50,45 +229,11 @@ def remove_alpha(img):
|
|||||||
return img
|
return img
|
||||||
|
|
||||||
def bleh_image_from_url(url, dithering, mode):
|
def bleh_image_from_url(url, dithering, mode):
|
||||||
resp = requests.get(url, stream=True)
|
return image_from_url(url, dithering, mode)
|
||||||
resp.raise_for_status()
|
|
||||||
img = Image.open(io.BytesIO(resp.content))
|
|
||||||
img = remove_alpha(img)
|
|
||||||
buf = io.BytesIO()
|
|
||||||
img.save(buf, format="PNG")
|
|
||||||
image_bytes_no_alpha = buf.getvalue()
|
|
||||||
bleh = subprocess.Popen(
|
|
||||||
["./bleh", "-o", "-", "-mode", f"{mode}", "-d", f"{dithering}", "-"],
|
|
||||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
||||||
out, err = bleh.communicate(image_bytes_no_alpha)
|
|
||||||
if bleh.returncode != 0:
|
|
||||||
raise RuntimeError(f"Driver failed: {err.decode()}")
|
|
||||||
img = Image.open(io.BytesIO(out)).convert("L")
|
|
||||||
# Optionally check width, pad/resize if needed
|
|
||||||
if img.width != IMAGE_WIDTH:
|
|
||||||
img = img.resize((IMAGE_WIDTH, img.height), Image.LANCZOS)
|
|
||||||
return img
|
|
||||||
|
|
||||||
|
|
||||||
def bleh_image_from_bytes(image_bytes, dithering, mode):
|
def bleh_image_from_bytes(image_bytes, dithering, mode):
|
||||||
# OPEN the uploaded image and remove alpha
|
return image_from_bytes(image_bytes, dithering, mode)
|
||||||
img = Image.open(io.BytesIO(image_bytes))
|
|
||||||
img = remove_alpha(img)
|
|
||||||
buf = io.BytesIO()
|
|
||||||
img.save(buf, format="PNG")
|
|
||||||
image_bytes_no_alpha = buf.getvalue()
|
|
||||||
# pass that to bleh
|
|
||||||
bleh = subprocess.Popen(
|
|
||||||
["./bleh", "-o", "-", "-mode", f"{mode}", "-d", f"{dithering}", "-"],
|
|
||||||
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
||||||
)
|
|
||||||
out, err = bleh.communicate(image_bytes_no_alpha)
|
|
||||||
if bleh.returncode != 0:
|
|
||||||
raise RuntimeError(f"Driver failed: {err.decode()}")
|
|
||||||
img = Image.open(io.BytesIO(out)).convert("L")
|
|
||||||
if img.width != IMAGE_WIDTH:
|
|
||||||
img = img.resize((IMAGE_WIDTH, img.height), Image.LANCZOS)
|
|
||||||
return img
|
|
||||||
|
|
||||||
def rotate_image_bytes(image_bytes, rotation):
|
def rotate_image_bytes(image_bytes, rotation):
|
||||||
if rotation == 0:
|
if rotation == 0:
|
||||||
@@ -445,25 +590,21 @@ def index():
|
|||||||
error = "Intensity must be a number between 0 and 100"
|
error = "Intensity must be a number between 0 and 100"
|
||||||
intensity = 85
|
intensity = 85
|
||||||
session['intensity'] = intensity
|
session['intensity'] = intensity
|
||||||
# If print button pressed, send to driver
|
# If print button pressed, send to blehd directly via Unix socket
|
||||||
if "print" in request.form:
|
if "print" in request.form:
|
||||||
try:
|
try:
|
||||||
with tempfile.NamedTemporaryFile(suffix=".png", delete=True) as tmpfile:
|
printable = pad_image_to_min_lines(image)
|
||||||
image.save(tmpfile, format="PNG")
|
pixels, height = pack_pixels(printable, printmode)
|
||||||
tmpfile.flush()
|
params = {
|
||||||
# Run the bleh command
|
"mode": printmode,
|
||||||
result = subprocess.run([
|
"intensity": intensity,
|
||||||
"./bleh",
|
"height": height,
|
||||||
"-mode", f"{printmode}",
|
"pixels_b64": base64.b64encode(pixels).decode("ascii"),
|
||||||
"-intensity", f"{intensity}",
|
}
|
||||||
tmpfile.name
|
blehd_call("print.start", params)
|
||||||
], capture_output=True, text=True, timeout=90)
|
|
||||||
if result.returncode != 0:
|
|
||||||
error = f"Printer error: {result.stderr or result.stdout}"
|
|
||||||
else:
|
|
||||||
printed = True
|
printed = True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error = f"Failed to print: {e}"
|
error = f"Failed to print via blehd socket: {e}"
|
||||||
return render_template(
|
return render_template(
|
||||||
"index.html",
|
"index.html",
|
||||||
dithering_modes=dithering_modes,
|
dithering_modes=dithering_modes,
|
||||||
|
|||||||
Reference in New Issue
Block a user