diff --git a/examples/legorange.c b/examples/legorange.c index 0b33bc3..e86dafc 100644 --- a/examples/legorange.c +++ b/examples/legorange.c @@ -124,8 +124,8 @@ int main(int argc, char *argv[]) { loop, &cb); - wc_datasync_init(ctx); - wc_datasync_connect(ctx); + wc_datasync_init(ctx, loop); + wc_datasync_connect(ctx, loop); /* if stdin has data to read, call stdin_watcher() */ ev_io_init(&stdin_watcher, stdin_cb, STDIN_FILENO, EV_READ); diff --git a/examples/wcchat.c b/examples/wcchat.c index 8759c45..36a3c9a 100644 --- a/examples/wcchat.c +++ b/examples/wcchat.c @@ -95,8 +95,8 @@ int main(int argc, char *argv[]) { loop, &cb); - wc_datasync_init(ctx); - wc_datasync_connect(ctx); + wc_datasync_init(ctx, loop); + wc_datasync_connect(ctx, loop); json_name = json_object_new_string(name); escaped_name = json_object_to_json_string(json_name); diff --git a/include/webcom-c/webcom-base.h b/include/webcom-c/webcom-base.h index fa8800d..ebd3116 100644 --- a/include/webcom-c/webcom-base.h +++ b/include/webcom-c/webcom-base.h @@ -242,8 +242,9 @@ void wc_dispatch_fd_event(wc_context_t *ctx, struct wc_pollargs *pa); * **WC_EVENT_SET_TIMER** event has fired. * @param ctx the context * @param timer the timer identifier + * @return a flag to tell if timer should be released */ -void wc_dispatch_timer_event(wc_context_t *ctx, enum wc_timersrc timer); +int wc_dispatch_timer_event(wc_context_t *ctx, enum wc_timersrc timer); /** * gets the user data associated to this context diff --git a/include/webcom-c/webcom-datasync.h b/include/webcom-c/webcom-datasync.h index a2e2f03..e2bb652 100644 --- a/include/webcom-c/webcom-datasync.h +++ b/include/webcom-c/webcom-datasync.h @@ -43,7 +43,7 @@ * @param ctx the Webcom context * @return a pointer to an opaque structure representing the datasync service */ -wc_datasync_context_t *wc_datasync_init(wc_context_t *ctx); +wc_datasync_context_t *wc_datasync_init(wc_context_t *ctx, void *foreign_loop); /** * Function that keeps the connection to the webcom server alive. @@ -71,7 +71,7 @@ int wc_datasync_keepalive(wc_context_t *ctx); * * @param ctx the context */ -void wc_datasync_connect(wc_context_t *ctx); +void wc_datasync_connect(wc_context_t *ctx, void *foreign_loop); /** * Gracefully closes the connection to the Webcom datasync server. diff --git a/include/webcom-c/webcom.h b/include/webcom-c/webcom.h index 09b74d5..da2f46f 100644 --- a/include/webcom-c/webcom.h +++ b/include/webcom-c/webcom.h @@ -83,7 +83,7 @@ * &cb); * * // we are going to use the datasync service - * wc_datasync_init(ctx); + * wc_datasync_init(ctx, loop); * * // ask the SDK to establish the connection to the server * wc_datasync_connect(ctx); diff --git a/lib/auth/auth.c b/lib/auth/auth.c index 1de3b10..5edc2f3 100644 --- a/lib/auth/auth.c +++ b/lib/auth/auth.c @@ -214,13 +214,10 @@ static int curl_sock_cb(CURL *e, curl_socket_t s, int what, void *data, void *so static int curl_timer_cb(CURLM *multi, long timeout_ms, void *userp) { WL_DBG("timer request on %p: %ld ms", multi, timeout_ms); - int handles; struct wc_timerargs wcta; wc_context_t *ctx = userp; - if (timeout_ms == 0) { - curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &handles); - } else if (timeout_ms > 0) { + if (timeout_ms >= 0) { wcta.ms = timeout_ms; wcta.repeat = 0; wcta.timer = WC_TIMER_AUTH; @@ -296,12 +293,14 @@ int wc_auth_with_password(wc_context_t *ctx, const char *email, const char *pass return 1; } -void wc_auth_service(wc_context_t *ctx, int fd) { - int curl_still_running; +int wc_auth_service(wc_context_t *ctx, int fd) { + int release_timer = 0, curl_still_running; curl_multi_socket_action(ctx->auth.auth_curl_multi_handle, fd, 0, &curl_still_running); if (curl_still_running == 0) { curl_multi_remove_handle(ctx->auth.auth_curl_multi_handle, ctx->auth.auth_curl_handle); curl_easy_cleanup(ctx->auth.auth_curl_handle); + release_timer = 1; } + return release_timer; } diff --git a/lib/datasync/datasync.c b/lib/datasync/datasync.c index 2f9fdcf..0a1b48b 100644 --- a/lib/datasync/datasync.c +++ b/lib/datasync/datasync.c @@ -42,7 +42,7 @@ #include "listen/listen_registry.h" #define WEBCOM_PROTOCOL_VERSION "5" -#define WEBCOM_WS_PATH "/_wss/.ws" +#define WEBCOM_WS_PATH "/datasync/v2" static int _wc_datasync_process_message(wc_context_t *ctx, wc_msg_t *msg) { struct wc_timerargs ta; @@ -255,9 +255,8 @@ static struct lws_protocols protocols[] = { {.name=NULL} }; -wc_datasync_context_t *wc_datasync_init(wc_context_t *ctx) { +wc_datasync_context_t *wc_datasync_init(wc_context_t *ctx, void *foreign_loop) { size_t ws_path_l; - struct lws_context_creation_info lws_ctx_creation_nfo; memset(&lws_ctx_creation_nfo, 0, sizeof(lws_ctx_creation_nfo)); @@ -265,37 +264,45 @@ wc_datasync_context_t *wc_datasync_init(wc_context_t *ctx) { lws_ctx_creation_nfo.protocols = protocols; #if defined(LWS_LIBRARY_VERSION_MAJOR) && LWS_LIBRARY_VERSION_MAJOR >= 2 lws_ctx_creation_nfo.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + lws_ctx_creation_nfo.options |= ctx->event_loop; #endif lws_ctx_creation_nfo.gid = -1; lws_ctx_creation_nfo.uid = -1; - -#if defined(LWS_LIBRARY_VERSION_NUMBER) && LWS_LIBRARY_VERSION_NUMBER < 2002000 - char *proxy; + + void *foreign_loops[1]; + foreign_loops[0] = foreign_loop; + lws_ctx_creation_nfo.foreign_loops = foreign_loops; + + char *proxy = getenv("http_proxy"); +#if defined(LWS_LIBRARY_VERSION_NUMBER) && LWS_LIBRARY_VERSION_NUMBER < 2002000 /* hack to make LWS support the http_proxy variable beginning with "http://" */ - proxy = getenv("http_proxy"); if (proxy != NULL && strncmp("http://", proxy, 7) == 0) { proxy += 7; } - lws_ctx_creation_nfo.http_proxy_address = proxy; #endif + lws_ctx_creation_nfo.http_proxy_address = proxy; + ctx->datasync.lws_cci.context = lws_create_context(&lws_ctx_creation_nfo); ctx->datasync.lws_cci.address = ctx->host; ctx->datasync.lws_cci.host = ctx->host; ctx->datasync.lws_cci.port = (int)ctx->port; ctx->datasync.lws_cci.ssl_connection = !ctx->no_tls; + ws_path_l = ( - sizeof(WEBCOM_WS_PATH) - 1 - + 3 - + sizeof(WEBCOM_PROTOCOL_VERSION) - 1 - + 4 - + strlen(ctx->app_name) - + 1 ); + sizeof(WEBCOM_WS_PATH) - 1 + + 1 + + strlen(ctx->app_name) + + 6 + + sizeof(WEBCOM_PROTOCOL_VERSION) - 1 + + 1 ); + ctx->datasync.lws_cci.path = malloc(ws_path_l); if (ctx->datasync.lws_cci.path == NULL) { return NULL; } - snprintf((char*)ctx->datasync.lws_cci.path, ws_path_l, "%s?v=%s&ns=%s", WEBCOM_WS_PATH, WEBCOM_PROTOCOL_VERSION, ctx->app_name); + + snprintf((char*)ctx->datasync.lws_cci.path, ws_path_l, "%s/%s/ws?v=%s", WEBCOM_WS_PATH, ctx->app_name, WEBCOM_PROTOCOL_VERSION); ctx->datasync.lws_cci.protocol = protocols[0].name; ctx->datasync.lws_cci.ietf_version_or_minus_one = -1; ctx->datasync.lws_cci.userdata = (void *)ctx; @@ -312,19 +319,56 @@ wc_datasync_context_t *wc_datasync_init(wc_context_t *ctx) { return &ctx->datasync; } +/* + * This represents your object that "contains" the client connection and has + * the client connection bound to it + */ +static struct _wc_client_conn { + lws_sorted_usec_list_t sul; + wc_context_t *ctx; + uint16_t retry_count; +} mco; + +static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 }; + +static const lws_retry_bo_t retry = { + .retry_ms_table = backoff_ms, + .retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms), + .conceal_count = LWS_ARRAY_SIZE(backoff_ms), + + .secs_since_valid_ping = 3, + .secs_since_valid_hangup = 10, + .jitter_percent = 20, +}; + +static void _wc_datasync_connect_client(lws_sorted_usec_list_t *sul) +{ + struct _wc_client_conn *m = lws_container_of(sul, struct _wc_client_conn, sul); + + /* Connect if we are not connected to the server. */ + m->ctx->datasync.lws_conn = lws_client_connect_via_info(&m->ctx->datasync.lws_cci); + + if (!m->ctx->datasync.lws_conn) { + if (lws_retry_sul_schedule(m->ctx->datasync.lws_cci.context, 0, sul, &retry, _wc_datasync_connect_client, &m->retry_count)) { + m->ctx->datasync.state = WC_CNX_STATE_DISCONNECTED; + WL_ERR("%s: connection attempts exhausted\n", __func__); + } + } +} + void _wc_datasync_connect(wc_context_t *ctx) { if (ctx->datasync.state == WC_CNX_STATE_DISCONNECTED) { ctx->datasync.state = WC_CNX_STATE_CONNECTING; - ctx->datasync.lws_conn = lws_client_connect_via_info(&ctx->datasync.lws_cci); - lws_service(ctx->datasync.lws_cci.context, 0); + mco.ctx = ctx; + lws_sul_schedule(ctx->datasync.lws_cci.context, 0, &mco.sul, _wc_datasync_connect_client, 1); } else { WL_WARN("bad state for _wc_datasync_connect() in context %p, expected %d, got %d", ctx, WC_CNX_STATE_DISCONNECTED, ctx->datasync.state); } } -void wc_datasync_connect(wc_context_t *ctx) { +void wc_datasync_connect(wc_context_t *ctx, void *loop) { if (ctx->datasync_init == 0) { - wc_datasync_init(ctx); + wc_datasync_init(ctx, loop); } _wc_datasync_schedule_reconnect(ctx); } @@ -356,3 +400,4 @@ int wc_datasync_keepalive(wc_context_t *ctx) { return sent; } + diff --git a/lib/datasync/datasync_priv.h b/lib/datasync/datasync_priv.h index 6141d11..4efd758 100644 --- a/lib/datasync/datasync_priv.h +++ b/lib/datasync/datasync_priv.h @@ -98,7 +98,7 @@ void wc_datasync_req_response_dispatch(wc_context_t *dsctx, wc_response_t *respo void wc_datasync_push_id(struct pushid_state *s, int64_t time, char* buf) ; void wc_datasync_dispatch_data(wc_context_t *dsctx, wc_push_t *push); void wc_datasync_cleanup_data_routes(wc_datasync_data_route_t **table); -void wc_auth_service(wc_context_t *ctx, int fd); +int wc_auth_service(wc_context_t *ctx, int fd); void _wc_datasync_connect(wc_context_t *ctx); void wc_datasync_service_socket(wc_context_t *ctx, struct wc_pollargs *pa); diff --git a/lib/libev.c b/lib/libev.c index 3c5304b..91188b4 100644 --- a/lib/libev.c +++ b/lib/libev.c @@ -29,6 +29,7 @@ #include "webcom-c/webcom-utils.h" #include "webcom-c/webcom-log.h" +#include "webcom_base_priv.h" #include "event_libs_priv.h" struct wc_libev_integration_data { @@ -64,7 +65,10 @@ static inline void _wc_on_timer_libev_cb ( struct wc_libev_integration_data *lid = wc_context_get_user_data(ctx); enum wc_timersrc timer = w - lid->timer_events; - wc_dispatch_timer_event(ctx, timer); + // stop timer if curl request done + if (wc_dispatch_timer_event(ctx, timer)) { + ev_timer_stop(lid->loop, w); + } } int _wc_libev_cb (wc_event_t event, wc_context_t *ctx, void *data, size_t len) { @@ -130,6 +134,11 @@ int _wc_libev_cb (wc_event_t event, wc_context_t *ctx, void *data, size_t len) { timer = &lid->timer_events[timerargs->timer]; in = ((ev_tstamp)timerargs->ms) / 1000.; repeat = timerargs->repeat ? in : 0.; + + // this event may be called multiple times to update timeout value, we must stop + // previously started timer (overwise DEL_TIMER event may delete the wrong timer!) + ev_timer_stop(lid->loop, timer); + #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wstrict-aliasing" ev_timer_init(timer, _wc_on_timer_libev_cb, in, repeat); @@ -177,8 +186,8 @@ wc_context_t *wc_context_create_with_libev(struct wc_context_options *options, s ev_options.callback = _wc_libev_cb; ev_options.user_data = integration_data; - ret = wc_context_create(&ev_options); + ret->event_loop = LWS_SERVER_OPTION_LIBEV; if (ret == NULL) { free(integration_data); diff --git a/lib/libevent.c b/lib/libevent.c index 396212b..cc8b40d 100644 --- a/lib/libevent.c +++ b/lib/libevent.c @@ -155,6 +155,7 @@ wc_context_t *wc_context_new_with_libevent(char *host, uint16_t port, char *appl integration_data->next_try = 0; ret = wc_datasync_init(host, port, application, _wc_libevent_cb, integration_data); + ret->event_loop = LWS_SERVER_OPTION_LIBEVENT; if (ret == NULL) { free(integration_data); diff --git a/lib/libuv.c b/lib/libuv.c index 2163228..8b81843 100644 --- a/lib/libuv.c +++ b/lib/libuv.c @@ -156,6 +156,7 @@ wc_context_t *wc_context_new_with_libuv(char *host, uint16_t port, char *applica integration_data->next_try = 0; ret = wc_datasync_init(host, port, application, _wc_libuv_cb, integration_data); + ret->event_loop = LWS_SERVER_OPTION_LIBUV; if (ret == NULL) { free(integration_data); diff --git a/lib/webcom_base.c b/lib/webcom_base.c index ec9de0d..0cfc9c3 100644 --- a/lib/webcom_base.c +++ b/lib/webcom_base.c @@ -67,7 +67,9 @@ void wc_dispatch_fd_event(wc_context_t *ctx, struct wc_pollargs *pa) { } } -void wc_dispatch_timer_event(wc_context_t *ctx, enum wc_timersrc timer) { +int wc_dispatch_timer_event(wc_context_t *ctx, enum wc_timersrc timer) { + int release_timer = 0; + switch (timer) { case WC_TIMER_DATASYNC_KEEPALIVE: wc_datasync_keepalive(ctx); @@ -76,11 +78,13 @@ void wc_dispatch_timer_event(wc_context_t *ctx, enum wc_timersrc timer) { _wc_datasync_connect(ctx); break; case WC_TIMER_AUTH: - wc_auth_service(ctx, -1); + release_timer = wc_auth_service(ctx, CURL_SOCKET_TIMEOUT); break; default: break; } + + return release_timer; } diff --git a/lib/webcom_base_priv.h b/lib/webcom_base_priv.h index 6244556..2c5b890 100644 --- a/lib/webcom_base_priv.h +++ b/lib/webcom_base_priv.h @@ -37,6 +37,7 @@ wc_datasync_context_t *wc_get_datasync(wc_context_t *); wc_auth_context_t *wc_get_auth(wc_context_t *); struct wc_context { + int event_loop; wc_on_event_cb_t callback; void *user; char *app_name; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ae388b6..0578894 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -284,3 +284,20 @@ target_link_libraries( webcom-c ) + +## tests for websockets +add_executable( + webcom-test-ws + test-ws.c +) + +target_include_directories( + webcom-test-ws + PRIVATE + ${webcom-sdk-c-tests_SOURCE_DIR}/../include +) + +target_link_libraries( + webcom-test-ws + webcom-c +) diff --git a/test/test-auth.c b/test/test-auth.c index 0ecdf4f..92d35fa 100644 --- a/test/test-auth.c +++ b/test/test-auth.c @@ -85,8 +85,8 @@ void on_auth_success(wc_context_t *ctx, struct wc_auth_info* ai) { token = strdup(ai->token); - wc_datasync_init(ctx); - wc_datasync_connect(ctx); + wc_datasync_init(ctx, EV_DEFAULT); + wc_datasync_connect(ctx, EV_DEFAULT); } void on_auth_error(wc_context_t *ctx, char* error) { diff --git a/test/test-cnx.c b/test/test-cnx.c index 951a725..ce84c16 100644 --- a/test/test-cnx.c +++ b/test/test-cnx.c @@ -36,7 +36,7 @@ static void on_connected(wc_context_t *ctx) { STFU_TRUE("The sever sent the handshake", 1); wc_datasync_close_cnx(ctx); - + ev_break(EV_DEFAULT, EVBREAK_ALL); } static int on_disconnected(wc_context_t *ctx) { STFU_TRUE("The connection was closed", 1); @@ -70,14 +70,13 @@ int main(void) { .port = 443, .app_name = "legorange" }; - STFU_TRUE ("Establish a new Connection", cnx1 = wc_context_create_with_libev(&options, loop, &cb) ); if (cnx1 == NULL) goto end; - wc_datasync_init(cnx1); - wc_datasync_connect(cnx1); + wc_datasync_init(cnx1, loop); + wc_datasync_connect(cnx1, loop); puts("\tprocessing the event loop..."); diff --git a/test/test-on-child.c b/test/test-on-child.c index 016121c..83dfd86 100644 --- a/test/test-on-child.c +++ b/test/test-on-child.c @@ -87,8 +87,8 @@ int main(void) { ); if (ctx == NULL) goto end; - wc_datasync_init(ctx); - wc_datasync_connect(ctx); + wc_datasync_init(ctx, loop); + wc_datasync_connect(ctx, loop); puts("\tprocessing the event loop..."); diff --git a/test/test-on-value.c b/test/test-on-value.c index 9592abe..4366ea5 100644 --- a/test/test-on-value.c +++ b/test/test-on-value.c @@ -79,8 +79,8 @@ int main(void) { ); if (ctx == NULL) goto end; - wc_datasync_init(ctx); - wc_datasync_connect(ctx); + wc_datasync_init(ctx, loop); + wc_datasync_connect(ctx, loop); puts("\tprocessing the event loop..."); diff --git a/tools/wcsh.c b/tools/wcsh.c index 1af1734..7709a12 100644 --- a/tools/wcsh.c +++ b/tools/wcsh.c @@ -258,7 +258,7 @@ int main(int argc, char *argv[]) { wc_log_use_custom(wcsh_log); - wc_datasync_init(ctx); + wc_datasync_init(ctx, loop); if (is_interactive) { @@ -544,7 +544,7 @@ static void exec_log(int argc, char **argv) { static void exec_connect(int argc, char **argv) { disconnect = 0; - wc_datasync_connect(ctx); + wc_datasync_connect(ctx, EV_DEFAULT); } static void exec_disconnect(int argc, char **argv) {