Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/legorange.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ int main(int argc, char *argv[]) {
loop,
&cb);

wc_datasync_init(ctx);
wc_datasync_connect(ctx);
wc_datasync_init(ctx, loop);
wc_datasync_connect(ctx, loop);

/* if stdin has data to read, call stdin_watcher() */
ev_io_init(&stdin_watcher, stdin_cb, STDIN_FILENO, EV_READ);
Expand Down
4 changes: 2 additions & 2 deletions examples/wcchat.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ int main(int argc, char *argv[]) {
loop,
&cb);

wc_datasync_init(ctx);
wc_datasync_connect(ctx);
wc_datasync_init(ctx, loop);
wc_datasync_connect(ctx, loop);

json_name = json_object_new_string(name);
escaped_name = json_object_to_json_string(json_name);
Expand Down
3 changes: 2 additions & 1 deletion include/webcom-c/webcom-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ void wc_dispatch_fd_event(wc_context_t *ctx, struct wc_pollargs *pa);
* **WC_EVENT_SET_TIMER** event has fired.
* @param ctx the context
* @param timer the timer identifier
* @return a flag to tell if timer should be released
*/
void wc_dispatch_timer_event(wc_context_t *ctx, enum wc_timersrc timer);
int wc_dispatch_timer_event(wc_context_t *ctx, enum wc_timersrc timer);

/**
* gets the user data associated to this context
Expand Down
4 changes: 2 additions & 2 deletions include/webcom-c/webcom-datasync.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* @param ctx the Webcom context
* @return a pointer to an opaque structure representing the datasync service
*/
wc_datasync_context_t *wc_datasync_init(wc_context_t *ctx);
wc_datasync_context_t *wc_datasync_init(wc_context_t *ctx, void *foreign_loop);

/**
* Function that keeps the connection to the webcom server alive.
Expand Down Expand Up @@ -71,7 +71,7 @@ int wc_datasync_keepalive(wc_context_t *ctx);
*
* @param ctx the context
*/
void wc_datasync_connect(wc_context_t *ctx);
void wc_datasync_connect(wc_context_t *ctx, void *foreign_loop);

/**
* Gracefully closes the connection to the Webcom datasync server.
Expand Down
2 changes: 1 addition & 1 deletion include/webcom-c/webcom.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
* &cb);
*
* // we are going to use the datasync service
* wc_datasync_init(ctx);
* wc_datasync_init(ctx, loop);
*
* // ask the SDK to establish the connection to the server
* wc_datasync_connect(ctx);
Expand Down
11 changes: 5 additions & 6 deletions lib/auth/auth.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,10 @@ static int curl_sock_cb(CURL *e, curl_socket_t s, int what, void *data, void *so

static int curl_timer_cb(CURLM *multi, long timeout_ms, void *userp) {
WL_DBG("timer request on %p: %ld ms", multi, timeout_ms);
int handles;
struct wc_timerargs wcta;
wc_context_t *ctx = userp;

if (timeout_ms == 0) {
curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &handles);
} else if (timeout_ms > 0) {
if (timeout_ms >= 0) {
wcta.ms = timeout_ms;
wcta.repeat = 0;
wcta.timer = WC_TIMER_AUTH;
Expand Down Expand Up @@ -296,12 +293,14 @@ int wc_auth_with_password(wc_context_t *ctx, const char *email, const char *pass
return 1;
}

void wc_auth_service(wc_context_t *ctx, int fd) {
int curl_still_running;
int wc_auth_service(wc_context_t *ctx, int fd) {
int release_timer = 0, curl_still_running;

curl_multi_socket_action(ctx->auth.auth_curl_multi_handle, fd, 0, &curl_still_running);
if (curl_still_running == 0) {
curl_multi_remove_handle(ctx->auth.auth_curl_multi_handle, ctx->auth.auth_curl_handle);
curl_easy_cleanup(ctx->auth.auth_curl_handle);
release_timer = 1;
}
return release_timer;
}
83 changes: 64 additions & 19 deletions lib/datasync/datasync.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#include "listen/listen_registry.h"

#define WEBCOM_PROTOCOL_VERSION "5"
#define WEBCOM_WS_PATH "/_wss/.ws"
#define WEBCOM_WS_PATH "/datasync/v2"

static int _wc_datasync_process_message(wc_context_t *ctx, wc_msg_t *msg) {
struct wc_timerargs ta;
Expand Down Expand Up @@ -255,47 +255,54 @@ static struct lws_protocols protocols[] = {
{.name=NULL}
};

wc_datasync_context_t *wc_datasync_init(wc_context_t *ctx) {
wc_datasync_context_t *wc_datasync_init(wc_context_t *ctx, void *foreign_loop) {
size_t ws_path_l;

struct lws_context_creation_info lws_ctx_creation_nfo;
memset(&lws_ctx_creation_nfo, 0, sizeof(lws_ctx_creation_nfo));

lws_ctx_creation_nfo.port = CONTEXT_PORT_NO_LISTEN;
lws_ctx_creation_nfo.protocols = protocols;
#if defined(LWS_LIBRARY_VERSION_MAJOR) && LWS_LIBRARY_VERSION_MAJOR >= 2
lws_ctx_creation_nfo.options |= LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
lws_ctx_creation_nfo.options |= ctx->event_loop;
#endif
lws_ctx_creation_nfo.gid = -1;
lws_ctx_creation_nfo.uid = -1;

#if defined(LWS_LIBRARY_VERSION_NUMBER) && LWS_LIBRARY_VERSION_NUMBER < 2002000
char *proxy;

void *foreign_loops[1];
foreign_loops[0] = foreign_loop;
lws_ctx_creation_nfo.foreign_loops = foreign_loops;

char *proxy = getenv("http_proxy");
#if defined(LWS_LIBRARY_VERSION_NUMBER) && LWS_LIBRARY_VERSION_NUMBER < 2002000
/* hack to make LWS support the http_proxy variable beginning with "http://" */
proxy = getenv("http_proxy");
if (proxy != NULL && strncmp("http://", proxy, 7) == 0) {
proxy += 7;
}
lws_ctx_creation_nfo.http_proxy_address = proxy;
#endif

