diff --git a/nmsg/io.c b/nmsg/io.c index 9f13fb06..f29f3877 100644 --- a/nmsg/io.c +++ b/nmsg/io.c @@ -741,29 +741,24 @@ io_write(struct nmsg_io_thr *iothr, struct nmsg_io_output *io_output, nmsg_io_t io = iothr->io; nmsg_res res; - /* It's possible a set "count" has been reached. */ - check_close_event(iothr, io_output, 1); - - if (io->stop) { - reset_close_event(iothr, io_output); - nmsg_message_destroy(&msg); - return (nmsg_res_stop); - } res = nmsg_output_write(io_output->output, msg); if (io_output->output->type != nmsg_output_type_callback) nmsg_message_destroy(&msg); - /* - * Reset only after the write, in case another thread invokes - * check_close_event and makes changes to io_output in the meantime. - */ - reset_close_event(iothr, io_output); - if (res != nmsg_res_success) return (res); - + atomic_fetch_add_explicit(&io->io_count_nmsg_payload_out, 1, memory_order_relaxed); + + /* It's possible a set "count" has been reached. */ + check_close_event(iothr, io_output, 1); + reset_close_event(iothr, io_output); + + if (io->stop) { + // nmsg_message_destroy(&msg); + return (nmsg_res_stop); + } return (res); } @@ -797,6 +792,8 @@ check_close_event(struct nmsg_io_thr *iothr, struct nmsg_io_output *io_output, u if (io->count > 0 && io_output->count_next_close == 0) io_output->count_next_close = io->count; + io_output->count_nmsg_payload_out += count; + if (io->count > 0 && io_output->count_nmsg_payload_out == io_output->count_next_close) { @@ -856,13 +853,6 @@ check_close_event(struct nmsg_io_thr *iothr, struct nmsg_io_output *io_output, u } out: - /* - * This incr is implicitly locked IF it's used, and this counter is - * only used IF io->count > 0; that condition results in an acquired - * lock at the beginning of this function. - */ - io_output->count_nmsg_payload_out += count; - if (io->close_fp != NULL || io->count > 0) pthread_mutex_unlock(&io_output->lock); }