Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 159 additions & 53 deletions scripts/ntrip_ros.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import os
import sys
import json
from typing import List

import rclpy
from rclpy.parameter import Parameter
from rcl_interfaces.msg import SetParametersResult

from ntrip_ros_base import NTRIPRosBase
from ntrip_client.ntrip_client import NTRIPClient
Expand All @@ -28,80 +31,183 @@ def __init__(self):
('key', 'None'),
('ca_cert', 'None'),
('rtcm_timeout_seconds', NTRIPClient.DEFAULT_RTCM_TIMEOUT_SECONDS),
('recovery_period_s', 5.0),
('max_disconnected_count', 3),
]
)

# Initialize all internal variables in constructor
# Will be loaded in 'load_parameters' function.
self.host = None
self.port = None
self.mountpoint = None
self.ntrip_version = None
self.authenticate = None
self.username = None
self.password = None
self.ssl = None
self.cert = None
self.key = None
self.ca_cert = None
self.rtcm_timeout_seconds = None

self.load_parameters()
self.add_on_set_parameters_callback(self.on_set_parameters_callback)

# Initialize the client
self._client = self.init_ntrip_client()
self.run()

# Initialize timer(s)
self._ntrip_config_updated = False
self._disconnected_count = 0
self.recovery_timer = self.create_timer(
self.get_parameter('recovery_period_s').value,
self.recovery_callback)

def load_parameters(self):
"""Load ROS parameters."""
# Read some mandatory config
host = self.get_parameter('host').value
port = self.get_parameter('port').value
mountpoint = self.get_parameter('mountpoint').value
self.host = self.get_parameter('host').value
self.port = self.get_parameter('port').value
self.mountpoint = self.get_parameter('mountpoint').value

# Optionally get the ntrip version from the launch file
ntrip_version = self.get_parameter('ntrip_version').value
if ntrip_version == 'None':
ntrip_version = None
self.ntrip_version = self.get_parameter('ntrip_version').value
if self.ntrip_version == 'None':
self.ntrip_version = None

# If we were asked to authenticate, read the username and password
username = None
password = None
if self.get_parameter('authenticate').value:
username = self.get_parameter('username').value
password = self.get_parameter('password').value
if not username:
self.get_logger().error('Requested to authenticate, but param "username" was not set')
sys.exit(1)
if not password:
self.get_logger().error('Requested to authenticate, but param "password" was not set')
sys.exit(1)
self.username = None
self.password = None
self.authenticate = self.get_parameter('authenticate').value
if self.authenticate:
self.username = self.get_parameter('username').value
self.password = self.get_parameter('password').value
if not self.username or not self.password:
raise ValueError(f'Invalid username/password: {self.username}/{self.password}')

# Initialize the client
self._client = NTRIPClient(
host=host,
port=port,
mountpoint=mountpoint,
ntrip_version=ntrip_version,
username=username,
password=password,
self.ssl = self.get_parameter('ssl').value
self.cert = self.get_parameter('cert').value
if self.cert == 'None':
self.cert = None
self.key = self.get_parameter('key').value
if self.key == 'None':
self.key = None
self.ca_cert = self.get_parameter('ca_cert').value
if self.ca_cert == 'None':
self.ca_cert = None

self.rtcm_timeout_seconds = self.get_parameter('rtcm_timeout_seconds').value

def on_set_parameters_callback(self, parameters: List[Parameter]) -> SetParametersResult:
"""Callback on parameter update
This allows to validate every parameter update and automatically
reload the necessary component when an update is triggered
"""
def is_string(name: str, param: Parameter) -> bool:
"""Helper function to reduce verbosity"""
return param.name == name and param.type_ == Parameter.Type.STRING

def is_bool(name: str, param: Parameter) -> bool:
"""Helper function to reduce verbosity"""
return param.name == name and param.type_ == Parameter.Type.BOOL

def is_integer(name: str, param: Parameter) -> bool:
"""Helper function to reduce verbosity"""
return param.name == name and param.type_ == Parameter.Type.INTEGER

self.get_logger().warn(f'{parameters}')

for parameter in parameters:
if is_string('host', parameter):
self.host = parameter.value
elif is_integer('port', parameter):
self.port = parameter.value
elif is_string('mountpoint', parameter):
self.mountpoint = parameter.value
elif is_string('username', parameter):
self.username = parameter.value
elif is_string('password', parameter):
self.password = parameter.value
elif is_bool('ssl', parameter):
self.ssl = parameter.value
elif is_string('cert', parameter):
self.cert = parameter.value
self.cert = self.cert if self.cert != 'None' else None
elif is_string('key', parameter):
self.key = parameter.value
self.key = self.key if self.key != 'None' else None
elif is_string('ca_cert', parameter):
self.ca_cert = parameter.value
self.ca_cert = self.ca_cert if self.ca_cert != 'None' else None
else:
# parameters unrelated to the NTRIP configuration
# such as a timer period
pass

self._ntrip_config_updated = True
return SetParametersResult(successful=True)

def init_ntrip_client(self):
"""Initialize a NTRIP client using class internal variable."""
client = NTRIPClient(
host=self.host,
port=self.port,
mountpoint=self.mountpoint,
ntrip_version=self.ntrip_version,
username=self.username,
password=self.password,
logerr=self.get_logger().error,
logwarn=self.get_logger().warning,
loginfo=self.get_logger().info,
logdebug=self.get_logger().debug
)
)
client.ssl = self.ssl
client.cert = self.cert
client.key = self.key
client.ca_cert = self.ca_cert