lws_ctx_creation_nfo.http_proxy_address = proxy;

ctx->datasync.lws_cci.context = lws_create_context(&lws_ctx_creation_nfo);
ctx->datasync.lws_cci.address = ctx->host;
ctx->datasync.lws_cci.host = ctx->host;
ctx->datasync.lws_cci.port = (int)ctx->port;
ctx->datasync.lws_cci.ssl_connection = !ctx->no_tls;

ws_path_l = (
sizeof(WEBCOM_WS_PATH) - 1
+ 3
+ sizeof(WEBCOM_PROTOCOL_VERSION) - 1
+ 4
+ strlen(ctx->app_name)
+ 1 );
sizeof(WEBCOM_WS_PATH) - 1
+ 1
+ strlen(ctx->app_name)
+ 6
+ sizeof(WEBCOM_PROTOCOL_VERSION) - 1
+ 1 );

ctx->datasync.lws_cci.path = malloc(ws_path_l);
if (ctx->datasync.lws_cci.path == NULL) {
return NULL;
}
snprintf((char*)ctx->datasync.lws_cci.path, ws_path_l, "%s?v=%s&ns=%s", WEBCOM_WS_PATH, WEBCOM_PROTOCOL_VERSION, ctx->app_name);

snprintf((char*)ctx->datasync.lws_cci.path, ws_path_l, "%s/%s/ws?v=%s", WEBCOM_WS_PATH, ctx->app_name, WEBCOM_PROTOCOL_VERSION);
ctx->datasync.lws_cci.protocol = protocols[0].name;
ctx->datasync.lws_cci.ietf_version_or_minus_one = -1;
ctx->datasync.lws_cci.userdata = (void *)ctx;
Expand All @@ -312,19 +319,56 @@ wc_datasync_context_t *wc_datasync_init(wc_context_t *ctx) {
return &ctx->datasync;
}

/*
* This represents your object that "contains" the client connection and has
* the client connection bound to it
*/
static struct _wc_client_conn {
lws_sorted_usec_list_t sul;
wc_context_t *ctx;
uint16_t retry_count;
} mco;

static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 };

static const lws_retry_bo_t retry = {
.retry_ms_table = backoff_ms,
.retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms),
.conceal_count = LWS_ARRAY_SIZE(backoff_ms),

