diff --git a/README.md b/README.md index 7f8b7b79..3faa83b3 100644 --- a/README.md +++ b/README.md @@ -88,13 +88,36 @@ $ fpie-gui -s test3_src.jpg -t test3_tgt.jpg -o result.jpg -b cuda -n 10000 We provide a simple GUI for real-time seamless cloning. You need to use your mouse to draw a rectangle on top of the source image, and click a point in target image. After that the result will automatically be generated. In the end, you can press ESC to terminate the program. +### Video and streams + +```bash +$ fpie-video -s src.png -m mask.png -t input.mp4 -o output.mp4 -h1 100 -w1 100 -n 5000 -g max +``` + +`fpie-video` applies the same Poisson blending to each target frame. The target can be a video file, stream URL, or camera index supported by OpenCV/FFmpeg. If the source is a video, frames are consumed one by one; add `--loop-source` to repeat it when the target is longer. + +The same interface is available from Python: + +```python +from fpie.video import BlendOptions, blend_video + +blend_video( + "src.png", + "input.mp4", + "output.mp4", + mask="mask.png", + mask_on_tgt=(100, 100), + options=BlendOptions(backend="numpy", iterations=5000), +) +``` + ### Backend and Solver We have provided 7 backends. Each backend has two solvers: EquSolver and GridSolver. You can find the difference between these two solvers in the next section. For different backend usage, please check out the related documentation [here](https://fpie.readthedocs.io/en/main/backend.html). -For other usage, please run `fpie -h` or `fpie-gui -h` to see the hint. +For other usage, please run `fpie -h`, `fpie-gui -h`, or `fpie-video -h` to see the hint. ## Benchmark Result diff --git a/fpie/__init__.py b/fpie/__init__.py index a556fdd5..8f50a8e2 100644 --- a/fpie/__init__.py +++ b/fpie/__init__.py @@ -1,3 +1,3 @@ """Fast Poisson Image Editing package.""" -__version__ = "0.3.2" +__version__ = "0.3.3" diff --git a/fpie/process.py b/fpie/process.py index bb8ca22b..01d054b1 100644 --- a/fpie/process.py +++ b/fpie/process.py @@ -1,5 +1,6 @@ """Processor abstractions and backend selection for PIE solvers.""" +import atexit import os from abc import ABC, abstractmethod from typing import Any @@ -30,6 +31,8 @@ def _default_cpu_count() -> int: DEFAULT_BACKEND = "numpy" ALL_BACKEND = ["numpy"] MPI: Any | None = None +_MPI_INITIALIZED_BY_FPIE = False +_MPI_FINALIZER_REGISTERED = False try: from fpie import numba_solver @@ -64,6 +67,9 @@ def _default_cpu_count() -> int: core_openmp = None try: + import mpi4py + + mpi4py.rc.initialize = False from mpi4py import MPI as _MPI from fpie import core_mpi # type: ignore @@ -143,6 +149,28 @@ def step(self, iteration: int) -> tuple[np.ndarray, np.ndarray] | None: pass +def _finalize_mpi() -> None: + """Finalize MPI when this module initialized it lazily.""" + assert MPI is not None + if MPI.Is_initialized() and not MPI.Is_finalized(): + MPI.Finalize() + + +def _ensure_mpi_initialized() -> Any: + """Initialize MPI lazily when the MPI backend is explicitly selected.""" + global _MPI_FINALIZER_REGISTERED # noqa: PLW0603 + global _MPI_INITIALIZED_BY_FPIE # noqa: PLW0603 + + assert MPI is not None + if not MPI.Is_initialized(): + MPI.Init_thread() + _MPI_INITIALIZED_BY_FPIE = True + if _MPI_INITIALIZED_BY_FPIE and not _MPI_FINALIZER_REGISTERED: + atexit.register(_finalize_mpi) + _MPI_FINALIZER_REGISTERED = True + return MPI + + class EquProcessor(BaseProcessor): """PIE Jacobi equation processor.""" @@ -167,9 +195,9 @@ def __init__( elif backend == "openmp" and core_openmp is not None: core = core_openmp.EquSolver(n_cpu) elif backend == "mpi" and core_mpi is not None: - assert MPI is not None + mpi = _ensure_mpi_initialized() core = core_mpi.EquSolver(min_interval) - rank = MPI.COMM_WORLD.Get_rank() + rank = mpi.COMM_WORLD.Get_rank() elif backend == "cuda" and core_cuda is not None: core = core_cuda.EquSolver(block_size) elif backend.startswith("taichi") and taichi_solver is not None: @@ -306,9 +334,9 @@ def __init__( elif backend == "openmp" and core_openmp is not None: core = core_openmp.GridSolver(grid_x, grid_y, n_cpu) elif backend == "mpi" and core_mpi is not None: - assert MPI is not None + mpi = _ensure_mpi_initialized() core = core_mpi.GridSolver(min_interval) - rank = MPI.COMM_WORLD.Get_rank() + rank = mpi.COMM_WORLD.Get_rank() elif backend == "cuda" and core_cuda is not None: core = core_cuda.GridSolver(grid_x, grid_y) elif backend.startswith("taichi") and taichi_solver is not None: diff --git a/fpie/video.py b/fpie/video.py new file mode 100644 index 00000000..bd870005 --- /dev/null +++ b/fpie/video.py @@ -0,0 +1,306 @@ +"""Video and stream processing helpers for Poisson image editing.""" + +from __future__ import annotations + +import math +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable + +import cv2 +import numpy as np + +from fpie.io import read_image +from fpie.process import ( + CPU_COUNT, + DEFAULT_BACKEND, + BaseProcessor, + EquProcessor, + GridProcessor, +) + +DEFAULT_FPS = 30.0 + + +@dataclass(frozen=True) +class BlendOptions: + """Configuration shared by image, video, and stream blending.""" + + method: str = "equ" + gradient: str = "max" + backend: str = DEFAULT_BACKEND + iterations: int = 5000 + n_cpu: int = CPU_COUNT + mpi_sync_interval: int = 100 + block_size: int = 1024 + grid_x: int = 8 + grid_y: int = 8 + + +@dataclass(frozen=True) +class VideoBlendResult: + """Summary returned after a video blend completes.""" + + output: str + frame_count: int + fps: float + size: tuple[int, int] + + +class _FrameSource: + def __init__(self, source: str | int, *, loop: bool = False): + self.source = source + self.loop = loop + self.image = None + self.capture = None + + if isinstance(source, str): + self.image = cv2.imread(source) + if self.image is not None: + if self.image.ndim == 2: + self.image = np.stack( + [self.image, self.image, self.image], axis=-1 + ) + elif self.image.ndim == 3 and self.image.shape[-1] == 4: + self.image = self.image[..., :-1] + return + + self.capture = cv2.VideoCapture(_coerce_capture_source(source)) + if not self.capture.isOpened(): + raise FileNotFoundError(f"Failed to open video source: {source}") + + def read(self) -> np.ndarray | None: + if self.image is not None: + return self.image + + assert self.capture is not None + ok, frame = self.capture.read() + if ok: + return frame + if not self.loop: + return None + + self.capture.set(cv2.CAP_PROP_POS_FRAMES, 0) + ok, frame = self.capture.read() + if ok: + return frame + return None + + def release(self) -> None: + if self.capture is not None: + self.capture.release() + + +def create_processor(options: BlendOptions) -> BaseProcessor: + """Create a processor for reusable frame-by-frame blending.""" + if options.backend == "mpi": + raise ValueError( + "Video and stream processing do not support the MPI backend." + ) + + if options.method == "equ": + return EquProcessor( + options.gradient, + options.backend, + options.n_cpu, + options.mpi_sync_interval, + options.block_size, + ) + if options.method == "grid": + return GridProcessor( + options.gradient, + options.backend, + options.n_cpu, + options.mpi_sync_interval, + options.block_size, + options.grid_x, + options.grid_y, + ) + raise ValueError(f"Invalid method: {options.method}") + + +def blend_frame( + src: np.ndarray, + mask: np.ndarray, + tgt: np.ndarray, + *, + mask_on_src: tuple[int, int] = (0, 0), + mask_on_tgt: tuple[int, int] = (0, 0), + options: BlendOptions | None = None, + processor: BaseProcessor | None = None, +) -> np.ndarray: + """Blend one source frame into one target frame.""" + options = options or BlendOptions() + proc = processor or create_processor(options) + proc.reset(src, mask, tgt, mask_on_src, mask_on_tgt) + if options.iterations <= 0: + return tgt.copy() + + result = proc.step(options.iterations) + if result is None: + raise RuntimeError( + "The selected processor did not return a root result." + ) + frame, _err = result + return frame + + +def blend_frames( + src_frames: Iterable[np.ndarray], + mask: np.ndarray, + tgt_frames: Iterable[np.ndarray], + *, + mask_on_src: tuple[int, int] = (0, 0), + mask_on_tgt: tuple[int, int] = (0, 0), + options: BlendOptions | None = None, +) -> Iterable[np.ndarray]: + """Yield blended frames from source and target frame iterables.""" + options = options or BlendOptions() + processor = create_processor(options) + for src, tgt in zip(src_frames, tgt_frames, strict=False): + yield blend_frame( + src, + mask, + tgt, + mask_on_src=mask_on_src, + mask_on_tgt=mask_on_tgt, + options=options, + processor=processor, + ) + + +def blend_video( + source: str | int, + target: str | int, + output: str, + *, + mask: str | np.ndarray | None = None, + mask_on_src: tuple[int, int] = (0, 0), + mask_on_tgt: tuple[int, int] = (0, 0), + options: BlendOptions | None = None, + fps: float | None = None, + fourcc: str | None = None, + max_frames: int | None = None, + loop_source: bool = False, +) -> VideoBlendResult: + """Blend a source image/video into a target video or realtime stream.""" + options = options or BlendOptions() + processor = create_processor(options) + source_frames = _FrameSource(source, loop=loop_source) + target_capture = cv2.VideoCapture(_coerce_capture_source(target)) + if not target_capture.isOpened(): + source_frames.release() + raise FileNotFoundError(f"Failed to open target video source: {target}") + + output_path = Path(output) + if output_path.parent: + output_path.parent.mkdir(parents=True, exist_ok=True) + + target_fps = _resolve_fps(fps, target_capture.get(cv2.CAP_PROP_FPS)) + width = int(target_capture.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(target_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) + first_frame: np.ndarray | None = None + if width <= 0 or height <= 0: + ok, first_frame = target_capture.read() + if not ok: + source_frames.release() + target_capture.release() + raise RuntimeError( + f"No frames available from target source: {target}" + ) + height, width = first_frame.shape[:2] + + writer = cv2.VideoWriter( + output, + cv2.VideoWriter_fourcc(*_pick_fourcc(output, fourcc)), + target_fps, + (width, height), + ) + if not writer.isOpened(): + source_frames.release() + target_capture.release() + raise RuntimeError(f"Failed to open video writer: {output}") + + mask_image = _load_mask(mask) + frame_count = 0 + try: + while max_frames is None or frame_count < max_frames: + if first_frame is not None: + target_frame = first_frame + first_frame = None + else: + ok, target_frame = target_capture.read() + if not ok: + break + source_frame = source_frames.read() + if source_frame is None: + break + + frame_mask = ( + mask_image + if mask_image is not None + else np.zeros(source_frame.shape[:2], dtype=np.uint8) + 255 + ) + blended = blend_frame( + source_frame, + frame_mask, + target_frame, + mask_on_src=mask_on_src, + mask_on_tgt=mask_on_tgt, + options=options, + processor=processor, + ) + writer.write(blended) + frame_count += 1 + finally: + writer.release() + target_capture.release() + source_frames.release() + + return VideoBlendResult( + output=output, + frame_count=frame_count, + fps=float(target_fps), + size=(width, height), + ) + + +def _load_mask(mask: str | np.ndarray | None) -> np.ndarray | None: + if mask is None: + return None + if isinstance(mask, np.ndarray): + return mask + if mask == "": + return None + return read_image(mask) + + +def _coerce_capture_source(source: str | int) -> str | int: + if isinstance(source, int): + return source + if source.isdecimal(): + return int(source) + return source + + +def _resolve_fps(override_fps: float | None, capture_fps: float) -> float: + for candidate in (override_fps, capture_fps, DEFAULT_FPS): + if candidate is None: + continue + try: + value = float(candidate) + except (TypeError, ValueError): + continue + if math.isfinite(value) and value > 0: + return value + return DEFAULT_FPS + + +def _pick_fourcc(output: str, fourcc: str | None) -> str: + if fourcc is not None: + if len(fourcc) != 4: + raise ValueError("fourcc must be exactly 4 characters.") + return fourcc + if Path(output).suffix.lower() == ".mp4": + return "mp4v" + return "MJPG" diff --git a/fpie/video_cli.py b/fpie/video_cli.py new file mode 100644 index 00000000..4e619053 --- /dev/null +++ b/fpie/video_cli.py @@ -0,0 +1,201 @@ +"""CLI entrypoint for video and stream Poisson image editing.""" + +import argparse + +import fpie +from fpie.process import ALL_BACKEND, CPU_COUNT, DEFAULT_BACKEND +from fpie.video import BlendOptions, blend_video + + +def get_args() -> argparse.Namespace: + """Parse video command-line arguments.""" + video_backends = [backend for backend in ALL_BACKEND if backend != "mpi"] + + parser = argparse.ArgumentParser() + parser.add_argument( + "-v", "--version", action="store_true", help="show the version and exit" + ) + parser.add_argument( + "--check-backend", + action="store_true", + help="print all available video backends", + ) + parser.add_argument( + "-b", + "--backend", + type=str, + choices=video_backends, + default=DEFAULT_BACKEND if DEFAULT_BACKEND != "mpi" else "numpy", + help="backend choice", + ) + parser.add_argument( + "-c", + "--cpu", + type=int, + default=CPU_COUNT, + help="number of CPU used", + ) + parser.add_argument( + "-z", + "--block-size", + type=int, + default=1024, + help="cuda block size (only for equ solver)", + ) + parser.add_argument( + "--method", + type=str, + choices=["equ", "grid"], + default="equ", + help="how to parallelize computation", + ) + parser.add_argument( + "-s", + "--source", + help="source image/video filename, stream URL, or camera index", + ) + parser.add_argument( + "-m", + "--mask", + default="", + help="mask image filename (default is to use the whole source frame)", + ) + parser.add_argument( + "-t", + "--target", + help="target video filename, stream URL, or camera index", + ) + parser.add_argument("-o", "--output", help="output video filename") + parser.add_argument( + "-h0", + type=int, + help="mask position (height) on source frame", + default=0, + ) + parser.add_argument( + "-w0", + type=int, + help="mask position (width) on source frame", + default=0, + ) + parser.add_argument( + "-h1", + type=int, + help="mask position (height) on target frame", + default=0, + ) + parser.add_argument( + "-w1", + type=int, + help="mask position (width) on target frame", + default=0, + ) + parser.add_argument( + "-g", + "--gradient", + type=str, + choices=["max", "src", "avg"], + default="max", + help="how to calculate gradient for PIE", + ) + parser.add_argument( + "-n", + type=int, + help="how many iterations to run per frame", + default=5000, + ) + parser.add_argument( + "--fps", + type=float, + default=None, + help="override output FPS (default uses target FPS or 30)", + ) + parser.add_argument( + "--fourcc", + type=str, + default=None, + help="four-character video codec, e.g. mp4v or MJPG", + ) + parser.add_argument( + "--max-frames", + type=int, + default=0, + help="stop after this many frames (0 means no limit)", + ) + parser.add_argument( + "--loop-source", + action="store_true", + help="loop source video if it ends before the target stream", + ) + parser.add_argument( + "--mpi-sync-interval", + type=int, + help="MPI sync iteration interval", + default=100, + ) + parser.add_argument( + "--grid-x", type=int, help="x axis stride for grid solver", default=8 + ) + parser.add_argument( + "--grid-y", type=int, help="y axis stride for grid solver", default=8 + ) + + args = parser.parse_args() + if args.version: + print(fpie.__version__) + raise SystemExit(0) + if args.check_backend: + print(video_backends) + raise SystemExit(0) + missing = [ + option + for option, value in ( + ("-s/--source", args.source), + ("-t/--target", args.target), + ("-o/--output", args.output), + ) + if not value + ] + if missing: + parser.error( + f"the following arguments are required: {', '.join(missing)}" + ) + return args + + +def main() -> None: + """Run the video command-line application.""" + args = get_args() + options = BlendOptions( + method=args.method, + gradient=args.gradient, + backend=args.backend, + iterations=args.n, + n_cpu=args.cpu, + mpi_sync_interval=args.mpi_sync_interval, + block_size=args.block_size, + grid_x=args.grid_x, + grid_y=args.grid_y, + ) + result = blend_video( + args.source, + args.target, + args.output, + mask=args.mask, + mask_on_src=(args.h0, args.w0), + mask_on_tgt=(args.h1, args.w1), + options=options, + fps=args.fps, + fourcc=args.fourcc, + max_frames=args.max_frames or None, + loop_source=args.loop_source, + ) + print( + f"Successfully wrote {result.frame_count} frames " + f"({result.size[0]}x{result.size[1]} @ {result.fps:.2f} FPS) " + f"to {result.output}" + ) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 304590b8..0c5f01ec 100644 --- a/setup.py +++ b/setup.py @@ -120,7 +120,11 @@ def get_description(): python_requires=">=3.10,<3.14", packages=find_packages(exclude=["tests", "tests.*"]), entry_points={ - "console_scripts": ["fpie=fpie.cli:main", "fpie-gui=fpie.gui:main"], + "console_scripts": [ + "fpie=fpie.cli:main", + "fpie-gui=fpie.gui:main", + "fpie-video=fpie.video_cli:main", + ], }, install_requires=[ "cmake>=3.5", diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 5fc9cf19..2b7eae14 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -2,8 +2,11 @@ import subprocess import sys +import tempfile import unittest +from pathlib import Path +import cv2 import numpy as np from fpie.process import ( @@ -11,6 +14,7 @@ EquProcessor, GridProcessor, ) +from fpie.video import BlendOptions, _resolve_fps, blend_frame, blend_video class SmokeTest(unittest.TestCase): @@ -85,6 +89,87 @@ def test_cli_check_backend(self) -> None: self.assertIn("numpy", result.stdout) + def test_video_cli_check_backend(self) -> None: + """Verify the video CLI can report available backends.""" + result = subprocess.run( + [ + sys.executable, + "-c", + ( + "import sys; " + "from fpie.video_cli import main; " + "sys.argv = ['fpie-video', '--check-backend']; " + "main()" + ), + ], + check=True, + capture_output=True, + text=True, + ) + + self.assertIn("numpy", result.stdout) + + def test_blend_frame_numpy_backend(self) -> None: + """Verify the public frame interface blends one target frame.""" + out = blend_frame( + self.src, + self.mask, + self.tgt, + options=BlendOptions(backend="numpy", iterations=2), + ) + + self.assertEqual(out.shape, self.tgt.shape) + self.assertEqual(out.dtype, np.uint8) + + def test_video_fps_falls_back_for_invalid_capture_values(self) -> None: + """Video FPS selection should ignore non-finite capture values.""" + self.assertEqual(_resolve_fps(None, float("nan")), 30.0) + self.assertEqual(_resolve_fps(None, 0.0), 30.0) + self.assertEqual(_resolve_fps(12.0, float("nan")), 12.0) + + def test_blend_video_numpy_backend(self) -> None: + """Verify the video interface writes a blended output stream.""" + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + src_path = tmp_path / "src.png" + mask_path = tmp_path / "mask.png" + target_path = tmp_path / "target.avi" + output_path = tmp_path / "out.avi" + + cv2.imwrite(str(src_path), self.src) + cv2.imwrite(str(mask_path), self.mask) + writer = cv2.VideoWriter( + str(target_path), + cv2.VideoWriter_fourcc(*"MJPG"), + 5.0, + (self.tgt.shape[1], self.tgt.shape[0]), + ) + self.assertTrue(writer.isOpened()) + writer.write(self.tgt) + writer.write(self.tgt + 1) + writer.release() + + result = blend_video( + str(src_path), + str(target_path), + str(output_path), + mask=str(mask_path), + options=BlendOptions(backend="numpy", iterations=2), + fps=5.0, + fourcc="MJPG", + ) + + self.assertEqual(result.frame_count, 2) + self.assertTrue(output_path.exists()) + + capture = cv2.VideoCapture(str(output_path)) + self.assertTrue(capture.isOpened()) + self.assertEqual(int(capture.get(cv2.CAP_PROP_FRAME_COUNT)), 2) + ok, frame = capture.read() + capture.release() + self.assertTrue(ok) + self.assertEqual(frame.shape[:2], self.tgt.shape[:2]) + if __name__ == "__main__": unittest.main()