client.nmea_parser.nmea_max_length = self._nmea_max_length
client.nmea_parser.nmea_min_length = self._nmea_min_length
client.reconnect_attempt_max = self._reconnect_attempt_max
client.reconnect_attempt_wait_seconds = self._reconnect_attempt_wait_seconds
client.rtcm_timeout_seconds = self.rtcm_timeout_seconds

return client

def recovery_callback(self):
"""Perform recovery of the NTRIP client
- Check if the configuration was updated through a parameter update
- Check if the NTRIP client is still connected
"""
if self._ntrip_config_updated:
self.stop()
# Re-initialize ntrip client with updated configuration
self._client = self.init_ntrip_client()
self.run()
self._ntrip_config_updated = False

#return early, do not check if client is connected immediately
return

if not self._client._connected:
self._disconnected_count += 1
if self._disconnected_count >= self.get_parameter('max_disconnected_count').value:
self._disconnected_count = 0
self.stop()
self.run()

# Get some SSL parameters for the NTRIP client
self._client.ssl = self.get_parameter('ssl').value
self._client.cert = self.get_parameter('cert').value
self._client.key = self.get_parameter('key').value
self._client.ca_cert = self.get_parameter('ca_cert').value
if self._client.cert == 'None':
self._client.cert = None
if self._client.key == 'None':
self._client.key = None
if self._client.ca_cert == 'None':
self._client.ca_cert = None

# Get some timeout parameters for the NTRIP client
self._client.nmea_parser.nmea_max_length = self._nmea_max_length
self._client.nmea_parser.nmea_min_length = self._nmea_min_length
self._client.reconnect_attempt_max = self._reconnect_attempt_max
self._client.reconnect_attempt_wait_seconds = self._reconnect_attempt_wait_seconds
self._client.rtcm_timeout_seconds = self.get_parameter('rtcm_timeout_seconds').value

if __name__ == '__main__':
# Start the node
rclpy.init()
node = NTRIPRos()
if not node.run():
sys.exit(1)
try:
# Spin until we are shut down
rclpy.spin(node)
except KeyboardInterrupt:
pass
except BaseException as e:
raise e
finally:
node.stop()

# Shutdown the node and stop rclpy
rclpy.shutdown()

# Shutdown the node and stop rclpy
rclpy.shutdown()
15 changes: 6 additions & 9 deletions scripts/ntrip_ros_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __init__(self, name):

# Setup the RTCM publisher
self._rtcm_pub = self.create_publisher(self._rtcm_message_type, 'rtcm', 10)
self._rtcm_timer = self.create_timer(0.1, self.publish_rtcm, autostart=False)

# Initialize the client
self._client = NTRIPBase(
Expand All @@ -101,23 +102,19 @@ def run(self):
self._fix_sub = self.create_subscription(NavSatFix, 'fix', self.subscribe_fix, 10)

# Start the timer that will check for RTCM data
self._rtcm_timer = self.create_timer(0.1, self.publish_rtcm)
self._rtcm_timer.reset()
return True

def stop(self):
self.get_logger().info('Stopping RTCM publisher')
if self._rtcm_timer:
self._rtcm_timer.cancel()
self._rtcm_timer.destroy()
self._rtcm_timer.cancel()
self.get_logger().info('Disconnecting NTRIP client')
self._client.disconnect()
self.get_logger().info('Shutting down node')
self.destroy_node()

def subscribe_nmea(self, nmea):
# Just extract the NMEA from the message, and send it right to the server
self._client.send_nmea(nmea.sentence)

def subscribe_fix(self, fix: NavSatFix):
# Calculate the timestamp of the message
timestamp_secs = fix.header.stamp.sec + fix.header.stamp.nanosec * 1e-9
Expand All @@ -136,7 +133,7 @@ def subscribe_fix(self, fix: NavSatFix):
nmea_lat_direction = "S"
if fix.longitude < 0:
nmea_lon_direction = "W"

# Convert the units of the latitude and longitude
nmea_lat = NMEAParser.lat_dd_to_dmm(fix.latitude)
nmea_lon = NMEAParser.lon_dd_to_dmm(fix.longitude)
Expand Down Expand Up @@ -180,4 +177,4 @@ def _create_rtcm_msgs_rtcm_message(self, rtcm):
frame_id=self._rtcm_frame_id
),
message=rtcm
)
)