From 8037f0c6f123e5851cfd6da044616738914349fc Mon Sep 17 00:00:00 2001 From: Etienne Adam Date: Fri, 10 Jun 2016 10:00:03 +0200 Subject: [PATCH] Add [DELAY ] argument to NACK command --- src/queue.c | 59 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 15 deletions(-) diff --git a/src/queue.c b/src/queue.c index 81c9f4c..cb94eca 100644 --- a/src/queue.c +++ b/src/queue.c @@ -837,7 +837,6 @@ void getjobCommand(client *c) { } /* ENQUEUE job-id-1 job-id-2 ... job-id-N - * NACK job-id-1 job-id-2 ... job-id-N * * If the job is active, queue it if job retry != 0. * If the job is in any other state, do nothing. @@ -846,12 +845,8 @@ void getjobCommand(client *c) { * NOTE: Even jobs with retry set to 0 are enqueued! Be aware that * using this command may violate the at-most-once contract. * - * Return the number of jobs actually move from active to queued state. - * - * The difference between ENQUEUE and NACK is that the latter will propagate - * cluster messages in a way that makes the nacks counter in the receiver - * to increment. */ -void enqueueGenericCommand(client *c, int nack) { + * Return the number of jobs actually move from active to queued state. */ +void enqueueCommand(client *c) { int j, enqueued = 0; if (validateJobIDs(c,c->argv+1,c->argc-1) == C_ERR) return; @@ -861,20 +856,54 @@ void enqueueGenericCommand(client *c, int nack) { job *job = lookupJob(c->argv[j]->ptr); if (job == NULL) continue; - if (job->state == JOB_STATE_ACTIVE && enqueueJob(job,nack) == C_OK) + if (job->state == JOB_STATE_ACTIVE && enqueueJob(job,0) == C_OK) enqueued++; } addReplyLongLong(c,enqueued); } -/* See enqueueGenericCommand(). */ -void enqueueCommand(client *c) { - enqueueGenericCommand(c,0); -} - -/* See enqueueGenericCommand(). */ +/* NACK job-id-1 job-id-2 ... job-id-N [DELAY ] + * + * The difference between ENQUEUE and NACK is that the latter will propagate + * cluster messages in a way that makes the nacks counter in the receiver + * to increment. + * + * Return the number of jobs actually move from active to (delayed) queued state. + */ void nackCommand(client *c) { - enqueueGenericCommand(c,1); + long long delay = 0; + int j, enqueued = 0; + int arg_count = c->argc; + + if (arg_count > 2) { + char *opt = c->argv[arg_count-2]->ptr; + if (!strcasecmp(opt,"delay")) { + int retval = getLongLongFromObject(c->argv[arg_count-1],&delay); + if (retval != C_OK || delay <= 0) { + addReplyError(c,"DELAY must be a number greater than zero"); + return; + } + arg_count -= 2; + } + } + + if (validateJobIDs(c,c->argv+1,arg_count-1) == C_ERR) return; + + for (j = 1; j < arg_count; j++) { + char *opt = c->argv[j]->ptr; + job *job = lookupJob(opt); + if (job == NULL) continue; + + clusterBroadcastQueued(job, CLUSTERMSG_FLAG0_INCR_NACKS); + job->num_nacks++; + + if (job->state == JOB_STATE_ACTIVE) { + updateJobRequeueTime(job,server.mstime+delay*1000); + enqueued++; + } + } + + addReplyLongLong(c,enqueued); } /* DEQUEUE job-id-1 job-id-2 ... job-id-N