Skip to content

High res captures and changing stream resolution #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 15, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 93 additions & 55 deletions src/labthings_picamera2/thing.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class RawImageModel(BaseModel):
stride: int
format: str


class PicameraControl(PropertyDescriptor):
def __init__(
self, control_name: str, model: type = float, description: Optional[str] = None
Expand Down Expand Up @@ -91,7 +92,9 @@ def __init__(self, stream: MJPEGStream, portal: BlockingPortal):
self.stream = stream
self.portal = portal

def outputframe(self, frame, _keyframe=True, _timestamp=None, _packet=None, _audio=False):
def outputframe(
self, frame, _keyframe=True, _timestamp=None, _packet=None, _audio=False
):
"""Add a frame to the stream's ringbuffer"""
self.stream.add_frame(frame, self.portal)

Expand Down Expand Up @@ -126,7 +129,9 @@ class ImageProcessingInputs(BaseModel):
colour_gains: tuple[float, float]
white_norm_lores: NDArray
raw_size: tuple[int, int]
colour_correction_matrix: tuple[float, float, float, float, float, float, float, float, float]
colour_correction_matrix: tuple[
float, float, float, float, float, float, float, float, float
]
gamma: NDArray


Expand All @@ -137,7 +142,7 @@ class ImageProcessingCache:
ccm: np.ndarray


class BlobNumpyDict(BlobBytes):
class BlobNumpyDict(BlobBytes):
def __init__(self, arrays: Mapping[str, np.ndarray]):
self._arrays = arrays
self._bytesio: Optional[io.BytesIO] = None
Expand All @@ -148,7 +153,7 @@ def arrays(self) -> Mapping[str, np.ndarray]:
return self._arrays

@property
def _bytes(self) -> bytes: #noqa mypy: override
def _bytes(self) -> bytes: # noqa mypy: override
"""Generate binary content on-the-fly from numpy data"""
if not self._bytesio:
out = io.BytesIO()
Expand All @@ -164,18 +169,14 @@ class NumpyBlob(Blob):
def from_arrays(cls, arrays: Mapping[str, np.ndarray]) -> Self:
return cls.model_construct( # type: ignore[return-value]
href="blob://local",
_data=BlobNumpyDict(
arrays,
media_type=cls.default_media_type()
),
_data=BlobNumpyDict(arrays, media_type=cls.default_media_type()),
)



def raw2rggb(raw: np.ndarray, size: tuple[int, int]) -> np.ndarray:
"""Convert packed 10 bit raw to RGGB 8 bit"""
raw = np.asarray(raw) # ensure it's an array
output_shape = (size[1]//2, size[0]//2, 4)
output_shape = (size[1] // 2, size[0] // 2, 4)
rggb = np.empty(output_shape, dtype=np.uint8)
raw_w = rggb.shape[1] // 2 * 5
for plane, offset in enumerate([(1, 1), (0, 1), (1, 0), (0, 0)]):
Expand Down Expand Up @@ -447,12 +448,27 @@ def __exit__(self, exc_type, exc_value, traceback):
cam.close()
del self._picamera

def start_streaming(self) -> None:
@thing_action
def start_streaming(
self, main_resolution: tuple[int, int] = (820, 616), buffer_count: int = 6
) -> None:
"""
Start the MJPEG stream

Sets the camera resolution to the video/stream resolution, and starts recording
if the stream should be active.
Sets the camera resolutions based on input parameters, and sets the low-res
resolution to (320, 240). Note: (320, 240) is a standard from the Pi Camera
manual.

Create two streams:
- `lores_mjpeg_stream` for autofocus at low-res resolution
- `mjpeg_stream` for preview. This is the `main_resolution` if this is less
than (1280, 960), or the low-res resolution if above. This allows for
high resolution capture without streaming high resolution video.

main_resolution: the resolution for the main configuration. Defaults to
(820, 616), 1/4 sensor size.
buffer_count: the number of frames to hold in the buffer. Higher uses more memory,
lower may cause dropped frames. Defaults to 6.
"""
with self.picamera() as picam:
# TODO: Filip: can we use the lores output to keep preview stream going
Expand All @@ -462,19 +478,23 @@ def start_streaming(self) -> None:
picam.stop()
picam.stop_encoder() # make sure there are no other encoders going
stream_config = picam.create_video_configuration(
main={"size": self.stream_resolution},
main={"size": main_resolution},
lores={"size": (320, 240), "format": "YUV420"},
sensor=self.thing_settings.get("sensor_mode", None),
controls=self.persistent_controls,
)
# Set buffer count - can't be negative
stream_config["buffer_count"] = buffer_count
picam.configure(stream_config)
logging.info("Starting picamera MJPEG stream...")
stream_name = "lores" if main_resolution[0] > 1280 else "main"
picam.start_recording(
MJPEGEncoder(self.mjpeg_bitrate),
PicameraStreamOutput(
self.mjpeg_stream,
get_blocking_portal(self),
),
name=stream_name,
)
picam.start_encoder(
MJPEGEncoder(100000000),
Expand All @@ -493,6 +513,7 @@ def start_streaming(self) -> None:
"Started MJPEG stream at %s on port %s", self.stream_resolution, 1
)

@thing_action
def stop_streaming(self, stop_web_stream=True) -> None:
"""
Stop the MJPEG stream
Expand Down Expand Up @@ -524,10 +545,28 @@ def snap_image(self) -> ArrayModel:
"""
return self.capture_array()

@thing_action
def capture_image(
self,
stream_name: Literal["main", "lores", "raw", "full"] = "main",
wait: Optional[float] = 0.9,
):
"""Acquire one image from the camera.

Return it as a PIL Image

stream_name: (Optional) The PiCamera2 stream to use, should be one of ["main", "lores", "raw", "full"]. Default = "main"
wait: (Optional, float) Set a timeout in seconds.
A TimeoutError is raised if this time is exceeded during capture.
Default = 0.9s, lower than the 1s timeout default in picamera yaml settings
"""
with self.picamera() as cam:
return cam.capture_image(stream_name, wait=wait)

@thing_action
def capture_array(
self,
stream_name: Literal["main", "lores", "raw"] = "main",
stream_name: Literal["main", "lores", "raw", "full"] = "main",
wait: Optional[float] = 0.9,
) -> ArrayModel:
"""Acquire one image from the camera and return as an array
Expand All @@ -536,27 +575,34 @@ def capture_array(
It's likely to be highly inefficient - raw and/or uncompressed captures using
binary image formats will be added in due course.

stream_name: (Optional) The PiCamera2 stream to use, should be one of ["main", "lores", "raw"]. Default = "main"
stream_name: (Optional) The PiCamera2 stream to use, should be one of ["main", "lores", "raw", "full"]. Default = "main"
wait: (Optional, float) Set a timeout in seconds.
A TimeoutError is raised if this time is exceeded during capture.
Default = 0.9s, lower than the 1s timeout default in picamera yaml settings
"""

# This was slower than capture_image for our use case, but directly returning
# an image as an array is still a useful feature
if stream_name == "full":
with self.picamera(pause_stream=True) as picam2:
capture_config = picam2.create_still_configuration()
return picam2.switch_mode_and_capture_array(capture_config, wait=wait)
with self.picamera() as cam:
return cam.capture_array(stream_name, wait = wait)
return cam.capture_array(stream_name, wait=wait)

@thing_action
def capture_raw(
self,
states_getter: GetThingStates,
get_states: bool=True,
get_processing_inputs: bool=True,
get_states: bool = True,
get_processing_inputs: bool = True,
wait: Optional[float] = 0.9,
) -> RawImageModel:
"""Capture a raw image

This function is intended to be as fast as possible, and will return
as soon as an image has been captured. The output format is not intended
to be useful, except as input to `raw_to_png`.
to be useful, except as input to `raw_to_png`.

wait: (Optional, float) Set a timeout in seconds.
A TimeoutError is raised if this time is exceeded during capture.
Expand All @@ -567,18 +613,22 @@ def capture_raw(
transferring it over the network.
"""
with self.picamera() as cam:
(buffer, ), parameters = cam.capture_buffers(["raw"], wait=wait)
(buffer,), parameters = cam.capture_buffers(["raw"], wait=wait)
configuration = cam.camera_configuration()
return RawImageModel(
image_data = RawBlob.from_bytes(buffer.tobytes()),
thing_states = states_getter() if get_states else None,
metadata = { "parameters": parameters, "sensor": configuration["sensor"], "tuning": self.tuning },
processing_inputs = (
image_data=RawBlob.from_bytes(buffer.tobytes()),
thing_states=states_getter() if get_states else None,
metadata={
"parameters": parameters,
"sensor": configuration["sensor"],
"tuning": self.tuning,
},
processing_inputs=(
self.image_processing_inputs if get_processing_inputs else None
),
size = configuration["raw"]["size"],
format = configuration["raw"]["format"],
stride = configuration["raw"]["stride"],
size=configuration["raw"]["size"],
format=configuration["raw"]["format"],
stride=configuration["raw"]["stride"],
)

@thing_property
Expand Down Expand Up @@ -616,32 +666,32 @@ def generate_image_processing_cache(
p: ImageProcessingInputs,
) -> ImageProcessingCache:
"""Prepare to process raw images

This is a static method to ensure its outputs depend only on its
inputs."""
zoom_factors = [
i / 2 / n for i, n in zip(p.raw_size[::-1], p.white_norm_lores.shape[:2])
] + [1]
white_norm = zoom(p.white_norm_lores, zoom_factors, order=1)[
: (p.raw_size[1]//2), : (p.raw_size[0]//2), :
: (p.raw_size[1] // 2), : (p.raw_size[0] // 2), :
]
ccm = np.array(p.colour_correction_matrix).reshape((3,3))
ccm = np.array(p.colour_correction_matrix).reshape((3, 3))
gamma = interp1d(p.gamma[:, 0] / 255, p.gamma[:, 1] / 255)
return ImageProcessingCache(
white_norm=white_norm,
ccm = ccm,
gamma = gamma,
ccm=ccm,
gamma=gamma,
)

_image_processing_cache: ImageProcessingCache | None = None

@thing_action
def prepare_image_normalisation(
self,
inputs: ImageProcessingInputs | None = None
self, inputs: ImageProcessingInputs | None = None
) -> ImageProcessingInputs:
"""The parameters used to convert raw image data into processed images
NB this method uses only information from `inputs` or

NB this method uses only information from `inputs` or
`self.image_processing_inputs`, to ensure repeatability
"""
p = inputs or self.image_processing_inputs
Expand All @@ -653,7 +703,7 @@ def process_raw_array(
self,
raw: RawImageModel,
use_cache: bool = False,
)->NDArray:
) -> NDArray:
"""Convert a raw image to a processed array"""
if not use_cache:
if raw.processing_inputs is None:
Expand All @@ -662,26 +712,22 @@ def process_raw_array(
"and we are not using the cache. This may be solved by "
"capturing with `get_processing_inputs=True`."
)
self.prepare_image_normalisation(
raw.processing_inputs
)
self.prepare_image_normalisation(raw.processing_inputs)
p = self._image_processing_cache
assert p is not None
assert raw.format == "SBGGR10_CSI2P"
buffer = np.frombuffer(raw.image_data.content, dtype=np.uint8)
packed = buffer.reshape((-1, raw.stride))
rgb = rggb2rgb(raw2rggb(packed, raw.size))
normed = rgb / p.white_norm
corrected = np.dot(
p.ccm, normed.reshape((-1, 3)).T
).T.reshape(normed.shape)
corrected = np.dot(p.ccm, normed.reshape((-1, 3)).T).T.reshape(normed.shape)
corrected[corrected < 0] = 0
corrected[corrected > 255] = 255
processed_image = p.gamma(corrected)
return processed_image.astype(np.uint8)

@thing_action
def raw_to_png(self, raw: RawImageModel, use_cache: bool = False)->PNGBlob:
def raw_to_png(self, raw: RawImageModel, use_cache: bool = False) -> PNGBlob:
"""Process a raw image to a PNG"""
arr = self.process_raw_array(raw=raw, use_cache=use_cache)
image = Image.fromarray(arr.astype(np.uint8), mode="RGB")
Expand Down Expand Up @@ -748,6 +794,7 @@ def capture_jpeg(
logging.info("Reconfiguring camera for full resolution capture")
cam.configure(cam.create_still_configuration())
cam.start()
cam.options["quality"] = 95
logging.info("capturing")
cam.capture_file(path, name="main", format="jpeg", wait=wait)
logging.info("done")
Expand Down Expand Up @@ -796,15 +843,6 @@ def grab_jpeg_size(
)
return portal.call(stream.next_frame_size)

# @thing_action
# def capture_to_scan(
# self,
# scan_manager: ScanManager,
# format: Literal["jpeg"] = "jpeg",
# ) -> None:
# with scan_manager.new_jpeg() as output, self.picamera() as cam:
# cam.capture_file(output, format="jpeg")

@thing_property
def exposure(self) -> float:
"""An alias for `exposure_time` to fit the micromanager API"""
Expand Down