.secs_since_valid_ping = 3,
.secs_since_valid_hangup = 10,
.jitter_percent = 20,
};

static void _wc_datasync_connect_client(lws_sorted_usec_list_t *sul)
{
struct _wc_client_conn *m = lws_container_of(sul, struct _wc_client_conn, sul);

/* Connect if we are not connected to the server. */
m->ctx->datasync.lws_conn = lws_client_connect_via_info(&m->ctx->datasync.lws_cci);

if (!m->ctx->datasync.lws_conn) {
if (lws_retry_sul_schedule(m->ctx->datasync.lws_cci.context, 0, sul, &retry, _wc_datasync_connect_client, &m->retry_count)) {
m->ctx->datasync.state = WC_CNX_STATE_DISCONNECTED;
WL_ERR("%s: connection attempts exhausted\n", __func__);
}
}
}

void _wc_datasync_connect(wc_context_t *ctx) {
if (ctx->datasync.state == WC_CNX_STATE_DISCONNECTED) {
ctx->datasync.state = WC_CNX_STATE_CONNECTING;
ctx->datasync.lws_conn = lws_client_connect_via_info(&ctx->datasync.lws_cci);
lws_service(ctx->datasync.lws_cci.context, 0);
mco.ctx = ctx;
lws_sul_schedule(ctx->datasync.lws_cci.context, 0, &mco.sul, _wc_datasync_connect_client, 1);
} else {
WL_WARN("bad state for _wc_datasync_connect() in context %p, expected %d, got %d", ctx, WC_CNX_STATE_DISCONNECTED, ctx->datasync.state);
}
}

