From a420c17e263f644b66c2a34984edac562e3dc62f Mon Sep 17 00:00:00 2001 From: Ray Date: Fri, 21 Jun 2024 12:57:15 -0400 Subject: [PATCH 1/7] support for esp32 --- EspLocalServer.cpp | 67 +++++++++++++++++++++++++++++++++++++++++ EspLocalServer.h | 49 ++++++++++++++++++++++++++++++ OpenThingsFramework.cpp | 59 +++++++++++++++++++++++++----------- OpenThingsFramework.h | 12 +++----- StringBuilder.cpp | 2 +- 5 files changed, 163 insertions(+), 26 deletions(-) create mode 100644 EspLocalServer.cpp create mode 100644 EspLocalServer.h diff --git a/EspLocalServer.cpp b/EspLocalServer.cpp new file mode 100644 index 0000000..71ed941 --- /dev/null +++ b/EspLocalServer.cpp @@ -0,0 +1,67 @@ +#if defined(ESP8266) || defined(ESP32) +#include "EspLocalServer.h" + +using namespace OTF; + +EspLocalServer::EspLocalServer(uint16_t port) : server(port) {} + +LocalClient *EspLocalServer::acceptClient() { + if (activeClient != nullptr) { + delete activeClient; + } + + WiFiClient wiFiClient = server.available(); + if (wiFiClient) { + activeClient = new EspLocalClient(wiFiClient); + } else { + activeClient = nullptr; + } + return activeClient; +} + +void EspLocalServer::begin() { + server.begin(); +} + + +EspLocalClient::EspLocalClient(WiFiClient client) { + this->client = client; +} + +bool EspLocalClient::dataAvailable() { + return client.available(); +} + +size_t EspLocalClient::readBytes(char *buffer, size_t length) { + return client.readBytes(buffer, length); +} + +size_t EspLocalClient::readBytesUntil(char terminator, char *buffer, size_t length) { + return client.readBytesUntil(terminator, buffer, length); +} + +void EspLocalClient::print(const char *data) { + client.print(data); +} + +void EspLocalClient::print(const __FlashStringHelper *data) { + client.print(data); +} + +int EspLocalClient::peek() { + return client.peek(); +} + +void EspLocalClient::setTimeout(int timeout) { + client.setTimeout(timeout); +} + +void EspLocalClient::flush() { + client.flush(); +} + +void EspLocalClient::stop() { + client.stop(); +} + +#endif diff --git a/EspLocalServer.h b/EspLocalServer.h new file mode 100644 index 0000000..df7dc42 --- /dev/null +++ b/EspLocalServer.h @@ -0,0 +1,49 @@ +#if defined(ESP8266) || defined(ESP32) +#ifndef OTF_ESPLOCALSERVER_H +#define OTF_ESPLOCALSERVER_H + +#include "LocalServer.h" + +#include +#if defined(ESP8266) + #include +#else + #include +#endif + +namespace OTF { + class EspLocalClient : public LocalClient { + friend class EspLocalServer; + + private: + WiFiClient client; + EspLocalClient(WiFiClient client); + + public: + bool dataAvailable(); + size_t readBytes(char *buffer, size_t length); + size_t readBytesUntil(char terminator, char *buffer, size_t length); + void print(const char *data); + void print(const __FlashStringHelper *data); + int peek(); + void setTimeout(int timeout); + void flush(); + void stop(); + }; + + + class EspLocalServer : public LocalServer { + private: + WiFiServer server; + EspLocalClient *activeClient = nullptr; + + public: + EspLocalServer(uint16_t port); + + LocalClient *acceptClient(); + void begin(); + }; +}// namespace OTF + +#endif +#endif diff --git a/OpenThingsFramework.cpp b/OpenThingsFramework.cpp index 2ac91df..bc2df09 100755 --- a/OpenThingsFramework.cpp +++ b/OpenThingsFramework.cpp @@ -20,9 +20,13 @@ OpenThingsFramework::OpenThingsFramework(uint16_t webServerPort, char *hdBuffer, headerBufferSize = HEADERS_BUFFER_SIZE; } missingPageCallback = defaultMissingPageCallback; - localServer.begin(); + //localServer.begin(); }; +void OpenThingsFramework::localServerBegin() { + localServer.begin(); +} + OpenThingsFramework::OpenThingsFramework(uint16_t webServerPort, const String &webSocketHost, uint16_t webSocketPort, const String &deviceKey, bool useSsl, char *hdBuffer, int hdBufferSize) : OpenThingsFramework(webServerPort, hdBuffer, hdBufferSize) { setCloudStatus(UNABLE_TO_CONNECT); @@ -66,6 +70,8 @@ void OpenThingsFramework::onMissingPage(callback_t callback) { void OpenThingsFramework::localServerLoop() { + Serial.println("------"); + Serial.print(ESP.getFreeHeap()); Serial.print("A"); static unsigned long wait_to = 0; // timeout to wait for client data if (!wait_to) { localClient = localServer.acceptClient(); @@ -90,8 +96,8 @@ void OpenThingsFramework::localServerLoop() { // got new client data, reset wait_to to 0 wait_to = 0; - - // Update the timeout for each data read to ensure that the total timeout is WIFI_CONNECTION_TIMEOUT. + Serial.print(ESP.getFreeHeap()); Serial.print("B"); + // Update the timeout for each data read to ensure that the total timeout is WIFI_CONNECTION_TIMEOUT. unsigned int timeout = millis()+WIFI_CONNECTION_TIMEOUT; @@ -111,6 +117,9 @@ void OpenThingsFramework::localServerLoop() { buffer[length++] = '\n'; if(read==1 && rc=='\r') { break; } } + + Serial.print(ESP.getFreeHeap()); Serial.print("C"); + DEBUG(Serial.printf((char *) F("Finished reading data from client. Request line + headers were %d bytes\n"), length);) buffer[length] = 0; @@ -121,9 +130,13 @@ void OpenThingsFramework::localServerLoop() { return; } + Serial.print(ESP.getFreeHeap()); Serial.print("D"); + //Serial.println(F("Parsing request")); Request request(buffer, length, false); + Serial.print(ESP.getFreeHeap()); Serial.print("E"); + char *bodyBuffer = NULL; // If the request was valid, read the body and add it to the Request object. if (request.getType() > INVALID) { @@ -148,24 +161,36 @@ void OpenThingsFramework::localServerLoop() { } } + Serial.print(ESP.getFreeHeap()); Serial.print("F"); //Serial.println(F("Filling response")); - Response res = Response(); - fillResponse(request, res); - - if(bodyBuffer) delete[] bodyBuffer; - //Serial.println(F("Sending response")); - if (res.isValid()) { - char *responseString = res.toString(); - DEBUG(Serial.printf((char *) F("Response message is: %s\n"), responseString);) - localClient->print(responseString); - } else { - localClient->print(F("HTTP/1.1 500 OTF error\r\nResponse string could not be built\r\n")); - DEBUG(Serial.println(F("An error occurred while building the response string."));) - } - + if(1) + { + Response res = Response(); + fillResponse(request, res); + + Serial.print(ESP.getFreeHeap()); Serial.print("G"); + + if(bodyBuffer) delete[] bodyBuffer; + + Serial.print(ESP.getFreeHeap()); Serial.print("H"); + + //Serial.println(F("Sending response")); + if (res.isValid()) { + char *responseString = res.toString(); + DEBUG(Serial.printf((char *) F("Response message is: %s\n"), responseString);) + localClient->print(responseString); + } else { + localClient->print(F("HTTP/1.1 500 OTF error\r\nResponse string could not be built\r\n")); + DEBUG(Serial.println(F("An error occurred while building the response string."));) + } + } + Serial.print(ESP.getFreeHeap()); Serial.print("I"); + // Get a new client to indicate that the previous client is no longer needed. localClient = localServer.acceptClient(); + Serial.print(ESP.getFreeHeap()); Serial.print("J"); + Serial.println("========"); DEBUG(Serial.println(F("Finished handling request"));) } diff --git a/OpenThingsFramework.h b/OpenThingsFramework.h index c5e736e..b61e5d2 100644 --- a/OpenThingsFramework.h +++ b/OpenThingsFramework.h @@ -7,13 +7,8 @@ #include #include -#if defined(ESP8266) - #include "Esp8266LocalServer.h" - #define LOCAL_SERVER_CLASS Esp8266LocalServer -#elif defined(ESP32) - #include "Esp32LocalServer.h" - #define LOCAL_SERVER_CLASS Esp32LocalServer -#endif +#include "EspLocalServer.h" +#define LOCAL_SERVER_CLASS EspLocalServer // The size of the buffer to store the incoming request line and headers (does not include body). Larger requests will be discarded. #define HEADERS_BUFFER_SIZE 1536 @@ -94,7 +89,8 @@ namespace OTF { void onMissingPage(callback_t callback); void loop(); - + void localServerBegin(); + /** Returns the current status of the connection to the OpenThings Cloud server. */ CLOUD_STATUS getCloudStatus(); diff --git a/StringBuilder.cpp b/StringBuilder.cpp index 3096e57..950e978 100644 --- a/StringBuilder.cpp +++ b/StringBuilder.cpp @@ -8,7 +8,7 @@ StringBuilder::StringBuilder(size_t maxLength) { } StringBuilder::~StringBuilder() { - delete buffer; + delete[] buffer; } void StringBuilder::bprintf(char *format, va_list args) { From ff467330117e8bca3c0b91d532a02f50727442f0 Mon Sep 17 00:00:00 2001 From: arfrie22 <43021241+arfrie22@users.noreply.github.com> Date: Fri, 21 Jun 2024 14:27:39 -0400 Subject: [PATCH 2/7] fixed local server class and added write from prg --- Esp32LocalServer.cpp | 3 +++ Esp32LocalServer.h | 1 + Esp8266LocalServer.cpp | 12 ++++++++---- Esp8266LocalServer.h | 3 ++- EspLocalServer.cpp | 8 ++++++++ EspLocalServer.h | 4 +++- LocalServer.h | 5 ++++- 7 files changed, 29 insertions(+), 7 deletions(-) diff --git a/Esp32LocalServer.cpp b/Esp32LocalServer.cpp index 5a68a61..7192d6f 100644 --- a/Esp32LocalServer.cpp +++ b/Esp32LocalServer.cpp @@ -52,6 +52,9 @@ size_t Esp32LocalClient::write(const char *buffer, size_t length) { return client.write((const uint8_t *)buffer, length); } +size_t Esp32LocalClient::write(const __FlashStringHelper *buffer, size_t length) { + return client.write((const uint8_t *)buffer, length); +} int Esp32LocalClient::peek() { return client.peek(); } diff --git a/Esp32LocalServer.h b/Esp32LocalServer.h index 399dfc9..fe24a06 100644 --- a/Esp32LocalServer.h +++ b/Esp32LocalServer.h @@ -22,6 +22,7 @@ namespace OTF { void print(const char *data); void print(const __FlashStringHelper *data); size_t write(const char *buffer, size_t length); + size_t write(const __FlashStringHelper *buffer, size_t length); int peek(); void setTimeout(int timeout); void flush(); diff --git a/Esp8266LocalServer.cpp b/Esp8266LocalServer.cpp index 359075c..9b5bce0 100644 --- a/Esp8266LocalServer.cpp +++ b/Esp8266LocalServer.cpp @@ -40,10 +40,6 @@ size_t Esp8266LocalClient::readBytesUntil(char terminator, char *buffer, size_t return client.readBytesUntil(terminator, buffer, length); } -size_t Esp8266LocalClient::write(const char *buffer, size_t size) { - return client.write((const uint8_t *)buffer, size); -} - void Esp8266LocalClient::print(const char *data) { client.print(data); } @@ -52,6 +48,14 @@ void Esp8266LocalClient::print(const __FlashStringHelper *data) { client.print(data); } +size_t Esp8266LocalClient::write(const char *buffer, size_t size) { + return client.write((const uint8_t *)buffer, size); +} + +size_t Esp8266LocalClient::write(const __FlashStringHelper *buffer, size_t size) { + return client.write_P((const char*) buffer, size); +} + int Esp8266LocalClient::peek() { return client.peek(); } diff --git a/Esp8266LocalServer.h b/Esp8266LocalServer.h index ee1deea..271c4ff 100644 --- a/Esp8266LocalServer.h +++ b/Esp8266LocalServer.h @@ -19,9 +19,10 @@ namespace OTF { bool dataAvailable(); size_t readBytes(char *buffer, size_t length); size_t readBytesUntil(char terminator, char *buffer, size_t length); - size_t write(const char *buffer, size_t length); void print(const char *data); void print(const __FlashStringHelper *data); + size_t write(const char *buffer, size_t length); + size_t write(const __FlashStringHelper *buffer, size_t length); int peek(); void setTimeout(int timeout); void flush(); diff --git a/EspLocalServer.cpp b/EspLocalServer.cpp index 71ed941..c3f1369 100644 --- a/EspLocalServer.cpp +++ b/EspLocalServer.cpp @@ -48,6 +48,14 @@ void EspLocalClient::print(const __FlashStringHelper *data) { client.print(data); } +size_t EspLocalClient::write(const char *buffer, size_t size) { + return client.write(buffer, size); +} + +size_t EspLocalClient::write(const __FlashStringHelper *const buffer, size_t size) { + return client.write_P((const char*) buffer, size); +} + int EspLocalClient::peek() { return client.peek(); } diff --git a/EspLocalServer.h b/EspLocalServer.h index df7dc42..8ac263e 100644 --- a/EspLocalServer.h +++ b/EspLocalServer.h @@ -24,7 +24,9 @@ namespace OTF { size_t readBytes(char *buffer, size_t length); size_t readBytesUntil(char terminator, char *buffer, size_t length); void print(const char *data); - void print(const __FlashStringHelper *data); + void print(const __FlashStringHelper *const data); + size_t write(const char *buffer, size_t size); + size_t write(const __FlashStringHelper *buffer, size_t size); int peek(); void setTimeout(int timeout); void flush(); diff --git a/LocalServer.h b/LocalServer.h index 055122e..89db984 100644 --- a/LocalServer.h +++ b/LocalServer.h @@ -25,11 +25,14 @@ namespace OTF { virtual void print(const char *data) = 0; /** Prints a null-terminated string to the response stream. This method may be called multiple times before the stream is closed. */ - virtual void print(const __FlashStringHelper *data) = 0; + virtual void print(const __FlashStringHelper *const data) = 0; /** Writes `size` bytes from `buffer` to the response stream. */ virtual size_t write(const char *buffer, size_t size) = 0; + /** Writes `size` bytes from `buffer` to the response stream from program memory. */ + virtual size_t write(const __FlashStringHelper *buffer, size_t size) = 0; + /** Returns the next character in the request stream (without advancing the stream), or returns -1 if no character is available. */ virtual int peek() = 0; From 1f4feaef2a5c03edf611ab92e9751056a9be9bcf Mon Sep 17 00:00:00 2001 From: arfrie22 <43021241+arfrie22@users.noreply.github.com> Date: Fri, 21 Jun 2024 15:27:41 -0400 Subject: [PATCH 3/7] removed heap spam messages --- OpenThingsFramework.cpp | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/OpenThingsFramework.cpp b/OpenThingsFramework.cpp index 7e5b3a2..97e0fd4 100755 --- a/OpenThingsFramework.cpp +++ b/OpenThingsFramework.cpp @@ -20,7 +20,6 @@ OpenThingsFramework::OpenThingsFramework(uint16_t webServerPort, char *hdBuffer, headerBufferSize = HEADERS_BUFFER_SIZE; } missingPageCallback = defaultMissingPageCallback; - //localServer.begin(); }; void OpenThingsFramework::localServerBegin() { @@ -81,8 +80,6 @@ void OpenThingsFramework::onMissingPage(callback_t callback) { void OpenThingsFramework::localServerLoop() { - Serial.println("------"); - Serial.print(ESP.getFreeHeap()); Serial.print("A"); static unsigned long wait_to = 0; // timeout to wait for client data if (!wait_to) { localClient = localServer.acceptClient(); @@ -107,7 +104,6 @@ void OpenThingsFramework::localServerLoop() { // got new client data, reset wait_to to 0 wait_to = 0; - Serial.print(ESP.getFreeHeap()); Serial.print("B"); // Update the timeout for each data read to ensure that the total timeout is WIFI_CONNECTION_TIMEOUT. unsigned int timeout = millis()+WIFI_CONNECTION_TIMEOUT; @@ -129,8 +125,6 @@ void OpenThingsFramework::localServerLoop() { if(read==1 && rc=='\r') { break; } } - Serial.print(ESP.getFreeHeap()); Serial.print("C"); - DEBUG(Serial.printf((char *) F("Finished reading data from client. Request line + headers were %d bytes\n"), length);) buffer[length] = 0; @@ -141,13 +135,9 @@ void OpenThingsFramework::localServerLoop() { return; } - Serial.print(ESP.getFreeHeap()); Serial.print("D"); - //Serial.println(F("Parsing request")); Request request(buffer, length, false); - Serial.print(ESP.getFreeHeap()); Serial.print("E"); - char *bodyBuffer = NULL; // If the request was valid, read the body and add it to the Request object. if (request.getType() > INVALID) { @@ -205,8 +195,6 @@ void OpenThingsFramework::localServerLoop() { wait_to = millis()+WIFI_CONNECTION_TIMEOUT; } - Serial.print(ESP.getFreeHeap()); Serial.print("J"); - Serial.println("========"); DEBUG(Serial.println(F("Finished handling request"));) } From a9ec52e226d02b08ca44836d759bc0072e88a7e1 Mon Sep 17 00:00:00 2001 From: arfrie22 <43021241+arfrie22@users.noreply.github.com> Date: Mon, 24 Jun 2024 17:23:33 -0400 Subject: [PATCH 4/7] Updated websocket library --- OpenThingsFramework.cpp | 51 ++++------- OpenThingsFramework.h | 3 +- Websocket.cpp | 131 +++++++++++---------------- Websocket.h | 194 +++++++++++++++++++++++++--------------- library.json | 2 +- 5 files changed, 189 insertions(+), 192 deletions(-) diff --git a/OpenThingsFramework.cpp b/OpenThingsFramework.cpp index bdc3cc6..8ff7322 100755 --- a/OpenThingsFramework.cpp +++ b/OpenThingsFramework.cpp @@ -30,32 +30,24 @@ OpenThingsFramework::OpenThingsFramework(uint16_t webServerPort, const String &w webSocket = new WebsocketClient(); // Wrap the member function in a static function. - webSocket->onEvent([this](websockets::WebsocketsEvent event, String data) -> void { + webSocket->onEvent([this](WSEvent_t type, uint8_t *payload, size_t length) -> void { DEBUG(Serial.printf((char *) F("Received websocket event of type %d\n"), event);) - webSocketEventCallback(event, data); + webSocketEventCallback(type, payload, length); }); - // Wrap the member function in a static function. - webSocket->onMessage([this](websockets::WebsocketsMessage message) -> void { - DEBUG(Serial.println(F("Received websocket message"));) - webSocketMessageCallback(message); - }); - - bool connected; if (useSsl) { DEBUG(Serial.println(F("Connecting to websocket with SSL"));) - // connected = webSocket->connectSecure(webSocketHost, webSocketPort, "/socket/v1?deviceKey=" + deviceKey); + // webSocket->connectSecure(webSocketHost, webSocketPort, "/socket/v1?deviceKey=" + deviceKey); } else { DEBUG(Serial.println(F("Connecting to websocket without SSL"));) - connected = webSocket->connect(webSocketHost, webSocketPort, "/socket/v1?deviceKey=" + deviceKey); + webSocket->connect(webSocketHost, webSocketPort, "/socket/v1?deviceKey=" + deviceKey); } DEBUG(Serial.println(F("Initialized websocket"));) - // Try to reconnect to the websocket if the connection is lost. The first time it is connecting has a timeout of 1 second/ - // After that, it will try to reconnect every WEBSOCKET_RECONNECT_INTERVAL milliseconds. - webSocket->enableWebSocketReconnect(1000, WEBSOCKET_RECONNECT_INTERVAL); + // Try to reconnect to the websocket if the connection is lost. + webSocket->setReconnectInterval(WEBSOCKET_RECONNECT_INTERVAL); // Ping the server every 15 seconds with a timeout of 5 seconds, and treat 1 missed ping as a lost connection. - webSocket->enableWebSocketHeartbeat(15000, 5000, 1); + webSocket->enableHeartbeat(15000, 5000, 1); } char *makeMapKey(StringBuilder *sb, HTTPMethod method, const char *path) { @@ -202,9 +194,9 @@ void OpenThingsFramework::loop() { } } -void OpenThingsFramework::webSocketEventCallback(websockets::WebsocketsEvent event, String data) { - switch (event) { - case websockets::WebsocketsEvent::ConnectionClosed: { +void OpenThingsFramework::webSocketEventCallback(WSEvent_t type, uint8_t *payload, size_t length) { + switch (type) { + case WSEvent_DISCONNECTED: { DEBUG(Serial.println(F("Websocket connection closed"));) if (cloudStatus == CONNECTED) { // Make sure the cloud status is only set to disconnected if it was previously connected. @@ -213,35 +205,29 @@ void OpenThingsFramework::webSocketEventCallback(websockets::WebsocketsEvent eve break; } - case websockets::WebsocketsEvent::ConnectionOpened: { + case WSEvent_CONNECTED: { DEBUG(Serial.println(F("Websocket connection opened"));) setCloudStatus(CONNECTED); break; } - case websockets::WebsocketsEvent::GotPing: { + case WSEvent_PING: { DEBUG(Serial.println(F("Received a ping from the server"));) break; } - case websockets::WebsocketsEvent::GotPong: { + case WSEvent_PONG: { DEBUG(Serial.println(F("Received a pong from the server"));) break; } - } -} -void OpenThingsFramework::webSocketMessageCallback(websockets::WebsocketsMessage message) { - websockets::MessageType type = message.type(); - switch (type) { - case websockets::MessageType::Text: { + case WSEvent_TEXT: { #define PREFIX_LENGTH 5 #define ID_LENGTH 4 // Length of the prefix, request ID, carriage return, and line feed. #define HEADER_LENGTH PREFIX_LENGTH + ID_LENGTH + 2 - char *message_data = (char*) message.c_str(); - size_t length = message.length(); + char *message_data = (char*) payload; if (strncmp_P(message_data, (char *) F("FWD: "), PREFIX_LENGTH) == 0) { DEBUG(Serial.println(F("Message is a forwarded request."));) @@ -290,12 +276,7 @@ void OpenThingsFramework::webSocketMessageCallback(websockets::WebsocketsMessage } break; } - - case websockets::MessageType::Ping: - case websockets::MessageType::Pong: - // These do not get forwarded to the message callback. - break; - + default: { DEBUG(Serial.printf((char *) F("Received unsupported websocket event of type %d\n"), type);) break; diff --git a/OpenThingsFramework.h b/OpenThingsFramework.h index beed154..5d47338 100644 --- a/OpenThingsFramework.h +++ b/OpenThingsFramework.h @@ -44,8 +44,7 @@ namespace OTF { char *headerBuffer = NULL; int headerBufferSize = 0; - void webSocketMessageCallback(websockets::WebsocketsMessage message); - void webSocketEventCallback(websockets::WebsocketsEvent event, String data); + void webSocketEventCallback(WSEvent_t type, uint8_t *payload, size_t length); void fillResponse(const Request &req, Response &res); void localServerLoop(); diff --git a/Websocket.cpp b/Websocket.cpp index 5cf42af..4851932 100644 --- a/Websocket.cpp +++ b/Websocket.cpp @@ -1,102 +1,73 @@ #include "Websocket.h" -void WebsocketClient::enableWebSocketHeartbeat(unsigned long interval, unsigned long timeout, unsigned long maxMissed) { - webSocketHeartbeatInterval = interval; - webSocketHeartbeatTimeout = timeout; - webSocketHeartbeatMaxMissed = maxMissed; - webSocketHeartbeatEnabled = true; +void WebsocketClient::enableHeartbeat(unsigned long interval, unsigned long timeout, uint8_t maxMissed) { + WebSocketsClient::enableHeartbeat(interval, timeout, maxMissed); } -void WebsocketClient::disableWebSocketHeartbeat() { - webSocketHeartbeatEnabled = false; +void WebsocketClient::disableHeartbeat() { + WebSocketsClient::disableHeartbeat(); } -void WebsocketClient::enableWebSocketReconnect(unsigned long firstInterval, unsigned long interval) { - webSocketFirstReconnectInterval = firstInterval; - webSocketReconnectInterval = interval; - webSocketReconnectEnabled = true; +void WebsocketClient::setReconnectInterval(unsigned long interval) { + WebSocketsClient::setReconnectInterval(interval); } -void WebsocketClient::disableWebSocketReconnect() { - webSocketReconnectEnabled = false; +void WebsocketClient::poll() { + WebSocketsClient::loop(); } -void WebsocketClient::poll() { - websockets::WebsocketsClient::poll(); - if (webSocketHeartbeatEnabled && available()) { - if (!webSocketHeartbeatInProgress && (millis() - webSocketHeartbeatLastSent > webSocketHeartbeatInterval)) { - if (webSocketHeartbeatMissed >= webSocketHeartbeatMaxMissed) { - // Too many missed heartbeats, close the connection - WS_DEBUG("Too many missed heartbeats, closing connection\n"); - webSocketReconnectLastAttempt = 0; - webSocketHeartbeatMissed = 0; - websockets::WebsocketsClient::close(); - return; - } - - WS_DEBUG("Sending ping\n"); - ping(); - webSocketHeartbeatLastSent = millis(); - webSocketHeartbeatInProgress = true; - } +void WebsocketClient::onEvent(WebSocketEventCallback callback) { + WS_DEBUG("Setting event callback\n"); + this->eventCallback = callback; +} - if (webSocketHeartbeatInProgress && (millis() - webSocketHeartbeatLastSent > webSocketHeartbeatTimeout)) { - // Heartbeat timeout - WS_DEBUG("Heartbeat timeout\n"); - webSocketHeartbeatMissed++; - webSocketHeartbeatInProgress = false; - return; - } +void WebsocketClient::connect(WSInterfaceString host, int port, WSInterfaceString path) { + WS_DEBUG("Connecting to ws://%s:%d%s\n", host.c_str(), port, path.c_str()); + WebSocketsClient::begin(host, port, path); +} + +void WebsocketClient::connectSecure(WSInterfaceString host, int port, WSInterfaceString path) { + WebSocketsClient::beginSSL(host.c_str(), port, path.c_str()); +} + +bool WebsocketClient::stream() { + if (clientIsConnected(&_client)) { + isStreaming = sendFrame(&_client, WSop_text, NULL, 0, false, false); + } else { + isStreaming = false; + } + + return isStreaming; +} + +bool WebsocketClient::send(uint8_t *payload, size_t length, bool headerToPayload = false) { + WS_DEBUG("Sending message of length %d\n", length); + + if (length == 0) { + length = strlen((const char *) payload); } - if (webSocketReconnectEnabled && webSocketShouldReconnect && !available()) { - if (millis() - webSocketReconnectLastAttempt > (webSocketReconnectFirstAttempt ? webSocketFirstReconnectInterval : webSocketReconnectInterval)) { - WS_DEBUG("Reconnecting...\n"); - // Attempt to reconnect - if (isSecure) { - websockets::WebsocketsClient::connectSecure(host, port, path); - } else { - websockets::WebsocketsClient::connect(host, port, path); - } - - WS_DEBUG("Reconnect attempt complete\n"); - WS_DEBUG("Connection status: %d\n", websockets::WebsocketsClient::available()); - webSocketReconnectLastAttempt = millis(); + if (clientIsConnected(&_client)) { + if (isStreaming) { + return sendFrame(&_client, WSop_continuation, payload, length, false, headerToPayload); + } else { + return sendFrame(&_client, WSop_text, payload, length, true, headerToPayload); } } - websockets::WebsocketsClient::poll(); + return false; } -void WebsocketClient::onEvent(websockets::PartialEventCallback callback) { - WS_DEBUG("Setting event callback\n"); - this->eventCallback = [callback](WebsocketsClient &, websockets::WebsocketsEvent event, websockets::WSInterfaceString data) { - callback(event, data); - }; +bool WebsocketClient::send(const char *payload, size_t length, bool headerToPayload = false) { + return send((uint8_t *) payload, length, headerToPayload); } -bool WebsocketClient::connect(websockets::WSInterfaceString host, int port, websockets::WSInterfaceString path) { - WS_DEBUG("Connecting to ws://%s:%d%s\n", host.c_str(), port, path.c_str()); - this->host = host; - this->port = port; - this->path = path; - webSocketReconnectFirstAttempt = true; - webSocketShouldReconnect = true; - webSocketHeartbeatMissed = 0; - webSocketHeartbeatInProgress = false; - isSecure = false; - return websockets::WebsocketsClient::connect(host, port, path); -} +bool WebsocketClient::end() { + if (!isStreaming) { + return true; + } -bool WebsocketClient::connectSecure(websockets::WSInterfaceString host, int port, websockets::WSInterfaceString path) { - WS_DEBUG("Connecting to wss://%s:%d%s\n", host.c_str(), port, path.c_str()); - this->host = host; - this->port = port; - this->path = path; - webSocketReconnectFirstAttempt = true; - webSocketShouldReconnect = true; - webSocketHeartbeatMissed = 0; - webSocketHeartbeatInProgress = false; - isSecure = true; - return websockets::WebsocketsClient::connectSecure(host, port, path); + bool res = sendFrame(&_client, WSop_continuation, NULL, 0, true, false); + isStreaming = !res; + return res; } \ No newline at end of file diff --git a/Websocket.h b/Websocket.h index 860b937..9c2d356 100644 --- a/Websocket.h +++ b/Websocket.h @@ -1,50 +1,64 @@ #ifndef WEBSOCKET_H #define WEBSOCKET_H -#include +#include -#ifdef DEBUG -#define WS_DEBUG(...) Serial.print("Websocket: "); Serial.printf(__VA_ARGS__) +#ifdef DEBUG +#define WS_DEBUG(...) \ + Serial.print("Websocket: "); \ + Serial.printf(__VA_ARGS__) #else #define WS_DEBUG(...) #endif -class WebsocketClient : public websockets::WebsocketsClient { +typedef String WSInterfaceString; + +typedef enum { + WSEvent_ERROR, + WSEvent_DISCONNECTED, + WSEvent_CONNECTED, + WSEvent_TEXT, + WSEvent_BIN, + // WStype_FRAGMENT_TEXT_START, + // WStype_FRAGMENT_BIN_START, + // WStype_FRAGMENT, + // WStype_FRAGMENT_FIN, + WSEvent_PING, + WSEvent_PONG, +} WSEvent_t; + +typedef std::function WebSocketEventCallback; + +class WebsocketClient : private WebSocketsClient { public: - WebsocketClient() : websockets::WebsocketsClient() { + WebsocketClient() : WebSocketsClient() { // Set up a callback to handle incoming pings - websockets::WebsocketsClient::onEvent([this](websockets::WebsocketsEvent event, websockets::WSInterfaceString message) { - switch (event) { - case websockets::WebsocketsEvent::GotPing: - // Respond to the ping - WS_DEBUG("Ponged a ping\n"); - pong(message); + WebSocketsClient::onEvent([this](WStype_t type, uint8_t *payload, size_t length) { + switch (type) { + case WStype_DISCONNECTED: + WS_DEBUG("Disconnected!\n"); + _callback(WSEvent_DISCONNECTED, payload, length); break; - case websockets::WebsocketsEvent::GotPong: - WS_DEBUG("Received a pong\n"); - if (webSocketHeartbeatEnabled) { - // If heartbeat is enabled, reset the missed coun and set the heartbeat in progress flag to false - webSocketHeartbeatMissed = 0; - webSocketHeartbeatInProgress = false; - } + case WStype_CONNECTED: { + WS_DEBUG("Connected to url: %s\n", payload); + _callback(WSEvent_CONNECTED, payload, length); + } break; + case WStype_TEXT: + WS_DEBUG("get text: %s\n", payload); + _callback(WSEvent_TEXT, payload, length); break; - case websockets::WebsocketsEvent::ConnectionOpened: - WS_DEBUG("Connection opened\n"); - // Mark the first attempt to reconnect as false, so it will use the slower reconnect interval - webSocketReconnectFirstAttempt = false; + case WStype_BIN: + WS_DEBUG("get binary length: %u\n", length); + _callback(WSEvent_BIN, payload, length); break; - case websockets::WebsocketsEvent::ConnectionClosed: - WS_DEBUG("Connection closed\n"); - // If the connection was closed, set the heartbeat in progress flag to false - if (webSocketHeartbeatEnabled) { - webSocketHeartbeatMissed = 0; - webSocketHeartbeatInProgress = false; - } + case WStype_PING: + WS_DEBUG("get ping\n"); + _callback(WSEvent_PING, payload, length); + break; + case WStype_PONG: + WS_DEBUG("get pong\n"); + _callback(WSEvent_PONG, payload, length); break; - } - - if (eventCallback) { - eventCallback(*this, event, message); } }); } @@ -55,10 +69,8 @@ class WebsocketClient : public websockets::WebsocketsClient { * @param host String containing the host name or IP address of the server * @param port Port number to connect to * @param path Path to connect to on the server - * @return true Connection was successful - * @return false Connection was unsuccessful */ - bool connect(websockets::WSInterfaceString host, int port, websockets::WSInterfaceString path); + void connect(WSInterfaceString host, int port, WSInterfaceString path); /** * @brief Connect to a websocket server using a secure connection @@ -66,10 +78,8 @@ class WebsocketClient : public websockets::WebsocketsClient { * @param host String containing the host name or IP address of the server * @param port Port number to connect to * @param path Path to connect to on the server - * @return true Connection was successful - * @return false Connection was unsuccessful */ - bool connectSecure(websockets::WSInterfaceString host, int port, websockets::WSInterfaceString path); + void connectSecure(WSInterfaceString host, int port, WSInterfaceString path); /** * @brief Close the connection to the websocket server @@ -77,10 +87,7 @@ class WebsocketClient : public websockets::WebsocketsClient { */ void close() { WS_DEBUG("Closing connection\n"); - webSocketShouldReconnect = false; - webSocketHeartbeatMissed = 0; - webSocketHeartbeatInProgress = false; - websockets::WebsocketsClient::close(); + WebSocketsClient::disconnect(); } /** @@ -90,27 +97,20 @@ class WebsocketClient : public websockets::WebsocketsClient { * @param timeout Time in milliseconds to wait for a response to the heartbeat * @param maxMissed Maximum number of missed heartbeats before closing the connection */ - void enableWebSocketHeartbeat(unsigned long interval, unsigned long timeout, unsigned long maxMissed); + void enableHeartbeat(unsigned long interval, unsigned long timeout, uint8_t maxMissed); /** * @brief Disable the heartbeat * */ - void disableWebSocketHeartbeat(); + void disableHeartbeat(); /** - * @brief Enable automatic reconnection to the websocket server + * @brief Sets the interval between reconnection attempts * - * @param firstInterval Time in milliseconds to wait before the first successful connection attempt * @param interval Time in milliseconds between reconnection attempts */ - void enableWebSocketReconnect(unsigned long firstInterval, unsigned long interval); - - /** - * @brief Disable automatic reconnection - * - */ - void disableWebSocketReconnect(); + void setReconnectInterval(unsigned long interval); /** * @brief Poll the websocket connection @@ -123,30 +123,76 @@ class WebsocketClient : public websockets::WebsocketsClient { * * @param callback Function to run when an event occurs */ - void onEvent(websockets::PartialEventCallback callback); + void onEvent(WebSocketEventCallback callback); + + /** + * @brief Enable streaming mode + * @return true Streaming mode enabled + * @return false Streaming mode not enabled + */ + bool stream(); + + /** + * @brief Send a text message to the server + * @param payload Data to send + * @param length Length of the data to send + * @param headerToPayload bool (see sendFrame for more details) + * @return true Message was successful + * @return false Message was unsuccessful + */ + bool send(uint8_t *payload, size_t length, bool headerToPayload = false); + + /** + * @brief Send a text message to the server + * @param payload Data to send + * @param length Length of the data to send + * @param headerToPayload bool (see sendFrame for more details) + * @return true Message was successful + * @return false Message was unsuccessful + */ + bool send(const char *payload, size_t length, bool headerToPayload = false); + + /** + * @brief End the stream + * @return true Stream ended + * @return false Stream failed to end + */ + bool end(); private: - unsigned int webSocketHeartbeatInterval = 0; - unsigned int webSocketHeartbeatTimeout = 0; - unsigned long webSocketHeartbeatLastSent = 0; - unsigned int webSocketHeartbeatMissed = 0; - unsigned int webSocketHeartbeatMaxMissed = 0; - bool webSocketHeartbeatInProgress = false; - bool webSocketHeartbeatEnabled = false; - - unsigned int webSocketReconnectInterval = 0; - unsigned int webSocketFirstReconnectInterval = 0; - unsigned long webSocketReconnectLastAttempt = 0; - bool webSocketReconnectFirstAttempt = true; - bool webSocketReconnectEnabled = false; - bool webSocketShouldReconnect = false; - websockets::WSInterfaceString host; - int port; - websockets::WSInterfaceString path; - - websockets::EventCallback eventCallback = nullptr; + // unsigned int webSocketHeartbeatInterval = 0; + // unsigned int webSocketHeartbeatTimeout = 0; + // unsigned long webSocketHeartbeatLastSent = 0; + // unsigned int webSocketHeartbeatMissed = 0; + // unsigned int webSocketHeartbeatMaxMissed = 0; + // bool webSocketHeartbeatInProgress = false; + // bool webSocketHeartbeatEnabled = false; + + // unsigned int webSocketReconnectInterval = 0; + // unsigned int webSocketFirstReconnectInterval = 0; + // unsigned long webSocketReconnectLastAttempt = 0; + // bool webSocketReconnectFirstAttempt = true; + // bool webSocketReconnectEnabled = false; + // bool webSocketShouldReconnect = false; + + bool enableReconnect = false; + unsigned long reconnectInterval = 0; + + WSInterfaceString host; + int port; + WSInterfaceString path; + + WebSocketEventCallback eventCallback = nullptr; + + void _callback(WSEvent_t type, uint8_t * payload, size_t length) { + if (eventCallback) { + eventCallback(type, payload, length); + } + } bool isSecure = false; + + bool isStreaming = false; }; #endif \ No newline at end of file diff --git a/library.json b/library.json index 1f870bf..6bdd84d 100644 --- a/library.json +++ b/library.json @@ -4,6 +4,6 @@ "author": "rayshobby", "description": "OpenThings Framework Library", "dependencies": { - "ArduinoWebsockets": "gilmaimon/ArduinoWebsockets@^0.5.4" + "WebSockets": "links2004/WebSockets@^2.4.2" } } \ No newline at end of file From 4f42bc5075cb384161ba44a0361760a536cb124d Mon Sep 17 00:00:00 2001 From: arfrie22 <43021241+arfrie22@users.noreply.github.com> Date: Mon, 24 Jun 2024 17:28:43 -0400 Subject: [PATCH 5/7] fixed function signatures --- Websocket.cpp | 4 ++-- Websocket.h | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Websocket.cpp b/Websocket.cpp index 4851932..b525cca 100644 --- a/Websocket.cpp +++ b/Websocket.cpp @@ -40,7 +40,7 @@ bool WebsocketClient::stream() { return isStreaming; } -bool WebsocketClient::send(uint8_t *payload, size_t length, bool headerToPayload = false) { +bool WebsocketClient::send(uint8_t *payload, size_t length, bool headerToPayload) { WS_DEBUG("Sending message of length %d\n", length); if (length == 0) { @@ -58,7 +58,7 @@ bool WebsocketClient::send(uint8_t *payload, size_t length, bool headerToPayload return false; } -bool WebsocketClient::send(const char *payload, size_t length, bool headerToPayload = false) { +bool WebsocketClient::send(const char *payload, size_t length, bool headerToPayload) { return send((uint8_t *) payload, length, headerToPayload); } diff --git a/Websocket.h b/Websocket.h index 9c2d356..370971b 100644 --- a/Websocket.h +++ b/Websocket.h @@ -35,6 +35,10 @@ class WebsocketClient : private WebSocketsClient { // Set up a callback to handle incoming pings WebSocketsClient::onEvent([this](WStype_t type, uint8_t *payload, size_t length) { switch (type) { + case WStype_ERROR: + WS_DEBUG("Error!\n"); + _callback(WSEvent_ERROR, payload, length); + break; case WStype_DISCONNECTED: WS_DEBUG("Disconnected!\n"); _callback(WSEvent_DISCONNECTED, payload, length); @@ -59,6 +63,9 @@ class WebsocketClient : private WebSocketsClient { WS_DEBUG("get pong\n"); _callback(WSEvent_PONG, payload, length); break; + default: + WS_DEBUG("Unknown event type: %d\n", type); + break; } }); } From 17623221aae31d91583d08514ca873acd6a203b3 Mon Sep 17 00:00:00 2001 From: arfrie22 <43021241+arfrie22@users.noreply.github.com> Date: Tue, 25 Jun 2024 12:14:42 -0400 Subject: [PATCH 6/7] fixed OTF sending multiple start streams which causes the server to crash --- OpenThingsFramework.cpp | 7 ++++--- StringBuilder.cpp | 8 ++++++-- StringBuilder.hpp | 1 + Websocket.cpp | 11 +++++++++-- Websocket.h | 4 ++-- 5 files changed, 22 insertions(+), 9 deletions(-) diff --git a/OpenThingsFramework.cpp b/OpenThingsFramework.cpp index 8ff7322..8c2fcf6 100755 --- a/OpenThingsFramework.cpp +++ b/OpenThingsFramework.cpp @@ -153,7 +153,7 @@ void OpenThingsFramework::localServerLoop() { // Make response stream to client Response res = Response(); - res.enableStream([this](const char *buffer, size_t length, bool streaming) -> void { + res.enableStream([this](const char *buffer, size_t length, bool first_message) -> void { localClient->write(buffer, length); }, [this]() -> void { localClient->flush(); @@ -238,9 +238,10 @@ void OpenThingsFramework::webSocketEventCallback(WSEvent_t type, uint8_t *payloa Request request(&message_data[HEADER_LENGTH], length - HEADER_LENGTH, true); Response res = Response(); // Make response stream to websocket - res.enableStream([this] (const char *buffer, size_t length, bool streaming) -> void { + res.enableStream([this] (const char *buffer, size_t length, bool first_message) -> void { // If the websocket is not already streaming, start streaming. - if (!streaming) { + if (first_message) { + WS_DEBUG("Starting stream\n"); webSocket->stream(); } diff --git a/StringBuilder.cpp b/StringBuilder.cpp index 8dc5b45..9c56625 100644 --- a/StringBuilder.cpp +++ b/StringBuilder.cpp @@ -20,9 +20,10 @@ void StringBuilder::bprintf(char *format, va_list args) { size_t res = vsnprintf(&buffer[length], maxLength - length, format, args); - if (stream_write && ((res >= maxLength) || (length + res >= maxLength))) { + if (streaming && ((res >= maxLength) || (length + res >= maxLength))) { // If in streaming mode flush the buffer and continue writing if the data doesn't fit. stream_write(buffer, length, streaming); + first_message = false; stream_flush(); clear(); res = vsnprintf(&buffer[length], maxLength - length, format, args); @@ -75,8 +76,9 @@ size_t StringBuilder::_write(const char *data, size_t data_length, bool use_pgm) // If the buffer is full, flush it and continue writing. if (write_length == 0) { - if (stream_write) { + if (streaming) { stream_write(buffer, length, streaming); + first_message = false; stream_flush(); clear(); } else { @@ -111,6 +113,8 @@ size_t StringBuilder::write_P(const __FlashStringHelper *const data, size_t data } void StringBuilder::enableStream(stream_write_t write, stream_flush_t flush, stream_end_t end) { + streaming = true; + first_message = true; stream_write = write; stream_flush = flush; stream_end = end; diff --git a/StringBuilder.hpp b/StringBuilder.hpp index ab883c3..7aa2880 100644 --- a/StringBuilder.hpp +++ b/StringBuilder.hpp @@ -25,6 +25,7 @@ namespace OTF { stream_flush_t stream_flush = nullptr; stream_end_t stream_end = nullptr; bool streaming = false; + bool first_message = true; /** * Internal write function diff --git a/Websocket.cpp b/Websocket.cpp index b525cca..98ecd03 100644 --- a/Websocket.cpp +++ b/Websocket.cpp @@ -31,8 +31,13 @@ void WebsocketClient::connectSecure(WSInterfaceString host, int port, WSInterfac } bool WebsocketClient::stream() { + if (isStreaming) { + WS_DEBUG("Already streaming\n"); + return false; + } + if (clientIsConnected(&_client)) { - isStreaming = sendFrame(&_client, WSop_text, NULL, 0, false, false); + isStreaming = sendFrame(&_client, WSop_text, (uint8_t *)"", 0, false, false); } else { isStreaming = false; } @@ -67,7 +72,9 @@ bool WebsocketClient::end() { return true; } - bool res = sendFrame(&_client, WSop_continuation, NULL, 0, true, false); + WS_DEBUG("Ending stream\n"); + + bool res = sendFrame(&_client, WSop_continuation, (uint8_t *)"", 0, true, false); isStreaming = !res; return res; } \ No newline at end of file diff --git a/Websocket.h b/Websocket.h index 370971b..82bf6f8 100644 --- a/Websocket.h +++ b/Websocket.h @@ -3,7 +3,7 @@ #include -#ifdef DEBUG +#ifdef ENABLE_DEBUG #define WS_DEBUG(...) \ Serial.print("Websocket: "); \ Serial.printf(__VA_ARGS__) @@ -29,7 +29,7 @@ typedef enum { typedef std::function WebSocketEventCallback; -class WebsocketClient : private WebSocketsClient { +class WebsocketClient : protected WebSocketsClient { public: WebsocketClient() : WebSocketsClient() { // Set up a callback to handle incoming pings From 59649543e748d62a3869968b74f29948e0318bf1 Mon Sep 17 00:00:00 2001 From: arfrie22 <43021241+arfrie22@users.noreply.github.com> Date: Tue, 25 Jun 2024 12:52:28 -0400 Subject: [PATCH 7/7] enable debug comment --- Websocket.h | 1 + 1 file changed, 1 insertion(+) diff --git a/Websocket.h b/Websocket.h index 82bf6f8..92a72ae 100644 --- a/Websocket.h +++ b/Websocket.h @@ -3,6 +3,7 @@ #include +// #define ENABLE_DEBUG #ifdef ENABLE_DEBUG #define WS_DEBUG(...) \ Serial.print("Websocket: "); \