diff --git a/pylabrobot/plate_reading/bmg_labtech/clario_star_backend.py b/pylabrobot/plate_reading/bmg_labtech/clario_star_backend.py index 7690c629d71..fa2605587d9 100644 --- a/pylabrobot/plate_reading/bmg_labtech/clario_star_backend.py +++ b/pylabrobot/plate_reading/bmg_labtech/clario_star_backend.py @@ -164,22 +164,63 @@ async def _mp_and_focus_height_value(self): ) return await self._wait_for_ready_and_return(mp_and_focus_height_value_response) - async def _run_luminescence(self, focal_height: float): + def _plate_bytes(self, plate: Plate) -> bytes: + """Encode the plate geometry into the binary format the CLARIOstar expects. + + Returns a 62-byte sequence: plate dimensions (12 bytes), column/row counts (2 bytes), + and a 384-bit well mask (48 bytes). + """ + + def float_to_bytes(f: float) -> bytes: + return round(f * 100).to_bytes(2, byteorder="big") + + plate_length = plate.get_absolute_size_x() + plate_width = plate.get_absolute_size_y() + + well_0 = plate.get_well(0) + assert well_0.location is not None, "Well 0 must be assigned to a plate" + plate_x1 = well_0.location.x + well_0.center().x + plate_y1 = plate_width - (well_0.location.y + well_0.center().y) + plate_xn = plate_length - plate_x1 + plate_yn = plate_width - plate_y1 + + plate_cols = plate.num_items_x + plate_rows = plate.num_items_y + + # 384-bit mask: first num_items bits set, rest zero + wells = ([1] * plate.num_items) + ([0] * (384 - plate.num_items)) + well_mask: int = sum(b << i for i, b in enumerate(wells[::-1])) + wells_bytes = well_mask.to_bytes(48, "big") + + return ( + float_to_bytes(plate_length) + + float_to_bytes(plate_width) + + float_to_bytes(plate_x1) + + float_to_bytes(plate_y1) + + float_to_bytes(plate_xn) + + float_to_bytes(plate_yn) + + plate_cols.to_bytes(1, byteorder="big") + + plate_rows.to_bytes(1, byteorder="big") + + wells_bytes + ) + + async def _run_luminescence(self, focal_height: float, plate: Plate): """Run a plate reader luminescence run.""" assert 0 <= focal_height <= 25, "focal height must be between 0 and 25 mm" focal_height_data = int(focal_height * 100).to_bytes(2, byteorder="big") + plate_bytes = self._plate_bytes(plate) - run_response = await self.send( - b"\x02\x00\x86\x0c\x04\x31\xec\x21\x66\x05\x96\x04\x60\x2c\x56" - b"\x1d\x06\x0c\x08\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x02\x01\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27" + payload = ( + b"\x04" + plate_bytes + b"\x02\x01\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27" b"\x0f\x27\x0f\x01" + focal_height_data + b"\x00\x00\x01\x00\x00\x0e\x10\x00\x01\x00\x01\x00" b"\x01\x00\x01\x00\x01\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x01" b"\x00\x00\x00\x01\x00\x64\x00\x20\x00\x00" ) + message_size = (len(payload) + 7).to_bytes(2, byteorder="big") + cmd = b"\x02" + message_size + b"\x0c" + payload + run_response = await self.send(cmd) # TODO: find a prettier way to do this. It's essentially copied from _wait_for_ready_and_return. last_status = None @@ -199,19 +240,19 @@ async def _run_luminescence(self, focal_height: float): ): return run_response - async def _run_absorbance(self, wavelength: float): + async def _run_absorbance(self, wavelength: float, plate: Plate): """Run a plate reader absorbance run.""" wavelength_data = int(wavelength * 10).to_bytes(2, byteorder="big") + plate_bytes = self._plate_bytes(plate) - absorbance_command = ( - b"\x02\x00\x7c\x0c\x04\x31\xec\x21\x66\x05\x96\x04\x60\x2c\x56\x1d\x06" - b"\x0c\x08\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" - b"\x00\x00\x00\x00\x00\x00\x82\x02\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27\x0f\x27" + payload = ( + b"\x04" + plate_bytes + b"\x82\x02\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27\x0f\x27" b"\x0f\x19\x01" + wavelength_data + b"\x00\x00\x00\x64\x00\x00\x00\x00\x00\x00\x00\x64\x00" b"\x00\x00\x00\x00\x02\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x16\x00\x01\x00\x00" ) - run_response = await self.send(absorbance_command) + message_size = (len(payload) + 7).to_bytes(2, byteorder="big") + cmd = b"\x02" + message_size + b"\x0c" + payload + run_response = await self.send(cmd) # TODO: find a prettier way to do this. It's essentially copied from _wait_for_ready_and_return. last_status = None @@ -250,7 +291,7 @@ async def read_luminescence( await self._mp_and_focus_height_value() - await self._run_luminescence(focal_height=focal_height) + await self._run_luminescence(focal_height=focal_height, plate=plate) await self._read_order_values() @@ -258,11 +299,12 @@ async def read_luminescence( vals = await self._get_measurement_values() - # All 96 values are 32 bit integers. The header is variable length, so we need to find the + # All values are 32 bit integers. The header is variable length, so we need to find the # start of the data. In the future, when we understand the protocol better, this can be # replaced with a more robust solution. + num_wells = plate.num_items start_idx = vals.index(b"\x00\x00\x00\x00\x00\x00") + len(b"\x00\x00\x00\x00\x00\x00") - data = list(vals)[start_idx : start_idx + 96 * 4] + data = list(vals)[start_idx : start_idx + num_wells * 4] # group bytes by 4 int_bytes = [data[i : i + 4] for i in range(0, len(data), 4)] @@ -307,17 +349,18 @@ async def read_absorbance( await self._mp_and_focus_height_value() - await self._run_absorbance(wavelength=wavelength) + await self._run_absorbance(wavelength=wavelength, plate=plate) await self._read_order_values() await self._status_hw() vals = await self._get_measurement_values() + num_wells = plate.num_items div = b"\x00" * 6 start_idx = vals.index(div) + len(div) - chromatic_data = vals[start_idx : start_idx + 96 * 4] - ref_data = vals[start_idx + 96 * 4 : start_idx + (96 * 2) * 4] + chromatic_data = vals[start_idx : start_idx + num_wells * 4] + ref_data = vals[start_idx + num_wells * 4 : start_idx + (num_wells * 2) * 4] chromatic_bytes = [bytes(chromatic_data[i : i + 4]) for i in range(0, len(chromatic_data), 4)] ref_bytes = [bytes(ref_data[i : i + 4]) for i in range(0, len(ref_data), 4)] chromatic_reading = [struct.unpack(">i", x)[0] for x in chromatic_bytes] @@ -327,7 +370,7 @@ async def read_absorbance( # c0 is the value of the chromatic at 0% intensity (black reading) # r100 is the value of the reference at 100% intensity # r0 is the value of the reference at 0% intensity (black reading) - after_values_idx = start_idx + (96 * 2) * 4 + after_values_idx = start_idx + (num_wells * 2) * 4 c100, c0, r100, r0 = struct.unpack(">iiii", vals[after_values_idx : after_values_idx + 4 * 4]) # a bit much, but numpy should not be a dependency