diff --git a/src/dragonfly/dragonfly/actions/SketchAction.py b/src/dragonfly/dragonfly/actions/SketchAction.py index 0df394b..6be8cf8 100644 --- a/src/dragonfly/dragonfly/actions/SketchAction.py +++ b/src/dragonfly/dragonfly/actions/SketchAction.py @@ -87,7 +87,7 @@ class SketchAction: SKETCH_STARTING_THRESHOLD = 20 # m def __init__(self, id, log_publisher, logger, local_setvelocity_publisher, announce_stream, offset, partner, leader, - drone_stream_factory, semaphore_observable, semaphore_publisher, dragonfly_sketch_subject, position_vector_publisher, threshold): + drone_stream_factory, semaphore_observable, semaphore_publisher, threshold): self.log_publisher = log_publisher self.logger = logger self.local_setvelocity_publisher = local_setvelocity_publisher @@ -99,14 +99,14 @@ def __init__(self, id, log_publisher, logger, local_setvelocity_publisher, annou self.ros_subscriptions = [] self.announce_stream = announce_stream self.drone_stream_factory = drone_stream_factory - self.dragonfly_sketch_subject = dragonfly_sketch_subject - self.position_vector_publisher = position_vector_publisher + self.dragonfly_sketch_subject = Subject() self.semaphore_observable = semaphore_observable self.semaphore_publisher = semaphore_publisher self.navigate_subscription = rx.empty().subscribe() self.leader_broadcast_subscription = rx.empty().subscribe() self.receive_semaphore_subscription = rx.empty().subscribe() + self.sketch_control_subscription = rx.empty().subscribe() self.target_position_vector = None self.encountered = False @@ -153,10 +153,12 @@ def step(self): if self.leader: self.leader_broadcast_subscription = rx.combine_latest( + rx.just(self.get_leader_drone().get_sketch_control_publisher()), self.setup_subject(self.partner), self.setup_subject(self.id) - ).subscribe(lambda position_reading_vector: self.broadcast_target(position_reading_vector[0], position_reading_vector[1])) + ).subscribe(lambda position_reading_vector: self.broadcast_target(position_reading_vector[0], position_reading_vector[1], position_reading_vector[2])) else: + self.sketch_control_subscription = self.get_leader_drone().get_sketch_control().subscribe(self.dragonfly_sketch_subject) self.receive_semaphore_subscription = self.semaphore_observable.subscribe(on_next = lambda token: self.listen_for_stop(token)) return self.status @@ -165,6 +167,9 @@ def step(self): def build_sketch_end_id(id): return f"sketch.{id}" + def get_leader_drone(self): + return self.drone_stream_factory.get_drone(self.id if self.leader else self.partner) + def listen_for_stop(self, token): self.logger.info(f"{self.id}: Received token") if token.id == self.build_sketch_end_id(self.partner): @@ -285,7 +290,7 @@ def calculate_error_correction(self, partner_position, self_position, position_v return center_correction - def broadcast_target(self, partner_position, self_position): + def broadcast_target(self, position_vector_publisher, partner_position, self_position): average_position = average(self_position, partner_position) @@ -330,8 +335,8 @@ def broadcast_target(self, partner_position, self_position): if self.encountered and self.passed(average_position): self.target_position_vector = self.boundary_sketch(partner_position, self_position) - # self.logger.info(f"PV: {self.target_position_vector}") - self.position_vector_publisher.publish(self.target_position_vector) + # self.logger.info(f"PV: {self.target_position_vector} to {position_vector_publisher}") + position_vector_publisher.publish(self.target_position_vector) self.dragonfly_sketch_subject.on_next(self.target_position_vector) def calculate_gradient(self): @@ -516,4 +521,5 @@ def passed(self, average_position): def stop(self): self.navigate_subscription.dispose() self.leader_broadcast_subscription.dispose() - self.receive_semaphore_subscription.dispose() \ No newline at end of file + self.receive_semaphore_subscription.dispose() + self.sketch_control_subscription.dispose() \ No newline at end of file diff --git a/src/dragonfly/dragonfly/command.py b/src/dragonfly/dragonfly/command.py index e2d995b..af5c4d6 100755 --- a/src/dragonfly/dragonfly/command.py +++ b/src/dragonfly/dragonfly/command.py @@ -52,7 +52,6 @@ def __init__(self, id, node): self.local_velocity_observable = Subject() self.dragonfly_announce_subject = Subject() self.semaphore_observable = Subject() - self.dragonfly_sketch_subject = Subject() def setmode(self, mode): self.logger.info(f"Set Mode {mode}") @@ -446,8 +445,7 @@ def mission(self, request, response): self.actionqueue.push(LogAction(self.logPublisher, "Sketch")) \ .push(SketchAction(self.id, self.logPublisher, self.logger, self.local_setvelocity_publisher, self.dragonfly_announce_subject, step.sketch_step.offset, step.sketch_step.partner, step.sketch_step.leader, self.drone_stream_factory, - self.semaphore_observable, self.semaphore_publisher, - self.dragonfly_sketch_subject, self.position_vector_publisher, step.sketch_step.threshold)) + self.semaphore_observable, self.semaphore_publisher, step.sketch_step.threshold)) elif step.msg_type == MissionStep.TYPE_VERTICAL_TRANSECT: self.logger.info("Vertical Transect") @@ -589,11 +587,6 @@ def setup(self): self.node.create_subscription(String, "/dragonfly/announce", lambda name: self.dragonfly_announce_subject.on_next(name), qos_profile=QoSProfile(depth=10, reliability=ReliabilityPolicy.BEST_EFFORT)) - self.node.create_subscription(PositionVector, "/dragonfly/sketch", - lambda positionVector: self.dragonfly_sketch_subject.on_next(positionVector), - qos_profile=QoSProfile(depth=10, reliability=ReliabilityPolicy.BEST_EFFORT)) - self.position_vector_publisher = self.node.create_publisher(PositionVector, "/dragonfly/sketch", - qos_profile=QoSProfile(history=HistoryPolicy.KEEP_LAST, depth=10)) self.semaphore_publisher = self.node.create_publisher(SemaphoreToken, "/dragonfly/semaphore", qos_profile=QoSProfile(history=HistoryPolicy.KEEP_LAST, depth=10)) diff --git a/src/dragonfly/dragonfly/droneStreamFactory.py b/src/dragonfly/dragonfly/droneStreamFactory.py index e73532a..1f79dab 100644 --- a/src/dragonfly/dragonfly/droneStreamFactory.py +++ b/src/dragonfly/dragonfly/droneStreamFactory.py @@ -4,7 +4,7 @@ from sensor_msgs.msg import NavSatFix from geometry_msgs.msg import TwistStamped from sensor_msgs.msg import Range -from dragonfly_messages.msg import CO2 +from dragonfly_messages.msg import CO2, PositionVector from rclpy.qos import QoSProfile, QoSHistoryPolicy, HistoryPolicy, ReliabilityPolicy class DroneStream: @@ -21,6 +21,8 @@ def __init__(self, name, node): self.velocity_subject_init = False self.rangefinder_subject = Subject() self.rangefinder_subject_init = False + self.sketch_control = Subject() + self.sketch_control_init = False self.mean = 0 self.std_dev = 1 @@ -63,6 +65,18 @@ def get_rangefinder(self): return self.rangefinder_subject + def get_sketch_control(self): + if not self.sketch_control_init: + self.node.create_subscription(PositionVector, f"{self.name}/sketch", + lambda value: self.sketch_control.on_next(value), + qos_profile=QoSProfile(depth=10, reliability=ReliabilityPolicy.BEST_EFFORT)) + self.sketch_control_init = True + + return self.sketch_control + + def get_sketch_control_publisher(self): + return self.node.create_publisher(PositionVector, f"{self.name}/sketch", + qos_profile=QoSProfile(history=HistoryPolicy.KEEP_LAST, depth=10)) class DroneStreamFactory: