From db6797962eb1cbd3e4946f81c3667400d387e883 Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 25 Mar 2026 21:05:49 -0700 Subject: [PATCH 01/11] segmenter: parallelize image processing with ProcessPoolExecutor - Replace pandas with stdlib csv in ecotaxa.py (removes 60MB dependency) - Add streamer.py for incremental JSONL metadata I/O per image - Add worker.py with picklable functions for parallel workers - Split _pipe() into _pipe_parallel() (asyncio + ProcessPoolExecutor) and _pipe_sequential() (preserves flat recalc + remove_previous_mask) - Share flat field via multiprocessing.SharedMemory (zero-copy) - Default 3 workers for RPi 5, configurable via MQTT worker_count - Graceful fallback to sequential on parallel failure --- segmenter/PARALLEL_SEGMENTER.md | 234 +++++++++++ segmenter/planktoscope/segmenter/__init__.py | 267 +++++++++--- segmenter/planktoscope/segmenter/ecotaxa.py | 80 ++-- .../planktoscope/segmenter/operations.py | 6 + segmenter/planktoscope/segmenter/streamer.py | 74 ++++ segmenter/planktoscope/segmenter/worker.py | 396 ++++++++++++++++++ segmenter/pyproject.toml | 2 +- 7 files changed, 955 insertions(+), 104 deletions(-) create mode 100644 segmenter/PARALLEL_SEGMENTER.md create mode 100644 segmenter/planktoscope/segmenter/streamer.py create mode 100644 segmenter/planktoscope/segmenter/worker.py diff --git a/segmenter/PARALLEL_SEGMENTER.md b/segmenter/PARALLEL_SEGMENTER.md new file mode 100644 index 000000000..8d42b05e1 --- /dev/null +++ b/segmenter/PARALLEL_SEGMENTER.md @@ -0,0 +1,234 @@ +# Parallel Segmenter — Design & Implementation + +## Overview + +This document details the parallelization of the PlanktoScope segmenter pipeline, the incremental metadata streaming system, and the removal of the pandas dependency. These changes target the Raspberry Pi 5 (4 cores) and aim for near-linear scaling of image segmentation throughput. + +## Problem Statement + +The original segmenter processed images sequentially in a single loop within `_pipe()`. On a 4-core RPi 5, three cores sat idle during the CPU-bound image processing (flat correction, thresholding, morphological operations, regionprops extraction, color analysis). Additionally: + +- All object metadata accumulated in a Python list in memory until the final EcoTaxa export, increasing peak RAM usage unnecessarily. +- The `pandas` library (60MB on disk) was used solely for a single `DataFrame.to_csv()` call to generate a tab-separated file. + +## Architecture + +### Before + +``` +_pipe() + ├── Calculate flat field (median of first 10 images) + └── FOR each image (sequential): + ├── Open image, apply flat correction + ├── Create binary mask (threshold → erode → dilate → close → erode) + ├── Slice: label connected components, extract regionprops + │ ├── For each object: extract morphology, color, blur + │ ├── Save cropped object JPG + │ ├── MQTT publish per-object metrics + │ └── Append object dict to self.__global_metadata["objects"] + ├── MQTT publish per-image progress + └── Flat recalculation heuristic (if object count spikes) + └── ecotaxa_export(): pandas DataFrame → TSV in ZIP archive +``` + +### After + +``` +_pipe() + ├── Calculate flat field (unchanged) + ├── Create SharedMemory for flat array (zero-copy sharing) + ├── Create temp .metadata_tmp/ directory + ├── IF worker_count > 1 AND remove_previous_mask == False: + │ └── _pipe_parallel() — asyncio + ProcessPoolExecutor + │ ├── Dispatch process_single_image() per image to worker pool + │ ├── Each worker: flat correction → mask → slice → write .jsonl + │ ├── Parent awaits results, publishes MQTT progress + │ └── On failure: falls back to _pipe_sequential() + │ ELSE: + │ └── _pipe_sequential() — original loop with streaming + │ ├── Preserves flat recalculation heuristic + │ ├── Preserves remove_previous_mask support + │ └── Writes .jsonl per image instead of in-memory accumulation + ├── Assemble all .jsonl files in image order → global_metadata["objects"] + ├── Cleanup SharedMemory + temp directory + └── ecotaxa_export(): stdlib csv.writer → TSV in ZIP archive +``` + +## File Changes + +### New Files + +#### `segmenter/planktoscope/segmenter/worker.py` + +Module-level (picklable) functions for `ProcessPoolExecutor` workers: + +- **`worker_init(shm_name, flat_shape, flat_dtype)`** — Called once per worker process at pool startup. Attaches to the parent's `SharedMemory` block and creates a numpy array view of the flat field. This is zero-copy — the ~49MB flat array is not duplicated per worker. + +- **`process_single_image(...)`** — The main worker entry point. Receives all parameters as serializable arguments (file paths, scalars, a metadata dict). Performs the complete per-image pipeline: + 1. `cv2.imread()` → divide by shared flat → `rescale_intensity()` + 2. `_create_mask()` — threshold → no_op → erode → dilate → close → erode2 + 3. `_slice_image()` — `skimage.measure.label/regionprops`, extract morphology + color + blur per object, save cropped JPGs + 4. Write object metadata to `.jsonl` via `streamer.write_image_objects()` + 5. Return result dict `{image_name, image_index, object_count, duration}` + + Error handling: entire function wrapped in try/except. On failure, logs the error and returns `{image_name, image_index, error: str}` instead of crashing the pool. + +- **Pure helper functions** (exact copies from `__init__.py`, made module-level): + - `_get_color_info(bgr_img, mask)` — HSV mean/std statistics + - `_extract_metadata_from_regionprop(prop, pixel_size_um=None)` — 24+ morphological features with optional µm calibration via `process_pixel` + - `_augment_slice(dim_slice, max_dims, size)` — Expand bounding box by padding pixels + - `_create_mask(img, debug_path, save_debug)` — Mask pipeline with `no_op` hardcoded for the `remove_previous_mask` slot (parallel mode always disables this) + - `_slice_image(img, name, mask, ...)` — Full object extraction loop. Includes `process_pixel` calibration, threshold value capture, debug image output. No MQTT, no shared state. + +#### `segmenter/planktoscope/segmenter/streamer.py` + +Incremental metadata I/O using JSON Lines format: + +- **`write_image_objects(metadata_dir, image_name, objects)`** — Writes one `.jsonl` file per image. Each line is a JSON object `{"name": "...", "metadata": {...}}` serialized with `NpEncoder` (handles numpy types). File is named `{image_name}.jsonl`, so workers writing different images never conflict. + +- **`read_image_objects(filepath)`** — Reads a `.jsonl` file back into a list of dicts. + +- **`assemble_all_objects(metadata_dir, images_list)`** — Reads all `.jsonl` files in the order of `images_list` (the sorted image filename list). This guarantees deterministic output order regardless of which worker finished first. Returns the combined object list ready for `ecotaxa_export()`. + +### Modified Files + +#### `segmenter/planktoscope/segmenter/__init__.py` + +**New imports:** +- `planktoscope.segmenter.streamer` +- `planktoscope.segmenter.worker` + +**New instance variable:** +- `self.__worker_count = 3` — Default for RPi 5 (4 cores, 1 reserved for Node-RED/system). Configurable via MQTT `settings.worker_count`. + +**`_slice_image()` changes:** +- Now returns a 3-tuple: `(object_count, unfiltered_count, objects_list)` instead of `(object_count, unfiltered_count)`. +- Objects are collected in a local `objects_list` and returned, instead of being appended to `self.__global_metadata["objects"]`. +- MQTT per-object publishes remain in place (they execute in the main process during sequential mode). + +**`_pipe()` rewrite:** +- Calculates flat field (unchanged). +- Creates `SharedMemory` for the flat array and a temp `.metadata_tmp/` directory. +- Branches to `_pipe_parallel()` or `_pipe_sequential()` based on `worker_count` and `remove_previous_mask`. +- After processing: calls `streamer.assemble_all_objects()` to rebuild the ordered object list. +- Cleanup in `finally` block: `shm.close()`, `shm.unlink()`, `shutil.rmtree(metadata_dir)`. +- Graceful degradation: if parallel dispatch raises an exception, catches it, logs a warning, and falls back to `_pipe_sequential()`. + +**`_pipe_parallel()` (new method):** +- Uses `asyncio.run()` scoped to this method only (the rest of the segmenter remains synchronous). +- Creates `ProcessPoolExecutor(max_workers=N, initializer=worker_init)`. +- Dispatches `process_single_image()` per image via `loop.run_in_executor()`. +- Awaits all futures, publishes MQTT progress as each completes. +- Collects errors from failed workers and logs them after the batch. + +**`_pipe_sequential()` (new method):** +- Preserves the original sequential loop exactly, including: + - Flat recalculation heuristic (object count > average + 20) + - `remove_previous_mask` support + - Per-image and per-object MQTT publishes +- Only difference: writes objects via `streamer.write_image_objects()` after each image instead of appending to `self.__global_metadata["objects"]`. + +**`treat_message()` addition:** +- Parses `settings.worker_count` from MQTT segmentation requests (default: 3). + +#### `segmenter/planktoscope/segmenter/ecotaxa.py` + +Complete rewrite of the TSV generation, removing the `pandas` and `numpy` dependencies: + +- **Removed:** `import pandas`, `import numpy`, `dtype_to_ecotaxa()` function. +- **Added:** `import csv`, `_infer_ecotaxa_type(value)` function — uses `isinstance(value, (int, float))` to determine `[f]` vs `[t]` annotation. +- **`ecotaxa_export()`** now builds rows as a list of dicts, determines column order from the first row, writes using `csv.writer(buf, delimiter="\t")`: + - Row 1: column names + - Row 2: EcoTaxa type annotations (`[f]` or `[t]`) + - Rows 3+: data values +- Output format is identical to the previous pandas-based output. + +#### `segmenter/planktoscope/segmenter/operations.py` + +Comments added to the module-level globals `__mask_to_remove` and `__last_threshold_value` explaining: +- `__mask_to_remove` is NOT safe for parallel use (requires sequential processing). +- `__last_threshold_value` is safe in parallel mode because each worker process gets its own copy via fork/spawn. + +#### `segmenter/pyproject.toml` + +- Removed `pandas>=2.3.3,<3` from `dependencies`. No new dependencies added — `asyncio`, `concurrent.futures`, `csv`, `json`, `multiprocessing.shared_memory` are all Python stdlib. + +## Key Design Decisions + +### Why ProcessPoolExecutor (not threading)? + +The segmentation workload is CPU-bound (numpy, opencv, scikit-image). Python's GIL prevents threads from achieving true parallelism for CPU work. `ProcessPoolExecutor` spawns real OS processes that run on separate cores. + +### Why SharedMemory for the flat field? + +The flat field is a ~49MB float64 array (4056×3040×3). Without shared memory, pickling it into each worker would cost 49MB × N workers in additional memory. `multiprocessing.shared_memory.SharedMemory` provides zero-copy access — all workers read the same physical memory. + +### Why JSON Lines (not CSV or SQLite)? + +- Each worker writes to a uniquely-named file (`{image_name}.jsonl`), so there's no locking or contention. +- Crash-resilient: if a worker fails, completed images' results survive. +- Assembly in deterministic order is trivial: read files in the sorted `images_list` order. +- JSON naturally handles the nested metadata structure and numpy type conversion (via `NpEncoder`). + +### Why disable flat recalculation in parallel mode? + +The flat recalculation heuristic checks if the current image's object count exceeds the running average by >20. This creates a sequential feedback loop — you can't know whether to recalculate until the previous image is done. The heuristic was already flagged with `TODO: this heuristic should be improved or removed if deemed unnecessary`. In parallel mode, the flat is computed once from the first 10 images and used for all. + +### Why force sequential when remove_previous_mask is enabled? + +`remove_previous_mask` subtracts the previous image's mask from the current image's mask. This is a strict image-to-image dependency that cannot be parallelized. When enabled (`remove_previous_mask=True` in MQTT settings), the segmenter automatically falls back to sequential processing. + +### Why asyncio.run() scoped to _pipe_parallel() only? + +The segmenter runs as a `multiprocessing.Process` with a synchronous main loop polling MQTT every 0.5s. Converting the entire process to async would require replacing `paho-mqtt` with an async MQTT library and restructuring the process lifecycle — a large refactor with marginal benefit since the bottleneck is CPU-bound image processing, not I/O. Scoping `asyncio.run()` to just the parallel dispatch method is the minimal, low-risk approach. + +### Why default to 3 workers? + +The RPi 5 has 4 cores. Leaving 1 core for Node-RED, MQTT, and system processes prevents the segmenter from starving the dashboard UI. Each worker uses ~150MB for image processing buffers. With the shared flat (~49MB) + parent overhead (~100MB) + 3 workers (~450MB) = ~600MB total, well within the Pi 5's available RAM. + +## Memory Profile + +| Component | Memory | Notes | +|-----------|--------|-------| +| Flat field (shared) | ~49MB | Single copy via SharedMemory, read by all workers | +| Per-worker image buffers | ~150MB | BGR image + mask + working arrays | +| Parent process overhead | ~100MB | MQTT client, metadata, orchestration | +| **Total (3 workers)** | **~650MB** | Fits comfortably in RPi 5 RAM | +| **Total (1 worker, sequential)** | **~300MB** | Same as original behavior | + +## Configuration + +### MQTT Settings + +The `worker_count` parameter can be sent in the segmentation MQTT message: + +```json +{ + "action": "segment", + "path": "/path/to/images", + "settings": { + "worker_count": 3, + "force": false, + "recursive": true, + "ecotaxa": true, + "keep": true, + "process_min_ESD": 20, + "remove_previous_mask": false + } +} +``` + +- `worker_count`: Number of parallel workers (default: 3). Set to 1 to force sequential processing. +- `remove_previous_mask`: When `true`, forces sequential processing regardless of `worker_count`. + +## Testing Checklist + +- [ ] Sequential mode (`worker_count=1`): output identical to pre-refactor +- [ ] Sequential mode with `remove_previous_mask=true`: behavior preserved +- [ ] Parallel mode (`worker_count=3`): same objects detected, same EcoTaxa TSV content (order is deterministic because assembly reads in image-list order) +- [ ] Parallel mode with image processing failure: error logged, remaining images processed, pipeline completes +- [ ] EcoTaxa ZIP archive: TSV format matches expectations (2-row header with `[f]`/`[t]` annotations) +- [ ] MQTT dashboard: progress updates still appear during segmentation +- [ ] Memory: RSS stays within expected bounds during parallel run +- [ ] `process_pixel` calibration: µm/µm² measurements correct in both modes +- [ ] Threshold value captured in object metadata in both modes diff --git a/segmenter/planktoscope/segmenter/__init__.py b/segmenter/planktoscope/segmenter/__init__.py index 0124a44bf..858230691 100644 --- a/segmenter/planktoscope/segmenter/__init__.py +++ b/segmenter/planktoscope/segmenter/__init__.py @@ -50,6 +50,8 @@ import planktoscope.segmenter.ecotaxa import planktoscope.segmenter.encoder import planktoscope.segmenter.operations +import planktoscope.segmenter.streamer +import planktoscope.segmenter.worker logger.info("planktoscope.segmenter is loaded") @@ -101,6 +103,7 @@ def __init__(self, event, data_path): self.__process_min_ESD = 20 # microns # https://planktoscope.slack.com/archives/C01V5ENKG0M/p1714146253356569 self.__remove_previous_mask = False + self.__worker_count = 3 # default for RPi 5 (4 cores, leave 1 for system) # create all base path for path in [ @@ -434,6 +437,8 @@ def __augment_slice(dim_slice, max_dims, size=10): dim_slice = tuple(dim_slice) return dim_slice + objects_list = [] + labels, nlabels = skimage.measure.label(mask, return_num=True) regionprops = skimage.measure.regionprops(labels) @@ -528,10 +533,7 @@ def __augment_slice(dim_slice, max_dims, size=10): json.dumps(object_metadata, cls=planktoscope.segmenter.encoder.NpEncoder), ) - if "objects" in self.__global_metadata: - self.__global_metadata["objects"].append(object_metadata) - else: - self.__global_metadata.update({"objects": [object_metadata]}) + objects_list.append(object_metadata) if self.__save_debug_img: if object_number: @@ -571,7 +573,7 @@ def __augment_slice(dim_slice, max_dims, size=10): img, os.path.join(self.__working_debug_path, "tagged.jpg"), ) - return (object_number, len(regionprops)) + return (object_number, len(regionprops), objects_list) def _pipe(self, ecotaxa_export): logger.info("Finding images") @@ -587,33 +589,204 @@ def _pipe(self, ecotaxa_export): else: logger.debug(f"We found {images_count} images, good luck!") - first_start = time.monotonic() - self.__mask_to_remove = None - # average = 0 - total_objects = 0 - average_objects = 0 - recalculate_flat = True - # TODO check image list here to find if a flat exists - # we recalculate the flat every 10 pictures - if recalculate_flat: - recalculate_flat = False - self.segmenter_client.client.publish( - "status/segmenter", '{"status":"Calculating flat"}' + # Calculate initial flat field + self.segmenter_client.client.publish( + "status/segmenter", '{"status":"Calculating flat"}' + ) + if images_count < 10: + self._calculate_flat(images_list[0:images_count], images_count, self.__working_path) + else: + self._calculate_flat(images_list[0:10], 10, self.__working_path) + + if self.__save_debug_img: + self._save_image( + self.__flat, + os.path.join(self.__working_debug_path, "flat_color.jpg"), + ) + + # Create temp directory for intermediate metadata + metadata_dir = os.path.join(self.__working_obj_path, ".metadata_tmp") + os.makedirs(metadata_dir, exist_ok=True) + + # Decide parallel vs sequential + use_parallel = self.__worker_count > 1 and not self.__remove_previous_mask + + shm = None + try: + if use_parallel: + # Create shared memory for the flat field array + import multiprocessing.shared_memory + flat_bytes = self.__flat.nbytes + shm = multiprocessing.shared_memory.SharedMemory( + create=True, size=flat_bytes + ) + flat_shared = np.ndarray( + self.__flat.shape, dtype=self.__flat.dtype, buffer=shm.buf + ) + flat_shared[:] = self.__flat[:] + + try: + self._pipe_parallel( + images_list, images_count, shm.name, metadata_dir + ) + except Exception as e: + logger.error( + f"Parallel segmentation failed, falling back to sequential: {e}" + ) + self._pipe_sequential(images_list, images_count, metadata_dir) + else: + if self.__remove_previous_mask and self.__worker_count > 1: + logger.info( + "remove_previous_mask is enabled — using sequential processing" + ) + self._pipe_sequential(images_list, images_count, metadata_dir) + + # Assemble all objects from .jsonl files in image order + all_objects = planktoscope.segmenter.streamer.assemble_all_objects( + metadata_dir, images_list ) - if images_count < 10: - self._calculate_flat(images_list[0:images_count], images_count, self.__working_path) + self.__global_metadata["objects"] = all_objects + total_objects = len(all_objects) + logger.success(f"Total objects assembled: {total_objects}") + + finally: + # Cleanup shared memory + if shm is not None: + shm.close() + shm.unlink() + # Cleanup temp metadata dir + import shutil + shutil.rmtree(metadata_dir, ignore_errors=True) + + if ecotaxa_export: + if "objects" in self.__global_metadata and self.__global_metadata["objects"]: + if planktoscope.segmenter.ecotaxa.ecotaxa_export( + self.__archive_fn, + self.__global_metadata, + self.__working_obj_path, + keep_files=True, + ): + logger.success("Ecotaxa archive export completed for this folder") + else: + logger.error("The ecotaxa export could not be completed") else: - self._calculate_flat(images_list[0:10], 10, self.__working_path) + logger.info("There are no objects to export") + else: + logger.info("We are not creating the ecotaxa output archive for this folder") - if self.__save_debug_img: - self._save_image( - self.__flat, - os.path.join(self.__working_debug_path, "flat_color.jpg"), + # cleanup + # we're done free some mem + self.__flat = None + + def _pipe_parallel(self, images_list, images_count, shm_name, metadata_dir): + """Process images in parallel using asyncio + ProcessPoolExecutor.""" + import asyncio + import concurrent.futures + + first_start = time.monotonic() + + # Build the base debug path (without per-image suffix) + sample_rel = self.__working_path.split(self.__img_path)[1].strip() + + async def _run_parallel(): + loop = asyncio.get_event_loop() + executor = concurrent.futures.ProcessPoolExecutor( + max_workers=self.__worker_count, + initializer=planktoscope.segmenter.worker.worker_init, + initargs=( + shm_name, + self.__flat.shape, + str(self.__flat.dtype), + ), + ) + + # Build a serializable copy of metadata for workers + # (excludes non-serializable items, keeps process_pixel etc.) + worker_metadata = { + k: v + for k, v in self.__global_metadata.items() + if k != "objects" + } + + futures = [] + for i, filename in enumerate(images_list): + name = os.path.splitext(filename)[0] + debug_path = os.path.join(self.__debug_objects_root, sample_rel, name) + + future = loop.run_in_executor( + executor, + planktoscope.segmenter.worker.process_single_image, + os.path.join(self.__working_path, filename), + name, + i, + images_count, + self.__working_obj_path, + debug_path, + metadata_dir, + self.__save_debug_img, + self.__process_min_ESD, + worker_metadata, ) + futures.append((future, i, filename)) + errors = [] + completed = 0 + for future, i, filename in futures: + try: + result = await future + completed += 1 + if "error" in result: + errors.append(result) + logger.error( + f"Worker error for {result['image_name']}: {result['error']}" + ) + else: + self.segmenter_client.client.publish( + "status/segmenter", + json.dumps( + { + "status": f"Segmented image {filename}, " + f"{completed}/{images_count} complete, " + f"{result['object_count']} objects in " + f"{result['duration']:.1f}s" + } + ), + ) + logger.success( + f"Image {result['image_name']}: " + f"{result['object_count']} objects in " + f"{result['duration']:.1f}s" + ) + except Exception as e: + completed += 1 + errors.append({"image_name": filename, "error": str(e)}) + logger.error(f"Future failed for {filename}: {e}") + + executor.shutdown(wait=True) + + if errors: + logger.warning( + f"{len(errors)} image(s) failed during parallel segmentation: " + f"{[e['image_name'] for e in errors]}" + ) + + total_duration = (time.monotonic() - first_start) / 60 + logger.success( + f"{images_count} images done in {total_duration:.1f} minutes " + f"({self.__worker_count} workers, parallel mode)" + ) + + asyncio.run(_run_parallel()) + + def _pipe_sequential(self, images_list, images_count, metadata_dir): + """Process images sequentially, preserving flat recalc heuristic and remove_previous_mask.""" + first_start = time.monotonic() + self.__mask_to_remove = None + total_objects = 0 + average_objects = 0 + recalculate_flat = False average_time = 0 - # TODO here would be a good place to parallelize the computation for i, filename in enumerate(images_list): name = os.path.splitext(filename)[0] @@ -627,10 +800,8 @@ def _pipe(self, ecotaxa_export): if recalculate_flat: # not i % 10 and i < (images_count - 10) recalculate_flat = False if len(images_list) == 10: - # We are too close to the end of the list, take the previous 10 images instead of the next 10 flat = self._calculate_flat(images_list, 10, self.__working_path) elif i > (len(images_list) - 11): - # We are too close to the end of the list, take the previous 10 images instead of the next 10 flat = self._calculate_flat(images_list[i - 10 : i], 10, self.__working_path) else: flat = self._calculate_flat(images_list[i : i + 10], 10, self.__working_path) # noqa: F841 @@ -652,7 +823,6 @@ def _pipe(self, ecotaxa_export): logger.debug(f"The debug objects path is {self.__working_debug_path}") # Create the debug objects path if needed if self.__save_debug_img: - # create the path! os.makedirs(self.__working_debug_path, exist_ok=True) start = time.monotonic() @@ -662,34 +832,25 @@ def _pipe(self, ecotaxa_export): os.path.join(self.__working_path, images_list[i]), self.__flat ) - # logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) - # logger.debug(time.monotonic() - start) - - # start = time.monotonic() - # logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) - mask = self._create_mask(img, self.__working_debug_path) - # logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) - # logger.debug(time.monotonic() - start) + objects_count, _, objects_list = self._slice_image(img, name, mask, total_objects) - # start = time.monotonic() - # logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) + # Stream objects to disk incrementally + planktoscope.segmenter.streamer.write_image_objects( + metadata_dir, name, objects_list + ) - objects_count, _ = self._slice_image(img, name, mask, total_objects) total_objects += objects_count # Simple heuristic to detect a movement of the flow cell and a change in the resulting flat # TODO: this heuristic should be improved or removed if deemed unnecessary if average_objects != 0 and objects_count > average_objects + 20: - # FIXME: this should force a new slice of the current image logger.debug( f"We need to recalculate a flat since we have {objects_count} new objects instead of the average of {average_objects}" ) recalculate_flat = True average_objects = (average_objects * i + objects_count) / (i + 1) - # logger.debug(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) - # logger.debug(time.monotonic() - start) delay = time.monotonic() - start average_time = (average_time * i + delay) / (i + 1) logger.success( @@ -708,26 +869,6 @@ def _pipe(self, ecotaxa_export): f"We also found {total_objects} objects, or an average of {total_objects / (total_duration * 60)}objects per second" ) - if ecotaxa_export: - if "objects" in self.__global_metadata: - if planktoscope.segmenter.ecotaxa.ecotaxa_export( - self.__archive_fn, - self.__global_metadata, - self.__working_obj_path, - keep_files=True, - ): - logger.success("Ecotaxa archive export completed for this folder") - else: - logger.error("The ecotaxa export could not be completed") - else: - logger.info("There are no objects to export") - else: - logger.info("We are not creating the ecotaxa output archive for this folder") - - # cleanup - # we're done free some mem - self.__flat = None - def segment_all(self, paths: list, force=False, ecotaxa_export=True): """Starts the segmentation in all the folders given recursively @@ -936,6 +1077,8 @@ def treat_message(self): self.__remove_previous_mask = settings.get("remove_previous_mask", False) + self.__worker_count = settings.get("worker_count", 3) + path = last_message["path"] if "path" in last_message else None # Publish the status "Started" to via MQTT to Node-RED diff --git a/segmenter/planktoscope/segmenter/ecotaxa.py b/segmenter/planktoscope/segmenter/ecotaxa.py index ab3ece883..bcec0f3a1 100644 --- a/segmenter/planktoscope/segmenter/ecotaxa.py +++ b/segmenter/planktoscope/segmenter/ecotaxa.py @@ -19,8 +19,7 @@ from loguru import logger -import numpy -import pandas # FIXME: just use python's csv library, to shave off pandas's 60 MB of unnecessary disk space usage +import csv import zipfile import os import io @@ -202,18 +201,10 @@ """ -def dtype_to_ecotaxa(dtype): - """Determines the EcoTaxa header field type annotation for the dtype""" - # Note: this code was copied from the MIT-licensed MorphoCut library at - # https://github.com/morphocut/morphocut/blob/0.1.2/src/morphocut/contrib/ecotaxa.py . - # The MorphoCut library is copyright 2019 Simon-Martin Schroeder and others. - try: - if numpy.issubdtype(dtype, numpy.number): - return "[f]" - except TypeError: # pragma: no cover - print(type(dtype)) - raise - +def _infer_ecotaxa_type(value) -> str: + """Infer EcoTaxa type annotation [f] or [t] from a Python value.""" + if isinstance(value, (int, float)): + return "[f]" return "[t]" @@ -228,9 +219,6 @@ def ecotaxa_export(archive_filepath, metadata, image_base_path, keep_files=False """ logger.info("Starting the ecotaxa archive export") with zipfile.ZipFile(archive_filepath, "w") as archive: - # empty table, one line per object - tsv_content = [] - if "objects" in metadata: object_list = metadata.pop("objects") else: @@ -238,49 +226,59 @@ def ecotaxa_export(archive_filepath, metadata, image_base_path, keep_files=False return 0 # sometimes the camera resolution is not exported as string - if not isinstance(metadata["acq_camera_resolution"], str): - metadata["acq_camera_resolution"] = ( - f"{metadata['acq_camera_resolution'][0]}x{metadata['acq_camera_resolution'][1]}" - ) + if not isinstance(metadata.get("acq_camera_resolution", ""), str): + res = metadata["acq_camera_resolution"] + metadata["acq_camera_resolution"] = f"{res[0]}x{res[1]}" - # let's go! + # Build rows, determine columns from first object + rows = [] + columns = None for rank, roi in enumerate(object_list, start=1): - tsv_line = {} - tsv_line.update(metadata) - tsv_line.update(("object_" + k, v) for k, v in roi["metadata"].items()) - tsv_line["object_id"] = roi["name"] + row = {} + row.update(metadata) + row.update(("object_" + k, v) for k, v in roi["metadata"].items()) + row["object_id"] = roi["name"] filename = roi["name"] + ".jpg" + row["img_file_name"] = filename + row["img_rank"] = 1 - tsv_line.update({"img_file_name": filename, "img_rank": 1}) - tsv_content.append(tsv_line) + if columns is None: + columns = list(row.keys()) + rows.append(row) image_path = os.path.join(image_base_path, filename) - archive.write(image_path, arcname=filename) if not keep_files: # we remove the image file if we don't want to keep it! os.remove(image_path) - tsv_content = pandas.DataFrame(tsv_content) + if not rows: + logger.error("No objects to export") + return 0 + + # Determine type annotations from first row values + type_row = [_infer_ecotaxa_type(rows[0].get(col)) for col in columns] + + # Write TSV to string buffer + buf = io.StringIO() + writer = csv.writer(buf, delimiter="\t", lineterminator="\n") + writer.writerow(columns) + writer.writerow(type_row) + for row in rows: + writer.writerow([row.get(col, "") for col in columns]) - tsv_type_header = [dtype_to_ecotaxa(dt) for dt in tsv_content.dtypes] - tsv_content.columns = pandas.MultiIndex.from_tuples( - list(zip(tsv_content.columns, tsv_type_header)) - ) + tsv_content = buf.getvalue() # create the filename with the acquisition ID - acquisition_id = metadata.get("acq_id") - acquisition_id = acquisition_id.replace(" ", "_") + acquisition_id = metadata.get("acq_id", "unknown").replace(" ", "_") tsv_filename = f"ecotaxa_{acquisition_id}.tsv" # add the tsv to the archive - archive.writestr( - tsv_filename, - io.BytesIO(tsv_content.to_csv(sep="\t", encoding="utf-8", index=False).encode()).read(), - ) + archive.writestr(tsv_filename, tsv_content.encode("utf-8")) if keep_files: tsv_file = os.path.join(image_base_path, tsv_filename) - tsv_content.to_csv(path_or_buf=tsv_file, sep="\t", encoding="utf-8", index=False) + with open(tsv_file, "w", encoding="utf-8") as f: + f.write(tsv_content) logger.success("Ecotaxa archive is ready!") return 1 diff --git a/segmenter/planktoscope/segmenter/operations.py b/segmenter/planktoscope/segmenter/operations.py index 1d002eac4..7202ccd64 100644 --- a/segmenter/planktoscope/segmenter/operations.py +++ b/segmenter/planktoscope/segmenter/operations.py @@ -19,6 +19,12 @@ import cv2 from loguru import logger +# WARNING: These module-level globals use process-local state. +# remove_previous_mask / __mask_to_remove: NOT safe for parallel use — requires +# sequential image-to-image processing. In parallel mode, the pipeline substitutes +# no_op() for the remove_previous_mask step. +# __last_threshold_value: safe in parallel mode because each worker process gets +# its own copy of this global via fork/spawn. __mask_to_remove = None __last_threshold_value = None diff --git a/segmenter/planktoscope/segmenter/streamer.py b/segmenter/planktoscope/segmenter/streamer.py new file mode 100644 index 000000000..1684f5507 --- /dev/null +++ b/segmenter/planktoscope/segmenter/streamer.py @@ -0,0 +1,74 @@ +"""Incremental metadata streaming to disk and assembly for EcoTaxa export. + +Each image's segmented objects are written to a separate .jsonl file during processing. +After all images are done, the files are assembled in image order for deterministic output. +""" + +import json +import os + +from loguru import logger + +from planktoscope.segmenter.encoder import NpEncoder + + +def write_image_objects(metadata_dir: str, image_name: str, objects: list[dict]) -> str: + """Write per-image object metadata to a .jsonl file. + + Args: + metadata_dir: Directory for intermediate metadata files + image_name: Base name of the source image (no extension) + objects: List of object dicts, each with "name" and "metadata" keys + + Returns: + Path to the written .jsonl file + """ + os.makedirs(metadata_dir, exist_ok=True) + filepath = os.path.join(metadata_dir, f"{image_name}.jsonl") + with open(filepath, "w") as f: + for obj in objects: + f.write(json.dumps(obj, cls=NpEncoder) + "\n") + logger.debug(f"Wrote {len(objects)} objects to {filepath}") + return filepath + + +def read_image_objects(filepath: str) -> list[dict]: + """Read object metadata from a .jsonl file. + + Args: + filepath: Path to a .jsonl file written by write_image_objects + + Returns: + List of object dicts + """ + objects = [] + with open(filepath, "r") as f: + for line in f: + line = line.strip() + if line: + objects.append(json.loads(line)) + return objects + + +def assemble_all_objects(metadata_dir: str, images_list: list[str]) -> list[dict]: + """Read all per-image .jsonl files in image order and return combined object list. + + Args: + metadata_dir: Directory containing .jsonl files + images_list: Ordered list of image filenames (with extension) + + Returns: + Combined list of all object dicts, in image order + """ + all_objects = [] + for filename in images_list: + image_name = os.path.splitext(filename)[0] + filepath = os.path.join(metadata_dir, f"{image_name}.jsonl") + if os.path.exists(filepath): + image_objects = read_image_objects(filepath) + all_objects.extend(image_objects) + logger.debug(f"Assembled {len(image_objects)} objects from {image_name}") + else: + logger.warning(f"No metadata file found for {image_name}") + logger.info(f"Assembled {len(all_objects)} total objects from {len(images_list)} images") + return all_objects diff --git a/segmenter/planktoscope/segmenter/worker.py b/segmenter/planktoscope/segmenter/worker.py new file mode 100644 index 000000000..a87739545 --- /dev/null +++ b/segmenter/planktoscope/segmenter/worker.py @@ -0,0 +1,396 @@ +"""Top-level worker function for parallel image segmentation. + +All functions in this module are module-level (picklable) so they can be used +with ProcessPoolExecutor. Workers do not hold MQTT connections — the parent +process publishes progress updates based on returned results. +""" + +import multiprocessing.shared_memory +import os +import time + +import cv2 +import numpy as np +import PIL.Image +import skimage.exposure +import skimage.measure +from loguru import logger + +import planktoscope.segmenter.operations +import planktoscope.segmenter.streamer + +# Per-worker cached flat reference (set by initializer) +_flat_shm = None +_flat_array = None + + +def worker_init(shm_name: str, flat_shape: tuple, flat_dtype: str) -> None: + """Initializer for each worker process. Attaches to shared flat field.""" + global _flat_shm, _flat_array + _flat_shm = multiprocessing.shared_memory.SharedMemory(name=shm_name, create=False) + _flat_array = np.ndarray(flat_shape, dtype=flat_dtype, buffer=_flat_shm.buf) + logger.debug(f"Worker {os.getpid()} attached to shared flat field '{shm_name}'") + + +def process_single_image( + image_filepath: str, + image_name: str, + image_index: int, + images_count: int, + working_obj_path: str, + working_debug_path: str, + metadata_dir: str, + save_debug_img: bool, + process_min_ESD: float, + global_metadata: dict, +) -> dict: + """Process a single image: flat correction, masking, slicing, metadata write. + + Args: + image_filepath: Full path to the source image + image_name: Base name of the image (no extension) + image_index: Index of this image in the acquisition + images_count: Total number of images in the acquisition + working_obj_path: Directory to save cropped object images + working_debug_path: Directory for debug output (per-image subdirectory) + metadata_dir: Directory for .jsonl intermediate metadata files + save_debug_img: Whether to save debug images + process_min_ESD: Minimum equivalent spherical diameter threshold (µm) + global_metadata: Acquisition metadata dict (for process_pixel calibration etc.) + + Returns: + dict with keys: image_name, image_index, object_count, duration + On error: dict with keys: image_name, image_index, error + """ + try: + start = time.monotonic() + + # Create debug path if needed + if save_debug_img: + os.makedirs(working_debug_path, exist_ok=True) + + # 1. Open and apply flat + image = cv2.imread(image_filepath) + if image is None: + raise FileNotFoundError(f"Could not read image: {image_filepath}") + image = image / _flat_array + image[0][0] = [0, 0, 0] + image = skimage.exposure.rescale_intensity( + image, in_range=(0, 1.04), out_range="uint8" + ) + + if save_debug_img: + _save_image(image, os.path.join(working_debug_path, "cleaned_image.jpg")) + + # 2. Create mask (no remove_previous_mask in parallel mode) + mask = _create_mask(image, working_debug_path, save_debug_img) + + # 3. Slice image and extract objects + objects, object_count, unfiltered_count = _slice_image( + image, + image_name, + mask, + working_obj_path, + working_debug_path, + save_debug_img, + process_min_ESD, + global_metadata, + ) + + # 4. Write objects metadata to disk incrementally + planktoscope.segmenter.streamer.write_image_objects( + metadata_dir, image_name, objects + ) + + duration = time.monotonic() - start + logger.info( + f"Worker {os.getpid()}: {image_name} done — " + f"{object_count} objects in {duration:.1f}s" + ) + return { + "image_name": image_name, + "image_index": image_index, + "object_count": object_count, + "unfiltered_count": unfiltered_count, + "duration": duration, + } + + except Exception as e: + logger.error(f"Worker {os.getpid()}: failed on {image_name}: {e}") + return { + "image_name": image_name, + "image_index": image_index, + "error": str(e), + } + + +def _save_image(image, path: str) -> None: + """Save a BGR image as JPEG.""" + PIL.Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)).save(path) + + +def _save_mask(mask, path: str) -> None: + """Save a binary mask as JPEG.""" + PIL.Image.fromarray(mask).save(path) + + +def _create_mask(img, debug_path: str, save_debug: bool): + """Create segmentation mask without global state (no remove_previous_mask). + + In parallel mode, remove_previous_mask is always disabled (replaced by no_op) + because it requires sequential image-to-image state. + """ + logger.info("Starting the mask creation") + + pipeline = [ + ("simple_threshold", planktoscope.segmenter.operations.simple_threshold), + ("no_op", planktoscope.segmenter.operations.no_op), + ("erode", planktoscope.segmenter.operations.erode), + ("dilate", planktoscope.segmenter.operations.dilate), + ("close", planktoscope.segmenter.operations.close), + ("erode2", planktoscope.segmenter.operations.erode2), + ] + + mask = img + for i, (name, func) in enumerate(pipeline): + mask = func(mask) + if save_debug and debug_path: + PIL.Image.fromarray(mask).save( + os.path.join(debug_path, f"mask_{i}_{name}.jpg") + ) + + logger.success("Mask created") + return mask + + +def _get_color_info(bgr_img, mask) -> dict: + """Compute HSV color statistics for an object region. + + Args: + bgr_img: BGR image of the object region + mask: Boolean mask of the object within the region + + Returns: + dict with MeanHue, StdHue, MeanSaturation, StdSaturation, MeanValue, StdValue + """ + hsv_img = cv2.cvtColor(bgr_img, cv2.COLOR_BGR2HSV) + (h_channel, s_channel, v_channel) = cv2.split(hsv_img) + return { + "MeanHue": np.mean(h_channel, where=mask), + "MeanSaturation": np.mean(s_channel, where=mask), + "MeanValue": np.mean(v_channel, where=mask), + "StdHue": np.std(h_channel, where=mask), + "StdSaturation": np.std(s_channel, where=mask), + "StdValue": np.std(v_channel, where=mask), + } + + +def _extract_metadata_from_regionprop(prop, pixel_size_um=None) -> dict: + """Extract morphological metadata from a scikit-image regionprop. + + Args: + prop: scikit-image regionprop object + pixel_size_um (float or None): pixel size in µm/pixel (process_pixel). + If provided, linear measurements are in µm and area measurements in µm². + If None, all measurements remain in pixel units. + """ + # Scale factors: linear (µm/px) and area (µm²/px²) + px = pixel_size_um if pixel_size_um and pixel_size_um > 0 else 1.0 + px2 = px * px + + return { + "label": prop.label, + # width of the smallest rectangle enclosing the object (µm if calibrated) + "width": (prop.bbox[3] - prop.bbox[1]) * px, + # height of the smallest rectangle enclosing the object (µm if calibrated) + "height": (prop.bbox[2] - prop.bbox[0]) * px, + # X coordinates of the top left point (pixels) + "bx": prop.bbox[1], + # Y coordinates of the top left point (pixels) + "by": prop.bbox[0], + # circularity — dimensionless ratio + "circ.": (4 * np.pi * prop.filled_area) / prop.perimeter**2, + # Surface area excluding holes (µm² if calibrated) + "area_exc": prop.area * px2, + # Surface area (µm² if calibrated) + "area": prop.filled_area * px2, + # Percentage of holes — dimensionless + "%area": 1 - (prop.area / prop.filled_area), + # Primary axis (µm if calibrated) + "major": prop.major_axis_length * px, + # Secondary axis (µm if calibrated) + "minor": prop.minor_axis_length * px, + # Y center of gravity (pixels) + "y": prop.centroid[0], + # X center of gravity (pixels) + "x": prop.centroid[1], + # Convex area (µm² if calibrated) + "convex_area": prop.convex_area * px2, + # Perimeter (µm if calibrated) + "perim.": prop.perimeter * px, + # major/minor — dimensionless + "elongation": np.divide(prop.major_axis_length, prop.minor_axis_length), + # perim/area_exc — units: 1/µm if calibrated + "perimareaexc": prop.perimeter / prop.area * (1.0 / px), + # perim/major — dimensionless + "perimmajor": prop.perimeter / prop.major_axis_length, + # (4∗π∗Area_exc)/perim² — dimensionless + "circex": np.divide(4 * np.pi * prop.area, prop.perimeter**2), + # Angle in degrees + "angle": prop.orientation / np.pi * 180 + 90, + # Bounding box area (µm² if calibrated) + "bounding_box_area": prop.bbox_area * px2, + "eccentricity": prop.eccentricity, + # Equivalent diameter (µm if calibrated) + "equivalent_diameter": prop.equivalent_diameter * px, + "euler_number": prop.euler_number, + # extent — dimensionless + "extent": prop.extent, + "local_centroid_col": prop.local_centroid[1], + "local_centroid_row": prop.local_centroid[0], + # solidity — dimensionless + "solidity": prop.solidity, + } + + +def _augment_slice(dim_slice, max_dims, size=10): + """Expand a region slice by `size` pixels in each direction.""" + dim_slice = list(dim_slice) + for i in range(2): + if dim_slice[i].start < size: + dim_slice[i] = slice(0, dim_slice[i].stop) + else: + dim_slice[i] = slice(dim_slice[i].start - size, dim_slice[i].stop) + + for i in range(2): + if dim_slice[i].stop + size == max_dims[i]: + dim_slice[i] = slice(dim_slice[i].start, max_dims[i]) + else: + dim_slice[i] = slice(dim_slice[i].start, dim_slice[i].stop + size) + + return tuple(dim_slice) + + +def _slice_image( + img, + name: str, + mask, + obj_path: str, + debug_path: str, + save_debug: bool, + min_ESD: float, + global_metadata: dict, +) -> tuple[list[dict], int, int]: + """Slice image into objects and extract metadata. No MQTT, no shared state. + + Args: + img: BGR image array + name: Base name of the source image + mask: Binary segmentation mask + obj_path: Directory to save cropped object images + debug_path: Directory for debug output + save_debug: Whether to save debug images + min_ESD: Minimum equivalent spherical diameter threshold (µm) + global_metadata: Acquisition metadata dict (for process_pixel) + + Returns: + tuple: (objects_list, filtered_count, unfiltered_count) + """ + labels, nlabels = skimage.measure.label(mask, return_num=True) + regionprops = skimage.measure.regionprops(labels) + + # Convert min ESD threshold from µm to pixels for filtering + pixel_size = global_metadata.get("process_pixel", None) + try: + pixel_size = float(pixel_size) if pixel_size is not None else None + except (ValueError, TypeError): + pixel_size = None + if pixel_size and pixel_size > 0: + min_esd_pixels = min_ESD / pixel_size + else: + min_esd_pixels = min_ESD + logger.warning( + f"No valid process_pixel calibration — using min ESD of {min_esd_pixels} as pixels" + ) + logger.debug( + f"Min ESD filter: {min_ESD} µm = {min_esd_pixels:.1f} px " + f"(process_pixel={pixel_size})" + ) + + filtered = [r for r in regionprops if r.equivalent_diameter_area >= min_esd_pixels] + object_number = len(filtered) + logger.debug(f"Found {nlabels} labels, or {object_number} after size filtering") + + # Determine pixel_size_um for calibrated measurements + pixel_size_um = None + if pixel_size and pixel_size > 0: + pixel_size_um = pixel_size + + objects = [] + for i, region in enumerate(filtered): + region.label = i + + # Extract metadata + obj_image = img[region.slice] + colors = _get_color_info(obj_image, region.filled_image) + metadata = _extract_metadata_from_regionprop(region, pixel_size_um=pixel_size_um) + + # Blur metric + metadata["blur_laplacian"] = planktoscope.segmenter.operations.calculate_blur( + obj_image + ) + + # Threshold value (each worker has its own process-local global) + threshold_value = planktoscope.segmenter.operations.get_last_threshold_value() + if threshold_value is not None: + metadata["threshold"] = threshold_value + + # Save cropped object image with augmented slice + obj_image_aug = img[_augment_slice(region.slice, labels.shape, 10)] + object_id = f"{name}_{i}" + _save_image(obj_image_aug, os.path.join(obj_path, f"{object_id}.jpg")) + + if save_debug and debug_path: + _save_mask( + region.filled_image, + os.path.join(debug_path, f"obj_{i}_mask.jpg"), + ) + + objects.append({"name": object_id, "metadata": {**metadata, **colors}}) + + # Debug tagged image + if save_debug and debug_path: + if object_number: + tagged_image = img.copy() + for region in filtered: + tagged_image = cv2.drawMarker( + tagged_image, + (int(region.centroid[1]), int(region.centroid[0])), + (0, 0, 255), + cv2.MARKER_CROSS, + ) + tagged_image = cv2.rectangle( + tagged_image, + pt1=region.bbox[-3:-5:-1], + pt2=region.bbox[-1:-3:-1], + color=(150, 0, 200), + thickness=1, + ) + contours, _ = cv2.findContours( + np.uint8(region.image), + mode=cv2.RETR_TREE, + method=cv2.CHAIN_APPROX_NONE, + ) + tagged_image = cv2.drawContours( + tagged_image, + contours, + -1, + (238, 130, 238), + thickness=1, + offset=(region.bbox[1], region.bbox[0]), + ) + _save_image(tagged_image, os.path.join(debug_path, "tagged.jpg")) + else: + _save_image(img, os.path.join(debug_path, "tagged.jpg")) + + return objects, object_number, len(regionprops) diff --git a/segmenter/pyproject.toml b/segmenter/pyproject.toml index debe1959e..02a34c132 100644 --- a/segmenter/pyproject.toml +++ b/segmenter/pyproject.toml @@ -14,7 +14,7 @@ classifiers = [ dependencies = [ "paho-mqtt>=2.1.0,<3", "numpy>=2.3.3,<3", - "pandas>=2.3.3,<3", + "loguru>=0.7.3,<0.8", "opencv-python-headless>=4.6.0.66,<5", "scikit-image>=0.25.2,<0.26", From 951c56ef66b4d27ae01bb1a66f10a9d62d141e34 Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 25 Mar 2026 21:09:39 -0700 Subject: [PATCH 02/11] add benchmark script --- segmenter/benchmark.py | 406 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 406 insertions(+) create mode 100644 segmenter/benchmark.py diff --git a/segmenter/benchmark.py b/segmenter/benchmark.py new file mode 100644 index 000000000..9569893c3 --- /dev/null +++ b/segmenter/benchmark.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +"""Benchmark script for PlanktoScope segmenter performance. + +Runs segmentation on a test acquisition folder and reports timing, throughput, +memory usage, and object counts. Works on both the original (main) and +parallel (feature/parallel-segmenter) branches. + +Usage: + # On the Pi, from the segmenter/ directory: + python3 benchmark.py /path/to/acquisition/folder [--workers N] [--runs N] [--validate REF] + + # Examples: + # Baseline (main branch, or new branch with sequential): + python3 benchmark.py /data/img/20210122/sample_1/acq_1 + + # Parallel mode (new branch only): + python3 benchmark.py /data/img/20210122/sample_1/acq_1 --workers 3 + + # Multiple runs for stable averages: + python3 benchmark.py /data/img/20210122/sample_1/acq_1 --workers 3 --runs 3 + + # Validate output against a reference (baseline) result: + python3 benchmark.py /data/img/20210122/sample_1/acq_1 --workers 3 --validate baseline_result.json + +Workflow: + 1. git checkout main + python3 benchmark.py /path/to/acq --runs 3 + # Save the output JSON as baseline_result.json + + 2. git checkout feature/parallel-segmenter + python3 benchmark.py /path/to/acq --workers 1 --runs 3 + # Compare sequential-on-new-branch vs baseline + + 3. python3 benchmark.py /path/to/acq --workers 3 --runs 3 --validate baseline_result.json + # Parallel mode, validate objects match baseline +""" + +import argparse +import datetime +import json +import os +import platform +import resource +import shutil +import sys +import time + + +def get_peak_rss_mb(): + """Get peak resident set size in MB (Linux/macOS).""" + # ru_maxrss is in KB on Linux, bytes on macOS + usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + if platform.system() == "Darwin": + return usage / (1024 * 1024) + return usage / 1024 + + +def prepare_test_folder(source_path, work_dir): + """Copy acquisition folder to a clean working directory. + + We copy so that: + - done.txt from previous runs doesn't skip processing + - Object images from previous runs don't interfere + - The original data is never modified + """ + if os.path.exists(work_dir): + shutil.rmtree(work_dir) + + # Copy only the source images and metadata.json + os.makedirs(work_dir, exist_ok=True) + for f in os.listdir(source_path): + src = os.path.join(source_path, f) + if os.path.isfile(src) and ( + f.endswith((".jpg", ".JPG", ".jpeg", ".JPEG")) + or f == "metadata.json" + ): + shutil.copy2(src, os.path.join(work_dir, f)) + + # Remove done.txt if it was copied + done_file = os.path.join(work_dir, "done.txt") + if os.path.exists(done_file): + os.remove(done_file) + + return work_dir + + +def count_images(path): + """Count JPG images in a directory.""" + return len([ + f for f in os.listdir(path) + if f.lower().endswith((".jpg", ".jpeg")) + ]) + + +def run_segmentation(data_path, acq_path, worker_count=None): + """Run the segmenter pipeline and return metrics. + + Args: + data_path: Root data directory (parent of img/, objects/, export/) + acq_path: Path to the acquisition folder to segment + worker_count: Number of workers (None = use branch default) + + Returns: + dict with timing, object count, and memory metrics + """ + import multiprocessing + + # Import segmenter (works on both branches) + from planktoscope.segmenter import SegmenterProcess + + # Create a mock event (never set — we don't use the run loop) + event = multiprocessing.Event() + seg = SegmenterProcess(event, data_path) + + # On the new branch, set worker_count if provided + if worker_count is not None and hasattr(seg, "_SegmenterProcess__worker_count"): + seg._SegmenterProcess__worker_count = worker_count + + # We need to set up a minimal MQTT mock since _pipe publishes status + class MockMQTTClient: + def publish(self, topic, payload): + pass # Discard all MQTT publishes during benchmark + + class MockSegmenterClient: + def __init__(self): + self.client = MockMQTTClient() + + seg.segmenter_client = MockSegmenterClient() + + # Measure + rss_before = get_peak_rss_mb() + start_time = time.monotonic() + + seg.segment_path(acq_path, ecotaxa_export=True) + + elapsed = time.monotonic() - start_time + rss_after = get_peak_rss_mb() + + # Count results + objects = seg._SegmenterProcess__global_metadata.get("objects", []) + object_count = len(objects) + + # Count output files + obj_path = seg._SegmenterProcess__working_obj_path + obj_images = len([f for f in os.listdir(obj_path) if f.endswith(".jpg")]) if os.path.exists(obj_path) else 0 + + # Check for ecotaxa archive + archive_path = seg._SegmenterProcess__archive_fn + archive_exists = os.path.exists(archive_path) + archive_size_mb = os.path.getsize(archive_path) / (1024 * 1024) if archive_exists else 0 + + # Extract object names for validation + object_names = sorted([obj["name"] for obj in objects]) + + return { + "elapsed_seconds": round(elapsed, 2), + "object_count": object_count, + "object_images_saved": obj_images, + "archive_exists": archive_exists, + "archive_size_mb": round(archive_size_mb, 2), + "peak_rss_mb": round(rss_after, 1), + "rss_delta_mb": round(rss_after - rss_before, 1), + "object_names": object_names, + } + + +def validate_against_reference(result, ref_path): + """Compare result against a saved reference.""" + with open(ref_path, "r") as f: + ref = json.load(f) + + issues = [] + + # Compare object counts + if result["object_count"] != ref["object_count"]: + issues.append( + f"Object count mismatch: got {result['object_count']}, " + f"expected {ref['object_count']}" + ) + + # Compare object names (deterministic order check) + ref_names = ref.get("object_names", []) + result_names = result.get("object_names", []) + if ref_names and result_names: + missing = set(ref_names) - set(result_names) + extra = set(result_names) - set(ref_names) + if missing: + issues.append(f"Missing objects: {sorted(missing)[:10]}{'...' if len(missing) > 10 else ''}") + if extra: + issues.append(f"Extra objects: {sorted(extra)[:10]}{'...' if len(extra) > 10 else ''}") + + return issues + + +def main(): + parser = argparse.ArgumentParser( + description="Benchmark PlanktoScope segmenter performance" + ) + parser.add_argument( + "acquisition_path", + help="Path to acquisition folder (must contain metadata.json + images)", + ) + parser.add_argument( + "--workers", + type=int, + default=None, + help="Number of parallel workers (default: branch default). " + "Only works on feature/parallel-segmenter branch.", + ) + parser.add_argument( + "--runs", + type=int, + default=1, + help="Number of runs to average (default: 1)", + ) + parser.add_argument( + "--validate", + type=str, + default=None, + help="Path to a reference result JSON to validate against", + ) + parser.add_argument( + "--output", + type=str, + default=None, + help="Save result JSON to this path (for use as --validate reference)", + ) + parser.add_argument( + "--data-path", + type=str, + default=None, + help="Root data directory (default: inferred as 3 levels up from acquisition_path)", + ) + args = parser.parse_args() + + acq_path = os.path.abspath(args.acquisition_path) + if not os.path.exists(os.path.join(acq_path, "metadata.json")): + print(f"ERROR: No metadata.json found in {acq_path}") + sys.exit(1) + + image_count = count_images(acq_path) + if image_count == 0: + print(f"ERROR: No images found in {acq_path}") + sys.exit(1) + + # Infer data_path: acquisition is at data/img/DATE/SAMPLE/ACQ + # so data_path is 4 levels up (img/ is one level) + if args.data_path: + data_path = os.path.abspath(args.data_path) + else: + # Try to find the img/ ancestor + parts = acq_path.split(os.sep) + try: + img_idx = len(parts) - 1 - parts[::-1].index("img") + data_path = os.sep.join(parts[:img_idx]) + except ValueError: + print( + "WARNING: Could not infer data_path from acquisition_path. " + "Using /tmp/benchmark_data" + ) + data_path = "/tmp/benchmark_data" + + # Detect branch + try: + import subprocess + branch = subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + stderr=subprocess.DEVNULL, + ).decode().strip() + except Exception: + branch = "unknown" + + worker_label = args.workers if args.workers else "default" + + print("=" * 60) + print("PlanktoScope Segmenter Benchmark") + print("=" * 60) + print(f" Branch: {branch}") + print(f" Acquisition: {acq_path}") + print(f" Images: {image_count}") + print(f" Data path: {data_path}") + print(f" Workers: {worker_label}") + print(f" Runs: {args.runs}") + print(f" Date: {datetime.datetime.now().isoformat()}") + print("=" * 60) + + all_results = [] + for run_num in range(1, args.runs + 1): + print(f"\n--- Run {run_num}/{args.runs} ---") + + # Prepare clean working copy + work_dir = os.path.join("/tmp", "benchmark_acq") + prepare_test_folder(acq_path, work_dir) + + # Also clean output directories that the segmenter will write to + for subdir in ["objects", "export", "clean"]: + out = os.path.join(data_path, subdir) + if os.path.exists(out): + shutil.rmtree(out) + os.makedirs(out, exist_ok=True) + + # Ensure img path structure exists for the segmenter + img_root = os.path.join(data_path, "img") + os.makedirs(img_root, exist_ok=True) + + # Create symlink so segmenter can find the working copy under img/ + rel_path = os.path.relpath(acq_path, os.path.join(data_path, "img")) + link_dir = os.path.join(img_root, os.path.dirname(rel_path)) + os.makedirs(link_dir, exist_ok=True) + link_path = os.path.join(img_root, rel_path) + if os.path.exists(link_path): + if os.path.islink(link_path): + os.unlink(link_path) + else: + shutil.rmtree(link_path) + os.symlink(work_dir, link_path) + + try: + result = run_segmentation(data_path, work_dir, args.workers) + all_results.append(result) + + print(f" Time: {result['elapsed_seconds']}s") + print(f" Objects found: {result['object_count']}") + print(f" Object images: {result['object_images_saved']}") + print(f" Archive created: {result['archive_exists']}") + if result["archive_exists"]: + print(f" Archive size: {result['archive_size_mb']} MB") + print(f" Peak RSS: {result['peak_rss_mb']} MB") + imgs_per_sec = image_count / result["elapsed_seconds"] if result["elapsed_seconds"] > 0 else 0 + objs_per_sec = result["object_count"] / result["elapsed_seconds"] if result["elapsed_seconds"] > 0 else 0 + print(f" Throughput: {imgs_per_sec:.2f} images/s, {objs_per_sec:.1f} objects/s") + except Exception as e: + print(f" ERROR: {e}") + import traceback + traceback.print_exc() + all_results.append({"error": str(e)}) + finally: + # Cleanup symlink + if os.path.islink(link_path): + os.unlink(link_path) + + # Summary + successful = [r for r in all_results if "error" not in r] + if not successful: + print("\nAll runs failed!") + sys.exit(1) + + print("\n" + "=" * 60) + print("SUMMARY") + print("=" * 60) + + avg_time = sum(r["elapsed_seconds"] for r in successful) / len(successful) + avg_objects = sum(r["object_count"] for r in successful) / len(successful) + max_rss = max(r["peak_rss_mb"] for r in successful) + avg_imgs_per_sec = image_count / avg_time if avg_time > 0 else 0 + avg_objs_per_sec = avg_objects / avg_time if avg_time > 0 else 0 + + summary = { + "branch": branch, + "workers": args.workers, + "image_count": image_count, + "runs": len(successful), + "avg_elapsed_seconds": round(avg_time, 2), + "avg_object_count": round(avg_objects), + "avg_images_per_second": round(avg_imgs_per_sec, 3), + "avg_objects_per_second": round(avg_objs_per_sec, 1), + "max_peak_rss_mb": round(max_rss, 1), + "object_count": successful[-1]["object_count"], + "object_names": successful[-1].get("object_names", []), + "date": datetime.datetime.now().isoformat(), + } + + print(f" Branch: {branch}") + print(f" Workers: {worker_label}") + print(f" Avg time: {avg_time:.2f}s ({len(successful)} runs)") + print(f" Avg objects: {int(avg_objects)}") + print(f" Throughput: {avg_imgs_per_sec:.3f} images/s") + print(f" Throughput: {avg_objs_per_sec:.1f} objects/s") + print(f" Max peak RSS: {max_rss:.1f} MB") + + # Validation + if args.validate: + print(f"\n--- Validation against {args.validate} ---") + issues = validate_against_reference(summary, args.validate) + if issues: + print(" VALIDATION FAILED:") + for issue in issues: + print(f" - {issue}") + else: + print(" VALIDATION PASSED: object counts and names match") + + # Save output + output_path = args.output + if output_path is None: + safe_branch = branch.replace("/", "_") + worker_str = f"_w{args.workers}" if args.workers else "" + output_path = f"benchmark_{safe_branch}{worker_str}.json" + + with open(output_path, "w") as f: + json.dump(summary, f, indent=2) + print(f"\n Results saved to: {output_path}") + print("=" * 60) + + +if __name__ == "__main__": + main() From dd10f98d79dd275d55e5392eb308ced845a08ae5 Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 25 Mar 2026 21:29:45 -0700 Subject: [PATCH 03/11] fix: benchmark uses isolated temp directory, never touches real data --- segmenter/benchmark.py | 65 +++++++++++------------------------------- 1 file changed, 17 insertions(+), 48 deletions(-) diff --git a/segmenter/benchmark.py b/segmenter/benchmark.py index 9569893c3..375241067 100644 --- a/segmenter/benchmark.py +++ b/segmenter/benchmark.py @@ -225,12 +225,6 @@ def main(): default=None, help="Save result JSON to this path (for use as --validate reference)", ) - parser.add_argument( - "--data-path", - type=str, - default=None, - help="Root data directory (default: inferred as 3 levels up from acquisition_path)", - ) args = parser.parse_args() acq_path = os.path.abspath(args.acquisition_path) @@ -243,22 +237,11 @@ def main(): print(f"ERROR: No images found in {acq_path}") sys.exit(1) - # Infer data_path: acquisition is at data/img/DATE/SAMPLE/ACQ - # so data_path is 4 levels up (img/ is one level) - if args.data_path: - data_path = os.path.abspath(args.data_path) - else: - # Try to find the img/ ancestor - parts = acq_path.split(os.sep) - try: - img_idx = len(parts) - 1 - parts[::-1].index("img") - data_path = os.sep.join(parts[:img_idx]) - except ValueError: - print( - "WARNING: Could not infer data_path from acquisition_path. " - "Using /tmp/benchmark_data" - ) - data_path = "/tmp/benchmark_data" + # SAFETY: Always use an isolated temp directory as the data root. + # The segmenter writes to data_path/objects/, data_path/export/, data_path/clean/. + # We must NEVER point data_path at the real /home/pi/data/ to avoid destroying + # previously segmented results. + data_path = os.path.join("/tmp", "benchmark_data") # Detect branch try: @@ -288,32 +271,20 @@ def main(): for run_num in range(1, args.runs + 1): print(f"\n--- Run {run_num}/{args.runs} ---") - # Prepare clean working copy - work_dir = os.path.join("/tmp", "benchmark_acq") + # Clean the entire temp data root between runs for isolation + if os.path.exists(data_path): + shutil.rmtree(data_path) + + # Prepare clean working copy of acquisition images + # Build the img/ subdirectory structure the segmenter expects + # e.g., /tmp/benchmark_data/img/DATE/SAMPLE/ACQ/ + img_root = os.path.join(data_path, "img") + work_dir = os.path.join(img_root, "benchmark_date", "benchmark_sample", "benchmark_acq") prepare_test_folder(acq_path, work_dir) - # Also clean output directories that the segmenter will write to + # Create the output directories the segmenter needs for subdir in ["objects", "export", "clean"]: - out = os.path.join(data_path, subdir) - if os.path.exists(out): - shutil.rmtree(out) - os.makedirs(out, exist_ok=True) - - # Ensure img path structure exists for the segmenter - img_root = os.path.join(data_path, "img") - os.makedirs(img_root, exist_ok=True) - - # Create symlink so segmenter can find the working copy under img/ - rel_path = os.path.relpath(acq_path, os.path.join(data_path, "img")) - link_dir = os.path.join(img_root, os.path.dirname(rel_path)) - os.makedirs(link_dir, exist_ok=True) - link_path = os.path.join(img_root, rel_path) - if os.path.exists(link_path): - if os.path.islink(link_path): - os.unlink(link_path) - else: - shutil.rmtree(link_path) - os.symlink(work_dir, link_path) + os.makedirs(os.path.join(data_path, subdir), exist_ok=True) try: result = run_segmentation(data_path, work_dir, args.workers) @@ -335,9 +306,7 @@ def main(): traceback.print_exc() all_results.append({"error": str(e)}) finally: - # Cleanup symlink - if os.path.islink(link_path): - os.unlink(link_path) + pass # Summary successful = [r for r in all_results if "error" not in r] From 3594ff22145868b04a7892fa92934b73cca2a656 Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 25 Mar 2026 21:36:26 -0700 Subject: [PATCH 04/11] fix: benchmark initializes process_uuid before calling segment_path --- segmenter/benchmark.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/segmenter/benchmark.py b/segmenter/benchmark.py index 375241067..e98378edb 100644 --- a/segmenter/benchmark.py +++ b/segmenter/benchmark.py @@ -127,6 +127,12 @@ def __init__(self): seg.segmenter_client = MockSegmenterClient() + # Set up state that segment_list() normally initializes before calling segment_path() + from uuid import uuid4 + seg._SegmenterProcess__process_uuid = str(uuid4()) + if seg._SegmenterProcess__process_id == "": + seg._SegmenterProcess__process_id = seg._SegmenterProcess__process_uuid + # Measure rss_before = get_peak_rss_mb() start_time = time.monotonic() From 711d41624bec0b357117f789712e4df59467d6b8 Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 25 Mar 2026 21:45:58 -0700 Subject: [PATCH 05/11] fix: count objects before ecotaxa_export pops them --- segmenter/benchmark.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/segmenter/benchmark.py b/segmenter/benchmark.py index e98378edb..6066fb842 100644 --- a/segmenter/benchmark.py +++ b/segmenter/benchmark.py @@ -137,27 +137,37 @@ def __init__(self): rss_before = get_peak_rss_mb() start_time = time.monotonic() - seg.segment_path(acq_path, ecotaxa_export=True) + # Run without ecotaxa export first so we can count objects before pop() + seg.segment_path(acq_path, ecotaxa_export=False) elapsed = time.monotonic() - start_time rss_after = get_peak_rss_mb() - # Count results + # Count results (must read before ecotaxa_export pops "objects") objects = seg._SegmenterProcess__global_metadata.get("objects", []) object_count = len(objects) + object_names = sorted([obj["name"] for obj in objects]) - # Count output files + # Now run ecotaxa export separately obj_path = seg._SegmenterProcess__working_obj_path + archive_path = seg._SegmenterProcess__archive_fn + import planktoscope.segmenter.ecotaxa + archive_dir = os.path.dirname(archive_path) + os.makedirs(archive_dir, exist_ok=True) + planktoscope.segmenter.ecotaxa.ecotaxa_export( + archive_path, + seg._SegmenterProcess__global_metadata, + obj_path, + keep_files=True, + ) + + # Count output files obj_images = len([f for f in os.listdir(obj_path) if f.endswith(".jpg")]) if os.path.exists(obj_path) else 0 # Check for ecotaxa archive - archive_path = seg._SegmenterProcess__archive_fn archive_exists = os.path.exists(archive_path) archive_size_mb = os.path.getsize(archive_path) / (1024 * 1024) if archive_exists else 0 - # Extract object names for validation - object_names = sorted([obj["name"] for obj in objects]) - return { "elapsed_seconds": round(elapsed, 2), "object_count": object_count, From 440aff70ce8966453d367189f1d361b5bb33e131 Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 22 Apr 2026 11:48:11 -0700 Subject: [PATCH 06/11] Add hard-stop in init --- segmenter/planktoscope/segmenter/__init__.py | 38 ++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/segmenter/planktoscope/segmenter/__init__.py b/segmenter/planktoscope/segmenter/__init__.py index 858230691..c08b06fb1 100644 --- a/segmenter/planktoscope/segmenter/__init__.py +++ b/segmenter/planktoscope/segmenter/__init__.py @@ -59,6 +59,11 @@ ################################################################################ # Main Segmenter class ################################################################################ +class SegmentationInterrupted(Exception): + """Raised when a user requests stop during an active segmentation pipeline.""" + pass + + class SegmenterProcess(multiprocessing.Process): """This class contains the main definitions for the segmenter of the PlanktoScope""" @@ -575,6 +580,20 @@ def __augment_slice(dim_slice, max_dims, size=10): ) return (object_number, len(regionprops), objects_list) + def _check_for_stop(self): + """Check if a stop request arrived via MQTT during the pipeline. + + Returns True if stop was requested, False otherwise. + Consumes the MQTT message if it was a stop request. + """ + if self.segmenter_client.new_message_received(): + peek = self.segmenter_client.msg + if peek and peek.get("payload", {}).get("action") == "stop": + logger.info("Stop requested during active segmentation") + self.segmenter_client.read_message() + return True + return False + def _pipe(self, ecotaxa_export): logger.info("Finding images") images_list = self._find_files(self.__working_path, ("JPG", "jpg", "JPEG", "jpeg")) @@ -629,6 +648,8 @@ def _pipe(self, ecotaxa_export): self._pipe_parallel( images_list, images_count, shm.name, metadata_dir ) + except SegmentationInterrupted: + raise # Do not fall back to sequential on user stop except Exception as e: logger.error( f"Parallel segmentation failed, falling back to sequential: {e}" @@ -735,6 +756,10 @@ async def _run_parallel(): try: result = await future completed += 1 + # Check for stop request between image completions + if self._check_for_stop(): + executor.shutdown(wait=True, cancel_futures=True) + raise SegmentationInterrupted("User requested stop") if "error" in result: errors.append(result) logger.error( @@ -788,6 +813,9 @@ def _pipe_sequential(self, images_list, images_count, metadata_dir): average_time = 0 for i, filename in enumerate(images_list): + # Check for stop request between images + if self._check_for_stop(): + raise SegmentationInterrupted("User requested stop") name = os.path.splitext(filename)[0] # Publish the object_id to via MQTT to Node-RED @@ -916,6 +944,10 @@ def segment_list(self, path_list: list, force=False, ecotaxa_export=True): # forcing, let's gooooo try: self.segment_path(path, ecotaxa_export) + except SegmentationInterrupted: + logger.info(f"User stopped segmentation at {path}") + exception = SegmentationInterrupted("User stopped") + break except Exception as e: logger.error(f"There was an error while segmenting {path}") exception = e @@ -924,6 +956,9 @@ def segment_list(self, path_list: list, force=False, ecotaxa_export=True): if exception is None: # Publish the status "Done" to via MQTT to Node-RED self.segmenter_client.client.publish("status/segmenter", '{"status":"Done"}') + elif isinstance(exception, SegmentationInterrupted): + logger.info("Publishing Interrupted status after user stop") + self.segmenter_client.client.publish("status/segmenter", '{"status":"Interrupted"}') else: self.segmenter_client.client.publish( "status/segmenter", @@ -1031,6 +1066,9 @@ def segment_path(self, path, ecotaxa_export): try: self._pipe(ecotaxa_export) + except SegmentationInterrupted: + logger.info(f"Pipeline interrupted by user for {path}, not marking as done") + raise except Exception as e: logger.exception(f"There was an error in the pipeline {e}") raise e From 28594f46456db38f8b29669101607986e104309e Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 22 Apr 2026 13:32:14 -0700 Subject: [PATCH 07/11] Fixed __init__.py bug for multiple button presses --- segmenter/planktoscope/segmenter/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/segmenter/planktoscope/segmenter/__init__.py b/segmenter/planktoscope/segmenter/__init__.py index c08b06fb1..62ad70b46 100644 --- a/segmenter/planktoscope/segmenter/__init__.py +++ b/segmenter/planktoscope/segmenter/__init__.py @@ -782,6 +782,8 @@ async def _run_parallel(): f"{result['object_count']} objects in " f"{result['duration']:.1f}s" ) + except SegmentationInterrupted: + raise except Exception as e: completed += 1 errors.append({"image_name": filename, "error": str(e)}) From 6e048acc4c0ad31d96fe5b79da5bd9a933e98650 Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 22 Apr 2026 14:48:17 -0700 Subject: [PATCH 08/11] fixed bug in db.js --- lib/db.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/db.js b/lib/db.js index 2a08f26a2..11dde14b0 100644 --- a/lib/db.js +++ b/lib/db.js @@ -88,6 +88,7 @@ async function getAcquisitionFromPath(path) { gallery: getGalleryPath(path), interupted, date: metadata.acq_local_datetime, + process_pixel: metadata.process_pixel ?? null, } return acquisition From 52550050a5aad799da26349a03dad9538d8a08d2 Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 22 Apr 2026 14:55:41 -0700 Subject: [PATCH 09/11] Drain stale messages + reset _interrupt --- segmenter/planktoscope/segmenter/__init__.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/segmenter/planktoscope/segmenter/__init__.py b/segmenter/planktoscope/segmenter/__init__.py index 62ad70b46..45f383cae 100644 --- a/segmenter/planktoscope/segmenter/__init__.py +++ b/segmenter/planktoscope/segmenter/__init__.py @@ -109,6 +109,7 @@ def __init__(self, event, data_path): # https://planktoscope.slack.com/archives/C01V5ENKG0M/p1714146253356569 self.__remove_previous_mask = False self.__worker_count = 3 # default for RPi 5 (4 cores, leave 1 for system) + self._interrupt_requested = False # create all base path for path in [ @@ -583,14 +584,16 @@ def __augment_slice(dim_slice, max_dims, size=10): def _check_for_stop(self): """Check if a stop request arrived via MQTT during the pipeline. - Returns True if stop was requested, False otherwise. - Consumes the MQTT message if it was a stop request. + Idempotent — once True, stays True until segment_list() resets it. """ + if self._interrupt_requested: + return True if self.segmenter_client.new_message_received(): peek = self.segmenter_client.msg if peek and peek.get("payload", {}).get("action") == "stop": logger.info("Stop requested during active segmentation") self.segmenter_client.read_message() + self._interrupt_requested = True return True return False @@ -925,6 +928,12 @@ def segment_list(self, path_list: list, force=False, ecotaxa_export=True): logger.info(f"The pipeline will be run in {len(path_list)} directories") logger.debug(f"Those are {path_list}") + # Drain any stop/garbage messages buffered between runs, then reset the + # interrupt flag so a stale click from a previous run can't kill this one. + while self.segmenter_client.new_message_received(): + self.segmenter_client.read_message() + self._interrupt_requested = False + self.__process_uuid = str(uuid4()) if self.__process_id == "": From 29c28c2db8c27bcc88ba9b55c11e0ecdb45dbeff Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 22 Apr 2026 15:03:24 -0700 Subject: [PATCH 10/11] Fixed regex ro enable per image indexing for progress bars --- segmenter/planktoscope/segmenter/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/segmenter/planktoscope/segmenter/__init__.py b/segmenter/planktoscope/segmenter/__init__.py index 45f383cae..6a8b9d688 100644 --- a/segmenter/planktoscope/segmenter/__init__.py +++ b/segmenter/planktoscope/segmenter/__init__.py @@ -773,8 +773,8 @@ async def _run_parallel(): "status/segmenter", json.dumps( { - "status": f"Segmented image {filename}, " - f"{completed}/{images_count} complete, " + "status": f"Segmenting image {completed}/{images_count}: " + f"{filename}, " f"{result['object_count']} objects in " f"{result['duration']:.1f}s" } From ab0a721d22ba692231b10f0789e0a9d26b8680cf Mon Sep 17 00:00:00 2001 From: Adam Date: Thu, 23 Apr 2026 10:05:58 -0700 Subject: [PATCH 11/11] apply ruff format to satisfy CI fmt-check --- segmenter/benchmark.py | 51 ++++++++++++-------- segmenter/planktoscope/segmenter/__init__.py | 41 +++++----------- segmenter/planktoscope/segmenter/worker.py | 22 +++------ 3 files changed, 50 insertions(+), 64 deletions(-) diff --git a/segmenter/benchmark.py b/segmenter/benchmark.py index 6066fb842..c22c8ee27 100644 --- a/segmenter/benchmark.py +++ b/segmenter/benchmark.py @@ -71,8 +71,7 @@ def prepare_test_folder(source_path, work_dir): for f in os.listdir(source_path): src = os.path.join(source_path, f) if os.path.isfile(src) and ( - f.endswith((".jpg", ".JPG", ".jpeg", ".JPEG")) - or f == "metadata.json" + f.endswith((".jpg", ".JPG", ".jpeg", ".JPEG")) or f == "metadata.json" ): shutil.copy2(src, os.path.join(work_dir, f)) @@ -86,10 +85,7 @@ def prepare_test_folder(source_path, work_dir): def count_images(path): """Count JPG images in a directory.""" - return len([ - f for f in os.listdir(path) - if f.lower().endswith((".jpg", ".jpeg")) - ]) + return len([f for f in os.listdir(path) if f.lower().endswith((".jpg", ".jpeg"))]) def run_segmentation(data_path, acq_path, worker_count=None): @@ -129,6 +125,7 @@ def __init__(self): # Set up state that segment_list() normally initializes before calling segment_path() from uuid import uuid4 + seg._SegmenterProcess__process_uuid = str(uuid4()) if seg._SegmenterProcess__process_id == "": seg._SegmenterProcess__process_id = seg._SegmenterProcess__process_uuid @@ -152,6 +149,7 @@ def __init__(self): obj_path = seg._SegmenterProcess__working_obj_path archive_path = seg._SegmenterProcess__archive_fn import planktoscope.segmenter.ecotaxa + archive_dir = os.path.dirname(archive_path) os.makedirs(archive_dir, exist_ok=True) planktoscope.segmenter.ecotaxa.ecotaxa_export( @@ -162,7 +160,11 @@ def __init__(self): ) # Count output files - obj_images = len([f for f in os.listdir(obj_path) if f.endswith(".jpg")]) if os.path.exists(obj_path) else 0 + obj_images = ( + len([f for f in os.listdir(obj_path) if f.endswith(".jpg")]) + if os.path.exists(obj_path) + else 0 + ) # Check for ecotaxa archive archive_exists = os.path.exists(archive_path) @@ -190,8 +192,7 @@ def validate_against_reference(result, ref_path): # Compare object counts if result["object_count"] != ref["object_count"]: issues.append( - f"Object count mismatch: got {result['object_count']}, " - f"expected {ref['object_count']}" + f"Object count mismatch: got {result['object_count']}, expected {ref['object_count']}" ) # Compare object names (deterministic order check) @@ -201,7 +202,9 @@ def validate_against_reference(result, ref_path): missing = set(ref_names) - set(result_names) extra = set(result_names) - set(ref_names) if missing: - issues.append(f"Missing objects: {sorted(missing)[:10]}{'...' if len(missing) > 10 else ''}") + issues.append( + f"Missing objects: {sorted(missing)[:10]}{'...' if len(missing) > 10 else ''}" + ) if extra: issues.append(f"Extra objects: {sorted(extra)[:10]}{'...' if len(extra) > 10 else ''}") @@ -209,9 +212,7 @@ def validate_against_reference(result, ref_path): def main(): - parser = argparse.ArgumentParser( - description="Benchmark PlanktoScope segmenter performance" - ) + parser = argparse.ArgumentParser(description="Benchmark PlanktoScope segmenter performance") parser.add_argument( "acquisition_path", help="Path to acquisition folder (must contain metadata.json + images)", @@ -262,10 +263,15 @@ def main(): # Detect branch try: import subprocess - branch = subprocess.check_output( - ["git", "rev-parse", "--abbrev-ref", "HEAD"], - stderr=subprocess.DEVNULL, - ).decode().strip() + + branch = ( + subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + stderr=subprocess.DEVNULL, + ) + .decode() + .strip() + ) except Exception: branch = "unknown" @@ -313,12 +319,19 @@ def main(): if result["archive_exists"]: print(f" Archive size: {result['archive_size_mb']} MB") print(f" Peak RSS: {result['peak_rss_mb']} MB") - imgs_per_sec = image_count / result["elapsed_seconds"] if result["elapsed_seconds"] > 0 else 0 - objs_per_sec = result["object_count"] / result["elapsed_seconds"] if result["elapsed_seconds"] > 0 else 0 + imgs_per_sec = ( + image_count / result["elapsed_seconds"] if result["elapsed_seconds"] > 0 else 0 + ) + objs_per_sec = ( + result["object_count"] / result["elapsed_seconds"] + if result["elapsed_seconds"] > 0 + else 0 + ) print(f" Throughput: {imgs_per_sec:.2f} images/s, {objs_per_sec:.1f} objects/s") except Exception as e: print(f" ERROR: {e}") import traceback + traceback.print_exc() all_results.append({"error": str(e)}) finally: diff --git a/segmenter/planktoscope/segmenter/__init__.py b/segmenter/planktoscope/segmenter/__init__.py index 78ba97cfe..af2321961 100644 --- a/segmenter/planktoscope/segmenter/__init__.py +++ b/segmenter/planktoscope/segmenter/__init__.py @@ -61,6 +61,7 @@ ################################################################################ class SegmentationInterrupted(Exception): """Raised when a user requests stop during an active segmentation pipeline.""" + pass @@ -612,9 +613,7 @@ def _pipe(self, ecotaxa_export): logger.debug(f"We found {images_count} images, good luck!") # Calculate initial flat field - self.segmenter_client.client.publish( - "status/segmenter", '{"status":"Calculating flat"}' - ) + self.segmenter_client.client.publish("status/segmenter", '{"status":"Calculating flat"}') if images_count < 10: self._calculate_flat(images_list[0:images_count], images_count, self.__working_path) else: @@ -638,31 +637,22 @@ def _pipe(self, ecotaxa_export): if use_parallel: # Create shared memory for the flat field array import multiprocessing.shared_memory + flat_bytes = self.__flat.nbytes - shm = multiprocessing.shared_memory.SharedMemory( - create=True, size=flat_bytes - ) - flat_shared = np.ndarray( - self.__flat.shape, dtype=self.__flat.dtype, buffer=shm.buf - ) + shm = multiprocessing.shared_memory.SharedMemory(create=True, size=flat_bytes) + flat_shared = np.ndarray(self.__flat.shape, dtype=self.__flat.dtype, buffer=shm.buf) flat_shared[:] = self.__flat[:] try: - self._pipe_parallel( - images_list, images_count, shm.name, metadata_dir - ) + self._pipe_parallel(images_list, images_count, shm.name, metadata_dir) except SegmentationInterrupted: raise # Do not fall back to sequential on user stop except Exception as e: - logger.error( - f"Parallel segmentation failed, falling back to sequential: {e}" - ) + logger.error(f"Parallel segmentation failed, falling back to sequential: {e}") self._pipe_sequential(images_list, images_count, metadata_dir) else: if self.__remove_previous_mask and self.__worker_count > 1: - logger.info( - "remove_previous_mask is enabled — using sequential processing" - ) + logger.info("remove_previous_mask is enabled — using sequential processing") self._pipe_sequential(images_list, images_count, metadata_dir) # Assemble all objects from .jsonl files in image order @@ -680,6 +670,7 @@ def _pipe(self, ecotaxa_export): shm.unlink() # Cleanup temp metadata dir import shutil + shutil.rmtree(metadata_dir, ignore_errors=True) if ecotaxa_export: @@ -726,11 +717,7 @@ async def _run_parallel(): # Build a serializable copy of metadata for workers # (excludes non-serializable items, keeps process_pixel etc.) - worker_metadata = { - k: v - for k, v in self.__global_metadata.items() - if k != "objects" - } + worker_metadata = {k: v for k, v in self.__global_metadata.items() if k != "objects"} futures = [] for i, filename in enumerate(images_list): @@ -765,9 +752,7 @@ async def _run_parallel(): raise SegmentationInterrupted("User requested stop") if "error" in result: errors.append(result) - logger.error( - f"Worker error for {result['image_name']}: {result['error']}" - ) + logger.error(f"Worker error for {result['image_name']}: {result['error']}") else: self.segmenter_client.client.publish( "status/segmenter", @@ -870,9 +855,7 @@ def _pipe_sequential(self, images_list, images_count, metadata_dir): objects_count, _, objects_list = self._slice_image(img, name, mask, total_objects) # Stream objects to disk incrementally - planktoscope.segmenter.streamer.write_image_objects( - metadata_dir, name, objects_list - ) + planktoscope.segmenter.streamer.write_image_objects(metadata_dir, name, objects_list) total_objects += objects_count # Simple heuristic to detect a movement of the flow cell and a change in the resulting flat diff --git a/segmenter/planktoscope/segmenter/worker.py b/segmenter/planktoscope/segmenter/worker.py index a87739545..a345cc164 100644 --- a/segmenter/planktoscope/segmenter/worker.py +++ b/segmenter/planktoscope/segmenter/worker.py @@ -75,9 +75,7 @@ def process_single_image( raise FileNotFoundError(f"Could not read image: {image_filepath}") image = image / _flat_array image[0][0] = [0, 0, 0] - image = skimage.exposure.rescale_intensity( - image, in_range=(0, 1.04), out_range="uint8" - ) + image = skimage.exposure.rescale_intensity(image, in_range=(0, 1.04), out_range="uint8") if save_debug_img: _save_image(image, os.path.join(working_debug_path, "cleaned_image.jpg")) @@ -98,14 +96,11 @@ def process_single_image( ) # 4. Write objects metadata to disk incrementally - planktoscope.segmenter.streamer.write_image_objects( - metadata_dir, image_name, objects - ) + planktoscope.segmenter.streamer.write_image_objects(metadata_dir, image_name, objects) duration = time.monotonic() - start logger.info( - f"Worker {os.getpid()}: {image_name} done — " - f"{object_count} objects in {duration:.1f}s" + f"Worker {os.getpid()}: {image_name} done — {object_count} objects in {duration:.1f}s" ) return { "image_name": image_name, @@ -155,9 +150,7 @@ def _create_mask(img, debug_path: str, save_debug: bool): for i, (name, func) in enumerate(pipeline): mask = func(mask) if save_debug and debug_path: - PIL.Image.fromarray(mask).save( - os.path.join(debug_path, f"mask_{i}_{name}.jpg") - ) + PIL.Image.fromarray(mask).save(os.path.join(debug_path, f"mask_{i}_{name}.jpg")) logger.success("Mask created") return mask @@ -313,8 +306,7 @@ def _slice_image( f"No valid process_pixel calibration — using min ESD of {min_esd_pixels} as pixels" ) logger.debug( - f"Min ESD filter: {min_ESD} µm = {min_esd_pixels:.1f} px " - f"(process_pixel={pixel_size})" + f"Min ESD filter: {min_ESD} µm = {min_esd_pixels:.1f} px (process_pixel={pixel_size})" ) filtered = [r for r in regionprops if r.equivalent_diameter_area >= min_esd_pixels] @@ -336,9 +328,7 @@ def _slice_image( metadata = _extract_metadata_from_regionprop(region, pixel_size_um=pixel_size_um) # Blur metric - metadata["blur_laplacian"] = planktoscope.segmenter.operations.calculate_blur( - obj_image - ) + metadata["blur_laplacian"] = planktoscope.segmenter.operations.calculate_blur(obj_image) # Threshold value (each worker has its own process-local global) threshold_value = planktoscope.segmenter.operations.get_last_threshold_value()