Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,6 @@ void getjobCommand(client *c) {
}

/* ENQUEUE job-id-1 job-id-2 ... job-id-N
* NACK job-id-1 job-id-2 ... job-id-N
*
* If the job is active, queue it if job retry != 0.
* If the job is in any other state, do nothing.
Expand All @@ -846,12 +845,8 @@ void getjobCommand(client *c) {
* NOTE: Even jobs with retry set to 0 are enqueued! Be aware that
* using this command may violate the at-most-once contract.
*
* Return the number of jobs actually move from active to queued state.
*
* The difference between ENQUEUE and NACK is that the latter will propagate
* cluster messages in a way that makes the nacks counter in the receiver
* to increment. */
void enqueueGenericCommand(client *c, int nack) {
* Return the number of jobs actually move from active to queued state. */
void enqueueCommand(client *c) {
int j, enqueued = 0;

if (validateJobIDs(c,c->argv+1,c->argc-1) == C_ERR) return;
Expand All @@ -861,20 +856,54 @@ void enqueueGenericCommand(client *c, int nack) {
job *job = lookupJob(c->argv[j]->ptr);
if (job == NULL) continue;

if (job->state == JOB_STATE_ACTIVE && enqueueJob(job,nack) == C_OK)
if (job->state == JOB_STATE_ACTIVE && enqueueJob(job,0) == C_OK)
enqueued++;
}
addReplyLongLong(c,enqueued);
}

/* See enqueueGenericCommand(). */
void enqueueCommand(client *c) {
enqueueGenericCommand(c,0);
}

/* See enqueueGenericCommand(). */
/* NACK job-id-1 job-id-2 ... job-id-N [DELAY <delay>]
*
* The difference between ENQUEUE and NACK is that the latter will propagate
* cluster messages in a way that makes the nacks counter in the receiver
* to increment.
*
* Return the number of jobs actually move from active to (delayed) queued state.
*/
void nackCommand(client *c) {
enqueueGenericCommand(c,1);
long long delay = 0;
int j, enqueued = 0;
int arg_count = c->argc;

if (arg_count > 2) {
char *opt = c->argv[arg_count-2]->ptr;
if (!strcasecmp(opt,"delay")) {
int retval = getLongLongFromObject(c->argv[arg_count-1],&delay);
if (retval != C_OK || delay <= 0) {
addReplyError(c,"DELAY must be a number greater than zero");
return;
}
arg_count -= 2;
}
}

if (validateJobIDs(c,c->argv+1,arg_count-1) == C_ERR) return;

for (j = 1; j < arg_count; j++) {
char *opt = c->argv[j]->ptr;
job *job = lookupJob(opt);
if (job == NULL) continue;

clusterBroadcastQueued(job, CLUSTERMSG_FLAG0_INCR_NACKS);
job->num_nacks++;

if (job->state == JOB_STATE_ACTIVE) {
updateJobRequeueTime(job,server.mstime+delay*1000);
enqueued++;
}
}

addReplyLongLong(c,enqueued);
}

/* DEQUEUE job-id-1 job-id-2 ... job-id-N
Expand Down