void wc_datasync_connect(wc_context_t *ctx) {
void wc_datasync_connect(wc_context_t *ctx, void *loop) {
if (ctx->datasync_init == 0) {
wc_datasync_init(ctx);
wc_datasync_init(ctx, loop);
}
_wc_datasync_schedule_reconnect(ctx);
}
Expand Down Expand Up @@ -356,3 +400,4 @@ int wc_datasync_keepalive(wc_context_t *ctx) {

return sent;
}

2 changes: 1 addition & 1 deletion lib/datasync/datasync_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void wc_datasync_req_response_dispatch(wc_context_t *dsctx, wc_response_t *respo
void wc_datasync_push_id(struct pushid_state *s, int64_t time, char* buf) ;
void wc_datasync_dispatch_data(wc_context_t *dsctx, wc_push_t *push);
void wc_datasync_cleanup_data_routes(wc_datasync_data_route_t **table);
void wc_auth_service(wc_context_t *ctx, int fd);
int wc_auth_service(wc_context_t *ctx, int fd);
void _wc_datasync_connect(wc_context_t *ctx);
void wc_datasync_service_socket(wc_context_t *ctx, struct wc_pollargs *pa);

Expand Down
13 changes: 11 additions & 2 deletions lib/libev.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "webcom-c/webcom-utils.h"
#include "webcom-c/webcom-log.h"

#include "webcom_base_priv.h"
#include "event_libs_priv.h"

struct wc_libev_integration_data {
Expand Down Expand Up @@ -64,7 +65,10 @@ static inline void _wc_on_timer_libev_cb (
struct wc_libev_integration_data *lid = wc_context_get_user_data(ctx);
enum wc_timersrc timer = w - lid->timer_events;

wc_dispatch_timer_event(ctx, timer);
// stop timer if curl request done
if (wc_dispatch_timer_event(ctx, timer)) {
ev_timer_stop(lid->loop, w);
}
}

int _wc_libev_cb (wc_event_t event, wc_context_t *ctx, void *data, size_t len) {
Expand Down Expand Up @@ -130,6 +134,11 @@ int _wc_libev_cb (wc_event_t event, wc_context_t *ctx, void *data, size_t len) {
timer = &lid->timer_events[timerargs->timer];
in = ((ev_tstamp)timerargs->ms) / 1000.;
repeat = timerargs->repeat ? in : 0.;

// this event may be called multiple times to update timeout value, we must stop
// previously started timer (overwise DEL_TIMER event may delete the wrong timer!)
ev_timer_stop(lid->loop, timer);

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wstrict-aliasing"
ev_timer_init(timer, _wc_on_timer_libev_cb, in, repeat);
Expand Down Expand Up @@ -177,8 +186,8 @@ wc_context_t *wc_context_create_with_libev(struct wc_context_options *options, s
ev_options.callback = _wc_libev_cb;
ev_options.user_data = integration_data;


ret = wc_context_create(&ev_options);
ret->event_loop = LWS_SERVER_OPTION_LIBEV;

if (ret == NULL) {
free(integration_data);
Expand Down
1 change: 1 addition & 0 deletions lib/libevent.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ wc_context_t *wc_context_new_with_libevent(char *host, uint16_t port, char *appl
integration_data->next_try = 0;

ret = wc_datasync_init(host, port, application, _wc_libevent_cb, integration_data);
ret->event_loop = LWS_SERVER_OPTION_LIBEVENT;

if (ret == NULL) {
free(integration_data);
Expand Down
1 change: 1 addition & 0 deletions lib/libuv.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ wc_context_t *wc_context_new_with_libuv(char *host, uint16_t port, char *applica
integration_data->next_try = 0;

ret = wc_datasync_init(host, port, application, _wc_libuv_cb, integration_data);
ret->event_loop = LWS_SERVER_OPTION_LIBUV;

if (ret == NULL) {
free(integration_data);
Expand Down
8 changes: 6 additions & 2 deletions lib/webcom_base.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ void wc_dispatch_fd_event(wc_context_t *ctx, struct wc_pollargs *pa) {
}
}

void wc_dispatch_timer_event(wc_context_t *ctx, enum wc_timersrc timer) {
int wc_dispatch_timer_event(wc_context_t *ctx, enum wc_timersrc timer) {
int release_timer = 0;

switch (timer) {
case WC_TIMER_DATASYNC_KEEPALIVE:
wc_datasync_keepalive(ctx);
Expand All @@ -76,11 +78,13 @@ void wc_dispatch_timer_event(wc_context_t *ctx, enum wc_timersrc timer) {
_wc_datasync_connect(ctx);
break;
case WC_TIMER_AUTH:
wc_auth_service(ctx, -1);
release_timer = wc_auth_service(ctx, CURL_SOCKET_TIMEOUT);
break;
default:
break;
}

return release_timer;
}


Expand Down
1 change: 1 addition & 0 deletions lib/webcom_base_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ wc_datasync_context_t *wc_get_datasync(wc_context_t *);
wc_auth_context_t *wc_get_auth(wc_context_t *);

struct wc_context {
int event_loop;
wc_on_event_cb_t callback;
void *user;
char *app_name;
Expand Down
17 changes: 17 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,20 @@ target_link_libraries(
webcom-c
)


## tests for websockets
add_executable(
webcom-test-ws
test-ws.c
)

target_include_directories(
webcom-test-ws
PRIVATE
${webcom-sdk-c-tests_SOURCE_DIR}/../include
)

target_link_libraries(
webcom-test-ws
webcom-c
)
4 changes: 2 additions & 2 deletions test/test-auth.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ void on_auth_success(wc_context_t *ctx, struct wc_auth_info* ai) {

token = strdup(ai->token);

wc_datasync_init(ctx);
wc_datasync_connect(ctx);
wc_datasync_init(ctx, EV_DEFAULT);
wc_datasync_connect(ctx, EV_DEFAULT);
}

void on_auth_error(wc_context_t *ctx, char* error) {
Expand Down
7 changes: 3 additions & 4 deletions test/test-cnx.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ static void on_connected(wc_context_t *ctx) {
STFU_TRUE("The sever sent the handshake", 1);

wc_datasync_close_cnx(ctx);

ev_break(EV_DEFAULT, EVBREAK_ALL);
}
static int on_disconnected(wc_context_t *ctx) {
STFU_TRUE("The connection was closed", 1);
Expand Down Expand Up @@ -70,14 +70,13 @@ int main(void) {
.port = 443,
.app_name = "legorange"
};

STFU_TRUE ("Establish a new Connection",
cnx1 = wc_context_create_with_libev(&options, loop, &cb)
);
if (cnx1 == NULL) goto end;

wc_datasync_init(cnx1);
wc_datasync_connect(cnx1);
wc_datasync_init(cnx1, loop);
wc_datasync_connect(cnx1, loop);

puts("\tprocessing the event loop...");

Expand Down
Loading