diff --git a/README.md b/README.md index 3bae9d1..571d104 100644 --- a/README.md +++ b/README.md @@ -14,12 +14,13 @@ Low-level Python or simplified wrapper modules to send commands to Standa contro ### smc8.py Example ```python - from util.smc8 import SMC + from util.smc8 import SmcController # Open connection examples - dev = SMC(device_connection = "192.168.31.123/9219", connection_type = "xinet", log = False) - dev = SMC(device_connection="/dev/ximc/00007DF6", connection_type = "serial",log = True) - dev.open_connection() + dev = SmcController(log = False) + dev.connect(device_str = "192.168.31.123/9219", connection_type = "xinet") + # OR + dev.connect(device_str="/dev/ximc/00007DF6", connection_type = "serial") time.sleep(.25) #Populates dev with device info dev.get_info() @@ -32,21 +33,21 @@ Low-level Python or simplified wrapper modules to send commands to Standa contro time.sleep(5) #Give time for stage to move # Query Position - pos = dev.get_position() # Query Position + pos = dev.get_pos() # Query Position # Move Relative to its current position - dev.move_rel(position = 5) #positive ot negative + dev.set_pos(position = 5) #positive ot negative time.sleep(5) # Move to absolute position - dev.move_abs(position = 10) + dev.set_pos(position = 10, abs_move = False) time.sleep(5) pos = dev.get_position() dev.home() time.sleep(5) #Close connection - dev.close_connection() + dev.disconnect() ``` ## 🧪 Testing diff --git a/pyproject.toml b/pyproject.toml index 0abe938..26f440c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,8 @@ readme = "README.md" requires-python = ">=3.8" dependencies = [ - "libximc" + "libximc", + "hardware_device_base@git+https://github.com/COO-Utilities/hardware_device_base" ] [project.urls] diff --git a/smc8.py b/smc8.py index 4eb9cd9..f8ad3b4 100644 --- a/smc8.py +++ b/smc8.py @@ -1,63 +1,48 @@ -#NOTE:: Pip install of libximc is needed to use the library imported -# These are not standard python librarys but are on PyPI -# -Elijah Anakalea-Buckley - +''' + standa.smc8.py: Class for controlling Standa SMC motion controllers + using the libximc library. + NOTE:: Pip install of libximc is needed to use the library imported + These are not standard python librarys but are on PyPI + -Elijah Anakalea-Buckley +''' +from typing import Tuple, Union, Dict import libximc.highlevel as ximc -import logging -import pathlib -import os -import time - +from hardware_device_base import HardwareMotionBase -class SMC(object): +class SmcController(HardwareMotionBase): ''' Class is for utilizing the libximc Library. - Functions from lib.ximc is incorporated into this class + Functions from lib.ximc are incorporated into this class to make it easier to use for common tasks. - using the more recently developed libximc.highlevel API - step_size:float = 0.0025 Conversion Coefficient, Example for converting steps to mm used in API, adjust as needed - All functions log their actions and errors to a log file - Required Parameters: - device_connection: str = Connection string for device + device_str: str = Connection string for device - Ex: serial connection: '/COM3', '/dev/ximc/000746D30' or '192.123.123.92' - NOTE:: For Network you must provide IP/Name and device ID. Device ID is the serial number tranlslated to hex - EX: SMC(device_connection = "192.168.29.123/9219", connection_type="xinet") + EX: SMC(device_str = "192.168.29.123/9219", connection_type="xinet") connection_type: str = Type of connection - Options: 'serial'=USB, 'tcp'=Raw TCP, 'xinet'=Network log: bool = Enable or disable logging to file ''' - def __init__(self, device_connection: str, connection_type: str,log: bool, step_size:float = 0.0025): + _move_cmd_flags = ximc.MvcmdStatus # Default move command flags + _state_flags = ximc.StateFlags + + def __init__(self, log:bool=True, logfile: str =__name__.rsplit(".", 1)[-1]): ''' Inicializes the device parameters: ip string, port integer, logging bool - full device capabilities will be under "self.device." - ''' - # Logger setup - logname = __name__.rsplit(".", 1)[-1] - self.logger = logging.getLogger(logname) - self.logger.setLevel(logging.DEBUG) - if log: - log_handler = logging.FileHandler(logname + ".log") - formatter = logging.Formatter( - "%(asctime)s--%(name)s--%(levelname)s--%(module)s--" - "%(funcName)s--%(message)s") - log_handler.setFormatter(formatter) - self.logger.addHandler(log_handler) - # Console handler for real-time output - console_handler = logging.StreamHandler() - console_formatter = logging.Formatter("%(asctime)s--%(message)s") - console_handler.setFormatter(console_formatter) - self.logger.addHandler(console_handler) - - - self.logger.info("Logger initialized for SMC8 Stage") + + connection_type: str="serial", step_size:float = 0.0025 + ''' + super().__init__(log, logfile) #Inicialize variables and objects - self._move_cmd_flags = ximc.MvcmdStatus # Default move command flags - self._state_flags = ximc.StateFlags self.serial_number = None self.power_setting = None self.device_information = None @@ -65,72 +50,75 @@ def __init__(self, device_connection: str, connection_type: str,log: bool, step_ self.min_limit = None self.max_limit = None self._homed_and_happy_bool = False - self._uPOSITION = 0 #Constant is 0 for DC motors and avaries for stepper motors - #look into ximc library for details on uPOSITION + self._uPOSITION = 0 #Constant is 0 for DC motors and avaries for stepper motors (pylint: disable=C0103) + #look into ximc library for details on uPOSITION self.device_uri = None + self.dev_open = False + self.step_size_coeff = None + self._axis = None - # Reference for connecting to device - # device_uri = r"xi-emu:///ABS_PATH/virtual_controller.bin" # Virtual device - # device_uri = r"xi-com:\\.\COM111" # Serial port - # device_uri = "xi-tcp://172.16.130.155:1820" # Raw TCP connection - # device_uri = "xi-net://192.168.1.120/abcd" # XiNet connection - connection_type = connection_type.lower().strip() - if connection_type == "serial": - self.device_uri = f"xi-com://{device_connection}" - elif connection_type == "tcp": - self.device_uri = f"xi-tcp://{device_connection}" - elif connection_type == "xinet": - self.device_uri = f"xi-net://{device_connection}" - else: - self.logger.error(f"Unknown connection type: {connection_type}") - raise ValueError(f"Unknown connection type: {connection_type}") - - - self.step_size_coeff = step_size # Example conversion coefficient, adjust as needed(mm) - self.dev_open = False - self._axis = ximc.Axis(self.device_uri) - - def open_connection(self): + def connect(self, connection_type: str, device_str: str, step_size:float = 0.0025): # pylint: disable=W0221 ''' Opens communication to the Device, gathers general information to store in local variables. + - Reference for connecting to device + - device_uri = r"xi-emu:///ABS_PATH/virtual_controller.bin" # Virtual device + - device_uri = r"xi-com:\COM111" # Serial port + - device_uri = "xi-tcp://172.16.130.155:1820" # Raw TCP connection + - device_uri = "xi-net://192.168.1.120/abcd" # XiNet connection return: Bool for successful or unsuccessful connection libximc:: open_device() ''' #Check if already open if self.dev_open: #log that device is already open - self.logger.info("Device already open, skipping open command.") + self.report_info("Device already open, skipping open command.") #return true if already open return True - #try to open try: + #Build device URI based on connection type + connection_type = connection_type.lower().strip() + if connection_type == "serial": + self.device_uri = f"xi-com://{device_str}" + elif connection_type == "tcp": + self.device_uri = f"xi-tcp://{device_str}" + elif connection_type == "xinet": + self.device_uri = f"xi-net://{device_str}" + else: + self.report_error(f"Unknown connection type: {connection_type}") + raise ValueError(f"Unknown connection type: {connection_type}") + + + self.step_size_coeff = step_size # Example conversion coefficient, adjust as needed(mm) #open device + self._axis =ximc.Axis(self.device_uri) self._axis.open_device() #get and save engine settings self._engine_settings = self._axis.get_engine_settings() - #Set calb for user units TODO:: Check if this is correct(SPECIFICALLY THE MICROSTEP MODE) + #Set calb for user units(SPECIFICALLY THE MICROSTEP MODE) self._axis.set_calb(self.step_size_coeff, self._engine_settings.MicrostepMode) - #Set limits - self.limits = self._axis.get_edges_settings() - self.min_limit = self.limits.LeftBorder - self.max_limit = self.limits.RightBorder + self.get_limits() - self.logger.info("Device opened successfully.") + self.report_info("Device opened successfully.") #return true if successful self.dev_open = True - return True - except Exception as e: + except ValueError as e: #log error - self.logger.error(f"Error opening device: {e}") - - #return false if unsuccessful + self.report_error(f"Error opening device: {e}") self.dev_open = False - return False + except ConnectionError as e: + #log error + self.report_error(f"Connection error opening device: {e}") + self.dev_open = False + except Exception as e: #pylint: disable=W0718 + #log error + self.report_error(f"Unknown error opening device: {e}") + self.dev_open = False + return self.dev_open - def close_connection(self): + def disconnect(self): ''' Closes communication to the Device return: Bool for successful or unsuccessful termination @@ -138,28 +126,22 @@ def close_connection(self): ''' #Check if already open if not self.dev_open: - #log that de is closed - self.logger.info("Device already closed, skipping close command.") - - #return true if already closed + #log that device is closed + self.report_warning("Already disconnected from device.") return True - #Try to close + #Try to close, return result try: self._axis.close_device() self.dev_open = False - self.logger.info("Device closed successfully.") - #return true if succesful + self.report_info("Device closed successfully.") return True - except Exception as e: - #catch error - #log error and return false - self.logger.error(f"Error closing device: {e}") - - #return false if unsuccessful + except Exception as e: #pylint: disable=W0718 + #log error and return device still open + self.report_error(f"Error closing device: {e}") self.dev_open = True return False - + def get_info(self): ''' Gets information about the device, such as serial number, power setting, @@ -173,30 +155,24 @@ def get_info(self): #Check if connection not open if not self.dev_open: #log closed connection - self.logger.error("Device not open, cannot get info.") + self.report_error("Device not open, cannot get info.") return False #Try to get info try: - #get serial number + #get serial number, power settings, device information self.serial_number = self._axis.get_serial_number() - #get power settings self.power_setting = self._axis.get_power_settings() - #get device information self.device_information = self._axis.get_device_information() - self.logger.info("Device opened successfully.") - self.logger.info(f"Serial number: {self.serial_number}") - self.logger.info(f"Power setting: {self.power_setting}") - #Log device information - self.logger.info(f"Device information: {self.device_information}") - - #return true if successful - return True - except Exception as e: + self.report_info("Device opened successfully.") + self.report_info(f"Serial number: {self.serial_number}") + self.report_info(f"Power setting: {self.power_setting}") + self.report_info(f"Device information: {self.device_information}") + except Exception as e: #pylint: disable=W0718 #log error and return None - self.logger.error(f"Error getting device information: {e}") - return False + self.report_error(f"Error getting device information: {e}") + return None def home(self): @@ -208,23 +184,47 @@ def home(self): ''' #Check if connection not open if not self.dev_open: - #log closed connection - self.logger.error("Device not open, cannot home stage.") + self.report_error("Device not open, cannot home stage.") return False - #Try to home to zero or parked position try: + #home stage,Check, and log status self._axis.command_homezero() - #Check position after homing - self.logger.info("Stage sent to homed position which is 0") - #return true if succesful + self.report_info("home command sent successfully.") self.status() return True - #catch error - except Exception as e: + except Exception as e: #pylint: disable=W0718 #log error - self.logger.error(f"Error homing stage: {e}") - #return false if unsuccessful + self.report_error(f"Error homing stage: {e}") + return False + + def set_pos(self, position:int, abs_move:bool=True): # pylint: disable=W0221 + ''' + Sets the current position of the stage to a specific value. + - This does not move the stage, just sets the current position + parameters: int:"position" to set current position to + return: bool on successful or unsuccessful set position + libximc:: set_position() + ''' + #Check if connection not open + if not self.dev_open: + self.report_error("Device not open, cannot set position.") + return False + + try: + #set position, return true if succesful + if abs_move: + self.report_info("Setting position with absolute move.") + self.move_abs(position) + self.report_info(f"Position set to: {position}") + else: + self.report_info("Setting position relative to current position.") + self.move_rel(position) + self.report_info(f"Position moved by: {position}") + return True + except Exception as e: #pylint: disable=W0718 + #log error and return false + self.report_error(f"Error setting position: {e}") return False def move_abs(self, position:int): @@ -238,24 +238,19 @@ def move_abs(self, position:int): ''' #Check if connection not open if not self.dev_open: - #log closed connection - self.logger.error("Device not open, cannot move stage.") + self.report_error("Device not open, cannot move stage.") return False - #Try move absolute try: #check limits/valid inputs if position < self.min_limit or position > self.max_limit: - self.logger.error(f"Position out of limits: {position}") + self.report_error(f"Position out of limits: {position}") return False - #move absolute self._axis.command_move(position, self._uPOSITION) - #return true if succesful return True - #catch error - except Exception as e: + except Exception as e: #pylint: disable=W0718 #log error and return false - self.logger.error(f"Error moving stage: {e}") + self.report_error(f"Error moving stage: {e}") return False def move_rel(self, position:int): @@ -269,32 +264,26 @@ def move_rel(self, position:int): ''' #Check if connection not open if not self.dev_open: - #log closed connection - self.logger.error("Device not open, cannot move stage.") + self.report_error("Device not open, cannot move stage.") return False - #Try move relative try: #check limits/valid inputs - #get current position - current_position = self.get_position() - #calculate new position + #get current position, calculate new position, check limits + current_position = self.get_pos() new_position = current_position + position - #check if new position is within limits if new_position < self.min_limit or new_position > self.max_limit: - self.logger.error(f"Position out of limits: {new_position}") + self.report_error(f"Position out of limits: {new_position}") return False #move relative self._axis.command_movr(position, self._uPOSITION) - #return true if succesful return True - #catch error - except Exception as e: + except Exception as e: #pylint: disable=W0718 #log error and return false - self.logger.error(f"Error moving stage: {e}") + self.report_error(f"Error moving stage: {e}") return False - def get_position(self): + def get_pos(self): # pylint: disable=W0221 ''' Gets Position of stage return: position in stage specific units @@ -302,23 +291,20 @@ def get_position(self): ''' #Check if connection not open if not self.dev_open: - #log closed connection - self.logger.error("Device not open, cannot get position.") + self.report_error("Device not open, cannot get position.") return False - #Try get_position try: #get position pos = self._axis.get_position() - #return aspects of the position object + self.report_info(f"Current position: {pos.Position}") return pos.Position - #catch error - except Exception as e: + except Exception as e: #pylint: disable=W0718 #log error and return None - self.logger.error(f"Error getting position: {e}") + self.report_error(f"Error getting position: {e}") return None - def status(self): + def get_status(self): ''' Gathers status and formats it in a usable and readable format. mostly for logging @@ -327,24 +313,19 @@ def status(self): ''' #Check if connection not open if not self.dev_open: - #log closed connection - self.logger.error("Device not open, cannot get status.") + self.report_error("Device not open, cannot get status.") return False - #Try status function try: - #get status - status = self._axis.get_status() - #parse results - #return status in user friendly way - self.logger.info(f"Position: {status.CurPosition}") - self._homed_and_happy_bool = bool(status.Flags & self._state_flags.STATE_IS_HOMED | + #get status, parse results, return status in user friendly way + self.status = self._axis.get_status() + self.report_info(f"Position: {self.status.CurPosition}") + self._homed_and_happy_bool = bool(self.status.Flags & self._state_flags.STATE_IS_HOMED | self._state_flags.STATE_EEPROM_CONNECTED) - return status - #catch error - except Exception as e: + return self.status + except Exception as e: #pylint: disable=W0718 #log error and return false - self.logger.error(f"Error getting status: {e}") + self.report_error(f"Error getting status: {e}") return None def halt(self): @@ -356,24 +337,50 @@ def halt(self): ''' #Check if connection not open if not self.dev_open: - #log closed connection - self.logger.error("Device not open, cannot halt stage.") + self.report_error("Device not open, cannot halt stage.") return False - #Try imidiate stop of stage try: + #imidiate stop, check status, recurse self._axis.command_stop() - #Check status after halting status = self._axis.get_status() if status.MvCmdSts != self._move_cmd_flags.MVCMD_STOP: self.halt() #Recursively call halt if not stopped #status.Moving - self.logger.info("Stage halted successfully.") - #return true if succesful + self.report_info("Stage halted successfully.") return True - #catch error - except Exception as e: + except Exception as e: #pylint: disable=W0718 #log error and return false - self.logger.error(f"Error halting stage: {e}") - return False \ No newline at end of file + self.report_error(f"Error halting stage: {e}") + return False + + def is_homed(self) -> bool: + """Check if the hardware motion device is homed.""" + self.get_status() + return self._homed_and_happy_bool + + def get_limits(self) -> Union[Dict[str, Tuple[float, float]], None]: + """ + Get the limits of the hardware motion device. + + Limits are the smallest and largest allowed positions for an axis. + Axes are identified by a string and limits are a tuple. + e.g.: {"1": (1, 6)} - for a filter wheel + + """ + #Set limits + limits = self._axis.get_edges_settings() + self.min_limit = limits.LeftBorder + self.max_limit = limits.RightBorder + self.report_info(f"Limits are: Min: {self.min_limit}, Max: {self.max_limit}") + ret = {"1": (self.min_limit, self.max_limit)} + return ret + + def close_loop(self) -> bool: + """Close the loop for the hardware motion device.""" + return True + + def is_loop_closed(self) -> bool: + """Check if the hardware motion loop is closed.""" + return True