Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 63 additions & 20 deletions pylabrobot/plate_reading/bmg_labtech/clario_star_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,22 +164,63 @@ async def _mp_and_focus_height_value(self):
)
return await self._wait_for_ready_and_return(mp_and_focus_height_value_response)

async def _run_luminescence(self, focal_height: float):
def _plate_bytes(self, plate: Plate) -> bytes:
"""Encode the plate geometry into the binary format the CLARIOstar expects.

Returns a 62-byte sequence: plate dimensions (12 bytes), column/row counts (2 bytes),
and a 384-bit well mask (48 bytes).
"""

def float_to_bytes(f: float) -> bytes:
return round(f * 100).to_bytes(2, byteorder="big")

plate_length = plate.get_absolute_size_x()
plate_width = plate.get_absolute_size_y()

well_0 = plate.get_well(0)
assert well_0.location is not None, "Well 0 must be assigned to a plate"
plate_x1 = well_0.location.x + well_0.center().x
plate_y1 = plate_width - (well_0.location.y + well_0.center().y)
plate_xn = plate_length - plate_x1
plate_yn = plate_width - plate_y1

plate_cols = plate.num_items_x
plate_rows = plate.num_items_y

# 384-bit mask: first num_items bits set, rest zero
wells = ([1] * plate.num_items) + ([0] * (384 - plate.num_items))
well_mask: int = sum(b << i for i, b in enumerate(wells[::-1]))
wells_bytes = well_mask.to_bytes(48, "big")

return (
float_to_bytes(plate_length)
+ float_to_bytes(plate_width)
+ float_to_bytes(plate_x1)
+ float_to_bytes(plate_y1)
+ float_to_bytes(plate_xn)
+ float_to_bytes(plate_yn)
+ plate_cols.to_bytes(1, byteorder="big")
+ plate_rows.to_bytes(1, byteorder="big")
+ wells_bytes
)

async def _run_luminescence(self, focal_height: float, plate: Plate):
"""Run a plate reader luminescence run."""

assert 0 <= focal_height <= 25, "focal height must be between 0 and 25 mm"

focal_height_data = int(focal_height * 100).to_bytes(2, byteorder="big")
plate_bytes = self._plate_bytes(plate)

run_response = await self.send(
b"\x02\x00\x86\x0c\x04\x31\xec\x21\x66\x05\x96\x04\x60\x2c\x56"
b"\x1d\x06\x0c\x08\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x02\x01\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27"
payload = (
b"\x04" + plate_bytes + b"\x02\x01\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27"
b"\x0f\x27\x0f\x01" + focal_height_data + b"\x00\x00\x01\x00\x00\x0e\x10\x00\x01\x00\x01\x00"
b"\x01\x00\x01\x00\x01\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x01"
b"\x00\x00\x00\x01\x00\x64\x00\x20\x00\x00"
)
message_size = (len(payload) + 7).to_bytes(2, byteorder="big")
cmd = b"\x02" + message_size + b"\x0c" + payload
run_response = await self.send(cmd)

# TODO: find a prettier way to do this. It's essentially copied from _wait_for_ready_and_return.
last_status = None
Expand All @@ -199,19 +240,19 @@ async def _run_luminescence(self, focal_height: float):
):
return run_response

async def _run_absorbance(self, wavelength: float):
async def _run_absorbance(self, wavelength: float, plate: Plate):
"""Run a plate reader absorbance run."""
wavelength_data = int(wavelength * 10).to_bytes(2, byteorder="big")
plate_bytes = self._plate_bytes(plate)

absorbance_command = (
b"\x02\x00\x7c\x0c\x04\x31\xec\x21\x66\x05\x96\x04\x60\x2c\x56\x1d\x06"
b"\x0c\x08\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x82\x02\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27\x0f\x27"
payload = (
b"\x04" + plate_bytes + b"\x82\x02\x00\x00\x00\x00\x00\x00\x00\x20\x04\x00\x1e\x27\x0f\x27"
b"\x0f\x19\x01" + wavelength_data + b"\x00\x00\x00\x64\x00\x00\x00\x00\x00\x00\x00\x64\x00"
b"\x00\x00\x00\x00\x02\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x16\x00\x01\x00\x00"
)
run_response = await self.send(absorbance_command)
message_size = (len(payload) + 7).to_bytes(2, byteorder="big")
cmd = b"\x02" + message_size + b"\x0c" + payload
run_response = await self.send(cmd)

# TODO: find a prettier way to do this. It's essentially copied from _wait_for_ready_and_return.
last_status = None
Expand Down Expand Up @@ -250,19 +291,20 @@ async def read_luminescence(

await self._mp_and_focus_height_value()

await self._run_luminescence(focal_height=focal_height)
await self._run_luminescence(focal_height=focal_height, plate=plate)

await self._read_order_values()

await self._status_hw()

vals = await self._get_measurement_values()

# All 96 values are 32 bit integers. The header is variable length, so we need to find the
# All values are 32 bit integers. The header is variable length, so we need to find the
# start of the data. In the future, when we understand the protocol better, this can be
# replaced with a more robust solution.
num_wells = plate.num_items
start_idx = vals.index(b"\x00\x00\x00\x00\x00\x00") + len(b"\x00\x00\x00\x00\x00\x00")
data = list(vals)[start_idx : start_idx + 96 * 4]
data = list(vals)[start_idx : start_idx + num_wells * 4]

# group bytes by 4
int_bytes = [data[i : i + 4] for i in range(0, len(data), 4)]
Expand Down Expand Up @@ -307,17 +349,18 @@ async def read_absorbance(

await self._mp_and_focus_height_value()

await self._run_absorbance(wavelength=wavelength)
await self._run_absorbance(wavelength=wavelength, plate=plate)

await self._read_order_values()

await self._status_hw()

vals = await self._get_measurement_values()
num_wells = plate.num_items
div = b"\x00" * 6
start_idx = vals.index(div) + len(div)
chromatic_data = vals[start_idx : start_idx + 96 * 4]
ref_data = vals[start_idx + 96 * 4 : start_idx + (96 * 2) * 4]
chromatic_data = vals[start_idx : start_idx + num_wells * 4]
ref_data = vals[start_idx + num_wells * 4 : start_idx + (num_wells * 2) * 4]
chromatic_bytes = [bytes(chromatic_data[i : i + 4]) for i in range(0, len(chromatic_data), 4)]
ref_bytes = [bytes(ref_data[i : i + 4]) for i in range(0, len(ref_data), 4)]
chromatic_reading = [struct.unpack(">i", x)[0] for x in chromatic_bytes]
Expand All @@ -327,7 +370,7 @@ async def read_absorbance(
# c0 is the value of the chromatic at 0% intensity (black reading)
# r100 is the value of the reference at 100% intensity
# r0 is the value of the reference at 0% intensity (black reading)
after_values_idx = start_idx + (96 * 2) * 4
after_values_idx = start_idx + (num_wells * 2) * 4
c100, c0, r100, r0 = struct.unpack(">iiii", vals[after_values_idx : after_values_idx + 4 * 4])

# a bit much, but numpy should not be a dependency
Expand Down
Loading