From 4150913a2bf1bdfdb1ca06d138eb3cbb31ec1ddc Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 4 Feb 2026 09:32:23 +0000 Subject: [PATCH 01/18] give `probe_liquid_heights` orchestrator full access to inner detection methods important for usage in... 1.- LLD (based on these inner detection methods rather than the default aspirate) 2.- auto-liquid level following --- .../backends/hamilton/STAR_backend.py | 146 +++++++++++++----- 1 file changed, 108 insertions(+), 38 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 42a63aa6cb6..9a61c44f09a 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1781,6 +1781,33 @@ async def probe_liquid_heights( search_speed: float = 10.0, n_replicates: int = 1, move_to_z_safety_after: bool = True, + # Shared detection parameters + channel_acceleration: float = 800.0, + post_detection_trajectory: Literal[0, 1] = 1, + post_detection_dist: float = 0.0, + # cLLD-specific parameters (used when lld_mode=GAMMA) + detection_edge: int = 10, + detection_drop: int = 2, + # pLLD-specific parameters (used when lld_mode=PRESSURE) + channel_speed_above_start_pos_search: float = 120.0, + z_drive_current_limit: int = 3, + tip_has_filter: bool = False, + dispense_drive_speed: float = 5.0, + dispense_drive_acceleration: float = 0.2, + dispense_drive_max_speed: float = 14.5, + dispense_drive_current_limit: int = 3, + plld_detection_edge: int = 30, + plld_detection_drop: int = 10, + clld_verification: bool = False, + clld_detection_edge: int = 10, + clld_detection_drop: int = 2, + max_delta_plld_clld: float = 5.0, + plld_mode: Optional[PressureLLDMode] = None, # defaults to PressureLLDMode.LIQUID + plld_foam_detection_drop: int = 30, + plld_foam_detection_edge_tolerance: int = 30, + plld_foam_ad_values: int = 30, + plld_foam_search_speed: float = 10.0, + dispense_back_plld_volume: Optional[float] = None, ) -> List[float]: """Probe liquid surface heights in containers using liquid level detection. @@ -1791,11 +1818,38 @@ async def probe_liquid_heights( Args: containers: List of Container objects to probe, one per channel. use_channels: Channel indices to use for probing (0-indexed). - resource_offsets: Optional XYZ offsets from container centers. Auto-calculated for single containers with odd channel counts to avoid center dividers. Defaults to container centers. - lld_mode: Detection mode - LLDMode(1) for capacitive, LLDMode(2) for pressure-based. Defaults to capacitive. + resource_offsets: Optional XYZ offsets from container centers. Auto-calculated for single + containers with odd channel counts to avoid center dividers. Defaults to container centers. + lld_mode: Detection mode - LLDMode(1) for capacitive, LLDMode(2) for pressure-based. + Defaults to capacitive. search_speed: Z-axis search speed in mm/s. Default 10.0 mm/s. n_replicates: Number of measurements per channel. Default 1. - move_to_z_safety_after: Whether to move channels to safe Z height after probing. Default True. + move_to_z_safety_after: Whether to move channels to safe Z height after probing. + Default True. + channel_acceleration: Search acceleration in mm/s^2. Default 800.0. + post_detection_trajectory: Post-detection move mode (0 or 1). Default 1. + post_detection_dist: Distance in mm to move up after detection. Default 0.0. + detection_edge: cLLD edge steepness threshold (0-1023). Default 10. + detection_drop: cLLD offset after edge detection (0-1023). Default 2. + channel_speed_above_start_pos_search: pLLD speed above search start in mm/s. Default 120.0. + z_drive_current_limit: pLLD Z-drive current limit. Default 3. + tip_has_filter: Whether tip has a filter. Default False. + dispense_drive_speed: pLLD dispense drive speed in mm/s. Default 5.0. + dispense_drive_acceleration: pLLD dispense drive acceleration in mm/s^2. Default 0.2. + dispense_drive_max_speed: pLLD dispense drive max speed in mm/s. Default 14.5. + dispense_drive_current_limit: pLLD dispense drive current limit. Default 3. + plld_detection_edge: pLLD edge detection threshold. Default 30. + plld_detection_drop: pLLD detection drop. Default 10. + clld_verification: Enable cLLD verification in pLLD mode. Default False. + clld_detection_edge: cLLD verification edge threshold. Default 10. + clld_detection_drop: cLLD verification drop. Default 2. + max_delta_plld_clld: Max allowed delta between pLLD and cLLD in mm. Default 5.0. + plld_mode: Pressure LLD mode. Defaults to PressureLLDMode.LIQUID for pLLD. + plld_foam_detection_drop: Foam detection drop. Default 30. + plld_foam_detection_edge_tolerance: Foam detection edge tolerance. Default 30. + plld_foam_ad_values: Foam AD values. Default 30. + plld_foam_search_speed: Foam search speed in mm/s. Default 10.0. + dispense_back_plld_volume: Volume to dispense back after pLLD in uL. Default None. Returns: Mean of measured liquid heights for each container (mm from cavity bottom). @@ -1890,43 +1944,59 @@ async def probe_liquid_heights( for container, tip_len in zip(containers, tip_lengths) ] + if lld_mode == self.LLDMode.GAMMA: + detect_func = self._move_z_drive_to_liquid_surface_using_clld + extra_kwargs: dict = { + "channel_acceleration": channel_acceleration, + "detection_edge": detection_edge, + "detection_drop": detection_drop, + "post_detection_trajectory": post_detection_trajectory, + "post_detection_dist": post_detection_dist, + } + else: + detect_func = self._search_for_surface_using_plld + extra_kwargs = { + "channel_acceleration": channel_acceleration, + "channel_speed_above_start_pos_search": channel_speed_above_start_pos_search, + "z_drive_current_limit": z_drive_current_limit, + "tip_has_filter": tip_has_filter, + "dispense_drive_speed": dispense_drive_speed, + "dispense_drive_acceleration": dispense_drive_acceleration, + "dispense_drive_max_speed": dispense_drive_max_speed, + "dispense_drive_current_limit": dispense_drive_current_limit, + "plld_detection_edge": plld_detection_edge, + "plld_detection_drop": plld_detection_drop, + "clld_verification": clld_verification, + "clld_detection_edge": clld_detection_edge, + "clld_detection_drop": clld_detection_drop, + "max_delta_plld_clld": max_delta_plld_clld, + "plld_mode": plld_mode if plld_mode is not None else self.PressureLLDMode.LIQUID, + "plld_foam_detection_drop": plld_foam_detection_drop, + "plld_foam_detection_edge_tolerance": plld_foam_detection_edge_tolerance, + "plld_foam_ad_values": plld_foam_ad_values, + "plld_foam_search_speed": plld_foam_search_speed, + "dispense_back_plld_volume": dispense_back_plld_volume, + "post_detection_trajectory": post_detection_trajectory, + "post_detection_dist": post_detection_dist, + } + try: for _ in range(n_replicates): - if lld_mode == self.LLDMode.GAMMA: - results = await asyncio.gather( - *[ - self._move_z_drive_to_liquid_surface_using_clld( - channel_idx=channel, - lowest_immers_pos=lip, - start_pos_search=sps, - channel_speed=search_speed, - ) - for channel, lip, sps in zip( - use_channels, lowest_immers_positions, start_pos_searches - ) - ], - return_exceptions=True, - ) - - else: - results = await asyncio.gather( - *[ - self._search_for_surface_using_plld( - channel_idx=channel, - lowest_immers_pos=lip, - start_pos_search=sps, - channel_speed=search_speed, - dispense_drive_speed=5.0, - plld_mode=self.PressureLLDMode.LIQUID, - clld_verification=False, - post_detection_dist=0.0, - ) - for channel, lip, sps in zip( - use_channels, lowest_immers_positions, start_pos_searches - ) - ], - return_exceptions=True, - ) + results = await asyncio.gather( + *[ + detect_func( + channel_idx=channel, + lowest_immers_pos=lip, + start_pos_search=sps, + channel_speed=search_speed, + **extra_kwargs, + ) + for channel, lip, sps in zip( + use_channels, lowest_immers_positions, start_pos_searches + ) + ], + return_exceptions=True, + ) # Get heights for ALL channels, handling failures for channels with no liquid # (indexed 0 to self.num_channels-1) but only store for used channels From af92efffbea7cb865ab4661eb93d3fb765454270 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 4 Feb 2026 12:51:25 +0000 Subject: [PATCH 02/18] smart behaviour: x-grouping, y-batching, looping --- .../backends/hamilton/STAR_backend.py | 218 ++++++++++++------ 1 file changed, 146 insertions(+), 72 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 9a61c44f09a..7c43c7396e1 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1781,6 +1781,10 @@ async def probe_liquid_heights( search_speed: float = 10.0, n_replicates: int = 1, move_to_z_safety_after: bool = True, + # Traverse height parameters (None = full Z safety, float = absolute Z position in mm) + min_traverse_height_at_beginning_of_command: Optional[float] = None, + min_traverse_height_during_command: Optional[float] = None, + z_position_at_end_of_command: Optional[float] = None, # Shared detection parameters channel_acceleration: float = 800.0, post_detection_trajectory: Literal[0, 1] = 1, @@ -1826,6 +1830,12 @@ async def probe_liquid_heights( n_replicates: Number of measurements per channel. Default 1. move_to_z_safety_after: Whether to move channels to safe Z height after probing. Default True. + min_traverse_height_at_beginning_of_command: Absolute Z height (mm) to move involved + channels to before the first batch. None (default) uses full Z safety. + min_traverse_height_during_command: Absolute Z height (mm) to move involved channels to + between batches (X groups and Y sub-batches). None (default) uses full Z safety. + z_position_at_end_of_command: Absolute Z height (mm) to move involved channels to after + probing (only used when move_to_z_safety_after is True). None (default) uses full Z safety. channel_acceleration: Search acceleration in mm/s^2. Default 800.0. post_detection_trajectory: Post-detection move mode (0 or 1). Default 1. post_detection_dist: Distance in mm to move up after detection. Default 0.0. @@ -1856,11 +1866,10 @@ async def probe_liquid_heights( Raises: RuntimeError: If channels lack tips. - NotImplementedError: If channels require different X positions. Notes: - All specified channels must have tips attached - - All channels must be at the same X position (single-row operation) + - Containers at different X positions are probed in sequential groups (single X carriage) - For single containers with odd channel counts, Y-offsets are applied to avoid center dividers (Hamilton 1000 uL spacing: 9mm, offset: 5.5mm) """ @@ -1902,48 +1911,30 @@ async def probe_liquid_heights( tip_lengths = [await self.request_tip_len_on_channel(channel_idx=idx) for idx in use_channels] - # Move channels to safe Z height before starting + # Move all channels to Z safety first (including uninvolved channels), then optionally + # lower only the involved channels to the requested traverse height. await self.move_all_channels_in_z_safety() + if min_traverse_height_at_beginning_of_command is not None: + await self.position_channels_in_z_direction( + {ch: min_traverse_height_at_beginning_of_command for ch in use_channels} + ) - # Check if all channels are on the same x position, then move there + # Compute X and Y positions for all containers x_pos = [ resource.get_location_wrt(self.deck, x="c", y="c", z="b").x + offset.x for resource, offset in zip(containers, resource_offsets) ] - if len(set(x_pos)) > 1: # TODO: implement - raise NotImplementedError( - "probe_liquid_heights is not yet supported for multiple x positions." - ) - await self.move_channel_x(0, x_pos[0]) - - # Move channels to their y positions y_pos = [ resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + offset.y for resource, offset in zip(containers, resource_offsets) ] - await self.position_channels_in_y_direction( - {channel: y for channel, y in zip(use_channels, y_pos)} - ) - # Detect liquid heights - absolute_heights_measurements: Dict[int, List[Optional[float]]] = { - ch: [] for ch in use_channels - } - - lowest_immers_positions = [ - container.get_absolute_location("c", "c", "cavity_bottom").z - + tip_len - - self.DEFAULT_TIP_FITTING_DEPTH - for container, tip_len in zip(containers, tip_lengths) - ] - start_pos_searches = [ - container.get_absolute_location("c", "c", "t").z - + tip_len - - self.DEFAULT_TIP_FITTING_DEPTH - + 5 - for container, tip_len in zip(containers, tip_lengths) - ] + # Group indices by unique X position (preserving order of first appearance) + x_groups: Dict[float, List[int]] = {} + for i, x in enumerate(x_pos): + x_groups.setdefault(x, []).append(i) + # Precompute detection function and kwargs (mode doesn't change between groups) if lld_mode == self.LLDMode.GAMMA: detect_func = self._move_z_drive_to_liquid_surface_using_clld extra_kwargs: dict = { @@ -1980,49 +1971,127 @@ async def probe_liquid_heights( "post_detection_dist": post_detection_dist, } + # Detect liquid heights, iterating over X groups sequentially (single X carriage) + absolute_heights_measurements: Dict[int, List[Optional[float]]] = { + ch: [] for ch in use_channels + } + try: - for _ in range(n_replicates): - results = await asyncio.gather( - *[ - detect_func( - channel_idx=channel, - lowest_immers_pos=lip, - start_pos_search=sps, - channel_speed=search_speed, - **extra_kwargs, + is_first_x_group = True + for x, indices in x_groups.items(): + # Raise channels before moving X carriage (tips may be lowered from previous group) + if not is_first_x_group: + if min_traverse_height_during_command is None: + await self.move_all_channels_in_z_safety() + else: + prev_channels = [use_channels[i] for i in prev_indices] + await self.position_channels_in_z_direction( + {ch: min_traverse_height_during_command for ch in prev_channels} ) - for channel, lip, sps in zip( - use_channels, lowest_immers_positions, start_pos_searches + await self.move_channel_x(0, x) + + # Within this X group, partition into Y sub-batches of channels that can be + # positioned simultaneously. Channels must be in descending Y order by channel + # index, with at least _channel_minimum_y_spacing between consecutive channels. + # Sort by channel index ascending, then greedily assign to compatible batches. + sorted_entries = sorted(indices, key=lambda i: use_channels[i]) + y_batches: List[List[int]] = [] # each batch is a list of indices into original arrays + for idx in sorted_entries: + ch = use_channels[idx] + y = y_pos[idx] + placed = False + for batch in y_batches: + last_idx = batch[-1] + last_ch = use_channels[last_idx] + last_y = y_pos[last_idx] + # Channel index increases, so Y must decrease by at least + # (channel gap) * minimum_spacing to leave room for intermediate channels. + if last_y - y >= (ch - last_ch) * self._channel_minimum_y_spacing: + batch.append(idx) + placed = True + break + if not placed: + y_batches.append([idx]) + + for y_batch_idx, y_batch in enumerate(y_batches): + batch_channels = [use_channels[i] for i in y_batch] + batch_containers = [containers[i] for i in y_batch] + batch_tip_lengths = [tip_lengths[i] for i in y_batch] + + # Raise channels before Y repositioning (skip first batch in each X group — + # already safe from the X-group-level raise or initial raise) + if y_batch_idx > 0: + if min_traverse_height_during_command is None: + await self.move_all_channels_in_z_safety() + else: + prev_batch_channels = [use_channels[i] for i in y_batches[y_batch_idx - 1]] + await self.position_channels_in_z_direction( + {ch: min_traverse_height_during_command for ch in prev_batch_channels} + ) + + # Position the batch's channels in Y + await self.position_channels_in_y_direction( + {use_channels[i]: y_pos[i] for i in y_batch} + ) + + # Compute Z search bounds for this batch + batch_lowest_immers = [ + container.get_absolute_location("c", "c", "cavity_bottom").z + + tip_len + - self.DEFAULT_TIP_FITTING_DEPTH + for container, tip_len in zip(batch_containers, batch_tip_lengths) + ] + batch_start_pos = [ + container.get_absolute_location("c", "c", "t").z + + tip_len + - self.DEFAULT_TIP_FITTING_DEPTH + + 5 + for container, tip_len in zip(batch_containers, batch_tip_lengths) + ] + + # Run n_replicates detection loop for this batch + for _ in range(n_replicates): + results = await asyncio.gather( + *[ + detect_func( + channel_idx=channel, + lowest_immers_pos=lip, + start_pos_search=sps, + channel_speed=search_speed, + **extra_kwargs, + ) + for channel, lip, sps in zip( + batch_channels, batch_lowest_immers, batch_start_pos + ) + ], + return_exceptions=True, ) - ], - return_exceptions=True, - ) - # Get heights for ALL channels, handling failures for channels with no liquid - # (indexed 0 to self.num_channels-1) but only store for used channels - current_absolute_liquid_heights = await self.request_pip_height_last_lld() - for idx, (ch_idx, result) in enumerate(zip(use_channels, results)): - if isinstance(result, STARFirmwareError): - # Check if it's specifically the "no liquid found" error - error_msg = str(result).lower() - if "no liquid level found" in error_msg or "no liquid was present" in error_msg: - height = None # No liquid detected - this is expected - msg = ( - f"Channel {ch_idx}: No liquid detected. Could be because there is " - f"no liquid in container {containers[idx].name} or liquid level is too low." - ) - if lld_mode == self.LLDMode.GAMMA: - msg += " Consider using pressure-based LLD if liquid is believed to exist." - logger.warning(msg) - else: - # Some other firmware error - re-raise it - raise result - elif isinstance(result, Exception): - # Some other unexpected error - re-raise it - raise result - else: - height = current_absolute_liquid_heights[ch_idx] - absolute_heights_measurements[ch_idx].append(height) + # Get heights for ALL channels, handling failures for channels with no liquid + current_absolute_liquid_heights = await self.request_pip_height_last_lld() + for local_idx, (ch_idx, result) in enumerate(zip(batch_channels, results)): + orig_idx = y_batch[local_idx] + if isinstance(result, STARFirmwareError): + error_msg = str(result).lower() + if "no liquid level found" in error_msg or "no liquid was present" in error_msg: + height = None + msg = ( + f"Channel {ch_idx}: No liquid detected. Could be because there is " + f"no liquid in container {containers[orig_idx].name} or liquid level " + f"is too low." + ) + if lld_mode == self.LLDMode.GAMMA: + msg += " Consider using pressure-based LLD if liquid is believed to exist." + logger.warning(msg) + else: + raise result + elif isinstance(result, Exception): + raise result + else: + height = current_absolute_liquid_heights[ch_idx] + absolute_heights_measurements[ch_idx].append(height) + prev_indices = y_batches[-1] # last Y batch's indices, for Z raise on next X group + is_first_x_group = False except: await self.move_all_channels_in_z_safety() raise @@ -2053,7 +2122,12 @@ async def probe_liquid_heights( ) if move_to_z_safety_after: - await self.move_all_channels_in_z_safety() + if z_position_at_end_of_command is None: + await self.move_all_channels_in_z_safety() + else: + await self.position_channels_in_z_direction( + {ch: z_position_at_end_of_command for ch in use_channels} + ) return relative_to_well From 4f2cc61962666f919d97bcdfda44cfa5f6c872e5 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 4 Feb 2026 16:30:15 +0000 Subject: [PATCH 03/18] test y positioning of non-adjacent subsets of channels --- .../backends/hamilton/STAR_backend.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 7c43c7396e1..c0b9d9e2fcc 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -2029,10 +2029,19 @@ async def probe_liquid_heights( {ch: min_traverse_height_during_command for ch in prev_batch_channels} ) - # Position the batch's channels in Y - await self.position_channels_in_y_direction( - {use_channels[i]: y_pos[i] for i in y_batch} - ) + # Position the batch's channels in Y, including any intermediate channels + # (channels between batch members that aren't part of this batch) to ensure + # they don't violate the descending-order / minimum-spacing constraint. + y_positions: Dict[int, float] = {use_channels[i]: y_pos[i] for i in y_batch} + sorted_batch_chs = sorted(batch_channels) + for k in range(len(sorted_batch_chs) - 1): + ch_lo, ch_hi = sorted_batch_chs[k], sorted_batch_chs[k + 1] + for intermediate_ch in range(ch_lo + 1, ch_hi): + if intermediate_ch not in y_positions: + y_positions[intermediate_ch] = ( + y_positions[ch_lo] - (intermediate_ch - ch_lo) * self._channel_minimum_y_spacing + ) + await self.position_channels_in_y_direction(y_positions) # Compute Z search bounds for this batch batch_lowest_immers = [ From 5cb32e58c03f5724ac2b85c1e95bac7c01b1edc4 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 4 Feb 2026 18:30:45 +0000 Subject: [PATCH 04/18] dictate multiple channels - one container behaviour --- .../backends/hamilton/STAR_backend.py | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index c0b9d9e2fcc..8732b785e90 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -59,6 +59,8 @@ SingleChannelDispense, ) from pylabrobot.liquid_handling.utils import ( + MIN_SPACING_BETWEEN_CHANNELS, + MIN_SPACING_EDGE, get_tight_single_resource_liquid_op_offsets, get_wide_single_resource_liquid_op_offsets, ) @@ -1880,17 +1882,22 @@ async def probe_liquid_heights( # Handle tip positioning ... if SINGLE container instance if resource_offsets is None: if len(set(containers)) == 1: - resource_offsets = get_wide_single_resource_liquid_op_offsets( - resource=containers[0], num_channels=len(containers) - ) + container_size_y = containers[0].get_absolute_size_y() + min_required = MIN_SPACING_EDGE * 2 + (len(containers) - 1) * MIN_SPACING_BETWEEN_CHANNELS + if container_size_y >= min_required: + resource_offsets = get_wide_single_resource_liquid_op_offsets( + resource=containers[0], num_channels=len(containers) + ) - if len(use_channels) % 2 != 0: - # Hamilton 1000 uL channels are 9 mm apart, so offset by half the distance - # + extra for the potential central 'splash guard' - y_offset = 5.5 - resource_offsets = [ - resource_offsets[i] + Coordinate(0, y_offset, 0) for i in range(len(use_channels)) - ] + if len(use_channels) % 2 != 0: + # Hamilton 1000 uL channels are 9 mm apart, so offset by half the distance + # + extra for the potential central 'splash guard' + y_offset = 5.5 + resource_offsets = [ + resource_offsets[i] + Coordinate(0, y_offset, 0) for i in range(len(use_channels)) + ] + # else: container too small to fit all channels — fall back to center offsets. + # Y sub-batching will serialize channels that can't coexist. resource_offsets = resource_offsets or [Coordinate.zero()] * len(containers) From 379b33d08e5698560912275affb9cef82fbd944a Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 4 Feb 2026 19:01:16 +0000 Subject: [PATCH 05/18] docstring update --- pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 8732b785e90..47993b2a431 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -2176,8 +2176,7 @@ async def probe_liquid_volumes( Volumes in each container (uL). Raises: - ValueError: If any container doesn't support height-to-volume conversion (raised by probe_liquid_heights). - NotImplementedError: If channels require different X positions. + ValueError: If any container doesn't support height-to-volume conversion. Notes: - Delegates all motion, LLD, validation, and safety logic to probe_liquid_heights From 7db57b9b97c057e5b5b0a59ca1f624acd52aeae2 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 4 Feb 2026 19:09:51 +0000 Subject: [PATCH 06/18] add tolerance during x grouping & check use_channels for duplicates --- .../backends/hamilton/STAR_backend.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 47993b2a431..dfc64a5a466 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1905,6 +1905,9 @@ async def probe_liquid_heights( if lld_mode not in {self.LLDMode.GAMMA, self.LLDMode.PRESSURE}: raise ValueError(f"LLDMode must be 1 (capacitive) or 2 (pressure-based), is {lld_mode}") + if len(use_channels) != len(set(use_channels)): + raise ValueError("use_channels must not contain duplicates.") + if not len(containers) == len(use_channels) == len(resource_offsets): raise ValueError( "Length of containers, use_channels, resource_offsets and tip_lengths must match." @@ -1936,10 +1939,12 @@ async def probe_liquid_heights( for resource, offset in zip(containers, resource_offsets) ] - # Group indices by unique X position (preserving order of first appearance) + # Group indices by unique X position (preserving order of first appearance). + # Round to 0.1mm to avoid floating point splitting of same-position containers. x_groups: Dict[float, List[int]] = {} for i, x in enumerate(x_pos): - x_groups.setdefault(x, []).append(i) + x_rounded = round(x, 1) + x_groups.setdefault(x_rounded, []).append(i) # Precompute detection function and kwargs (mode doesn't change between groups) if lld_mode == self.LLDMode.GAMMA: @@ -1985,7 +1990,10 @@ async def probe_liquid_heights( try: is_first_x_group = True - for x, indices in x_groups.items(): + for _, indices in x_groups.items(): + # Use the actual (non-rounded) X position of the first container in this group + group_x = x_pos[indices[0]] + # Raise channels before moving X carriage (tips may be lowered from previous group) if not is_first_x_group: if min_traverse_height_during_command is None: @@ -1995,7 +2003,7 @@ async def probe_liquid_heights( await self.position_channels_in_z_direction( {ch: min_traverse_height_during_command for ch in prev_channels} ) - await self.move_channel_x(0, x) + await self.move_channel_x(0, group_x) # Within this X group, partition into Y sub-batches of channels that can be # positioned simultaneously. Channels must be in descending Y order by channel @@ -2076,9 +2084,7 @@ async def probe_liquid_heights( channel_speed=search_speed, **extra_kwargs, ) - for channel, lip, sps in zip( - batch_channels, batch_lowest_immers, batch_start_pos - ) + for channel, lip, sps in zip(batch_channels, batch_lowest_immers, batch_start_pos) ], return_exceptions=True, ) From 0f66cf772a83300d618c608624357be6f835185d Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 4 Feb 2026 19:22:22 +0000 Subject: [PATCH 07/18] parameterize `allow_duplicate_channels` useful for pooling (?) --- .../liquid_handling/backends/hamilton/STAR_backend.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index dfc64a5a466..949d4b4cf9b 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1783,6 +1783,7 @@ async def probe_liquid_heights( search_speed: float = 10.0, n_replicates: int = 1, move_to_z_safety_after: bool = True, + allow_duplicate_channels: bool = False, # Traverse height parameters (None = full Z safety, float = absolute Z position in mm) min_traverse_height_at_beginning_of_command: Optional[float] = None, min_traverse_height_during_command: Optional[float] = None, @@ -1832,6 +1833,8 @@ async def probe_liquid_heights( n_replicates: Number of measurements per channel. Default 1. move_to_z_safety_after: Whether to move channels to safe Z height after probing. Default True. + allow_duplicate_channels: Whether to allow the same channel index to appear multiple times + in use_channels. Default False. min_traverse_height_at_beginning_of_command: Absolute Z height (mm) to move involved channels to before the first batch. None (default) uses full Z safety. min_traverse_height_during_command: Absolute Z height (mm) to move involved channels to @@ -1905,8 +1908,10 @@ async def probe_liquid_heights( if lld_mode not in {self.LLDMode.GAMMA, self.LLDMode.PRESSURE}: raise ValueError(f"LLDMode must be 1 (capacitive) or 2 (pressure-based), is {lld_mode}") - if len(use_channels) != len(set(use_channels)): - raise ValueError("use_channels must not contain duplicates.") + if not allow_duplicate_channels and len(use_channels) != len(set(use_channels)): + raise ValueError( + "use_channels must not contain duplicates. Set allow_duplicate_channels=True to override." + ) if not len(containers) == len(use_channels) == len(resource_offsets): raise ValueError( @@ -2639,6 +2644,7 @@ async def aspirate( use_channels=use_channels, resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, + allow_duplicate_channels=True, ) # override minimum traversal height because we don't want to move channels up. we are already above the liquid. @@ -3001,6 +3007,7 @@ async def dispense( use_channels=use_channels, resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, + allow_duplicate_channels=True, ) # override minimum traversal height because we don't want to move channels up. we are already above the liquid. From 4de464d753309463fe685782cda2517cede94e8b Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Wed, 4 Feb 2026 19:56:44 +0000 Subject: [PATCH 08/18] contemplate reaction to "phantom channels" i.e. channels not used in between those that are used in spread behaviour across large --- .../backends/hamilton/STAR_backend.py | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 949d4b4cf9b..541199c5229 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1886,19 +1886,24 @@ async def probe_liquid_heights( if resource_offsets is None: if len(set(containers)) == 1: container_size_y = containers[0].get_absolute_size_y() - min_required = MIN_SPACING_EDGE * 2 + (len(containers) - 1) * MIN_SPACING_BETWEEN_CHANNELS + # For non-consecutive channels (e.g. [0,1,2,5,6,7]), we must account for + # phantom intermediate channels (3,4) that physically exist between them. + # Compute offsets for the full channel range (min to max), then pick only + # the offsets corresponding to the actual channels being used. + num_channels_in_span = max(use_channels) - min(use_channels) + 1 + min_required = ( + MIN_SPACING_EDGE * 2 + (num_channels_in_span - 1) * MIN_SPACING_BETWEEN_CHANNELS + ) if container_size_y >= min_required: - resource_offsets = get_wide_single_resource_liquid_op_offsets( - resource=containers[0], num_channels=len(containers) + all_offsets = get_wide_single_resource_liquid_op_offsets( + containers[0], num_channels_in_span ) + min_ch = min(use_channels) + resource_offsets = [all_offsets[ch - min_ch] for ch in use_channels] - if len(use_channels) % 2 != 0: - # Hamilton 1000 uL channels are 9 mm apart, so offset by half the distance - # + extra for the potential central 'splash guard' + if num_channels_in_span % 2 != 0: y_offset = 5.5 - resource_offsets = [ - resource_offsets[i] + Coordinate(0, y_offset, 0) for i in range(len(use_channels)) - ] + resource_offsets = [offset + Coordinate(0, y_offset, 0) for offset in resource_offsets] # else: container too small to fit all channels — fall back to center offsets. # Y sub-batching will serialize channels that can't coexist. From 77f3c23626500aaf25413795238ec5d5a703fc22 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 5 Feb 2026 00:21:57 +0000 Subject: [PATCH 09/18] implement Copilot suggestions --- .../liquid_handling/backends/hamilton/STAR_backend.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 541199c5229..517babb6f50 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1881,6 +1881,13 @@ async def probe_liquid_heights( if use_channels is None: use_channels = list(range(len(containers))) + if len(use_channels) == 0: + raise ValueError("use_channels must not be empty.") + if not all(0 <= ch < self.num_channels for ch in use_channels): + raise ValueError( + f"All use_channels must be integers in range [0, {self.num_channels - 1}], " + f"got {use_channels}." + ) # Handle tip positioning ... if SINGLE container instance if resource_offsets is None: @@ -1920,7 +1927,7 @@ async def probe_liquid_heights( if not len(containers) == len(use_channels) == len(resource_offsets): raise ValueError( - "Length of containers, use_channels, resource_offsets and tip_lengths must match." + "Length of containers, use_channels, and resource_offsets must match." f"are {len(containers)}, {len(use_channels)}, {len(resource_offsets)}." ) @@ -1957,6 +1964,7 @@ async def probe_liquid_heights( x_groups.setdefault(x_rounded, []).append(i) # Precompute detection function and kwargs (mode doesn't change between groups) + detect_func: Callable[..., Any] if lld_mode == self.LLDMode.GAMMA: detect_func = self._move_z_drive_to_liquid_surface_using_clld extra_kwargs: dict = { @@ -2000,6 +2008,7 @@ async def probe_liquid_heights( try: is_first_x_group = True + prev_indices: Optional[List[int]] = None for _, indices in x_groups.items(): # Use the actual (non-rounded) X position of the first container in this group group_x = x_pos[indices[0]] From 85286f87dcda1188d41f0667d20f3c47e2ca97d5 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 5 Feb 2026 00:27:23 +0000 Subject: [PATCH 10/18] fix type checking --- pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 517babb6f50..513d85a7095 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -2015,6 +2015,7 @@ async def probe_liquid_heights( # Raise channels before moving X carriage (tips may be lowered from previous group) if not is_first_x_group: + assert prev_indices is not None if min_traverse_height_during_command is None: await self.move_all_channels_in_z_safety() else: From c31b66916b1c9e1949681f2e3637cf3e799359b7 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 5 Feb 2026 19:08:19 +0000 Subject: [PATCH 11/18] remove MIN_SPACING_BETWEEN_CHANNELS & simplify extra_kwargs --- .../backends/hamilton/STAR_backend.py | 32 +++++++++++-------- pylabrobot/liquid_handling/utils.py | 9 +++--- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 513d85a7095..a6d0515ca17 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -59,7 +59,6 @@ SingleChannelDispense, ) from pylabrobot.liquid_handling.utils import ( - MIN_SPACING_BETWEEN_CHANNELS, MIN_SPACING_EDGE, get_tight_single_resource_liquid_op_offsets, get_wide_single_resource_liquid_op_offsets, @@ -1782,6 +1781,7 @@ async def probe_liquid_heights( lld_mode: LLDMode = LLDMode.GAMMA, search_speed: float = 10.0, n_replicates: int = 1, + move_to_z_safety_before: bool = True, move_to_z_safety_after: bool = True, allow_duplicate_channels: bool = False, # Traverse height parameters (None = full Z safety, float = absolute Z position in mm) @@ -1899,11 +1899,13 @@ async def probe_liquid_heights( # the offsets corresponding to the actual channels being used. num_channels_in_span = max(use_channels) - min(use_channels) + 1 min_required = ( - MIN_SPACING_EDGE * 2 + (num_channels_in_span - 1) * MIN_SPACING_BETWEEN_CHANNELS + MIN_SPACING_EDGE * 2 + (num_channels_in_span - 1) * self._channel_minimum_y_spacing ) if container_size_y >= min_required: all_offsets = get_wide_single_resource_liquid_op_offsets( - containers[0], num_channels_in_span + resource=containers[0], + num_channels=num_channels_in_span, + min_spacing=self._channel_minimum_y_spacing, ) min_ch = min(use_channels) resource_offsets = [all_offsets[ch - min_ch] for ch in use_channels] @@ -1922,7 +1924,8 @@ async def probe_liquid_heights( if not allow_duplicate_channels and len(use_channels) != len(set(use_channels)): raise ValueError( - "use_channels must not contain duplicates. Set allow_duplicate_channels=True to override." + "use_channels must not contain duplicates. " + "Set `allow_duplicate_channels=True` to override." ) if not len(containers) == len(use_channels) == len(resource_offsets): @@ -1938,9 +1941,11 @@ async def probe_liquid_heights( tip_lengths = [await self.request_tip_len_on_channel(channel_idx=idx) for idx in use_channels] - # Move all channels to Z safety first (including uninvolved channels), then optionally - # lower only the involved channels to the requested traverse height. - await self.move_all_channels_in_z_safety() + # Default: move all channels to Z safety first (including uninvolved channels), + # be conservative on safety but allow repeated calls with minimial "channel jumping" + if move_to_z_safety_before: + await self.move_all_channels_in_z_safety() + # Optional: lower only the involved channels to the requested traverse height if min_traverse_height_at_beginning_of_command is not None: await self.position_channels_in_z_direction( {ch: min_traverse_height_at_beginning_of_command for ch in use_channels} @@ -1968,16 +1973,12 @@ async def probe_liquid_heights( if lld_mode == self.LLDMode.GAMMA: detect_func = self._move_z_drive_to_liquid_surface_using_clld extra_kwargs: dict = { - "channel_acceleration": channel_acceleration, "detection_edge": detection_edge, "detection_drop": detection_drop, - "post_detection_trajectory": post_detection_trajectory, - "post_detection_dist": post_detection_dist, } else: detect_func = self._search_for_surface_using_plld extra_kwargs = { - "channel_acceleration": channel_acceleration, "channel_speed_above_start_pos_search": channel_speed_above_start_pos_search, "z_drive_current_limit": z_drive_current_limit, "tip_has_filter": tip_has_filter, @@ -1997,8 +1998,6 @@ async def probe_liquid_heights( "plld_foam_ad_values": plld_foam_ad_values, "plld_foam_search_speed": plld_foam_search_speed, "dispense_back_plld_volume": dispense_back_plld_volume, - "post_detection_trajectory": post_detection_trajectory, - "post_detection_dist": post_detection_dist, } # Detect liquid heights, iterating over X groups sequentially (single X carriage) @@ -2102,6 +2101,9 @@ async def probe_liquid_heights( lowest_immers_pos=lip, start_pos_search=sps, channel_speed=search_speed, + channel_acceleration=channel_acceleration, + post_detection_trajectory=post_detection_trajectory, + post_detection_dist=post_detection_dist, **extra_kwargs, ) for channel, lip, sps in zip(batch_channels, batch_lowest_immers, batch_start_pos) @@ -11288,7 +11290,9 @@ async def pierce_foil( if spread == "wide": offsets = get_wide_single_resource_liquid_op_offsets( - well, num_channels=len(piercing_channels) + resource=well, + num_channels=len(piercing_channels), + min_spacing=self._channel_minimum_y_spacing, ) else: offsets = get_tight_single_resource_liquid_op_offsets( diff --git a/pylabrobot/liquid_handling/utils.py b/pylabrobot/liquid_handling/utils.py index 0437be31037..43997c75b9d 100644 --- a/pylabrobot/liquid_handling/utils.py +++ b/pylabrobot/liquid_handling/utils.py @@ -3,9 +3,9 @@ from pylabrobot.resources.coordinate import Coordinate from pylabrobot.resources.resource import Resource -MIN_SPACING_BETWEEN_CHANNELS = 9 +MIN_SPACING_BETWEEN_CHANNELS = 9.0 # minimum spacing between the edge of the container and the center of channel -MIN_SPACING_EDGE = 1 +MIN_SPACING_EDGE = 1.0 def _get_centers_with_margin(dim_size: float, n: int, margin: float, min_spacing: float): @@ -19,8 +19,7 @@ def _get_centers_with_margin(dim_size: float, n: int, margin: float, min_spacing def get_wide_single_resource_liquid_op_offsets( - resource: Resource, - num_channels: int, + resource: Resource, num_channels: int, min_spacing: float = MIN_SPACING_BETWEEN_CHANNELS ) -> List[Coordinate]: resource_size = resource.get_absolute_size_y() centers = list( @@ -29,7 +28,7 @@ def get_wide_single_resource_liquid_op_offsets( dim_size=resource_size, n=num_channels, margin=MIN_SPACING_EDGE, - min_spacing=MIN_SPACING_BETWEEN_CHANNELS, + min_spacing=min_spacing, ) ) ) # reverse because channels are from back to front From 5bfcb35a2453110b0b5ecf3f3a04aeb6d03e378c Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 5 Feb 2026 19:48:04 +0000 Subject: [PATCH 12/18] create `STARChatterboxBackend. probe_liquid_heights()` --- .../backends/hamilton/STAR_backend.py | 2 +- .../backends/hamilton/STAR_chatterbox.py | 69 ++++++++++++++++++- 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index a6d0515ca17..9cb7afccd76 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1942,7 +1942,7 @@ async def probe_liquid_heights( tip_lengths = [await self.request_tip_len_on_channel(channel_idx=idx) for idx in use_channels] # Default: move all channels to Z safety first (including uninvolved channels), - # be conservative on safety but allow repeated calls with minimial "channel jumping" + # be conservative on safety but allow repeated calls with minimal "channel jumping" if move_to_z_safety_before: await self.move_all_channels_in_z_safety() # Optional: lower only the involved channels to the requested traverse height diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py index ade7567e232..cbe71223eac 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py @@ -1,16 +1,25 @@ import datetime from contextlib import asynccontextmanager -from typing import List, Literal, Optional, Union +from typing import Dict, List, Literal, Optional, Union from pylabrobot.liquid_handling.backends import LiquidHandlerBackend -from pylabrobot.liquid_handling.backends.hamilton.STAR_backend import Head96Information, STARBackend +from pylabrobot.liquid_handling.backends.hamilton.STAR_backend import ( + Head96Information, + STARBackend, +) +from pylabrobot.resources.container import Container +from pylabrobot.resources.coordinate import Coordinate from pylabrobot.resources.well import Well class STARChatterboxBackend(STARBackend): """Chatterbox backend for 'STAR'""" - def __init__(self, num_channels: int = 8, core96_head_installed: bool = True): + def __init__( + self, + num_channels: int = 8, + core96_head_installed: bool = True, + ): """Initialize a chatter box backend. Args: @@ -208,6 +217,15 @@ async def channel_dispensing_drive_request_position( async def move_channel_y(self, channel: int, y: float): print(f"moving channel {channel} to y: {y}") + async def move_channel_x(self, channel: int, x: float): + print(f"moving channel {channel} to x: {x}") + + async def move_all_channels_in_z_safety(self): + print("moving all channels to z safety") + + async def position_channels_in_z_direction(self, zs: Dict[int, float]): + print(f"positioning channels in z: {zs}") + # # # # # # # # 1_000 uL Channel: Complex Commands # # # # # # # # async def step_off_foil( @@ -276,3 +294,48 @@ async def slow_iswap(self, wrist_velocity: int = 20_000, gripper_velocity: int = finally: messages.append("end slow iswap") print(" | ".join(messages)) + + # # # # # # # # Liquid Level Detection (LLD) # # # # # # # # + + async def probe_liquid_heights( + self, + containers: List[Container], + use_channels: Optional[List[int]] = None, + resource_offsets: Optional[List[Coordinate]] = None, + **kwargs, + ) -> List[float]: + """Probe liquid heights by computing from tracked container volumes. + + Instead of simulating hardware LLD, this mock computes liquid heights directly from + each container's volume tracker using `container.compute_height_from_volume()`. + + Args: + containers: List of Container objects to probe. + use_channels: Channel indices (unused in mock, but validated for tip presence). + resource_offsets: Optional offsets (unused in mock). + **kwargs: Additional LLD parameters (unused in mock). + + Returns: + Liquid heights in mm from cavity bottom for each container, computed from tracked volumes. + + Raises: + NotImplementedError: If a container doesn't support compute_height_from_volume. + """ + if use_channels is None: + use_channels = list(range(len(containers))) + + # Validate tip presence using tip tracker + for ch in use_channels: + self.head[ch].get_tip() # Raises NoTipError if no tip + + heights: List[float] = [] + for container in containers: + volume = container.tracker.get_used_volume() + if volume == 0: + heights.append(0.0) + else: + height = container.compute_height_from_volume(volume) + heights.append(height) + + print(f"probe_liquid_heights: {[f'{h:.2f}' for h in heights]} mm") + return heights From 6020a63abcbd2e20ce28b51903fad00312d36368 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 5 Feb 2026 19:50:13 +0000 Subject: [PATCH 13/18] create `STARChatterboxBackend. request_tip_len_on_channel()` using the trackers :) --- .../backends/hamilton/STAR_chatterbox.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py index cbe71223eac..0e5c89f78cc 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py @@ -297,6 +297,21 @@ async def slow_iswap(self, wrist_velocity: int = 20_000, gripper_velocity: int = # # # # # # # # Liquid Level Detection (LLD) # # # # # # # # + async def request_tip_len_on_channel(self, channel_idx: int) -> float: + """Return tip length from the tip tracker. + + Args: + channel_idx: Index of the pipetting channel (0-indexed). + + Returns: + The tip length in mm from the tip tracker. + + Raises: + NoTipError: If no tip is present on the channel (via tip tracker). + """ + tip = self.head[channel_idx].get_tip() + return tip.total_tip_length + async def probe_liquid_heights( self, containers: List[Container], From 3a06a18518c25dd736898af8615a2ca6309dc6cb Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 5 Feb 2026 21:08:12 +0000 Subject: [PATCH 14/18] improve y-batching algorithm important: thorough in-line documentation to counter complexity --- .../backends/hamilton/STAR_backend.py | 73 +++++++++++++------ 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 9cb7afccd76..483f264b0fe 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -2024,28 +2024,59 @@ async def probe_liquid_heights( ) await self.move_channel_x(0, group_x) - # Within this X group, partition into Y sub-batches of channels that can be - # positioned simultaneously. Channels must be in descending Y order by channel - # index, with at least _channel_minimum_y_spacing between consecutive channels. - # Sort by channel index ascending, then greedily assign to compatible batches. - sorted_entries = sorted(indices, key=lambda i: use_channels[i]) - y_batches: List[List[int]] = [] # each batch is a list of indices into original arrays - for idx in sorted_entries: - ch = use_channels[idx] - y = y_pos[idx] - placed = False - for batch in y_batches: - last_idx = batch[-1] - last_ch = use_channels[last_idx] - last_y = y_pos[last_idx] - # Channel index increases, so Y must decrease by at least - # (channel gap) * minimum_spacing to leave room for intermediate channels. - if last_y - y >= (ch - last_ch) * self._channel_minimum_y_spacing: - batch.append(idx) - placed = True + # ────────────────────────────────────────────────────────────────────────── + # INTERVAL PARTITIONING: Pack channels into minimum parallel batches + # ────────────────────────────────────────────────────────────────────────── + # + # PHYSICAL CONSTRAINT: + # Channels are mounted on a single Y-carriage with fixed minimum spacing. + # Channel 0 sits at the back (high Y), channel 7 at the front (low Y). + # Two channels can only probe together if their target Y positions respect + # this physical ordering with sufficient gaps for any channels between them. + # + # Example: channels 2 and 5 probing together need at least 3× min_spacing + # between their Y positions (room for phantom channels 3, 4). + # + # MATHEMATICAL SIMPLIFICATION: + # Raw constraint: y[i] - y[j] >= (j - i) × min_spacing (for i < j) + # + # Define: normalized_y = y + channel_index × min_spacing + # This "shifts" each channel's Y by its index, collapsing the constraint to: + # normalized_y[i] >= normalized_y[j] (for i < j in same batch) + # + # Now we simply need each batch to have decreasing normalized_y values. + # + # ALGORITHM (First-Fit Decreasing, optimal for interval partitioning): + # Process channels by index. For each, find the first batch it fits into + # (where batch's last normalized_y >= channel's normalized_y), or create new. + # This greedy approach is provably optimal for minimum batch count. + # + # ────────────────────────────────────────────────────────────────────────── + + min_spacing = self._channel_minimum_y_spacing + channels_by_index = sorted(indices, key=lambda i: use_channels[i]) + + batches: List[List[int]] = [] + batch_floors: List[float] = [] # lowest normalized_y in each batch (determines what fits) + + for idx in channels_by_index: + channel = use_channels[idx] + normalized_y = y_pos[idx] + channel * min_spacing + + # Find first batch that can accept this channel + assigned = False + for batch_idx, floor in enumerate(batch_floors): + if floor >= normalized_y: + batches[batch_idx].append(idx) + batch_floors[batch_idx] = normalized_y + assigned = True break - if not placed: - y_batches.append([idx]) + + if not assigned: + batches.append([idx]) + batch_floors.append(normalized_y) + + y_batches = batches for y_batch_idx, y_batch in enumerate(y_batches): batch_channels = [use_channels[i] for i in y_batch] From c2cd192488b427c4e4c320653b77a39bc5e3ccdb Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 5 Feb 2026 20:03:56 -0500 Subject: [PATCH 15/18] fix kwargs for Chatterbox --- .../backends/hamilton/STAR_chatterbox.py | 54 +++++++++++++++++-- 1 file changed, 50 insertions(+), 4 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py index 0e5c89f78cc..513dfb54884 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py @@ -7,6 +7,10 @@ Head96Information, STARBackend, ) + +# Type aliases for nested enums (for cleaner signatures) +LLDMode = STARBackend.LLDMode +PressureLLDMode = STARBackend.PressureLLDMode from pylabrobot.resources.container import Container from pylabrobot.resources.coordinate import Coordinate from pylabrobot.resources.well import Well @@ -317,7 +321,39 @@ async def probe_liquid_heights( containers: List[Container], use_channels: Optional[List[int]] = None, resource_offsets: Optional[List[Coordinate]] = None, - **kwargs, + lld_mode: LLDMode = LLDMode.GAMMA, + search_speed: float = 10.0, + n_replicates: int = 1, + move_to_z_safety_before: bool = True, + move_to_z_safety_after: bool = True, + allow_duplicate_channels: bool = False, + min_traverse_height_at_beginning_of_command: Optional[float] = None, + min_traverse_height_during_command: Optional[float] = None, + z_position_at_end_of_command: Optional[float] = None, + channel_acceleration: float = 800.0, + post_detection_trajectory: Literal[0, 1] = 1, + post_detection_dist: float = 0.0, + detection_edge: int = 10, + detection_drop: int = 2, + channel_speed_above_start_pos_search: float = 120.0, + z_drive_current_limit: int = 3, + tip_has_filter: bool = False, + dispense_drive_speed: float = 5.0, + dispense_drive_acceleration: float = 0.2, + dispense_drive_max_speed: float = 14.5, + dispense_drive_current_limit: int = 3, + plld_detection_edge: int = 30, + plld_detection_drop: int = 10, + clld_verification: bool = False, + clld_detection_edge: int = 10, + clld_detection_drop: int = 2, + max_delta_plld_clld: float = 5.0, + plld_mode: Optional[PressureLLDMode] = None, + plld_foam_detection_drop: int = 30, + plld_foam_detection_edge_tolerance: int = 30, + plld_foam_ad_values: int = 30, + plld_foam_search_speed: float = 10.0, + dispense_back_plld_volume: Optional[float] = None, ) -> List[float]: """Probe liquid heights by computing from tracked container volumes. @@ -326,9 +362,8 @@ async def probe_liquid_heights( Args: containers: List of Container objects to probe. - use_channels: Channel indices (unused in mock, but validated for tip presence). - resource_offsets: Optional offsets (unused in mock). - **kwargs: Additional LLD parameters (unused in mock). + use_channels: Channel indices (validated for tip presence). + All other parameters: Accepted for API compatibility but unused in mock. Returns: Liquid heights in mm from cavity bottom for each container, computed from tracked volumes. @@ -336,6 +371,17 @@ async def probe_liquid_heights( Raises: NotImplementedError: If a container doesn't support compute_height_from_volume. """ + # Unused parameters kept for signature compatibility: + _ = (lld_mode, search_speed, n_replicates, move_to_z_safety_before, move_to_z_safety_after, + allow_duplicate_channels, min_traverse_height_at_beginning_of_command, + min_traverse_height_during_command, z_position_at_end_of_command, channel_acceleration, + post_detection_trajectory, post_detection_dist, detection_edge, detection_drop, + channel_speed_above_start_pos_search, z_drive_current_limit, tip_has_filter, + dispense_drive_speed, dispense_drive_acceleration, dispense_drive_max_speed, + dispense_drive_current_limit, plld_detection_edge, plld_detection_drop, clld_verification, + clld_detection_edge, clld_detection_drop, max_delta_plld_clld, plld_mode, + plld_foam_detection_drop, plld_foam_detection_edge_tolerance, plld_foam_ad_values, + plld_foam_search_speed, dispense_back_plld_volume, resource_offsets) if use_channels is None: use_channels = list(range(len(containers))) From c8ea489568159216d8535298332f80c6faebd236 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 5 Feb 2026 20:14:15 -0500 Subject: [PATCH 16/18] `make format` --- .../backends/hamilton/STAR_chatterbox.py | 46 +++++++++++++++---- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py index 513dfb54884..64f3ce71225 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py @@ -372,16 +372,42 @@ async def probe_liquid_heights( NotImplementedError: If a container doesn't support compute_height_from_volume. """ # Unused parameters kept for signature compatibility: - _ = (lld_mode, search_speed, n_replicates, move_to_z_safety_before, move_to_z_safety_after, - allow_duplicate_channels, min_traverse_height_at_beginning_of_command, - min_traverse_height_during_command, z_position_at_end_of_command, channel_acceleration, - post_detection_trajectory, post_detection_dist, detection_edge, detection_drop, - channel_speed_above_start_pos_search, z_drive_current_limit, tip_has_filter, - dispense_drive_speed, dispense_drive_acceleration, dispense_drive_max_speed, - dispense_drive_current_limit, plld_detection_edge, plld_detection_drop, clld_verification, - clld_detection_edge, clld_detection_drop, max_delta_plld_clld, plld_mode, - plld_foam_detection_drop, plld_foam_detection_edge_tolerance, plld_foam_ad_values, - plld_foam_search_speed, dispense_back_plld_volume, resource_offsets) + _ = ( + lld_mode, + search_speed, + n_replicates, + move_to_z_safety_before, + move_to_z_safety_after, + allow_duplicate_channels, + min_traverse_height_at_beginning_of_command, + min_traverse_height_during_command, + z_position_at_end_of_command, + channel_acceleration, + post_detection_trajectory, + post_detection_dist, + detection_edge, + detection_drop, + channel_speed_above_start_pos_search, + z_drive_current_limit, + tip_has_filter, + dispense_drive_speed, + dispense_drive_acceleration, + dispense_drive_max_speed, + dispense_drive_current_limit, + plld_detection_edge, + plld_detection_drop, + clld_verification, + clld_detection_edge, + clld_detection_drop, + max_delta_plld_clld, + plld_mode, + plld_foam_detection_drop, + plld_foam_detection_edge_tolerance, + plld_foam_ad_values, + plld_foam_search_speed, + dispense_back_plld_volume, + resource_offsets, + ) if use_channels is None: use_channels = list(range(len(containers))) From 16c8de058929dbf08da7c01cc42b5008be70bbf3 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 5 Feb 2026 20:44:51 -0500 Subject: [PATCH 17/18] correct import placements --- .../liquid_handling/backends/hamilton/STAR_chatterbox.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py index 64f3ce71225..3467f797eff 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py @@ -7,13 +7,13 @@ Head96Information, STARBackend, ) +from pylabrobot.resources.container import Container +from pylabrobot.resources.coordinate import Coordinate +from pylabrobot.resources.well import Well # Type aliases for nested enums (for cleaner signatures) LLDMode = STARBackend.LLDMode PressureLLDMode = STARBackend.PressureLLDMode -from pylabrobot.resources.container import Container -from pylabrobot.resources.coordinate import Coordinate -from pylabrobot.resources.well import Well class STARChatterboxBackend(STARBackend): From 47d2c19111832d7bb6c8d77cfae95ad8e4cee998 Mon Sep 17 00:00:00 2001 From: Camillo Moschner Date: Thu, 5 Feb 2026 22:14:02 -0500 Subject: [PATCH 18/18] fix set_heads called before head dicts are populated --- pylabrobot/liquid_handling/liquid_handler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pylabrobot/liquid_handling/liquid_handler.py b/pylabrobot/liquid_handling/liquid_handler.py index e3b96012ca1..7ef8fce55e6 100644 --- a/pylabrobot/liquid_handling/liquid_handler.py +++ b/pylabrobot/liquid_handling/liquid_handler.py @@ -154,12 +154,13 @@ async def setup(self, **backend_kwargs): raise RuntimeError("The setup has already finished. See `LiquidHandler.stop`.") self.backend.set_deck(self.deck) - self.backend.set_heads(head=self.head, head96=self.head96) await super().setup(**backend_kwargs) self.head = {c: TipTracker(thing=f"Channel {c}") for c in range(self.backend.num_channels)} self.head96 = {c: TipTracker(thing=f"Channel {c}") for c in range(96)} + self.backend.set_heads(head=self.head, head96=self.head96) + self._resource_pickup = None def serialize_state(self) -> Dict[str, Any]: