Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,6 @@ class ExternalConnection {

std::shared_ptr <structures::AtomicQueue<structures::ReconnectQueueItem>>
reconnectQueue_ {};
/// Name of the vehicle
std::string vehicleName_ {};
/// Name of the company
std::string company_ {};
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ namespace bringauto::external_client::connection::communication {
void closeConnection() override;

private:
/// Directionality of QUIC streams created or accepted by this connection
enum class StreamMode {
/// Stream can only send data in one direction
Unidirectional,
/// Stream supports bidirectional send and receive
Bidirectional
};

/// Pointer to the MsQuic API function table
const QUIC_API_TABLE *quic_{nullptr};
/// QUIC registration handle associated with the application
Expand All @@ -98,6 +106,8 @@ namespace bringauto::external_client::connection::communication {
/// Path to the CA certificate file
std::string caFile_;

StreamMode streamMode_{StreamMode::Bidirectional};

/// Atomic state of the connection used for synchronization across threads
std::atomic<ConnectionState> connectionState_{ConnectionState::NOT_CONNECTED};

Expand Down Expand Up @@ -233,7 +243,7 @@ namespace bringauto::external_client::connection::communication {
*
* @param message Message to be sent to the peer.
*/
void sendViaQuicStream(const ExternalProtocol::ExternalClient& message);
void sendViaQuicStream(const ExternalProtocol::ExternalClient &message);

/**
* @brief Closes the active QUIC configuration.
Expand Down Expand Up @@ -318,21 +328,40 @@ namespace bringauto::external_client::connection::communication {
/**
* @brief Retrieves a protocol setting value as a plain string.
*
* Extracts a value from ExternalConnectionSettings::protocolSettings and
* transparently handles values stored as JSON-encoded strings.
* Looks up a value in ExternalConnectionSettings::protocolSettings and returns
* it as a plain string. If the stored value is a JSON-encoded string, it is
* transparently parsed and unwrapped.
*
* Allows uniform access to protocol settings regardless of whether
* they were stored as plain strings or JSON-serialized values.
* If the key does not exist or the value cannot be parsed as valid JSON,
* the provided default value is returned and a warning is logged.
*
* This allows uniform access to protocol settings regardless of whether
* they are stored as plain strings or JSON-serialized strings.
*
* @param settings External connection settings containing protocolSettings.
* @param key Key identifying the protocol setting.
* @param defaultValue Value returned if the key is missing or invalid.
* @return Plain string value suitable for direct use (e.g. file paths).
*
* @throws std::out_of_range if the key is not present in protocolSettings.
*/
static std::string getProtocolSettingsString(
const structures::ExternalConnectionSettings &settings,
std::string_view key
std::string_view key,
std::string defaultValue = {}
);

/**
* @brief Parses QUIC stream mode from protocol settings.
*
* Reads the stream mode from the external connection settings and determines
* whether QUIC streams should be unidirectional or bidirectional.
*
* Supported values:
* - "unidirectional", "unidir" → Unidirectional streams
* - any other value or missing setting → Bidirectional streams (default)
*
* @param settings External connection settings containing QUIC protocol options
* @return StreamMode Parsed stream mode (defaults to Bidirectional)
*/
static StreamMode parseStreamMode(const structures::ExternalConnectionSettings &settings);
};
}
1 change: 1 addition & 0 deletions include/bringauto/settings/Constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class Constants {
inline static constexpr std::string_view CLIENT_CERT { "client-cert" };
inline static constexpr std::string_view CLIENT_KEY { "client-key" };
inline static constexpr std::string_view ALPN { "alpn" };
inline static constexpr std::string_view STREAM_MODE { "stream-mode" };

inline static constexpr std::string_view MODULES { "modules" };
inline static constexpr std::string_view AERON_CONNECTION { "aeron:ipc"};
Expand Down
4 changes: 3 additions & 1 deletion resources/config/quic_example.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
"client-cert" : "build/certs/client.pem",
"client-key" : "build/certs/client-key.pem",
"alpn" : "sample",
"DisconnectTimeoutMs" : 800
"stream-mode": "unidirectional",
"DisconnectTimeoutMs" : 800,
"PeerUnidiStreamCount" : 100
},
"modules": [1,2,3]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ int ExternalConnection::initializeConnection(const std::vector<structures::Devic
int ExternalConnection::connectMessageHandle(const std::vector<structures::DeviceIdentification> &devices) {
generateSessionId();

auto connectMessage = common_utils::ProtobufUtils::createExternalClientConnect(sessionId_, company_, vehicleName_,
auto connectMessage = common_utils::ProtobufUtils::createExternalClientConnect(sessionId_,
context_->settings->company,
context_->settings->vehicleName,
devices);
if(not communicationChannel_->sendMessage(&connectMessage)){
log::logError("Communication client couldn't send any message");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ namespace bringauto::external_client::connection::communication {
settings::Constants::CLIENT_KEY)),
caFile_(getProtocolSettingsString(
settings,
settings::Constants::CA_FILE)) {
settings::Constants::CA_FILE)),
streamMode_(parseStreamMode(settings)) {
alpnBuffer_.Buffer = reinterpret_cast<uint8_t *>(alpn_.data());
alpnBuffer_.Length = static_cast<uint32_t>(alpn_.size());

Expand Down Expand Up @@ -88,15 +89,29 @@ namespace bringauto::external_client::connection::communication {
std::shared_ptr<ExternalProtocol::ExternalServer> QuicCommunication::receiveMessage() {
std::unique_lock lock(inboundMutex_);

// Wait for a message or transition out of allowed states
// Explicitly allow waiting during CONNECTING, CLOSING, and CONNECTED states
// This whitelist approach is safe if new states are added in the future
if (!inboundCv_.wait_for(
lock,
settings::receive_message_timeout,
[this] { return !inboundQueue_.empty() || connectionState_.load() != ConnectionState::CONNECTED; }
[this] {
auto state = connectionState_.load();
return !inboundQueue_.empty() ||
(state != ConnectionState::CONNECTING &&
state != ConnectionState::CLOSING &&
state != ConnectionState::CONNECTED);
}
)) {
return nullptr;
}

if (connectionState_.load() != ConnectionState::CONNECTED || inboundQueue_.empty()) {
// Check if we stopped waiting due to invalid state or empty queue
auto state = connectionState_.load();
if ((state != ConnectionState::CONNECTING &&
state != ConnectionState::CLOSING &&
state != ConnectionState::CONNECTED) ||
inboundQueue_.empty()) {
return nullptr;
}

Expand Down Expand Up @@ -228,9 +243,15 @@ namespace bringauto::external_client::connection::communication {
inboundCv_.notify_one();
}

void QuicCommunication::sendViaQuicStream(const ExternalProtocol::ExternalClient& message) {
void QuicCommunication::sendViaQuicStream(const ExternalProtocol::ExternalClient &message) {
HQUIC stream{nullptr};
if (QUIC_FAILED(quic_->StreamOpen(connection_, QUIC_STREAM_OPEN_FLAG_NONE, streamCallback, this, &stream))) {

QUIC_STREAM_OPEN_FLAGS flags = streamMode_ == StreamMode::Unidirectional
? QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL
: QUIC_STREAM_OPEN_FLAG_NONE;

if (QUIC_FAILED(
quic_->StreamOpen(connection_, flags, streamCallback, this, &stream))) {
settings::Logger::logError("[quic] StreamOpen failed");
return;
}
Expand Down Expand Up @@ -290,6 +311,23 @@ namespace bringauto::external_client::connection::communication {
break;
}

/// Fired when peer open new stream on connection, stream handler need to be def
case QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED: {
auto streamId = self->getStreamId(event->PEER_STREAM_STARTED.Stream);

settings::Logger::logDebug(
"[quic] [stream {}] Peer stream started",
streamId.value_or(0)
);

self->quic_->SetCallbackHandler(event->PEER_STREAM_STARTED.Stream,
reinterpret_cast<void *>(streamCallback), // NOSONAR - MsQuic C API requires passing function pointer as void*
context);

self->quic_->StreamReceiveSetEnabled(event->PEER_STREAM_STARTED.Stream, TRUE);
break;
}

/// Final notification that the connection has been fully shut down.
/// This is the last event delivered for the connection handle.
case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: {
Expand Down Expand Up @@ -487,16 +525,39 @@ namespace bringauto::external_client::connection::communication {

std::string QuicCommunication::getProtocolSettingsString(
const structures::ExternalConnectionSettings &settings,
std::string_view key
std::string_view key,
std::string defaultValue
) {
const auto &raw = settings.protocolSettings.at(std::string(key));
const auto it = settings.protocolSettings.find(std::string(key));
if (it == settings.protocolSettings.end()) {
settings::Logger::logWarning("[quic] Protocol setting '{}' not found, using default", key);
return defaultValue;
}

const auto &raw = it->second;

if (nlohmann::json::accept(raw)) {
auto j = nlohmann::json::parse(raw);
if (j.is_string()) {
return j.get<std::string>();
try {
if (nlohmann::json::accept(raw)) {
auto j = nlohmann::json::parse(raw);
if (j.is_string()) {
return j.get<std::string>();
}
}
return raw;
} catch (const nlohmann::json::exception &) {
settings::Logger::logWarning("[quic] Protocol setting '{}' contains invalid JSON, using default", key);
return defaultValue;
}
return raw;
}

QuicCommunication::StreamMode QuicCommunication::parseStreamMode(
const structures::ExternalConnectionSettings &settings
) {
const std::string mode = getProtocolSettingsString(settings, settings::Constants::STREAM_MODE);

if (mode == "unidirectional" || mode == "unidir")
return StreamMode::Unidirectional;

return StreamMode::Bidirectional;
}
}
6 changes: 6 additions & 0 deletions source/bringauto/settings/QuicSettingsParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ namespace bringauto::settings {
quic.IsSet.DisconnectTimeoutMs = TRUE;
}

if (auto value = getOptionalUint(settings, "PeerUnidiStreamCount"); value.has_value()) {
settings::Logger::logDebug("[quic] [settings] PeerUnidiStreamCount settings loaded");
quic.PeerUnidiStreamCount = *value;
quic.IsSet.PeerUnidiStreamCount = TRUE;
}

return quic;
}

Expand Down