diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index a2747c9f39c..3668c458ae1 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -59,6 +59,7 @@ SingleChannelDispense, ) from pylabrobot.liquid_handling.utils import ( + MIN_SPACING_EDGE, get_tight_single_resource_liquid_op_offsets, get_wide_single_resource_liquid_op_offsets, ) @@ -1788,7 +1789,40 @@ async def probe_liquid_heights( lld_mode: LLDMode = LLDMode.GAMMA, search_speed: float = 10.0, n_replicates: int = 1, + move_to_z_safety_before: bool = True, move_to_z_safety_after: bool = True, + allow_duplicate_channels: bool = False, + # Traverse height parameters (None = full Z safety, float = absolute Z position in mm) + min_traverse_height_at_beginning_of_command: Optional[float] = None, + min_traverse_height_during_command: Optional[float] = None, + z_position_at_end_of_command: Optional[float] = None, + # Shared detection parameters + channel_acceleration: float = 800.0, + post_detection_trajectory: Literal[0, 1] = 1, + post_detection_dist: float = 0.0, + # cLLD-specific parameters (used when lld_mode=GAMMA) + detection_edge: int = 10, + detection_drop: int = 2, + # pLLD-specific parameters (used when lld_mode=PRESSURE) + channel_speed_above_start_pos_search: float = 120.0, + z_drive_current_limit: int = 3, + tip_has_filter: bool = False, + dispense_drive_speed: float = 5.0, + dispense_drive_acceleration: float = 0.2, + dispense_drive_max_speed: float = 14.5, + dispense_drive_current_limit: int = 3, + plld_detection_edge: int = 30, + plld_detection_drop: int = 10, + clld_verification: bool = False, + clld_detection_edge: int = 10, + clld_detection_drop: int = 2, + max_delta_plld_clld: float = 5.0, + plld_mode: Optional[PressureLLDMode] = None, # defaults to PressureLLDMode.LIQUID + plld_foam_detection_drop: int = 30, + plld_foam_detection_edge_tolerance: int = 30, + plld_foam_ad_values: int = 30, + plld_foam_search_speed: float = 10.0, + dispense_back_plld_volume: Optional[float] = None, ) -> List[float]: """Probe liquid surface heights in containers using liquid level detection. @@ -1799,43 +1833,96 @@ async def probe_liquid_heights( Args: containers: List of Container objects to probe, one per channel. use_channels: Channel indices to use for probing (0-indexed). - resource_offsets: Optional XYZ offsets from container centers. Auto-calculated for single containers with odd channel counts to avoid center dividers. Defaults to container centers. - lld_mode: Detection mode - LLDMode(1) for capacitive, LLDMode(2) for pressure-based. Defaults to capacitive. + resource_offsets: Optional XYZ offsets from container centers. Auto-calculated for single + containers with odd channel counts to avoid center dividers. Defaults to container centers. + lld_mode: Detection mode - LLDMode(1) for capacitive, LLDMode(2) for pressure-based. + Defaults to capacitive. search_speed: Z-axis search speed in mm/s. Default 10.0 mm/s. n_replicates: Number of measurements per channel. Default 1. - move_to_z_safety_after: Whether to move channels to safe Z height after probing. Default True. + move_to_z_safety_after: Whether to move channels to safe Z height after probing. + Default True. + allow_duplicate_channels: Whether to allow the same channel index to appear multiple times + in use_channels. Default False. + min_traverse_height_at_beginning_of_command: Absolute Z height (mm) to move involved + channels to before the first batch. None (default) uses full Z safety. + min_traverse_height_during_command: Absolute Z height (mm) to move involved channels to + between batches (X groups and Y sub-batches). None (default) uses full Z safety. + z_position_at_end_of_command: Absolute Z height (mm) to move involved channels to after + probing (only used when move_to_z_safety_after is True). None (default) uses full Z safety. + channel_acceleration: Search acceleration in mm/s^2. Default 800.0. + post_detection_trajectory: Post-detection move mode (0 or 1). Default 1. + post_detection_dist: Distance in mm to move up after detection. Default 0.0. + detection_edge: cLLD edge steepness threshold (0-1023). Default 10. + detection_drop: cLLD offset after edge detection (0-1023). Default 2. + channel_speed_above_start_pos_search: pLLD speed above search start in mm/s. Default 120.0. + z_drive_current_limit: pLLD Z-drive current limit. Default 3. + tip_has_filter: Whether tip has a filter. Default False. + dispense_drive_speed: pLLD dispense drive speed in mm/s. Default 5.0. + dispense_drive_acceleration: pLLD dispense drive acceleration in mm/s^2. Default 0.2. + dispense_drive_max_speed: pLLD dispense drive max speed in mm/s. Default 14.5. + dispense_drive_current_limit: pLLD dispense drive current limit. Default 3. + plld_detection_edge: pLLD edge detection threshold. Default 30. + plld_detection_drop: pLLD detection drop. Default 10. + clld_verification: Enable cLLD verification in pLLD mode. Default False. + clld_detection_edge: cLLD verification edge threshold. Default 10. + clld_detection_drop: cLLD verification drop. Default 2. + max_delta_plld_clld: Max allowed delta between pLLD and cLLD in mm. Default 5.0. + plld_mode: Pressure LLD mode. Defaults to PressureLLDMode.LIQUID for pLLD. + plld_foam_detection_drop: Foam detection drop. Default 30. + plld_foam_detection_edge_tolerance: Foam detection edge tolerance. Default 30. + plld_foam_ad_values: Foam AD values. Default 30. + plld_foam_search_speed: Foam search speed in mm/s. Default 10.0. + dispense_back_plld_volume: Volume to dispense back after pLLD in uL. Default None. Returns: Mean of measured liquid heights for each container (mm from cavity bottom). Raises: RuntimeError: If channels lack tips. - NotImplementedError: If channels require different X positions. Notes: - All specified channels must have tips attached - - All channels must be at the same X position (single-row operation) + - Containers at different X positions are probed in sequential groups (single X carriage) - For single containers with odd channel counts, Y-offsets are applied to avoid center dividers (Hamilton 1000 uL spacing: 9mm, offset: 5.5mm) """ if use_channels is None: use_channels = list(range(len(containers))) + if len(use_channels) == 0: + raise ValueError("use_channels must not be empty.") + if not all(0 <= ch < self.num_channels for ch in use_channels): + raise ValueError( + f"All use_channels must be integers in range [0, {self.num_channels - 1}], " + f"got {use_channels}." + ) # Handle tip positioning ... if SINGLE container instance if resource_offsets is None: if len(set(containers)) == 1: - resource_offsets = get_wide_single_resource_liquid_op_offsets( - resource=containers[0], num_channels=len(containers) + container_size_y = containers[0].get_absolute_size_y() + # For non-consecutive channels (e.g. [0,1,2,5,6,7]), we must account for + # phantom intermediate channels (3,4) that physically exist between them. + # Compute offsets for the full channel range (min to max), then pick only + # the offsets corresponding to the actual channels being used. + num_channels_in_span = max(use_channels) - min(use_channels) + 1 + min_required = ( + MIN_SPACING_EDGE * 2 + (num_channels_in_span - 1) * self._channel_minimum_y_spacing ) + if container_size_y >= min_required: + all_offsets = get_wide_single_resource_liquid_op_offsets( + resource=containers[0], + num_channels=num_channels_in_span, + min_spacing=self._channel_minimum_y_spacing, + ) + min_ch = min(use_channels) + resource_offsets = [all_offsets[ch - min_ch] for ch in use_channels] - if len(use_channels) % 2 != 0: - # Hamilton 1000 uL channels are 9 mm apart, so offset by half the distance - # + extra for the potential central 'splash guard' - y_offset = 5.5 - resource_offsets = [ - resource_offsets[i] + Coordinate(0, y_offset, 0) for i in range(len(use_channels)) - ] + if num_channels_in_span % 2 != 0: + y_offset = 5.5 + resource_offsets = [offset + Coordinate(0, y_offset, 0) for offset in resource_offsets] + # else: container too small to fit all channels — fall back to center offsets. + # Y sub-batching will serialize channels that can't coexist. resource_offsets = resource_offsets or [Coordinate.zero()] * len(containers) @@ -1843,9 +1930,15 @@ async def probe_liquid_heights( if lld_mode not in {self.LLDMode.GAMMA, self.LLDMode.PRESSURE}: raise ValueError(f"LLDMode must be 1 (capacitive) or 2 (pressure-based), is {lld_mode}") + if not allow_duplicate_channels and len(use_channels) != len(set(use_channels)): + raise ValueError( + "use_channels must not contain duplicates. " + "Set `allow_duplicate_channels=True` to override." + ) + if not len(containers) == len(use_channels) == len(resource_offsets): raise ValueError( - "Length of containers, use_channels, resource_offsets and tip_lengths must match." + "Length of containers, use_channels, and resource_offsets must match." f"are {len(containers)}, {len(use_channels)}, {len(resource_offsets)}." ) @@ -1856,111 +1949,232 @@ async def probe_liquid_heights( tip_lengths = [await self.request_tip_len_on_channel(channel_idx=idx) for idx in use_channels] - # Move channels to safe Z height before starting - await self.move_all_channels_in_z_safety() + # Default: move all channels to Z safety first (including uninvolved channels), + # be conservative on safety but allow repeated calls with minimal "channel jumping" + if move_to_z_safety_before: + await self.move_all_channels_in_z_safety() + # Optional: lower only the involved channels to the requested traverse height + if min_traverse_height_at_beginning_of_command is not None: + await self.position_channels_in_z_direction( + {ch: min_traverse_height_at_beginning_of_command for ch in use_channels} + ) - # Check if all channels are on the same x position, then move there + # Compute X and Y positions for all containers x_pos = [ resource.get_location_wrt(self.deck, x="c", y="c", z="b").x + offset.x for resource, offset in zip(containers, resource_offsets) ] - if len(set(x_pos)) > 1: # TODO: implement - raise NotImplementedError( - "probe_liquid_heights is not yet supported for multiple x positions." - ) - await self.move_channel_x(0, x_pos[0]) - - # Move channels to their y positions y_pos = [ resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + offset.y for resource, offset in zip(containers, resource_offsets) ] - await self.position_channels_in_y_direction( - {channel: y for channel, y in zip(use_channels, y_pos)} - ) - # Detect liquid heights + # Group indices by unique X position (preserving order of first appearance). + # Round to 0.1mm to avoid floating point splitting of same-position containers. + x_groups: Dict[float, List[int]] = {} + for i, x in enumerate(x_pos): + x_rounded = round(x, 1) + x_groups.setdefault(x_rounded, []).append(i) + + # Precompute detection function and kwargs (mode doesn't change between groups) + detect_func: Callable[..., Any] + if lld_mode == self.LLDMode.GAMMA: + detect_func = self._move_z_drive_to_liquid_surface_using_clld + extra_kwargs: dict = { + "detection_edge": detection_edge, + "detection_drop": detection_drop, + } + else: + detect_func = self._search_for_surface_using_plld + extra_kwargs = { + "channel_speed_above_start_pos_search": channel_speed_above_start_pos_search, + "z_drive_current_limit": z_drive_current_limit, + "tip_has_filter": tip_has_filter, + "dispense_drive_speed": dispense_drive_speed, + "dispense_drive_acceleration": dispense_drive_acceleration, + "dispense_drive_max_speed": dispense_drive_max_speed, + "dispense_drive_current_limit": dispense_drive_current_limit, + "plld_detection_edge": plld_detection_edge, + "plld_detection_drop": plld_detection_drop, + "clld_verification": clld_verification, + "clld_detection_edge": clld_detection_edge, + "clld_detection_drop": clld_detection_drop, + "max_delta_plld_clld": max_delta_plld_clld, + "plld_mode": plld_mode if plld_mode is not None else self.PressureLLDMode.LIQUID, + "plld_foam_detection_drop": plld_foam_detection_drop, + "plld_foam_detection_edge_tolerance": plld_foam_detection_edge_tolerance, + "plld_foam_ad_values": plld_foam_ad_values, + "plld_foam_search_speed": plld_foam_search_speed, + "dispense_back_plld_volume": dispense_back_plld_volume, + } + + # Detect liquid heights, iterating over X groups sequentially (single X carriage) absolute_heights_measurements: Dict[int, List[Optional[float]]] = { ch: [] for ch in use_channels } - lowest_immers_positions = [ - container.get_absolute_location("c", "c", "cavity_bottom").z - + tip_len - - self.DEFAULT_TIP_FITTING_DEPTH - for container, tip_len in zip(containers, tip_lengths) - ] - start_pos_searches = [ - container.get_absolute_location("c", "c", "t").z - + tip_len - - self.DEFAULT_TIP_FITTING_DEPTH - + 5 - for container, tip_len in zip(containers, tip_lengths) - ] - try: - for _ in range(n_replicates): - if lld_mode == self.LLDMode.GAMMA: - results = await asyncio.gather( - *[ - self._move_z_drive_to_liquid_surface_using_clld( - channel_idx=channel, - lowest_immers_pos=lip, - start_pos_search=sps, - channel_speed=search_speed, - ) - for channel, lip, sps in zip( - use_channels, lowest_immers_positions, start_pos_searches + is_first_x_group = True + prev_indices: Optional[List[int]] = None + for _, indices in x_groups.items(): + # Use the actual (non-rounded) X position of the first container in this group + group_x = x_pos[indices[0]] + + # Raise channels before moving X carriage (tips may be lowered from previous group) + if not is_first_x_group: + assert prev_indices is not None + if min_traverse_height_during_command is None: + await self.move_all_channels_in_z_safety() + else: + prev_channels = [use_channels[i] for i in prev_indices] + await self.position_channels_in_z_direction( + {ch: min_traverse_height_during_command for ch in prev_channels} + ) + await self.move_channel_x(0, group_x) + + # ────────────────────────────────────────────────────────────────────────── + # INTERVAL PARTITIONING: Pack channels into minimum parallel batches + # ────────────────────────────────────────────────────────────────────────── + # + # PHYSICAL CONSTRAINT: + # Channels are mounted on a single Y-carriage with fixed minimum spacing. + # Channel 0 sits at the back (high Y), channel 7 at the front (low Y). + # Two channels can only probe together if their target Y positions respect + # this physical ordering with sufficient gaps for any channels between them. + # + # Example: channels 2 and 5 probing together need at least 3× min_spacing + # between their Y positions (room for phantom channels 3, 4). + # + # MATHEMATICAL SIMPLIFICATION: + # Raw constraint: y[i] - y[j] >= (j - i) × min_spacing (for i < j) + # + # Define: normalized_y = y + channel_index × min_spacing + # This "shifts" each channel's Y by its index, collapsing the constraint to: + # normalized_y[i] >= normalized_y[j] (for i < j in same batch) + # + # Now we simply need each batch to have decreasing normalized_y values. + # + # ALGORITHM (First-Fit Decreasing, optimal for interval partitioning): + # Process channels by index. For each, find the first batch it fits into + # (where batch's last normalized_y >= channel's normalized_y), or create new. + # This greedy approach is provably optimal for minimum batch count. + # + # ────────────────────────────────────────────────────────────────────────── + + min_spacing = self._channel_minimum_y_spacing + channels_by_index = sorted(indices, key=lambda i: use_channels[i]) + + batches: List[List[int]] = [] + batch_floors: List[float] = [] # lowest normalized_y in each batch (determines what fits) + + for idx in channels_by_index: + channel = use_channels[idx] + normalized_y = y_pos[idx] + channel * min_spacing + + # Find first batch that can accept this channel + assigned = False + for batch_idx, floor in enumerate(batch_floors): + if floor >= normalized_y: + batches[batch_idx].append(idx) + batch_floors[batch_idx] = normalized_y + assigned = True + break + + if not assigned: + batches.append([idx]) + batch_floors.append(normalized_y) + + y_batches = batches + + for y_batch_idx, y_batch in enumerate(y_batches): + batch_channels = [use_channels[i] for i in y_batch] + batch_containers = [containers[i] for i in y_batch] + batch_tip_lengths = [tip_lengths[i] for i in y_batch] + + # Raise channels before Y repositioning (skip first batch in each X group — + # already safe from the X-group-level raise or initial raise) + if y_batch_idx > 0: + if min_traverse_height_during_command is None: + await self.move_all_channels_in_z_safety() + else: + prev_batch_channels = [use_channels[i] for i in y_batches[y_batch_idx - 1]] + await self.position_channels_in_z_direction( + {ch: min_traverse_height_during_command for ch in prev_batch_channels} ) - ], - return_exceptions=True, - ) - else: - results = await asyncio.gather( - *[ - self._search_for_surface_using_plld( - channel_idx=channel, - lowest_immers_pos=lip, - start_pos_search=sps, - channel_speed=search_speed, - dispense_drive_speed=5.0, - plld_mode=self.PressureLLDMode.LIQUID, - clld_verification=False, - post_detection_dist=0.0, - ) - for channel, lip, sps in zip( - use_channels, lowest_immers_positions, start_pos_searches - ) - ], - return_exceptions=True, - ) + # Position the batch's channels in Y, including any intermediate channels + # (channels between batch members that aren't part of this batch) to ensure + # they don't violate the descending-order / minimum-spacing constraint. + y_positions: Dict[int, float] = {use_channels[i]: y_pos[i] for i in y_batch} + sorted_batch_chs = sorted(batch_channels) + for k in range(len(sorted_batch_chs) - 1): + ch_lo, ch_hi = sorted_batch_chs[k], sorted_batch_chs[k + 1] + for intermediate_ch in range(ch_lo + 1, ch_hi): + if intermediate_ch not in y_positions: + y_positions[intermediate_ch] = ( + y_positions[ch_lo] - (intermediate_ch - ch_lo) * self._channel_minimum_y_spacing + ) + await self.position_channels_in_y_direction(y_positions) + + # Compute Z search bounds for this batch + batch_lowest_immers = [ + container.get_absolute_location("c", "c", "cavity_bottom").z + + tip_len + - self.DEFAULT_TIP_FITTING_DEPTH + for container, tip_len in zip(batch_containers, batch_tip_lengths) + ] + batch_start_pos = [ + container.get_absolute_location("c", "c", "t").z + + tip_len + - self.DEFAULT_TIP_FITTING_DEPTH + + 5 + for container, tip_len in zip(batch_containers, batch_tip_lengths) + ] - # Get heights for ALL channels, handling failures for channels with no liquid - # (indexed 0 to self.num_channels-1) but only store for used channels - current_absolute_liquid_heights = await self.request_pip_height_last_lld() - for idx, (ch_idx, result) in enumerate(zip(use_channels, results)): - if isinstance(result, STARFirmwareError): - # Check if it's specifically the "no liquid found" error - error_msg = str(result).lower() - if "no liquid level found" in error_msg or "no liquid was present" in error_msg: - height = None # No liquid detected - this is expected - msg = ( - f"Channel {ch_idx}: No liquid detected. Could be because there is " - f"no liquid in container {containers[idx].name} or liquid level is too low." - ) - if lld_mode == self.LLDMode.GAMMA: - msg += " Consider using pressure-based LLD if liquid is believed to exist." - logger.warning(msg) - else: - # Some other firmware error - re-raise it - raise result - elif isinstance(result, Exception): - # Some other unexpected error - re-raise it - raise result - else: - height = current_absolute_liquid_heights[ch_idx] - absolute_heights_measurements[ch_idx].append(height) + # Run n_replicates detection loop for this batch + for _ in range(n_replicates): + results = await asyncio.gather( + *[ + detect_func( + channel_idx=channel, + lowest_immers_pos=lip, + start_pos_search=sps, + channel_speed=search_speed, + channel_acceleration=channel_acceleration, + post_detection_trajectory=post_detection_trajectory, + post_detection_dist=post_detection_dist, + **extra_kwargs, + ) + for channel, lip, sps in zip(batch_channels, batch_lowest_immers, batch_start_pos) + ], + return_exceptions=True, + ) + + # Get heights for ALL channels, handling failures for channels with no liquid + current_absolute_liquid_heights = await self.request_pip_height_last_lld() + for local_idx, (ch_idx, result) in enumerate(zip(batch_channels, results)): + orig_idx = y_batch[local_idx] + if isinstance(result, STARFirmwareError): + error_msg = str(result).lower() + if "no liquid level found" in error_msg or "no liquid was present" in error_msg: + height = None + msg = ( + f"Channel {ch_idx}: No liquid detected. Could be because there is " + f"no liquid in container {containers[orig_idx].name} or liquid level " + f"is too low." + ) + if lld_mode == self.LLDMode.GAMMA: + msg += " Consider using pressure-based LLD if liquid is believed to exist." + logger.warning(msg) + else: + raise result + elif isinstance(result, Exception): + raise result + else: + height = current_absolute_liquid_heights[ch_idx] + absolute_heights_measurements[ch_idx].append(height) + prev_indices = y_batches[-1] # last Y batch's indices, for Z raise on next X group + is_first_x_group = False except: await self.move_all_channels_in_z_safety() raise @@ -1991,7 +2205,12 @@ async def probe_liquid_heights( ) if move_to_z_safety_after: - await self.move_all_channels_in_z_safety() + if z_position_at_end_of_command is None: + await self.move_all_channels_in_z_safety() + else: + await self.position_channels_in_z_direction( + {ch: z_position_at_end_of_command for ch in use_channels} + ) return relative_to_well @@ -2024,8 +2243,7 @@ async def probe_liquid_volumes( Volumes in each container (uL). Raises: - ValueError: If any container doesn't support height-to-volume conversion (raised by probe_liquid_heights). - NotImplementedError: If channels require different X positions. + ValueError: If any container doesn't support height-to-volume conversion. Notes: - Delegates all motion, LLD, validation, and safety logic to probe_liquid_heights @@ -2482,6 +2700,7 @@ async def aspirate( use_channels=use_channels, resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, + allow_duplicate_channels=True, ) # override minimum traversal height because we don't want to move channels up. we are already above the liquid. @@ -2844,6 +3063,7 @@ async def dispense( use_channels=use_channels, resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, + allow_duplicate_channels=True, ) # override minimum traversal height because we don't want to move channels up. we are already above the liquid. @@ -11109,7 +11329,9 @@ async def pierce_foil( if spread == "wide": offsets = get_wide_single_resource_liquid_op_offsets( - well, num_channels=len(piercing_channels) + resource=well, + num_channels=len(piercing_channels), + min_spacing=self._channel_minimum_y_spacing, ) else: offsets = get_tight_single_resource_liquid_op_offsets( diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py index 7cea5c467cc..6df0b852bae 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_chatterbox.py @@ -1,11 +1,20 @@ import datetime from contextlib import asynccontextmanager -from typing import List, Literal, Optional, Union +from typing import Dict, List, Literal, Optional, Union from pylabrobot.liquid_handling.backends import LiquidHandlerBackend -from pylabrobot.liquid_handling.backends.hamilton.STAR_backend import Head96Information, STARBackend +from pylabrobot.liquid_handling.backends.hamilton.STAR_backend import ( + Head96Information, + STARBackend, +) +from pylabrobot.resources.container import Container +from pylabrobot.resources.coordinate import Coordinate from pylabrobot.resources.well import Well +# Type aliases for nested enums (for cleaner signatures) +LLDMode = STARBackend.LLDMode +PressureLLDMode = STARBackend.PressureLLDMode + class STARChatterboxBackend(STARBackend): """Chatterbox backend for 'STAR'""" @@ -216,6 +225,15 @@ async def channel_dispensing_drive_request_position( async def move_channel_y(self, channel: int, y: float): print(f"moving channel {channel} to y: {y}") + async def move_channel_x(self, channel: int, x: float): + print(f"moving channel {channel} to x: {x}") + + async def move_all_channels_in_z_safety(self): + print("moving all channels to z safety") + + async def position_channels_in_z_direction(self, zs: Dict[int, float]): + print(f"positioning channels in z: {zs}") + # # # # # # # # 1_000 uL Channel: Complex Commands # # # # # # # # async def step_off_foil( @@ -284,3 +302,131 @@ async def slow_iswap(self, wrist_velocity: int = 20_000, gripper_velocity: int = finally: messages.append("end slow iswap") print(" | ".join(messages)) + + # # # # # # # # Liquid Level Detection (LLD) # # # # # # # # + + async def request_tip_len_on_channel(self, channel_idx: int) -> float: + """Return tip length from the tip tracker. + + Args: + channel_idx: Index of the pipetting channel (0-indexed). + + Returns: + The tip length in mm from the tip tracker. + + Raises: + NoTipError: If no tip is present on the channel (via tip tracker). + """ + tip = self.head[channel_idx].get_tip() + return tip.total_tip_length + + async def probe_liquid_heights( + self, + containers: List[Container], + use_channels: Optional[List[int]] = None, + resource_offsets: Optional[List[Coordinate]] = None, + lld_mode: LLDMode = LLDMode.GAMMA, + search_speed: float = 10.0, + n_replicates: int = 1, + move_to_z_safety_before: bool = True, + move_to_z_safety_after: bool = True, + allow_duplicate_channels: bool = False, + min_traverse_height_at_beginning_of_command: Optional[float] = None, + min_traverse_height_during_command: Optional[float] = None, + z_position_at_end_of_command: Optional[float] = None, + channel_acceleration: float = 800.0, + post_detection_trajectory: Literal[0, 1] = 1, + post_detection_dist: float = 0.0, + detection_edge: int = 10, + detection_drop: int = 2, + channel_speed_above_start_pos_search: float = 120.0, + z_drive_current_limit: int = 3, + tip_has_filter: bool = False, + dispense_drive_speed: float = 5.0, + dispense_drive_acceleration: float = 0.2, + dispense_drive_max_speed: float = 14.5, + dispense_drive_current_limit: int = 3, + plld_detection_edge: int = 30, + plld_detection_drop: int = 10, + clld_verification: bool = False, + clld_detection_edge: int = 10, + clld_detection_drop: int = 2, + max_delta_plld_clld: float = 5.0, + plld_mode: Optional[PressureLLDMode] = None, + plld_foam_detection_drop: int = 30, + plld_foam_detection_edge_tolerance: int = 30, + plld_foam_ad_values: int = 30, + plld_foam_search_speed: float = 10.0, + dispense_back_plld_volume: Optional[float] = None, + ) -> List[float]: + """Probe liquid heights by computing from tracked container volumes. + + Instead of simulating hardware LLD, this mock computes liquid heights directly from + each container's volume tracker using `container.compute_height_from_volume()`. + + Args: + containers: List of Container objects to probe. + use_channels: Channel indices (validated for tip presence). + All other parameters: Accepted for API compatibility but unused in mock. + + Returns: + Liquid heights in mm from cavity bottom for each container, computed from tracked volumes. + + Raises: + NotImplementedError: If a container doesn't support compute_height_from_volume. + """ + # Unused parameters kept for signature compatibility: + _ = ( + lld_mode, + search_speed, + n_replicates, + move_to_z_safety_before, + move_to_z_safety_after, + allow_duplicate_channels, + min_traverse_height_at_beginning_of_command, + min_traverse_height_during_command, + z_position_at_end_of_command, + channel_acceleration, + post_detection_trajectory, + post_detection_dist, + detection_edge, + detection_drop, + channel_speed_above_start_pos_search, + z_drive_current_limit, + tip_has_filter, + dispense_drive_speed, + dispense_drive_acceleration, + dispense_drive_max_speed, + dispense_drive_current_limit, + plld_detection_edge, + plld_detection_drop, + clld_verification, + clld_detection_edge, + clld_detection_drop, + max_delta_plld_clld, + plld_mode, + plld_foam_detection_drop, + plld_foam_detection_edge_tolerance, + plld_foam_ad_values, + plld_foam_search_speed, + dispense_back_plld_volume, + resource_offsets, + ) + if use_channels is None: + use_channels = list(range(len(containers))) + + # Validate tip presence using tip tracker + for ch in use_channels: + self.head[ch].get_tip() # Raises NoTipError if no tip + + heights: List[float] = [] + for container in containers: + volume = container.tracker.get_used_volume() + if volume == 0: + heights.append(0.0) + else: + height = container.compute_height_from_volume(volume) + heights.append(height) + + print(f"probe_liquid_heights: {[f'{h:.2f}' for h in heights]} mm") + return heights diff --git a/pylabrobot/liquid_handling/liquid_handler.py b/pylabrobot/liquid_handling/liquid_handler.py index 3b04ff68b77..99fb85371b1 100644 --- a/pylabrobot/liquid_handling/liquid_handler.py +++ b/pylabrobot/liquid_handling/liquid_handler.py @@ -163,7 +163,6 @@ async def setup(self, **backend_kwargs): raise RuntimeError("The setup has already finished. See `LiquidHandler.stop`.") self.backend.set_deck(self.deck) - self.backend.set_heads(head=self.head, head96=self.head96) await super().setup(**backend_kwargs) self.head = {c: TipTracker(thing=f"Channel {c}") for c in range(self.backend.num_channels)} diff --git a/pylabrobot/liquid_handling/utils.py b/pylabrobot/liquid_handling/utils.py index 0437be31037..43997c75b9d 100644 --- a/pylabrobot/liquid_handling/utils.py +++ b/pylabrobot/liquid_handling/utils.py @@ -3,9 +3,9 @@ from pylabrobot.resources.coordinate import Coordinate from pylabrobot.resources.resource import Resource -MIN_SPACING_BETWEEN_CHANNELS = 9 +MIN_SPACING_BETWEEN_CHANNELS = 9.0 # minimum spacing between the edge of the container and the center of channel -MIN_SPACING_EDGE = 1 +MIN_SPACING_EDGE = 1.0 def _get_centers_with_margin(dim_size: float, n: int, margin: float, min_spacing: float): @@ -19,8 +19,7 @@ def _get_centers_with_margin(dim_size: float, n: int, margin: float, min_spacing def get_wide_single_resource_liquid_op_offsets( - resource: Resource, - num_channels: int, + resource: Resource, num_channels: int, min_spacing: float = MIN_SPACING_BETWEEN_CHANNELS ) -> List[Coordinate]: resource_size = resource.get_absolute_size_y() centers = list( @@ -29,7 +28,7 @@ def get_wide_single_resource_liquid_op_offsets( dim_size=resource_size, n=num_channels, margin=MIN_SPACING_EDGE, - min_spacing=MIN_SPACING_BETWEEN_CHANNELS, + min_spacing=min_spacing, ) ) ) # reverse because channels are from back to front