Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ indent_size = 2
insert_final_newline = true
trim_trailing_whitespace = true

[*.py]
indent_size = 4

[*.{ts,js}]
quote_type = single

Expand Down
2 changes: 2 additions & 0 deletions machine-learning/immich_ml/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class Settings(BaseSettings):
max_batch_size: MaxBatchSize | None = None
openvino_precision: ModelPrecision = ModelPrecision.FP32
rocm_precision: ModelPrecision = ModelPrecision.FP32
ocr_detection_static_size: int | None = None
ocr_recognition_static_width: int | None = None

@property
def device_id(self) -> str:
Expand Down
85 changes: 71 additions & 14 deletions machine-learning/immich_ml/models/ocr/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from rapidocr.utils.typings import EngineType, LangDet, OCRVersion, TaskType
from rapidocr.utils.typings import ModelType as RapidModelType

from immich_ml.config import log
from immich_ml.config import log, settings
from immich_ml.models.base import InferenceModel
from immich_ml.schemas import ModelFormat, ModelSession, ModelTask, ModelType
from immich_ml.sessions.ort import OrtSession
Expand Down Expand Up @@ -67,8 +67,15 @@ def _predict(self, inputs: Image.Image) -> TextDetectionOutput:
w, h = inputs.size
if w < 32 or h < 32:
return self._empty
out = self.session.run(None, {"x": self._transform(inputs)})[0]
boxes, scores = self.postprocess(out, (h, w))
transformed, letterbox = self._transform(inputs)
out = self.session.run(None, {"x": transformed})[0]
if letterbox is None:
boxes, scores = self.postprocess(out, (h, w))
else:
boxes, scores = self.postprocess(out, (letterbox["padded_size"], letterbox["padded_size"]))
if len(boxes) == 0:
return self._empty
boxes = self._undo_letterbox(boxes, letterbox, h, w)
if len(boxes) == 0:
return self._empty
return {
Expand All @@ -77,25 +84,75 @@ def _predict(self, inputs: Image.Image) -> TextDetectionOutput:
}

# adapted from RapidOCR
def _transform(self, img: Image.Image) -> NDArray[np.float32]:
if img.height < img.width:
ratio = float(self.max_resolution) / img.height
def _transform(self, img: Image.Image) -> tuple[NDArray[np.float32], dict[str, float | int] | None]:
if (static_size := settings.ocr_detection_static_size) is not None:
if static_size <= 0:
log.warning("Ignoring ocr_detection_static_size=%s; must be > 0.", static_size)
static_size = None
elif static_size % 32 != 0:
adjusted = max(32, ((static_size + 16) // 32) * 32)
log.warning(
"Adjusting ocr_detection_static_size=%s to %s to keep it a multiple of 32.",
static_size,
adjusted,
)
static_size = adjusted
if static_size is not None:
scale = min(static_size / img.width, static_size / img.height)
resized_w = max(1, int(round(img.width * scale)))
resized_h = max(1, int(round(img.height * scale)))
resized = img.resize((resized_w, resized_h), resample=Image.Resampling.LANCZOS)
padded = Image.new("RGB", (static_size, static_size))
offset_x = (static_size - resized_w) // 2
offset_y = (static_size - resized_h) // 2
padded.paste(resized, (offset_x, offset_y))
resized_img = padded
letterbox = {
"offset_x": offset_x,
"offset_y": offset_y,
"scale_x": resized_w / img.width,
"scale_y": resized_h / img.height,
"padded_size": static_size,
}
else:
ratio = float(self.max_resolution) / img.width
ratio = min(ratio, 1.0)
if img.height < img.width:
ratio = float(self.max_resolution) / img.height
else:
ratio = float(self.max_resolution) / img.width
ratio = min(ratio, 1.0)

resize_h = int(img.height * ratio)
resize_w = int(img.width * ratio)
resize_h = int(img.height * ratio)
resize_w = int(img.width * ratio)

resize_h = int(round(resize_h / 32) * 32)
resize_w = int(round(resize_w / 32) * 32)
resized_img = img.resize((int(resize_w), int(resize_h)), resample=Image.Resampling.LANCZOS)
resize_h = int(round(resize_h / 32) * 32)
resize_w = int(round(resize_w / 32) * 32)
resized_img = img.resize((int(resize_w), int(resize_h)), resample=Image.Resampling.LANCZOS)
letterbox = None

img_np: NDArray[np.float32] = cv2.cvtColor(np.array(resized_img, dtype=np.float32), cv2.COLOR_RGB2BGR) # type: ignore
img_np -= self.mean
img_np *= self.std_inv
img_np = np.transpose(img_np, (2, 0, 1))
return np.expand_dims(img_np, axis=0)
return np.expand_dims(img_np, axis=0), letterbox

@staticmethod
def _undo_letterbox(
boxes: NDArray[np.float32] | NDArray[np.int32],
letterbox: dict[str, float | int],
img_height: int,
img_width: int,
) -> NDArray[np.float32]:
# Map boxes from the padded square back to the original image space.
offset_x = int(letterbox["offset_x"])
offset_y = int(letterbox["offset_y"])
scale_x = float(letterbox["scale_x"])
scale_y = float(letterbox["scale_y"])
boxes_f = boxes.astype(np.float32, copy=True)
boxes_f[:, :, 0] = (boxes_f[:, :, 0] - offset_x) / scale_x
boxes_f[:, :, 1] = (boxes_f[:, :, 1] - offset_y) / scale_y
boxes_f[:, :, 0] = np.clip(boxes_f[:, :, 0], 0, img_width - 1)
boxes_f[:, :, 1] = np.clip(boxes_f[:, :, 1], 0, img_height - 1)
return boxes_f

def sorted_boxes(self, dt_boxes: NDArray[np.float32]) -> NDArray[np.float32]:
if len(dt_boxes) == 0:
Expand Down
28 changes: 28 additions & 0 deletions machine-learning/immich_ml/models/ocr/recognition.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any

import cv2
import numpy as np
from numpy.typing import NDArray
from PIL import Image
Expand Down Expand Up @@ -66,6 +67,33 @@ def _load(self) -> ModelSession:
lang_type=self.language,
)
)
if (static_width := settings.ocr_recognition_static_width) is not None:
if static_width <= 0:
log.warning("Ignoring ocr_recognition_static_width=%s; must be > 0.", static_width)
else:
self.model.rec_image_shape = (3, 48, static_width)

# Keep input width fixed to avoid dynamic tensor shapes on Intel GPU backends.
def resize_norm_img_static(self, img: NDArray[np.uint8], max_wh_ratio: float) -> NDArray[np.float32]:
img_channel, img_height, img_width = self.rec_image_shape
assert img_channel == img.shape[2]

# Ignore max_wh_ratio: we always pad to the fixed width.
h, w = img.shape[:2]
ratio = w / float(h)
resized_w = min(int(np.ceil(img_height * ratio)), img_width)

resized_image = cv2.resize(img, (resized_w, img_height))
resized_image = resized_image.astype("float32")
resized_image = resized_image.transpose((2, 0, 1)) / 255
resized_image -= 0.5
resized_image /= 0.5

padding_im = np.zeros((img_channel, img_height, img_width), dtype=np.float32)
padding_im[:, :, 0:resized_w] = resized_image
return padding_im

self.model.resize_norm_img = resize_norm_img_static.__get__(self.model, type(self.model))
return session

def _predict(self, img: Image.Image, texts: TextDetectionOutput) -> TextRecognitionOutput:
Expand Down
Loading