diff --git a/.github/workflows/python-check.yml b/.github/workflows/python-check.yml new file mode 100644 index 0000000..e43522f --- /dev/null +++ b/.github/workflows/python-check.yml @@ -0,0 +1,11 @@ +name: python style check +on: [push, pull_request] +jobs: + check-python-style: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Black Code Formatter + uses: lgeiger/black-action@master + with: + args: ". --check --diff" \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..70df8a3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +**/__pycache__ + +COLCON_IGNORE +.colcon_root diff --git a/LICENSE b/LICENSE index fc45ba6..dee5c37 100644 --- a/LICENSE +++ b/LICENSE @@ -1,28 +1,201 @@ -BSD 3-Clause License - -Copyright (c) 2024, Martin Klomp, MIRTE-team - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -3. Neither the name of the copyright holder nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2025, MIRTE team + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..0090a31 --- /dev/null +++ b/NOTICE @@ -0,0 +1,7 @@ +This software is licensed under the Apache License, Version 2.0. +See the LICENSE file for details. + +Technische Universiteit Delft hereby disclaims all copyright interest +in the software of "MIRTE robot" (an educational robot) written by the +Author(s). +prof. dr. ir. Fred van Keulen, Dean of Mechanical Engineering diff --git a/README.md b/README.md index 824df0a..2730d27 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,29 @@ -# Mirte Python API +# mirte-python-api + +This package provides the python API for the [MIRTE robot](https://mirte.org). +Please read the [MIRTE documentation](https://docs.mirte.org/develop/doc/api/mirte_python_api.html) +for further documentation of this API. + +## Install + +```sh +pip install . +``` + +## Checks + +To contribute to this repository the code needs to pass the python +[style check](https://github.com/mirte-robot/mirte-python/blob/develop/.github/workflows/python-check.yml) + +To check this locally before you commit/push: +```sh +pip install black +black --check . +# Fix by using: +black . +``` + +## License + +This work is licensed under a Apache-2.0 OSS license. -This package provides the API for the Mirte Robot. Please read the Mirte documentation. diff --git a/mirte_robot/__pycache__/linetrace.cpython-36.pyc b/mirte_robot/__pycache__/linetrace.cpython-36.pyc deleted file mode 100644 index 9fc2b08..0000000 Binary files a/mirte_robot/__pycache__/linetrace.cpython-36.pyc and /dev/null differ diff --git a/mirte_robot/__pycache__/robot.cpython-36.pyc b/mirte_robot/__pycache__/robot.cpython-36.pyc deleted file mode 100644 index 8bb10d6..0000000 Binary files a/mirte_robot/__pycache__/robot.cpython-36.pyc and /dev/null differ diff --git a/mirte_robot/linetrace.py b/mirte_robot/linetrace.py index 2c1f9f9..cc8776f 100644 --- a/mirte_robot/linetrace.py +++ b/mirte_robot/linetrace.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -#TODO: for debugging purposes we could *also* listen to keyboard events +# TODO: for debugging purposes we could *also* listen to keyboard events import sys import os @@ -9,49 +9,51 @@ import multiprocessing from websocket_server import WebsocketServer -# Already load rospy (whicht takes long) and robot, so mirte.py does not need to do this anymore -import rospy +# Already load rclpy (whicht takes long) and robot, so mirte.py does not need to do this anymore +import rclpy from mirte_robot import robot # Global shared memory objects (TODO: check if we need shared memory, why is server working?) -stepper = multiprocessing.Value('b', True) -do_step = multiprocessing.Value('b', False) -running = multiprocessing.Value('b', False) +stepper = multiprocessing.Value("b", True) +do_step = multiprocessing.Value("b", False) +running = multiprocessing.Value("b", False) + # the stop be stopped when: # 1) the code finishes # 2) the user stopped the process # 3) the websocket connection is closed def stop_mirte(): - global running - if running.value: + global running + if running.value: process.terminate() - running.value = False + running.value = False + def load_mirte_module(stepper, do_step, running): def trace_lines(frame, event, arg): - global do_step - if event != 'line': - return - # Only return line number to websocket when in step mode - if stepper.value: - server.send_message_to_all(str(frame.f_lineno)) - while stepper.value and not do_step.value: - time.sleep(.01) - do_step.value = False + global do_step + if event != "line": + return + # Only return line number to websocket when in step mode + if stepper.value: + server.send_message_to_all(str(frame.f_lineno)) + while stepper.value and not do_step.value: + time.sleep(0.01) + do_step.value = False def traceit(frame, event, arg): - co = frame.f_code - filename = co.co_filename - if not filename.endswith('mirte.py'): - return - return trace_lines + co = frame.f_code + filename = co.co_filename + if not filename.endswith("mirte.py"): + return + return trace_lines # Send the PID to the web interface and give it some time to call strace on this process # to see the output of this script server.send_message_to_all("pid:" + str(os.getpid())) - time.sleep(0.2) #TODO: let client send signal when strace is started + time.sleep(0.2) # TODO: let client send signal when strace is started sys.settrace(traceit) # rospy.init_node() for some reason needs to be called from __main__ when importing in the regular way. @@ -59,48 +61,55 @@ def traceit(frame, event, arg): # https://answers.ros.org/question/266612/rospy-init_node-inside-imported-file test = SourceFileLoader("mirte", "/home/mirte/workdir/mirte.py").load_module() - # Stop the motors. The atexit call in robot.py does not work when running from a subprocess: + # Stop the motors. The atexit call in robot.py does not work when running from a subprocess: # https://stackoverflow.com/questions/34506638/how-to-register-atexit-function-in-pythons-multiprocessing-subprocess # TODO: this assumes we have the robot initlized under variable 'mirte'. As soon as we let them create their own python, # this might not work anymore. - if hasattr(test, 'mirte'): - test.mirte.stop() + if hasattr(test, "mirte"): + test.mirte.stop() # Sending the linetrace 0 to the client server.send_message_to_all("0") running.value = False stop_mirte() -process = multiprocessing.Process(target = load_mirte_module, args=(stepper, do_step)) + +process = multiprocessing.Process(target=load_mirte_module, args=(stepper, do_step)) + def start_mirte(): global process # process should already have been killed after stop, or disconnect # but, just in case make sure to stop this if process.is_alive(): - process.terminate() - process = multiprocessing.Process(target = load_mirte_module, args=(stepper, do_step, running)) + process.terminate() + process = multiprocessing.Process( + target=load_mirte_module, args=(stepper, do_step, running) + ) process.start() + def client_left(client, server): stop_mirte() + def message_received(client, server, message): - global stepper, do_step, running - - if message == "b": #break (pause) - stepper.value = True - if message == "c": #continue (play) - stepper.value = False - if not running.value: - start_mirte() - running.value = True - if message == "s": #step (step) - do_step.value = True - if message == "e": #exit (stop) - stepper.value = True - do_step.value = False - stop_mirte() + global stepper, do_step, running + + if message == "b": # break (pause) + stepper.value = True + if message == "c": # continue (play) + stepper.value = False + if not running.value: + start_mirte() + running.value = True + if message == "s": # step (step) + do_step.value = True + if message == "e": # exit (stop) + stepper.value = True + do_step.value = False + stop_mirte() + server = WebsocketServer(host="0.0.0.0", port=8001, loglevel=logging.CRITICAL) server.set_fn_message_received(message_received) diff --git a/mirte_robot/robot.py b/mirte_robot/robot.py index 41ad3c6..d346de4 100644 --- a/mirte_robot/robot.py +++ b/mirte_robot/robot.py @@ -1,41 +1,131 @@ #!/usr/bin/env python +import math +import os +import platform import time -import rospy -import rosservice import signal import sys -import math -import atexit - -# TODO: check if we need the telemetrix version of this? -#sys.path.append('/usr/local/lib/python2.7/dist-packages/PyMata-2.20-py2.7.egg') # Needed for jupyter notebooks -#sys.path.append('/usr/local/lib/python2.7/dist-packages/pyserial-3.4-py2.7.egg') - -import message_filters -from geometry_msgs.msg import Twist -from std_msgs.msg import Int32 -from std_msgs.msg import String -from std_msgs.msg import Empty -from mirte_msgs.msg import * +import threading +import weakref +from typing import TYPE_CHECKING, Literal, Optional, overload + +import rclpy +import rclpy.node +from rclpy.validate_namespace import validate_namespace + +if TYPE_CHECKING: + import rclpy.client + +from controller_manager_msgs.srv import SwitchController +from mirte_msgs.srv import ( + GetAnalogPinValue, + GetBoardCharacteristics, + GetColorHSL, + GetColorRGBW, + GetDigitalPinValue, + GetEncoder, + GetIntensity, + GetIntensityDigital, + GetKeypad, + GetRange, + SetDigitalPinValue, + SetMotorSpeed, + SetOLEDFile, + SetOLEDText, + SetPWMPinValue, + SetServoAngle, +) +from rcl_interfaces.srv import ListParameters + +##### +# +# There are 4 use cases that should exit the user code: +# +# 1) Running "while true" (and CTRL-C) from commandline +# 2) Running "for 10" from commandline +# 3) Running "while true" (and stop/CTRL-C) from web interface +# 4) Running "for 10" from web interface +# +##### -from mirte_msgs.srv import * -from std_srvs.srv import * mirte = {} -class Robot(): - """ Robot API - This class allows you to control the robot from Python. The getters and setters - are just wrappers calling ROS topics or services. +class Robot: + """Robot API + + This class allows you to control the robot from Python. The getters and setters + are just wrappers calling ROS topics or services. """ + # Implementation Notes: + # This class creates a hidden ROS Node for the communication. + # This class is a singleton. + # No QoS Profiles are set, but this might not be required, since they might already behave like ROS 1 persistant. + # Therefore the node is also anonymized with the current time. + + # Ensuring singleton behavior + # This should not be done as a decorator, since the decorator returns a + # function. Therefor the sphynx documentation is not able to process + # this class anymore. + _instance = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super(Robot, cls).__new__(cls) + rclpy.init() + return cls._instance + + def __init__( + self, machine_namespace: Optional[str] = None, hardware_namespace: str = "io" + ): + """Intialize the Mirte Robot API""" + + # Only the first instance (singelton) needs to be initialized + if getattr(self, "_initialized", False): + return + + # Parameters: + # machine_namespace (Optional[str], optional): The Namespace from '/' to the ROS namespace for the specific Mirte. Defaults to "/{HOSTNAME}". (This only has to be changed when running the Robot API from a different machine directly. It is configured correctly for the Web interface) + # hardware_namespace (str, optional): The namespace for the hardware peripherals. Defaults to "io". + + self._machine_namespace = "" # ( + # machine_namespace + # if machine_namespace and validate_namespace(machine_namespace) + # else "/" + platform.node().replace("-", "_").lower() + # ) + self._hardware_namespace = "/io" # ( + # hardware_namespace + # if validate_namespace( + # hardware_namespace + # if hardware_namespace.startswith("/") + # else (self._machine_namespace + "/" + hardware_namespace) + # ) + # else "io" + # ) + + ROS_DISTRO = os.getenv("ROS_DISTRO") + + # In order to be able to run this from multiple python files (ie creating + # multtiple nodes) a timestamp is added (replicating a 'anonymous' node) + self._node = rclpy.node.Node( + "_mirte_python_api_" + str(time.time_ns()), + namespace=self._machine_namespace, + start_parameter_services=True, + ) - def __init__(self): # Stop robot when exited - atexit.register(self.stop) + rclpy.get_default_context().on_shutdown(self._at_exit) + self._stopping = False + self._lock = threading.Lock() + + # Assuming we are only using the Python API for the MIRTE Pioneer. This + # is only used for setMotorControl(), so all other functions should + # (theoretically) still work on teh MIRTE Master. + self.CONTROLLER = "mirte_base_controller" - self.PWM = 3 #PrivateConstants.PWM when moving to Python3 + self.PWM = 3 # PrivateConstants.PWM when moving to Python3 self.INPUT = 0 self.OUTPUT = 1 self.PULLUP = 11 @@ -45,28 +135,105 @@ def __init__(self): self.begin_time = time.time() self.last_call = 0 - # Call /stop and /start service to disable/enable the ROS diff_drive_controller - # By default this class will control the rbot though PWM (controller stopped). Only in case + # Call controller_manager/switch_controller service to disable/enable the ROS diff_drive_controller + # By default this class will control the robot though PWM (controller stopped). Only in case # the controller is needed, it will be enabled. - self.stop_controller_service = rospy.ServiceProxy('stop', Empty, persistent=True) - self.start_controller_service = rospy.ServiceProxy('start', Empty, persistent=True) + self._switch_controller_service = self._node.create_client( + SwitchController, "controller_manager/switch_controller" + ) + + if ROS_DISTRO[0] >= "i": # Check if the ROS Distro is IRON or newer + # Not available untill ROS Iron + if not self._node.wait_for_node( + self._hardware_namespace + "telemetrix", 10 + ): + self._node.get_logger().fatal( + f"Telemetrix node at '{self._node.get_namespace() + '/' + self._hardware_namespace + '/telemetrix'}' was not found! Aborting" + ) + exit(-1) + list_parameters: rclpy.client.Client = self._node.create_client( + ListParameters, self._hardware_namespace + "/telemetrix/list_parameters" + ) + + # # Get the Board Characteristics + get_board_characteristics = self._node.create_client( + GetBoardCharacteristics, + self._hardware_namespace + "/get_board_characteristics", + ) + + # Wait for the get_board_characteristics to prevent weird errors. + if not get_board_characteristics.wait_for_service(2): + self._node.get_logger().fatal( + f"Telemetrix node at '{self._node.get_namespace()+ '/'+ self._hardware_namespace + '/telemetrix'}' does not provide a '{self._node.get_namespace() + '/' + self._hardware_namespace + '/get_board_characteristics'}' service! Aborting" + ) + exit(-1) + + board_future: rclpy.Future = get_board_characteristics.call_async( + GetBoardCharacteristics.Request() + ) + rclpy.spin_until_future_complete(self._node, board_future) + + board_characteristics: GetBoardCharacteristics.Response = board_future.result() + + self._max_adc: int = board_characteristics.max_adc + self._max_pwm: int = board_characteristics.max_pwm + self._max_voltage: float = board_characteristics.max_voltage + + self._node.destroy_client(get_board_characteristics) # Service for motor speed - self.motors = {} - if rospy.has_param("/mirte/motor"): - self.motors = rospy.get_param("/mirte/motor") + self.motors = {} # FIXME: Is self.motors used? + motors_future: rclpy.Future = list_parameters.call_async( + ListParameters.Request(prefixes=["motor"], depth=3) + ) + rclpy.spin_until_future_complete(self._node, motors_future) + + motor_prefixes: list[str] = motors_future.result().result.prefixes + if len(motor_prefixes) > 0: + self.motors = [ + motor_prefix.split(".")[-1] for motor_prefix in motor_prefixes + ] self.motor_services = {} for motor in self.motors: - self.motor_services[motor] = rospy.ServiceProxy('/mirte/set_' + self.motors[motor]["name"] + '_speed', SetMotorSpeed, persistent=True) - - # Service for motor speed - if rospy.has_param("/mirte/servo"): - servos = rospy.get_param("/mirte/servo") - self.servo_services = {} + self._node.get_logger().info( + f"Created service client for motor [{motor}]" + ) + self.motor_services[motor] = self._node.create_client( + SetMotorSpeed, + self._hardware_namespace + "/motor/" + motor + "/set_speed", + ) + self.motor_services[motor].wait_for_service() + + # Service for motor speed + + def finalize(node: rclpy.node.Node, motors: dict[str, rclpy.client.Client]): + for motor in motors.values(): + future = motor.call_async(SetMotorSpeed.Request(speed=0)) + rclpy.spin_until_future_complete(node, future) + + self._finalizer = weakref.finalize( + self, finalize, self._node, self.motor_services + ) + + servo_future = list_parameters.call_async( + ListParameters.Request(prefixes=["servo"], depth=3) + ) + + rclpy.spin_until_future_complete(self._node, servo_future) + + servo_prefixes = servo_future.result().result.prefixes + if len(servo_prefixes) > 0: + servos = [servo_prefix.split(".")[-1] for servo_prefix in servo_prefixes] + self.servo_services: dict[str, rclpy.client.Client] = {} for servo in servos: - self.servo_services[servo] = rospy.ServiceProxy('/mirte/set_' + servos[servo]["name"] + '_servo_angle', SetServoAngle, persistent=True) - - rospy.init_node('mirte_python_api', anonymous=False) + self._node.get_logger().info( + f"Created service client for servo [{servo}]" + ) + self.servo_services[servo] = self._node.create_client( + SetServoAngle, + self._hardware_namespace + "/servo/" + servo + "/set_angle", + ) + self.servo_services[servo].wait_for_service() ## Sensors ## The sensors are now just using a blocking service call. This is intentionally @@ -76,65 +243,228 @@ def __init__(self): ## maybe even to blockly. # Services for distance sensors - if rospy.has_param("/mirte/distance"): - distance_sensors = rospy.get_param("/mirte/distance") + distance_future = list_parameters.call_async( + ListParameters.Request(prefixes=["distance"], depth=3) + ) + + rclpy.spin_until_future_complete(self._node, distance_future) + + distance_prefixes = distance_future.result().result.prefixes + if len(distance_prefixes) > 0: + distance_sensors = [sensor.split(".")[-1] for sensor in distance_prefixes] self.distance_services = {} for sensor in distance_sensors: - self.distance_services[sensor] = rospy.ServiceProxy('/mirte/get_distance_' + distance_sensors[sensor]["name"], GetDistance, persistent=True) - - if rospy.has_param("/mirte/oled"): - oleds = rospy.get_param("/mirte/oled") + self._node.get_logger().info( + f"Created service client for distance sensor [{sensor}]" + ) + self.distance_services[sensor] = self._node.create_client( + GetRange, + self._hardware_namespace + "/distance/" + sensor + "/get_range", + ) + self.distance_services[sensor].wait_for_service() + + # Services for oled + oled_future = list_parameters.call_async( + ListParameters.Request(prefixes=["oled"], depth=3) + ) + + rclpy.spin_until_future_complete(self._node, oled_future) + + oled_prefixes = oled_future.result().result.prefixes + if len(oled_prefixes) > 0: + oleds = [oled.split(".")[-1] for oled in oled_prefixes] self.oled_services = {} for oled in oleds: - self.oled_services[oled] = rospy.ServiceProxy('/mirte/set_' + oleds[oled]["name"] + '_image', SetOLEDImage, persistent=True) + self._node.get_logger().info( + f"Created service client for oled [{oled}]" + ) + self.oled_services[oled] = { + "text": self._node.create_client( + SetOLEDText, + self._hardware_namespace + "/oled/" + oled + "/set_text", + ), + "file": self._node.create_client( + SetOLEDFile, + self._hardware_namespace + "/oled/" + oled + "/set_file", + ), + } + self.oled_services[oled]["text"].wait_for_service() + self.oled_services[oled]["file"].wait_for_service() # Services for intensity sensors (TODO: how to expose the digital version?) - if rospy.has_param("/mirte/intensity"): - intensity_sensors = rospy.get_param("/mirte/intensity") + intensity_future = list_parameters.call_async( + ListParameters.Request(prefixes=["intensity"], depth=3) + ) + + rclpy.spin_until_future_complete(self._node, intensity_future) + + intensity_prefixes = intensity_future.result().result.prefixes + if len(intensity_prefixes) > 0: + intensity_sensors = [sensor.split(".")[-1] for sensor in intensity_prefixes] self.intensity_services = {} # We can not get the types (analog and/or digital) of the intensity sensor # straight from the parameter server (it might be just set as the PCB without # explicit values. We can however deduct what is there by checking the # services. - service_list = rosservice.get_service_list() + service_list = set( + [ + service + for service, service_type in self._node.get_service_names_and_types() + ] + ) for sensor in intensity_sensors: - if "/mirte/get_intensity_" + intensity_sensors[sensor]["name"] in service_list: - self.intensity_services[sensor] = rospy.ServiceProxy('/mirte/get_intensity_' + intensity_sensors[sensor]["name"], GetIntensity, persistent=True) - if "/mirte/get_intensity_" + intensity_sensors[sensor]["name"] + "_digital" in service_list: - self.intensity_services[sensor + "_digital"] = rospy.ServiceProxy('/mirte/get_intensity_' + intensity_sensors[sensor]["name"] + "_digital", GetIntensityDigital, persistent=True) - + if ( + self._hardware_namespace + "/intensity/" + sensor + "/get_analog" + in service_list + ): + self._node.get_logger().info( + f"Created service client for intensity [{sensor}]" + ) + self.intensity_services[sensor] = self._node.create_client( + GetIntensity, + self._hardware_namespace + + "/intensity/" + + sensor + + "/get_analog", + ) + self.intensity_services[sensor].wait_for_service() + if ( + self._hardware_namespace + "/intensity/" + sensor + "/get_digital" + in service_list + ): + self._node.get_logger().info( + f"Created service client for digital intensity [{sensor}]" + ) + self.intensity_services[sensor + "_digital"] = ( + self._node.create_client( + GetIntensityDigital, + self._hardware_namespace + + "/intensity/" + + sensor + + "/get_digital", + ) + ) + self.intensity_services[sensor + "_digital"].wait_for_service() # Services for encoder sensors - if rospy.has_param("/mirte/encoder"): - encoder_sensors = rospy.get_param("/mirte/encoder") + encoder_future = list_parameters.call_async( + ListParameters.Request(prefixes=["encoder"], depth=3) + ) + + rclpy.spin_until_future_complete(self._node, encoder_future) + + encoder_prefixes = encoder_future.result().result.prefixes + if len(encoder_prefixes) > 0: + encoder_sensors = [sensor.split(".")[-1] for sensor in encoder_prefixes] self.encoder_services = {} for sensor in encoder_sensors: - self.encoder_services[sensor] = rospy.ServiceProxy('/mirte/get_encoder_' + encoder_sensors[sensor]["name"], GetEncoder, persistent=True) + self._node.get_logger().info( + f"Created service client for encoder [{sensor}]" + ) + self.encoder_services[sensor] = self._node.create_client( + GetEncoder, + self._hardware_namespace + "/encoder/" + sensor + "/get_encoder", + ) + self.encoder_services[sensor].wait_for_service() # Services for keypad sensors - if rospy.has_param("/mirte/keypad"): - keypad_sensors = rospy.get_param("/mirte/keypad") + keypad_future = list_parameters.call_async( + ListParameters.Request(prefixes=["keypad"], depth=3) + ) + + rclpy.spin_until_future_complete(self._node, keypad_future) + + keypad_prefixes = keypad_future.result().result.prefixes + if len(keypad_prefixes) > 0: + keypad_sensors = [sensor.split(".")[-1] for sensor in keypad_prefixes] self.keypad_services = {} for sensor in keypad_sensors: - self.keypad_services[sensor] = rospy.ServiceProxy('/mirte/get_keypad_' + keypad_sensors[sensor]["name"], GetKeypad, persistent=True) + self._node.get_logger().info( + f"Created service client for keypad [{sensor}]" + ) + self.keypad_services[sensor] = self._node.create_client( + GetKeypad, + self._hardware_namespace + "/keypad/" + sensor + "/get_key", + ) + self.keypad_services[sensor].wait_for_service() # Services for color sensors - if rospy.has_param("/mirte/color"): - color_sensors = rospy.get_param("/mirte/color") - self.color_services = {} - for sensor in color_sensors: - self.color_services[sensor] = rospy.ServiceProxy('/mirte/get_color_' + color_sensors[sensor]["name"], GetColorHSL, persistent=True) + color_future = list_parameters.call_async( + ListParameters.Request(prefixes=["color"], depth=3) + ) - self.get_pin_value_service = rospy.ServiceProxy('/mirte/get_pin_value', GetPinValue, persistent=True) - self.set_pin_value_service = rospy.ServiceProxy('/mirte/set_pin_value', SetPinValue, persistent=True) + rclpy.spin_until_future_complete(self._node, color_future) + color_prefixes = color_future.result().result.prefixes + if len(color_prefixes) > 0: + color_sensors = [sensor.split(".")[-1] for sensor in color_prefixes] + self.color_services: dict[str, dict[str, "rclpy.client.Client"]] = {} + for sensor in color_sensors: + self._node.get_logger().info( + f"Created service client for color sensor [{sensor}]" + ) + self.color_services[sensor] = { + "RGBW": self._node.create_client( + GetColorRGBW, + self._hardware_namespace + "/color/" + sensor + "/get_rgbw", + ), + "HSL": self._node.create_client( + GetColorHSL, + self._hardware_namespace + "/color/" + sensor + "/get_hsl", + ), + } + self.color_services[sensor]["RGBW"].wait_for_service() + self.color_services[sensor]["HSL"].wait_for_service() + + self._node.destroy_client(list_parameters) + + self._get_digital_pin_value_service = self._node.create_client( + GetDigitalPinValue, self._hardware_namespace + "/get_digital_pin_value" + ) + + self._get_analog_pin_value_service = self._node.create_client( + GetAnalogPinValue, self._hardware_namespace + "/get_analog_pin_value" + ) + + self._set_digital_pin_value_service = self._node.create_client( + SetDigitalPinValue, self._hardware_namespace + "/set_digital_pin_value" + ) + + self._set_pwm_pin_value_service = self._node.create_client( + SetPWMPinValue, self._hardware_namespace + "/set_pwm_pin_value" + ) signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) - def getTimestamp(self): - """Gets the elapsed time in seconds since the initialization fo the Robot. + self._initialized = True + + def _call_service( + self, client: rclpy.client.Client, request: rclpy.client.SrvTypeRequest + ) -> rclpy.client.SrvTypeResponse: + + with self._lock: + future_response = client.call_async(request) + while not future_response.done() and not self._stopping: + rclpy.spin_once(self._node, timeout_sec=0.1) + + if self._stopping: + with self._lock: + self._stopping = False + self._at_exit() + + return future_response.result() + + # FIXME: Check if services are available, if not don't hard error on: + # AttributeError: 'Robot' object has no attribute 'oled_services'. Did you mean: '_services'? + def _check_available(self, services: dict[str] | None, id: str) -> bool: + if services is None: + return False + return id in services + + def getTimestamp(self) -> float: + """Gets the elapsed time in seconds since the initialization of the Robot. Returns: float: Time in seconds since the initialization of the Robot. Fractions of a second may be present if the system clock provides them. @@ -142,7 +472,7 @@ def getTimestamp(self): return time.time() - self.begin_time - def getTimeSinceLastCall(self): + def getTimeSinceLastCall(self) -> float: """Gets the elapsed time in seconds since the last call to this function. Returns: @@ -152,28 +482,42 @@ def getTimeSinceLastCall(self): last_call = self.last_call self.last_call = time.time() if last_call == 0: - return 0 + return 0 else: - return time.time() - last_call + return time.time() - last_call - def getDistance(self, sensor): + def getDistance(self, sensor: str) -> float: """Gets data from a HC-SR04 distance sensor: calculated distance in meters. Parameters: sensor (str): The name of the sensor as defined in the configuration. Returns: - int: Range in meters measured by the HC-SR04 sensor. + float: Range in meters measured by the HC-SR04 sensor. (The distance gets clamped to minimum and maximum range of the HC-SR04 sensor) Warning: - A maximum of 6 distance sensors is supported. + Depeding on the MCU used, there might be a mamimum of the number of distance sensors one can use. """ + value = self._call_service(self.distance_services[sensor], GetRange.Request()) + range = value.range + + distance: float = range.range - dist = self.distance_services[sensor]() - return dist.data + if distance == math.inf or distance == -math.inf or math.isnan(distance): + distance = -1 - def getIntensity(self, sensor, type="analog"): + return distance + + # TODO: Maybe change digital return type to bool + @overload + def getIntensity(self, sensor: str, type: Literal["analog"]) -> float: ... + @overload + def getIntensity(self, sensor: str, type: Literal["digital"]) -> int: ... + + def getIntensity( + self, sensor: str, type: Literal["analog", "digital"] = "analog" + ) -> int | float: """Gets data from an intensity sensor. Parameters: @@ -181,15 +525,21 @@ def getIntensity(self, sensor, type="analog"): type (str): The type of the sensor (either 'analog' or 'digital'). Returns: - int: Value of the sensor (0-255 when analog, 0-1 when digital). + int | float: Value of the sensor (0.0-1.0 when analog, 0-1 when digital). """ + # FIXME: IMPROVE ERROR for type if type == "analog": - value = self.intensity_services[sensor]() + value = self._call_service( + self.intensity_services[sensor], GetIntensity.Request() + ) if type == "digital": - value = self.intensity_services[sensor + "_digital"]() + value = self._call_service( + self.intensity_services[sensor + "_digital"], + GetIntensityDigital.Request(), + ) return value.data - def getEncoder(self, sensor): + def getEncoder(self, sensor: str) -> int: """Gets data from an encoder: every encoder pulse increments the counter. Parameters: @@ -199,10 +549,12 @@ def getEncoder(self, sensor): int: Number of encoder pulses since boot of the robot. """ - value = self.encoder_services[sensor]() + value = self._call_service(self.encoder_services[sensor], GetEncoder.Request()) return value.data - def getKeypad(self, keypad): + def getKeypad( + self, keypad: str + ) -> Literal["", "up", "down", "left", "right", "enter"]: """Gets the value of the keypad: the button that is pressed. Parameters: @@ -212,11 +564,30 @@ def getKeypad(self, keypad): str: The name of the button ('up', 'down', 'left', 'right', 'enter'). """ - - value = self.keypad_services[keypad]() + value = self._call_service(self.keypad_services[keypad], GetKeypad.Request()) return value.data - def getColor(self, sensor): + def getColorRGBW(self, sensor: str) -> dict[str, float]: + """Gets the value of the color sensor. + + Parameters: + sensor (str): The name of the sensor as defined in the configuration. + + Returns: + {r, g, b, w}: Scaled (0-1) values per R(ed), G(reen), B(lue), and W(hite). + """ + + value = self._call_service( + self.color_services[sensor]["RGBW"], GetColorRGBW.Request() + ) + return { + "r": value.color.r, + "g": value.color.g, + "b": value.color.b, + "w": value.color.w, + } + + def getColor(self, sensor: str) -> dict[str, float]: """Gets the value of the color sensor. Parameters: @@ -224,25 +595,79 @@ def getColor(self, sensor): Returns: {h, s, l}: Hue (0-360), Saturation (0-1), Lightness. + {h, s, l}: Hue (0-360), Saturation (0-1), Lightness. """ - value = self.color_services[sensor]() - return {'h': value.color.h, 's': value.color.s, 'l': value.color.l } + value = self._call_service( + self.color_services[sensor]["HSL"], GetColorHSL.Request() + ) + return { + "h": value.color.h, + "s": value.color.s, + "l": value.color.l, + } + + @overload # FIXME: Should this return int of float + def getAnalogPinValue(self, pin: str, mode: Literal["percentage"]) -> int: ... + + @overload + def getAnalogPinValue(self, pin: str, mode: Literal["raw"]) -> int: ... + + @overload + def getAnalogPinValue(self, pin: str, mode: Literal["voltage"]) -> float: ... - def getAnalogPinValue(self, pin): + # TODO: What to do with max? + def getAnalogPinValue( + self, pin: str, mode: Literal["percentage", "raw", "voltage"] = "percentage" + ) -> float: """Gets the input value of an analog pin. Parameters: pin (str): The pin number of an analog pin as printed on the microcontroller. + mode (str, optional): The units of the value, can be "percentage", "raw" or "voltage". Defaults to "percentage". Returns: - int: Value between 0-255. + int | float: Value of the pin (0-100 when percentage, 0- when raw, 0-V when voltage). """ - value = self.get_pin_value_service(str(pin), "analog") - return value.data - - def setAnalogPinValue(self, pin, value): + response: GetAnalogPinValue.Response = self._call_service( + self._get_analog_pin_value_service, + GetAnalogPinValue.Request(pin=str(pin)), + ) + + # FIXME: TMP CHECK + assert response.status, response.message + match mode: + case "raw": + return response.value + case "percentage": + return response.value / self._max_adc * 100 + case "voltage": + return response.value / self._max_adc * self._max_voltage + case _: + assert False, "FIXME: UNKNOWN MODE" + + # TODO: Input int or float? Why not both + @overload + def setAnalogPinValue( + self, pin: str, value: int | float, mode: Literal["percentage"] + ) -> bool: ... + + @overload + def setAnalogPinValue(self, pin: str, value: int, mode: Literal["raw"]) -> bool: ... + + @overload + def setAnalogPinValue( + self, pin: str, value: float, mode: Literal["voltage"] + ) -> bool: ... + + # FIXME: UPDATE DOCS + def setAnalogPinValue( + self, + pin: str, + value: int | float, + mode: Literal["percentage", "raw", "voltage"] = "percentage", + ) -> bool: """Sets the output value of an analog pin (PWM). Parameters: @@ -250,106 +675,157 @@ def setAnalogPinValue(self, pin, value): value (int): Value between 0-255. """ - value = self.set_pin_value_service(str(pin), "analog", value) - return value.status - - def setOLEDText(self, oled, text): + raw_value = None + + match mode: + # TODO: Maybe add u16 bounds checking, since message type is picky + case "raw": + # Bounds checking happens in telemetrix node + raw_value = int(value) + case "percentage": + # TODO: Maybe check percentage bounds here for better error. + raw_value = int((value / 100) * self._max_pwm) + case "voltage": + # TODO: Maybe check voltage bounds here for better error. + raw_value = int((value / self._max_voltage) * self._max_pwm) + case _: + assert False, "NO VALID MODE" + + response: SetPWMPinValue.Response = self._call_service( + self._set_pwm_pin_value_service, + SetPWMPinValue.Request(pin=str(pin), value=raw_value), + ) + + assert response.status, response.message + return response.status + + def setOLEDText(self, oled: str, text: str) -> bool: """Shows text on the OLED. Parameters: oled (str): The name of the sensor as defined in the configuration. text (str): String to be shown on the 128x64 OLED. """ - value = self.oled_services[oled]('text', str(text)) + value = self._call_service( + self.oled_services[oled]["text"], SetOLEDText.Request(text=str(text)) + ) return value.status - def setOLEDImage(self, oled, image): + def setOLEDImage(self, oled: str, image: str) -> bool: """Shows image on the OLED. Parameters: oled (str): The name of the sensor as defined in the configuration. - image (str): Image name as defined in the images folder of the mirte-oled-images repository (excl file extension). + image (str): Image name/path either an absolute path, a path + relative to the folder of the mirte-oled-images + repository or and package relative path (pkg://PACKAGE_NAME/REST/OF/PATH). + The image extension can be omitted if its png. """ - value = self.oled_services[oled]('image', image) + value = self._call_service( + self.oled_services[oled]["file"], + SetOLEDFile.Request( + path=(image if ("." in image.split("/")[-1]) else (image + ".png")) + ), + ) return value.status - def setOLEDAnimation(self, oled, animation): + def setOLEDAnimation(self, oled: str, animation: str) -> bool: """Shows animation on the OLED. Parameters: oled (str): The name of the sensor as defined in the configuration. - animation (str): Animation (directory) name as defined in the animations folder of the mirte-oled-images repository. + animation (str): Animation (directory) name/path either an absolute + path, a path relative to the folder of the + mirte-oled-images repository or and package + relative path (pkg://PACKAGE_NAME/REST/OF/PATH). """ - value = self.oled_services[oled]('animation', animation) + value = self._call_service( + self.oled_services[oled]["file"], SetOLEDFile.Request(path=animation) + ) return value.status - def getDigitalPinValue(self, pin): + def getDigitalPinValue(self, pin: str) -> bool: """Gets the input value of a digital pin. Parameters: - pin (str): The pin number of an analog pin as printed on the microcontroller. + pin (str): The pin number of a digital pin as printed on the microcontroller. Returns: bool: The input value. """ - value = self.get_pin_value_service(str(pin), "digital") - return value.data + response: GetDigitalPinValue.Response = self._call_service( + self._get_digital_pin_value_service, + GetDigitalPinValue.Request(pin=str(pin)), + ) + assert response.status, response.message + return bool(response.value) - def setServoAngle(self, servo, angle): + def setServoAngle(self, servo: str, angle: float) -> bool: """Sets the angle of a servo. Parameters: servo (str): The name of the sensor as defined in the configuration. - angle (int): The angle of the servo (range [0-360], but some servos - might be hysically limited to [0-180]. + angle (float): The angle of the servo (range [0-360], but some servos + might be physically limited to [0-180]. Returns: bool: True if set successfully. Warning: - The servo uses the Servo library from Arduino (through Telemetrix). This also - means that, when a servo is used and the library is enabled, the last timer on - the MCU will be used for timing of the servos. This timer therefore can not be - used for PWM anymore. For Arduino Nano/Uno this means pins D9 and D10 will not - have PWM anymore. For the SMT32 this means pins A1, A2, A3, A15, B3, B10, and B11 - will not have PWM anymore. + Servos use PWM signals, which needs MCU timers. Depending on the MCU + and the servo implementation, it might be the case that enabling a servo + can interfere with (or use other) timers. This can lead to PWM signals not + functioning on some pins that used to have PWM (due to used timers). Warning: - A maximum of 12 servos is supported. + Depeding on the MCU used, there might be a mamimum of the number of servos + one can use. """ - - value = self.servo_services[servo](angle) + value = self._call_service( + self.servo_services[servo], + SetServoAngle.Request( + angle=float(angle), degrees=SetServoAngle.Request.DEGREES + ), + ) return value.status - def setDigitalPinValue(self, pin, value): + def setDigitalPinValue(self, pin: str, value: bool) -> bool: """Sets the output value of a digital pin. Parameters: pin (str): The pin number of an analog pin as printed on the microcontroller. value (bool): Value to set. """ - value = self.set_pin_value_service(str(pin), "digital", value) - return value.status - def setMotorSpeed(self, motor, value): + response: SetDigitalPinValue.Response = self._call_service( + self._set_digital_pin_value_service, + SetDigitalPinValue.Request(pin=str(pin), value=value), + ) + assert response.status, response.message # FIXME: TMP ERROR + + return response.status + + def setMotorSpeed(self, motor: str, value: int) -> bool: """Sets the speed of the motor. Parameters: motor (str): The name of the sensor as defined in the configuration. - value (int): The 'directional duty cycle' (range [-100, 100]) of the PWM + value (int): The 'directional duty cycle' (range [-100, 100]) of the PWM signal (-100: full backward, 0: stand still, 100: full forward). Returns: bool: True if set successfully. """ - motor = self.motor_services[motor](value) + motor = self._call_service( + self.motor_services[motor], SetMotorSpeed.Request(speed=int(value)) + ) return motor.status - def setMotorControl(self, status): + def setMotorControl(self, status: bool) -> bool: """Enables/disables the motor controller. This is enabled on boot, but can be disabled/enabled at runtime. This makes the ROS control node pause, so it will not respond to Twist messages anymore when disabled. @@ -358,16 +834,22 @@ def setMotorControl(self, status): status (bool): To which status the motor controller should be set. Returns: - none + bool: True if succes (ok) """ - - if (status): - self.start_controller_service() + request = None + if status: + request = SwitchController.Request( + activate_controllers=[self.CONTROLLER], activate_asap=True + ) else: - self.stop_controller_service() - return + request = SwitchController.Request(deactivate_controllers=[self.CONTROLLER]) - def stop(self): + response: SwitchController.Response = self._call_service( + self._switch_controller_service, request + ) + return response.ok + + def stop(self) -> None: """Stops all DC motors defined in the configuration Note: @@ -375,23 +857,39 @@ def stop(self): or when it finished. """ - for motor in self.motors: - self.setMotorSpeed(self.motors[motor]["name"], 0) + self.setMotorSpeed(motor, 0) + + def getROSNode(self): + return self._node def _signal_handler(self, sig, frame): + self._stopping = True + + if not self._lock.locked(): + self._at_exit() + + def _at_exit(self) -> None: self.stop() sys.exit() + # We need a special function to initiate the Robot() because the main.py need to call the # init_node() (see: https://answers.ros.org/question/266612/rospy-init_node-inside-imported-file/) -def createRobot(): +# TODO: We probably do not need this anymore in ROS2. But it affects the python imports. +def createRobot( + machine_namespace: Optional[str] = None, hardware_namespace: str = "io" +) -> Robot: """Creates and return instance of the robot class. + Parameters: + machine_namespace (Optional[str], optional): The Namespace from '/' to the ROS namespace for the specific Mirte. Defaults to "/{HOSTNAME}". (This only has to be changed when running the Robot API from a different machine directly. It is configured correctly for the Web interface) + hardware_namespace (str, optional): The namespace for the hardware peripherals. Defaults to "io". + Returns: Robot: The initialize Robot class. """ global mirte - mirte = Robot() + mirte = Robot(machine_namespace, hardware_namespace) return mirte diff --git a/setup.py b/setup.py index a64d247..f2dea2a 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,10 @@ setuptools.setup( name="mirte_robot", - install_requires=["websocket_server", "rospkg"], + install_requires=[ + "websocket_server" + ], # TODO: Require mirte_msgs? rcl_interfaces? controller_manager_msgs? + # Also requires "rclpy" version="0.1.0", author="Martin Klomp", author_email="m.klomp@tudelft.nl", @@ -13,11 +16,12 @@ long_description=long_description, long_description_content_type="text/markdown", url="https://github.com/mirte-robot/mirte-python", - packages=['mirte_robot'], + packages=["mirte_robot"], + license="Apache License 2.0", classifiers=[ "Programming Language :: Python :: 3", - "License :: OSI Approved :: BSD License", + "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", ], - python_requires='>=3.6', + python_requires=">=3.6", )