diff --git a/.python-version b/.python-version index 9ad6380..ac037f6 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.8.18 +pypy@3.7.13 diff --git a/explain b/explain new file mode 100644 index 0000000..765de25 --- /dev/null +++ b/explain @@ -0,0 +1,12 @@ +calc_bearing function: Calulates the `true bearing` for the rover + +- true bearing is the bearing from North + +bearing_to: This calculates the `relative bearing` between the rover and a coordinate + +- This would be the difference between the rover's tru bearing and the coords true bearing +- relative bearing is the bearing between any two objects + +gps_controller we need calc_bearing + +navigation we need bearing_to diff --git a/libs/Locationf9p.py b/libs/Locationf9p.py deleted file mode 100755 index a32ba25..0000000 --- a/libs/Locationf9p.py +++ /dev/null @@ -1,171 +0,0 @@ -import sys - -sys.path.append("../") -from math import cos, radians, degrees, sin, atan2, pi, sqrt, asin -import threading - - -# Class that computes functions related to location of Rover -# TODO: Make sure that the current GPS outputs coordinates -# in the same format as the swift, and test out -# heading and see if that should be computed -# somewhere outside of __parse -class LocationF9P: - # To find the device path, run ls -l /dev/serial/by-id/usb-ublox_....(tab complete this) - # and it will return where it's symlinked to, most likely /dev/ttyACM0 - # you could probably just use that /dev/serial path too - def __init__(self, device_path="/dev/ttyACM0"): - self.device_path = device_path - self.device_open_file = None - self.latitude = 0 - self.longitude = 0 - self.old_latitude = 0 - self.old_longitude = 0 - self.bearing = 0.0 - self.running = True - - def config(self): - # read from a file, probably configure this to work with - # ConfigParser because that's ez - pass - - # Returns distance in kilometers between given latitude and longitude - def distance_to(self, lat: float, lon: float): - earth_radius = 6371.301 - delta_lat = (lat - self.latitude) * (pi / 180.0) - delta_lon = (lon - self.longitude) * (pi / 180.0) - - a = sin(delta_lat / 2) * sin(delta_lat / 2) + cos( - self.latitude * (pi / 180.0) - ) * cos(lat * (pi / 180.0)) * sin(delta_lon / 2) * sin(delta_lon / 2) - c = 2 * atan2(sqrt(a), sqrt(1 - a)) - return earth_radius * c - - # Calculates difference between given bearing to location and current bearing - # Positive is turn right, negative is turn left - def bearing_to(self, lat: float, lon: float): - result_bearing = ( - self.calc_bearing(self.latitude, self.longitude, lat, lon) - self.bearing - ) - return ( - result_bearing + 360 - if result_bearing < -180 - else (result_bearing - 360 if result_bearing > 180 else result_bearing) - ) - - # Starts updating fields from the GPS box - def start_GPS_thread(self): - if self.device_open_file is None: - print( - "[Location-f9p.py] start_GPS_thread was called before opening the gps /dev/ entry - start_GPS will be called now" - ) - self.start_GPS() - - self.running = True - t = threading.Thread( - target=self.update_fields_loop, name=("update GPS fields"), args=() - ) - t.daemon = True - t.start() - - # Stops updating fields from GPS box - def stop_GPS_thread(self): - self.running = False - - def start_GPS(self): - self.device_open_file = open(self.device_path) - - def update_fields_loop(self): - while self.running: - if self.device_open_file is not None: - line_read = self.device_open_file.readline() - self.__parse(line_read) - return - - # Parse NMEA messages read from the GPS. - # The protocol is described on pdf page 24 - # of the integration manual (section 4.2.6) - # https://cdn.sparkfun.com/assets/f/7/4/3/5/PM-15136.pdf - def __parse(self, message_str): - if isinstance(message_str, str): - split = message_str.split(",") - # look for lat/lon messages. We'll almost certainly get - # GN messages (maybe only GN messages, but this theoretically accepts other talkers) - if split[0].startswith("$") and split[0].endswith("GLL"): - computed_checksum = 0 - for char in message_str: - if char == "*": - # marks the end of the portion to hash - break - elif char == "$": - # marks the beginning - computed_checksum = 0 - else: - computed_checksum ^= ord(char) - - computed_checksum = format(computed_checksum, "X") - message_checksum = ( - message_str.split("*")[-1].replace("\n", "").replace("\r", "") - ) - if computed_checksum != message_checksum: - print( - "`" - + format(computed_checksum, "X") - + "` did not match `" - + message_str.split("*")[-1] - + "`" - ) - return - - self.old_latitude = self.latitude - self.old_longitude = self.longitude - - # This is just a big pile of slicing up, the goal is to take: - # $GNGLL,3511.93307,N,09721.15557,W,011244.00,A,D*64 - # and output the lat and lon. - # It does this by: - # - Parsing out the integer number of degrees (2 digits for lat, 3 digits for lon) - # - Parsing out the minutes, and dividing them by 60, then adding them to the integer degrees - # - Multiplying by -1 if the lat is in the south, or if the lon is in the west - self.latitude = (int(split[1][0:2]) + float(split[1][2:]) / 60) * ( - -1 if split[2] == "S" else 1 - ) - self.longitude = (int(split[3][0:3]) + float(split[3][3:]) / 60) * ( - -1 if split[4] == "W" else 1 - ) - - self.bearing = self.calc_bearing( - self.old_latitude, self.old_longitude, self.latitude, self.longitude - ) - elif not ( - message_str == "\n" or message_str == "\r\n" or message_str == "" - ) and not message_str.startswith("$"): - print( - "got weird message: " + message_str + " of len " + str(len(split)) - ) - - # Calculates bearing between two points. - # (0 is North, 90 is East, +/-180 is South, -90 is West) - def calc_bearing(self, lat1: float, lon1: float, lat2: float, lon2: float): - x = cos(lat2 * (pi / 180.0)) * sin((lon2 - lon1) * (pi / 180.0)) - y = cos(lat1 * (pi / 180.0)) * sin(lat2 * (pi / 180.0)) - sin( - lat1 * (pi / 180.0) - ) * cos(lat2 * (pi / 180.0)) * cos((lon2 - lon1) * (pi / 180.0)) - return (180.0 / pi) * atan2(x, y) - - # Calculate latitutde and longitude given distance (in km) and bearing (in degrees) - def get_coordinates(self, distance: float, bearing: float): - # https://stackoverflow.com/questions/7222382/get-lat-long-given-current-point-distance-and-bearing - R = 6371.301 - brng = radians(bearing) # Assuming bearing is in degrees - d = distance - - lat1 = radians(self.latitude) # Current lat point converted to radians - lon1 = radians(self.longitude) # Current long point converted to radians - - lat2 = asin(sin(lat1) * cos(d / R) + cos(lat1) * sin(d / R) * cos(brng)) - lon2 = lon1 + atan2( - sin(brng) * sin(d / R) * cos(lat1), cos(d / R) - sin(lat1) * sin(lat2) - ) - - return degrees(lat2), degrees(lon2) diff --git a/libs/args.py b/libs/args.py new file mode 100644 index 0000000..018d02b --- /dev/null +++ b/libs/args.py @@ -0,0 +1,92 @@ +from argparse import ArgumentParser, Namespace +from dataclasses import dataclass +from typing import Union, List +from loguru import logger + +from libs.coordinate import Coordinate + + +@dataclass +class Args: + aruco_id: Union[int, None] # the ArUco tag ID to search for, if any + coord_file_path: str # a file that's in the format: `lat lon`, each on a new line + coordinate_list: List[Coordinate] + camera_id: int # the camera to use. defaults to 0 + + def __init__(self): + """ + Gets the command line arugments and updates internal state. + """ + arg_parser = ArgumentParser() + + # get opencv camera id + arg_parser.add_argument( + "--camera_id", + "-c", + required=False, + default=0, + type=int, + help="The OpenCV capture device (camera) ID to use. Use `tools/camera_list.py` to determine usable values.", + ) + + # get the coordinate file path + arg_parser.add_argument( + "--coords", + "-f", + required=True, + type=str, + help="A file path pointing to a list of coordinates. The file should have one coordinate pair per line, in this format: `lat long`", + ) + + # get the aruco ID to use + arg_parser.add_argument( + "--aruco_id", + "-a", + required=False, + type=int, + help="The ArUco ID number to use from the DICT_4X4_50 list. Safe testing values are 0 and 1.", + ) + + # place all data into internal state + args: Namespace = arg_parser.parse_args() + + self.aruco_id = args.aruco_id + if self.aruco_id is not None: + if self.aruco_id > 50 or self.aruco_id < 0: + logger.error( + f"You must pass an ArUco ID within the range: [0, 50]. You gave: `{self.aruco_id}`." + ) + exit(1) + + self.camera_id = args.camera_id if args.camera_id else 0 + + self.coord_file_path = args.coords + coordinate_list = [] + + with open(self.coord_file_path) as file: + for line in file: + try: + (lat_str, lon_str) = line.split(" ") + + # attempt to parse the strings into two numbers + (lat, lon) = ( + float(lat_str.replace("\ufeff", "")), + float(lon_str.replace("\ufeff", "")), + ) + + # create a `Coordinate` from the data + coordinate_list.append(Coordinate(lat, lon)) + except ValueError as e: + logger.error( + f"Failed to parse given coordinate file. Failed with error: `{e}`" + ) + exit(1) + + file.close() + self.coordinate_list = coordinate_list + logger.info(f"The coordinates file was parsed successfully! Here's the list: {self.coordinate_list}") + + + pass + + pass diff --git a/libs/aruco_tracker.py b/libs/aruco_tracker.py index abf00e3..752f043 100755 --- a/libs/aruco_tracker.py +++ b/libs/aruco_tracker.py @@ -1,229 +1,79 @@ -import configparser -import os -from time import sleep -from typing import List +from dataclasses import dataclass +from typing import Tuple, Union import cv2 -import cv2.aruco as aruco from loguru import logger -# TODO(bray): `dataclasses` -class ARTracker: - # TODO: Add type declarations - # TODO: Can we not have our initialization function be over 80 lines? - def __init__( - self, - cameras: List[int], - write: bool = False, - config_file: str = "config.ini", - ) -> None: - """ - Constructs a new ARTracker. - - - `cameras` is a list of OpenCV camera values. - - `write` tells the camera to save video files. - - `config_file` is a path to the config file (relative or absolute). - """ - - self.write = write - self.distance_to_marker = -1 - self.angle_to_marker = -999.9 - self.index1 = -1 - self.index2 = -1 - self.cameras = cameras +@dataclass() +class ArucoTracker: + """ + Capture frames and detect ARUCO markers in frames + """ - # Open the config file - # TODO: Could this be in a function - config = configparser.ConfigParser(allow_no_value=True) - if not config.read(config_file): - logger.info("ERROR OPENING AR CONFIG:") - if os.path.isabs(config_file): - print(config_file) # print the absolute path - else: - # print the full relative path - print(f"{os.getcwd()}/{config_file}") - exit(-2) - - # Set variables from the config file - self.degrees_per_pixel = float(config["ARTRACKER"]["DEGREES_PER_PIXEL"]) - self.vdegrees_per_pixel = float(config["ARTRACKER"]["VDEGREES_PER_PIXEL"]) - self.focal_length = float(config["ARTRACKER"]["FOCAL_LENGTH"]) - self.focal_length30H = float(config["ARTRACKER"]["FOCAL_LENGTH30H"]) - self.focal_length30V = float(config["ARTRACKER"]["FOCAL_LENGTH30V"]) - self.known_marker_width = float(config["ARTRACKER"]["KNOWN_TAG_WIDTH"]) - self.format = config["ARTRACKER"]["FORMAT"] - self.frame_width = int(config["ARTRACKER"]["FRAME_WIDTH"]) - self.frame_height = int(config["ARTRACKER"]["FRAME_HEIGHT"]) - - # Initialize video writer, fps is set to 5 - if self.write: - self.video_writer = cv2.VideoWriter( - "autonomous.avi", - cv2.VideoWriter_fourcc( - self.format[0], self.format[1], self.format[2], self.format[3] - ), - 5, - (self.frame_width, self.frame_height), - False, - ) - - # Set the ARUCO marker dictionary - self.marker_dict = aruco.Dictionary_get(aruco.DICT_4X4_50) - - # Initialize cameras - # TODO: Could this be in a function - self.caps = [] - if isinstance(self.cameras, int): - self.cameras = [self.cameras] - for i in range(0, len(self.cameras)): - # Makes sure the camera actually connects - while True: - cam = cv2.VideoCapture(self.cameras[i]) - if not cam.isOpened(): - logger.warning(f"!!!!!!!!!!!!Camera {i} did not open!!!!!!!!!!!!!!") - cam.release() - sleep(0.4) # wait a moment to try again! - continue - cam.set(cv2.CAP_PROP_FRAME_HEIGHT, self.frame_height) - cam.set(cv2.CAP_PROP_FRAME_WIDTH, self.frame_width) - - # greatly speeds up the program but the writer is a bit wack because of this - # (it makes the camera store only one frame at a time) - cam.set(cv2.CAP_PROP_BUFFERSIZE, 1) - - # fourcc is the codec used to encode the video (generally) - cam.set( - cv2.CAP_PROP_FOURCC, - cv2.VideoWriter_fourcc( - self.format[0], self.format[1], self.format[2], self.format[3] - ), - ) - # the 0th value of the tuple is a bool, true if we got a frame - # so, in other words: if we didn't read, stop using the camera device - if not cam.read()[0]: - cam.release() - else: # if we did get a frame... - self.caps.append(cam) # add to our list of cameras - break + aruco_marker: int + marker_size: int # In centimeters + # TODO: Find out how to get camera matrix and distance coefficients + camera_mtx: type # TODO + distance_coefficients: type # TODO + aruco_dictionary: cv2.aruco.DICT_4X4_50 + video_capture: cv2.VideoCapture - # id1 is the main ar tag to telating to id2, since gateposts are no longrack, id2 is not relevant, image is the image to analyze - # TODO: Get rid of anything relating to id2 - # markerFound - def is_marker_found(self, id1: int, image, id2: int = -1) -> bool: + def __init__(self, opencv_camera_index: int): """ - Attempts to find a marker with the given `id1` in the provided `image`. - - Returns true if found. + Create and configure `VideoCapture` object from opencv camera index + Create and initialize aruco dictionary """ + pass - found = False + def find_marker(self) -> Union[None, Tuple[float, float]]: + """ + Captures a frame and checks to see if `aruco_marker` is present. - # converts to grayscale - grayscale = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY, image) + Either returns `None` if not found, or a tuple (a: float, b: float): + - `a`: the distance to the marker in centimeters + - `b`: the angle to the marker in degrees + """ - self.index1 = -1 - self.index2 = -1 - bw = grayscale - # tries converting to b&w using different different cutoffs to find the perfect one for the current lighting - for i in range(40, 221, 60): - # FIXME(bray): use logan's pr man! - bw = cv2.threshold(grayscale, i, 255, cv2.THRESH_BINARY)[1] - (self.corners, self.marker_IDs, self.rejected) = aruco.detectMarkers( + # Capture a new frame + ret, frame = self.video_capture.read() + if not ret: + logger.error("FRAME COULDNT BE CAPTURED! Camera may be busy.") + return None + + # Convert frame to grayscale and try to find find a marker + grayscale = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY, frame) + for threshold_cutoff in range( + 40, 221, 60 + ): # Check for arucos tags with different thresholds + # Find aruco tags in image with given threshold cutoff + bw = cv2.threshold(grayscale, threshold_cutoff, 255, cv2.THRESH_BINARY)[1] + (tag_corners, aruco_tags, __) = cv2.aruco.detectMarkers( bw, self.marker_dict ) - if self.marker_IDs is not None: - self.index1 = -1 - # this just checks to make sure that it found the right marker - for m in range(len(self.marker_IDs)): - if self.marker_IDs[m] == id1: - self.index1 = m - break - if self.index1 != -1: # if we changed the id of the ARUCO tag... - logger.info("Found the correct marker!") - found = True - if self.write: - self.video_writer.write(bw) # purely for debug - # FIXME(bray): this is likely a bug. wait_key()'s delay - # input is in milliseconds! and 1 ms is not very long... - cv2.waitKey(1) + # check if our tag is in the list of found aruco tags + tag_index: Union[int, None] = None + for tag in aruco_tags: + if tag == self.aruco_marker: + logger.info(f"ArUco tag `{tag}` found!") + tag_index = tag break - # we need to check if we've found it or not. so... - if self.index1 != -1: - # TODO(teeler): Can we just replace this with cv2.arcuo.estimatePoseSingleMarkers(...) - center_x_marker = ( - self.corners[self.index1][0][0][0] - + self.corners[self.index1][0][1][0] - + self.corners[self.index1][0][2][0] - + self.corners[self.index1][0][3][0] - ) / 4 - # takes the pixels from the marker to the center of the image and multiplies it by the degrees per pixel - # FIXME(bray): wow yeah thanks for the comment, but why? - # FIXME(llar): this is making me cry, is this referencing coordinates on the aruco tag?? using x / y coordinate axis? - self.angle_to_marker = self.degrees_per_pixel * ( - center_x_marker - self.frame_width / 2 - ) - - """ - distanceToAR = (knownWidthOfMarker(20cm) * focalLengthOfCamera) / pixelWidthOfMarker - focal_length = focal length at 0 degrees horizontal and 0 degrees vertical - focal_length30H = focal length at 30 degreees horizontal and 0 degrees vertical - focal_length30V = focal length at 30 degrees vertical and 0 degrees horizontal - realFocalLength of camera = focal_length - + (horizontal angle to marker/30) * (focal_length30H - focal_length) - + (vertical angle to marker / 30) * (focal_length30V - focal_length) - If focal_length30H and focal_length30V both equal focal_length then realFocalLength = focal_length which is good for non huddly cameras - Please note that the realFocalLength calculation is an approximation that could be much better if anyone wants to try to come up with something better - """ - # hangle, vangle = horizontal, vertical angle - hangle_to_marker = abs(self.angle_to_marker) - center_y_marker = ( - self.corners[self.index1][0][0][1] - + self.corners[self.index1][0][1][1] - + self.corners[self.index1][0][2][1] - + self.corners[self.index1][0][3][1] - ) / 4 - vangle_to_marker = abs( - self.vdegrees_per_pixel * (center_y_marker - self.frame_height / 2) - ) - realFocalLength = ( - self.focal_length - + (hangle_to_marker / 30) * (self.focal_length30H - self.focal_length) - + (vangle_to_marker / 30) * (self.focal_length30V - self.focal_length) - ) - width_of_marker = ( - ( - self.corners[self.index1][0][1][0] - - self.corners[self.index1][0][0][0] + # if so, let's estimate its pose + if tag_index is not None: + logger.debug("Checking tag pose...") + rotation_vects, translation_vects = cv2.aruco.estimatePoseSingleMarkers( + tag_corners[tag_index], + self.marker_size, + self.camera_mtx, + self.distance_coefficients, ) - + ( - self.corners[self.index1][0][2][0] - - self.corners[self.index1][0][3][0] - ) - ) / 2 - self.distance_to_marker = ( - self.known_marker_width * realFocalLength - ) / width_of_marker - - return found - - def find_marker(self, id1: int, id2: int = -1, cameras: int = -1) -> bool: - """ - This method attempts to find a marker with the given ID. - Returns true if found. - - `id1` is the marker you want to look for. - `cameras` is the number of cameras to check. -1 for all of them - """ - if cameras == -1: - cameras = len(self.caps) + # calculations to get distance and angle to marker + (distance_to_marker, angle_to_marker) = (0.0, 0.0) - for i in range(cameras): - ret, frame = self.caps[i].read() - if self.is_marker_found(id1, frame, id2=id2): - return True + # TODO + return (distance_to_marker, angle_to_marker) - return False + return None diff --git a/libs/communication.py b/libs/communication.py new file mode 100644 index 0000000..db5050a --- /dev/null +++ b/libs/communication.py @@ -0,0 +1,122 @@ +from dataclasses import dataclass +from socket import socket as Socket # fixes namespace collisions + +from loguru import logger + +from libs.config import Config + + +@dataclass() +class Communication: + """ + Communicate wheel speeds and LED commands to proper micro-controllers + """ + + socket: Socket + + # constant values to define wheel speeds + WHEEL_SUBSYSTEM_BYTE = 0x01 + WHEEL_PART_BYTE = 0x01 + + # LED subsystem constants + LED_SUBSYSTEM_BYTE = 0x01 + LED_PART_BYTE = 0x02 + + def __init__(self, c: Config): + self.socket = Socket(Socket.AF_INET, Socket.SOCK_DGRAM) + self.socket.connect((c.mbed_ip, c.mbed_port)) # adds ip/port to socket's state + + def send_simple_wheel_speeds(self, left_speed: int, right_speed: int) -> bytearray: + """ + Sends the given wheel side speeds to the rover. In this case, each + side uses the same speed for all their wheels. + """ + + (ls, rs) = (left_speed, right_speed) + if ls < 0 or ls > 255 or rs < 0 or rs > 255: + logger.error("HEY, speeds are wrong! `{ls}` and `{rs}` must be in [0, 255]") + + return self.send_wheel_speeds(ls, ls, ls, rs, rs, rs) + + def send_wheel_speeds( + self, + front_left: int, + middle_left: int, + rear_left: int, + front_right: int, + middle_right: int, + rear_right: int, + ) -> bytearray: + """ + Sends the given wheel speeds to the rover. Each wheel speed goes from + 0 to 255 (u8::MAX), with 126 being neutral. + """ + message: bytearray = bytearray(9) # the wheels use 9 bytes + + # add subsystem and part bytes + message.extend([self.WHEEL_SUBSYSTEM_BYTE, self.WHEEL_PART_BYTE]) + + # stick in the speeds + message.extend( + [front_left, middle_left, rear_left, front_right, middle_right, rear_right] + ) + + # compute + add checksum + checksum = self.__checksum(message) + message.extend([checksum]) + + # send the message over udp 🥺 + # TODO: consider using Google's QUIC instead! + self.socket.sendall(message) + logger.info(f"Sending wheel speeds: {self.__prettyprint_byte_array(message)}") + return message + + def send_led_command(self, red: int, green: int, blue: int) -> bytearray: + """ + Sends the given LED color to the rover. All colors should be in the + range of [0, 255]. + + These are not checksummed. + """ + message: bytearray = bytearray(5) # LEDs use 5 bytes + message.extend([self.LED_SUBSYSTEM_BYTE, self.LED_PART_BYTE]) + message.extend([red, green, blue]) + self.socket.sendall(message) + + logger.info(f"Sending command to LEDs: `{self.__prettyprint_byte_array()}`") + return message + + def send_lights_off(self) -> bytearray: + """Turns off the lights.""" + return self.send_led_command(0, 0, 0) + + def send_led_red(self) -> bytearray: + """Makes the LEDs red.""" + return self.send_led_command(255, 0, 0) + + def send_led_green(self) -> bytearray: + """Makes the LEDs green.""" + return self.send_led_command(0, 255, 0) + + def send_led_blue(self) -> bytearray: + """Makes the LEDs blue.""" + return self.send_led_command(0, 0, 255) + + def __checksum(self, byte_array: bytearray) -> int: + """ + Calculates the checksum of a byte array. + """ + checksum: int = sum(byte_array) & 0xFF + return checksum + + def __prettyprint_byte_array(self, byte_array: bytearray) -> str: + """ + Prints a byte array in a human-readable format. + """ + pretty_print: str = "[" + for index, byte in enumerate(byte_array): + pretty_print += f"0x{byte:02X}" + if index != len(byte_array) - 1: + pretty_print += ", " + pretty_print += "]" + return pretty_print diff --git a/libs/config.py b/libs/config.py new file mode 100644 index 0000000..e4da434 --- /dev/null +++ b/libs/config.py @@ -0,0 +1,109 @@ +from configparser import ConfigParser +from loguru import logger + +from dataclasses import dataclass + + +@dataclass() +class Config: + """ + A container for the rover's configuration. + """ + + config: ConfigParser + + def __init__(self, config_path: str): + c = ConfigParser() + if not c.read(config_path): + logger.error(f"Failed to read the config from given config path: `{config_path}`") + + self.config = c + + ##### MBED, for controlling the LEDs ##### + + def mbed_ip(self) -> str: + """ + Returns the IP address of the MBED micro-controller. + """ + return str(self.config["CONFIG"]["MBED_IP"]) + + def mbed_port(self) -> int: + """ + Returns the port number of the MBED micro-controller. + """ + return int(self.config["CONFIG"]["MBED_PORT"]) + + ##### Swift, for getting GPS coordinates ##### + + def gps_ip(self) -> str: + """ + Returns the IP address of the Swift GPS module. + """ + return str(self.config["CONFIG"]["SWIFT_IP"]) + + def gps_port(self) -> int: + """ + Returns the port number of the Swift GPS module. + """ + return int(self.config["CONFIG"]["SWIFT_PORT"]) + + ##### OpenCV format ##### + + def opencv_format(self) -> str: + """ + Returns the OpenCV format string for camera initialization. + """ + return self.config["ARTRACKER"]["FORMAT"] + + ##### Camera traits, for object distances and angles ##### + + def camera_hdegrees_per_pixel(self) -> float: + """ + Returns the horizontal degrees per pixel for the camera. + """ + return float(self.config["ARTRACKER"]["DEGREES_PER_PIXEL"]) + + def camera_vdegrees_per_pixel(self) -> float: + """ + Returns the vertical degrees per pixel for the camera. + """ + return float(self.config["ARTRACKER"]["VDEGREES_PER_PIXEL"]) + + def camera_frame_width(self) -> int: + """ + Returns the width of the camera frame. + """ + return int(self.config["ARTRACKER"]["FRAME_WIDTH"]) + + def camera_frame_height(self) -> int: + """ + Returns the height of the camera frame. + """ + return int(self.config["ARTRACKER"]["FRAME_HEIGHT"]) + + def camera_focal_length(self) -> float: + """ + The focal length of the camera. + """ + return float(self.config["ARTRACKER"]["FOCAL_LENGTH"]) + + ## TODO: remove these if we use the OpenCV distance/angle function + + def camera_known_marker_size(self) -> float: + """ + The real-world width/height of an ARUCO marker. This helps calculate + the distance/angle to the marker. + """ + return float(self.config["ARTRACKER"]["KNOWN_TAG_WIDTH"]) + + def camera_focal_length30h(self) -> float: + """ + The focal length of the camera at 30 degrees horizontal. + """ + return float(self.config["ARTRACKER"]["FOCAL_LENGTH30H"]) + + def camera_focal_length30v(self) -> float: + """ + The focal length of the camera at 30 degrees vertical. + """ + return float(self.config["ARTRACKER"]["FOCAL_LENGTH30V"]) diff --git a/libs/coordinate.py b/libs/coordinate.py new file mode 100644 index 0000000..3bba311 --- /dev/null +++ b/libs/coordinate.py @@ -0,0 +1,21 @@ +from dataclasses import dataclass + +@dataclass +class Coordinate: + latitude: float + longitude: float + + def get_latitude(self) -> float: + self.latitude + + def get_longitude(self) -> float: + self.longitude + + def set_latitude(self, latitude: float): + self.latitude = latitude + + def set_longitude(self, longitude: float): + self.longitude = longitude + + def __str__(self): + return f"({self.latitude}, {self.longitude})" \ No newline at end of file diff --git a/libs/gps_controller.py b/libs/gps_controller.py new file mode 100644 index 0000000..db7e20f --- /dev/null +++ b/libs/gps_controller.py @@ -0,0 +1,130 @@ +from dataclasses import dataclass +from multiprocessing import Process +from time import sleep + +from loguru import logger +from geographiclib.geodesic import Geodesic + +from gps import gps +from libs import config +from libs.coordinate import Coordinate + + +@dataclass() +class GpsController: + """ + Controls the GPS library. + """ + + conf: config.Config + enabled: bool = False # whether or not gps comms are enabled + + # note: if you change these, make sure to go to GpsReport too + coordinates: Coordinate = Coordinate(0.0, 0.0) + height: float = 0.0 # in meters + time: float = 0.0 # "time of week" in milliseconds + error: float = 0.0 # in millimeters, accounting for vert/horiz error + true_bearing: float = 0.0 # Bearing from True North + previous_coordinates: Coordinate = Coordinate(0.0, 0.0) + + # here's the process we can join on when the gps is closed + process: Process + + SLEEP_TIME: float = 0.1 + + def __init__(self, c: config.Config): + self.conf = c + + # TODO? + """ + Initialize Location object to get coordinates + """ + pass + + def start(self): + """ + Starts the GPS process and connects to the GPS hardware itself. + """ + gps.gps_init(self.conf.gps_ip(), self.conf.gps_port()) + self.enabled = True + + logger.info("Starting GPS process...") + proc = Process(target=self.check, name="GPS Process") + proc.start() + logger.info("GPS process started!") + + self.process = proc + pass + + def stop(self): + """ + Sets the GPS state to be off, causing the GPS thread to stop. + """ + self.enabled = False + gps.gps_finish() + logger.info("Joining the GPS Controller process...") + self.process.close() + logger.info("GPS Controller process joined!") + + def check(self): + while self.enabled: + if gps.get_latitude() + gps.get_longitude() == 0: + logger.error("Expected GPS data but got none.") + else: + # we've got GPS data. let's do our thing... + # make current coords old + self.previous_coordinates = self.coordinates + + # get new values + self.coordinates = Coordinate(gps.get_latitude(), gps.get_longitude()) + self.height = gps.get_height() + self.time = gps.get_time() + self.error = gps.get_error() + + # calculate true bearing between new and old coordinates + self.true_bearing = calculate_true_bearing( + self.previous_coordinates, self.coordinates + ) + + # TODO: make report and send to navigation thread + + sleep(self.SLEEP_TIME) + + logger.info("GPS thread: no longer enabled, shutting down...") + + +def calculate_true_bearing(prev_coords: Coordinate, curr_coords: Coordinate) -> float: + """ + Calculate the True Bearing for an object given its current position and previous position. + NOTE: True bearing is the bearing relative to North + + - `co1`: first coordinate. + - `co2`: second coordinate. + """ + earth = Geodesic.WGS84 + + res = earth.Inverse( + prev_coords.get_latitude(), + prev_coords.get_longitude(), + curr_coords.get_latitude(), + curr_coords.get_longitude(), + ) + true_bearing: float = res["azi1"] + + logger.debug(f"True bearing: `{true_bearing}`°") + return true_bearing + + +@dataclass() +class GpsReport: + """ + A container for GPS data. Sendable across threads. + """ + + # important: ensure you keep these fields matched with GpsController + current_coordinates: Coordinate = Coordinate(0.0, 0.0) + height: float = 0.0 # in meters + time: float = 0.0 # "time of week" in milliseconds + error: float = 0.0 # in millimeters, accounting for vert/horiz error + true_bearing: float = 0.0 # in degrees + previous_coordinates: Coordinate = Coordinate(0.0, 0.0) diff --git a/libs/navigation.py b/libs/navigation.py new file mode 100644 index 0000000..36180ab --- /dev/null +++ b/libs/navigation.py @@ -0,0 +1,154 @@ +from dataclasses import dataclass +from multiprocessing import Queue + +from geographiclib.geodesic import Geodesic +from loguru import logger + +from libs.config import Config +from libs.coordinate import Coordinate +from libs.gps_controller import GpsController, calculate_true_bearing, GpsReport + + +@dataclass() +class Navigation: + """ + Keeps track of latititude and longitude of objective, distance to objective, and angle to objective. + + It also calculates some of these values. + + The rover should run the `finish` method when it is done navigating. + + - `given_coords`: a given gps coordinate to navigate to. Depending on + section of comp, ARUCO tags will be nearby + """ + + conf: Config + given_coords: Coordinate + gps: GpsController + + queue: Queue + + def __init__( + self, conf: Config, given_coords: Coordinate + ): + """ + Initializes the navigation to get coordinates. + """ + self.given_coords = given_coords + self.conf = conf + self.gps = GpsController(self.conf) + self.gps.start() + + def finish(self): + """ + Stops the GPS thread and shuts down the GPS controller. + + This should always be called when the rover finishes navigation. + """ + self.gps.stop() + self.gps = None + + def distance_to_object(self, coord: Coordinate) -> float: + """ + Finds the distance between the rover and `coord`. + """ + if self.gps is None: + logger.error("GPS should be initialized before finding distance.") + return 0.0 + else: + rover_coords = self.gps.coordinates + earth: Geodesic = Geodesic.WGS84 + res = earth.Inverse( + rover_coords.get_latitude(), + rover_coords.get_longitude(), + coord.get_latitude(), + coord.get_longitude(), + ) + distance: float = res["s12"] + + logger.debug(f"Distance between `{rover_coords}` and `{coord}`: `{distance}`m") + return distance + + def angle_to_object(self, coord: Coordinate) -> float: + """ + Finds the angle between the rover and `coord`. + """ + + # Get all the cool stuff to calculate bearing from rover to coordinate + rover_coords = self.gps.coordinates + rover_true_bearing = self.gps.true_bearing + true_bearing_to_coordinate = calculate_true_bearing(rover_coords, coord) + + # The relative bearing to an coordinate is the difference between + # the true bearing of the coordinate and the true bearing of the rover + relative_bearing = true_bearing_to_coordinate - rover_true_bearing + + # Ensure relative bearing is > -180 and < 180 + if relative_bearing < -180: # Bearing is < 180 so add 360 + return relative_bearing + 360 + elif relative_bearing > 180: # Bearing is > 180 so subtract 360 + return relative_bearing - 360 + return relative_bearing # The bearing is just right :3 + + def coordinates_to_object(self, distance_km: float, angle_deg: float) -> Coordinate: + """ + Calculates the coordinates of another object compared to the Rover. + + - `distance_km`: The object's distance from the Rover in kilometers. + - `angle_deg`: The object's angle from the Rover in degrees. + """ + rover_coords = self.gps.coordinates + + earth: Geodesic = Geodesic.WGS84 + res = earth.Direct( + distance_km, + angle_deg, + rover_coords.get_latitude(), + rover_coords.get_longitude(), + ) + c = Coordinate(res["lat2"], res["lon2"]) + + logger.debug(f"Object coordinates (lat, lon): {c}") + return c + + def navigate(self): + """ + A method that runs until `not self.enabled`. Checks for a new + GpsReport. + + If one is present, does logic based on the rover's mode and location. + """ + while self.enabled: + # check for new gps report + if not self.queue.empty(): + message = self.queue.get() + + if type(message) is GpsReport: + logger.debug("Found new GPS report! Let's operate on it...") + self.drive(message) + + + def drive(self, message: GpsReport): + # UNIMPLEMENTED! + pass + + + + + +def calculate_angle(self, co1: Coordinate, co2: Coordinate) -> float: + """ + Calculates the angle between two coordinates. + The angle between the direction of the rover and North + + - `co1`: first coordinate. + - `co2`: second coordinate. + """ + earth: Geodesic = Geodesic.WGS84 + res = earth.Inverse( + co1.get_latitude(), co1.get_longitude(), co2.get_latitude(), co2.get_longitude() + ) + angle: float = res["azi1"] + + logger.debug(f"Angle between `{co1}` and `{co2}`: `{angle}`°") + return angle diff --git a/libs/rover.py b/libs/rover.py new file mode 100644 index 0000000..6e2bf88 --- /dev/null +++ b/libs/rover.py @@ -0,0 +1,134 @@ +from dataclasses import dataclass +from multiprocessing import Process, Queue +from time import sleep + +import cv2 +from loguru import logger + +from libs.config import Config +from libs.args import Args +from libs.aruco_tracker import ArucoTracker +from libs.communication import Communication +from libs.navigation import Navigation +from maps.server import MapServer + + +@dataclass +class Mode: + SEARCHING_ALONG_COORDINATE = 0 + ARUCO = 1 + YOLO = 2 + + state: int + + +@dataclass() +class Rover: + """ + A representation of the rover. + """ + arguments: Args + conf: Config + mode: Mode + + rover_connection: Communication + communication_queue: Queue + communication_process: Process + + nav: Navigation + navigation_queue: Queue + + aruco_tracker: ArucoTracker + aruco_tracker_queue: Queue + + def __init__(self): + # get arguments + logger.info("Parsing arguments...") + self.arguments = Args() + + self.mode = Mode.SEARCHING_ALONG_COORDINATE + + # check arguments for aruco id. set self.mode accordingly + if self.arguments.aruco_id is None: + logger.info("No ArUco markers were given. Let's navigate to the given coordinates.") + self.mode = self.mode + # go to coordinate + + else: + self.mode = Mode.SEARCHING_ALONG_COORDINATE + # go to coordinate + # TODO: when that's done, look for the marker + logger.info("Parsed arguments successfully!") + + # sets up the parser + self.conf = Config("../config.ini") + + + self.speeds = [0, 0] + self.error_accumulation = 0.0 + + # TODO: Initiate different threads + # ✅ Navigation thread + # - Communication thread + # - Aruco Tracker/YOLO thread + # - RoverMap Thread + + self.rover_connection = Communication(self.conf.gps_ip(), self.conf.gps_port()) + + pass + + def start_navigation(self): + # initialize the gps object + # this starts a thread for the gps, as this module handles that itself + self.nav = Navigation( + self.conf, + self.arguments.coordinate_list, # FIXME: needs to take a list??? or manage it manual + ) + + pass + + def start_communication(self): + # starts the thread that sends wheel speeds + proc = Process(target=self.rover_connection.handle_commands, name="send wheel speeds") # FIXME + proc.daemon = True # FIXME: don't do this + proc.start() + + logger.info("Started the communication thread!") + self.communication_process = proc + pass + + def start_aruco_tracker_thread(self): + pass + + def start_yolo_thread(self): + pass + + def start_rover_map(self): + # Starts everything needed by the map + self.map_server = MapServer() + self.map_server.register_routes() + self.map_server.start(debug=False) + self.start_map(self.update_map, 0.5) + sleep(0.1) + pass + + def pid_controller(self): + """ + Read from navigation thread and send commands through communication thread + """ + pass + + +@dataclass() +class Yolo: + """ + Captures frames and checks for competition objects in a frame. Computes bounding boxes. + """ + + aruco_marker: int + video_capture: cv2.VideoCapture + + def __init__(self): + """ + Initialize YOLO detection model + """ diff --git a/main.py b/main.py index df128c9..e9b5628 100755 --- a/main.py +++ b/main.py @@ -8,6 +8,8 @@ from pathlib import Path from configparser import ConfigParser +# TODO(rewrite): make sure the speeds are correct. account for wheel deadlock! + def flash(mbedIP: str, mbedPort: int) -> None: """ @@ -15,7 +17,7 @@ def flash(mbedIP: str, mbedPort: int) -> None: This is used to show that the rover successfully navigated to a goal. """ - + while True: udp_out.send_LED(mbedIP, mbedPort, "g") sleep(0.2) @@ -23,7 +25,6 @@ def flash(mbedIP: str, mbedPort: int) -> None: sleep(0.2) - # Create Argument Parser and parse arguments arg_parser = argparse.ArgumentParser() arg_parser.add_argument( @@ -50,7 +51,7 @@ def flash(mbedIP: str, mbedPort: int) -> None: # TODO: dataclass this function. returning a tuple is ugly and begging for bugs # TODO: use typing. broken due to old ass python (<3.8) -#def parse_arguments() -> Tuple[list[int], list[Tuple[float, float]]]: +# def parse_arguments() -> Tuple[list[int], list[Tuple[float, float]]]: def parse_arguments(): """ Parses the command line arguments. @@ -103,8 +104,10 @@ def parse_arguments(): # TODO: make this a part of a `Rover` class :3 def drive( # TODO: use typing. old ass python (<3.8) - #rover: Drive, gps_coordinates: list[Tuple[float, float]], aruco_ids: list[int] - rover: Drive, gps_coordinates, aruco_ids + # rover: Drive, gps_coordinates: list[Tuple[float, float]], aruco_ids: list[int] + rover: Drive, + gps_coordinates, + aruco_ids, ) -> bool: """ Given a Drive object, navigate to goal @@ -114,9 +117,10 @@ def drive( # Drive along GPS coordinates logger.warning("ok we're driving along the gps coords :3") - dac_result = rover.drive_along_coordinates(gps_coordinates, aruco_ids[0], aruco_ids[1]) + dac_result = rover.drive_along_coordinates( + gps_coordinates, aruco_ids[0], aruco_ids[1] + ) logger.error(f"drive along coordinates returned {dac_result}") - # TODO: Can we have this run in the rover class instead of returning from 'drive_along_coordinates' if aruco_ids[0] != -1: diff --git a/new_main.py b/new_main.py new file mode 100644 index 0000000..cab32aa --- /dev/null +++ b/new_main.py @@ -0,0 +1,10 @@ +from loguru import logger +from libs.rover import Rover + +logger.info("Starting Autonomous...") +remi = Rover() + +# this generally shouldn't happen unless the Rover first specifies +logger.warning("Autonomous code ended. Main thread exiting...") +logger.warning("Make sure to use Ctrl^C if other threads continue to run.") +exit(0) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 413804a..12e28bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,8 +5,10 @@ description = "Add your description here" authors = [{ name = "Barrett", email = "contact@barretts.club" }] dependencies = [ "opencv-python>=4.7", - "flask>=3.0.3", + "flask>=2.0", "loguru>=0.7.2", + "geographiclib>=2.0", + # "dataclasses==0.8", # ATTENTION: this is required on the Rover for python 3.6 ] readme = "README.md" requires-python = ">= 3.6" @@ -16,7 +18,7 @@ build-backend = "hatchling.build" [tool.rye] managed = true -dev-dependencies = ["mypy>=1.8.0", "ruff>=0.2.0"] +#dev-dependencies = ["mypy>=1.4.0", "ruff>=0.2.0"] [tool.hatch.metadata] allow-direct-references = true diff --git a/requirements-dev.lock b/requirements-dev.lock new file mode 100644 index 0000000..326676e --- /dev/null +++ b/requirements-dev.lock @@ -0,0 +1,38 @@ +# generated by rye +# use `rye lock` or `rye sync` to update this lockfile +# +# last locked with the following flags: +# pre: false +# features: [] +# all-features: false +# with-sources: false + +-e file:. +click==8.1.7 + # via flask +flask==2.2.5 + # via autonomous +geographiclib==2.0 + # via autonomous +importlib-metadata==6.7.0 + # via click + # via flask +itsdangerous==2.1.2 + # via flask +jinja2==3.1.4 + # via flask +loguru==0.7.2 + # via autonomous +markupsafe==2.1.5 + # via jinja2 + # via werkzeug +numpy==1.21.6 + # via opencv-python +opencv-python==4.9.0.80 + # via autonomous +typing-extensions==4.7.1 + # via importlib-metadata +werkzeug==2.2.3 + # via flask +zipp==3.15.0 + # via importlib-metadata diff --git a/requirements.lock b/requirements.lock new file mode 100644 index 0000000..326676e --- /dev/null +++ b/requirements.lock @@ -0,0 +1,38 @@ +# generated by rye +# use `rye lock` or `rye sync` to update this lockfile +# +# last locked with the following flags: +# pre: false +# features: [] +# all-features: false +# with-sources: false + +-e file:. +click==8.1.7 + # via flask +flask==2.2.5 + # via autonomous +geographiclib==2.0 + # via autonomous +importlib-metadata==6.7.0 + # via click + # via flask +itsdangerous==2.1.2 + # via flask +jinja2==3.1.4 + # via flask +loguru==0.7.2 + # via autonomous +markupsafe==2.1.5 + # via jinja2 + # via werkzeug +numpy==1.21.6 + # via opencv-python +opencv-python==4.9.0.80 + # via autonomous +typing-extensions==4.7.1 + # via importlib-metadata +werkzeug==2.2.3 + # via flask +zipp==3.15.0 + # via importlib-metadata diff --git a/tools/list_cameras.py b/tools/list_cameras.py new file mode 100644 index 0000000..37d1e80 --- /dev/null +++ b/tools/list_cameras.py @@ -0,0 +1,10 @@ +from cv2 import VideoCapture +from loguru import logger + +for i in range(0, 10): + try: + v: VideoCapture = VideoCapture(index=i) + if v.isOpened(): + logger.info(f"Found a camera! Camera {{ id: {i} }}") + except Exception: + pass