Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/dragonfly/dragonfly/actions/SketchAction.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class SketchAction:
SKETCH_STARTING_THRESHOLD = 20 # m

def __init__(self, id, log_publisher, logger, local_setvelocity_publisher, announce_stream, offset, partner, leader,
drone_stream_factory, semaphore_observable, semaphore_publisher, dragonfly_sketch_subject, position_vector_publisher, threshold):
drone_stream_factory, semaphore_observable, semaphore_publisher, threshold):
self.log_publisher = log_publisher
self.logger = logger
self.local_setvelocity_publisher = local_setvelocity_publisher
Expand All @@ -99,14 +99,14 @@ def __init__(self, id, log_publisher, logger, local_setvelocity_publisher, annou
self.ros_subscriptions = []
self.announce_stream = announce_stream
self.drone_stream_factory = drone_stream_factory
self.dragonfly_sketch_subject = dragonfly_sketch_subject
self.position_vector_publisher = position_vector_publisher
self.dragonfly_sketch_subject = Subject()
self.semaphore_observable = semaphore_observable
self.semaphore_publisher = semaphore_publisher

self.navigate_subscription = rx.empty().subscribe()
self.leader_broadcast_subscription = rx.empty().subscribe()
self.receive_semaphore_subscription = rx.empty().subscribe()
self.sketch_control_subscription = rx.empty().subscribe()

self.target_position_vector = None
self.encountered = False
Expand Down Expand Up @@ -153,10 +153,12 @@ def step(self):

if self.leader:
self.leader_broadcast_subscription = rx.combine_latest(
rx.just(self.get_leader_drone().get_sketch_control_publisher()),
self.setup_subject(self.partner),
self.setup_subject(self.id)
).subscribe(lambda position_reading_vector: self.broadcast_target(position_reading_vector[0], position_reading_vector[1]))
).subscribe(lambda position_reading_vector: self.broadcast_target(position_reading_vector[0], position_reading_vector[1], position_reading_vector[2]))
else:
self.sketch_control_subscription = self.get_leader_drone().get_sketch_control().subscribe(self.dragonfly_sketch_subject)
self.receive_semaphore_subscription = self.semaphore_observable.subscribe(on_next = lambda token: self.listen_for_stop(token))

return self.status
Expand All @@ -165,6 +167,9 @@ def step(self):
def build_sketch_end_id(id):
return f"sketch.{id}"

def get_leader_drone(self):
return self.drone_stream_factory.get_drone(self.id if self.leader else self.partner)

def listen_for_stop(self, token):
self.logger.info(f"{self.id}: Received token")
if token.id == self.build_sketch_end_id(self.partner):
Expand Down Expand Up @@ -285,7 +290,7 @@ def calculate_error_correction(self, partner_position, self_position, position_v

return center_correction

def broadcast_target(self, partner_position, self_position):
def broadcast_target(self, position_vector_publisher, partner_position, self_position):

average_position = average(self_position, partner_position)

Expand Down Expand Up @@ -330,8 +335,8 @@ def broadcast_target(self, partner_position, self_position):

if self.encountered and self.passed(average_position):
self.target_position_vector = self.boundary_sketch(partner_position, self_position)
# self.logger.info(f"PV: {self.target_position_vector}")
self.position_vector_publisher.publish(self.target_position_vector)
# self.logger.info(f"PV: {self.target_position_vector} to {position_vector_publisher}")
position_vector_publisher.publish(self.target_position_vector)
self.dragonfly_sketch_subject.on_next(self.target_position_vector)

def calculate_gradient(self):
Expand Down Expand Up @@ -516,4 +521,5 @@ def passed(self, average_position):
def stop(self):
self.navigate_subscription.dispose()
self.leader_broadcast_subscription.dispose()
self.receive_semaphore_subscription.dispose()
self.receive_semaphore_subscription.dispose()
self.sketch_control_subscription.dispose()
9 changes: 1 addition & 8 deletions src/dragonfly/dragonfly/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def __init__(self, id, node):
self.local_velocity_observable = Subject()
self.dragonfly_announce_subject = Subject()
self.semaphore_observable = Subject()
self.dragonfly_sketch_subject = Subject()

def setmode(self, mode):
self.logger.info(f"Set Mode {mode}")
Expand Down Expand Up @@ -446,8 +445,7 @@ def mission(self, request, response):
self.actionqueue.push(LogAction(self.logPublisher, "Sketch")) \
.push(SketchAction(self.id, self.logPublisher, self.logger, self.local_setvelocity_publisher, self.dragonfly_announce_subject,
step.sketch_step.offset, step.sketch_step.partner, step.sketch_step.leader, self.drone_stream_factory,
self.semaphore_observable, self.semaphore_publisher,
self.dragonfly_sketch_subject, self.position_vector_publisher, step.sketch_step.threshold))
self.semaphore_observable, self.semaphore_publisher, step.sketch_step.threshold))

elif step.msg_type == MissionStep.TYPE_VERTICAL_TRANSECT:
self.logger.info("Vertical Transect")
Expand Down Expand Up @@ -589,11 +587,6 @@ def setup(self):
self.node.create_subscription(String, "/dragonfly/announce",
lambda name: self.dragonfly_announce_subject.on_next(name),
qos_profile=QoSProfile(depth=10, reliability=ReliabilityPolicy.BEST_EFFORT))
self.node.create_subscription(PositionVector, "/dragonfly/sketch",
lambda positionVector: self.dragonfly_sketch_subject.on_next(positionVector),
qos_profile=QoSProfile(depth=10, reliability=ReliabilityPolicy.BEST_EFFORT))
self.position_vector_publisher = self.node.create_publisher(PositionVector, "/dragonfly/sketch",
qos_profile=QoSProfile(history=HistoryPolicy.KEEP_LAST, depth=10))
self.semaphore_publisher = self.node.create_publisher(SemaphoreToken, "/dragonfly/semaphore",
qos_profile=QoSProfile(history=HistoryPolicy.KEEP_LAST, depth=10))

Expand Down
16 changes: 15 additions & 1 deletion src/dragonfly/dragonfly/droneStreamFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sensor_msgs.msg import NavSatFix
from geometry_msgs.msg import TwistStamped
from sensor_msgs.msg import Range
from dragonfly_messages.msg import CO2
from dragonfly_messages.msg import CO2, PositionVector
from rclpy.qos import QoSProfile, QoSHistoryPolicy, HistoryPolicy, ReliabilityPolicy

class DroneStream:
Expand All @@ -21,6 +21,8 @@ def __init__(self, name, node):
self.velocity_subject_init = False
self.rangefinder_subject = Subject()
self.rangefinder_subject_init = False
self.sketch_control = Subject()
self.sketch_control_init = False
self.mean = 0
self.std_dev = 1

Expand Down Expand Up @@ -63,6 +65,18 @@ def get_rangefinder(self):

return self.rangefinder_subject

def get_sketch_control(self):
if not self.sketch_control_init:
self.node.create_subscription(PositionVector, f"{self.name}/sketch",
lambda value: self.sketch_control.on_next(value),
qos_profile=QoSProfile(depth=10, reliability=ReliabilityPolicy.BEST_EFFORT))
self.sketch_control_init = True

return self.sketch_control

def get_sketch_control_publisher(self):
return self.node.create_publisher(PositionVector, f"{self.name}/sketch",
qos_profile=QoSProfile(history=HistoryPolicy.KEEP_LAST, depth=10))

class DroneStreamFactory:

Expand Down