/**
*
*/
-int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel)
+int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel, int max_in_flight)
{
struct ev_loop *loop;
loop = ev_default_loop(0);
LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n", topic, channel);
// setup the reader
rdr = new_nsq_reader(loop, topic, channel, (void *)ctx, NULL, NULL, NULL, nsq_message_handler);
+ rdr->max_in_flight = max_in_flight;
if (consumer_use_nsqd == 0) {
snprintf(address, 128, "%.*s", nsq_lookupd_address.len, nsq_lookupd_address.s);
int pid;
int i;
int workers = dbn_consumer_workers / nsq_topic_channel_counter;
+ int max_in_flight = 1;
+
+ if (nsq_max_in_flight > 1) {
+ max_in_flight = nsq_max_in_flight;
+ }
fire_init_event(rank);
return -1; /* error */
if (pid==0){
close(nsq_worker_pipes_fds[i*2+1]);
- return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, DEFAULT_CHANNEL));
+ return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, DEFAULT_CHANNEL, max_in_flight));
}
}
} else {
return -1; /* error */
if (pid==0){
close(nsq_worker_pipes_fds[i*2+1]);
- return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, tc->channel));
+ return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, tc->channel, max_in_flight));
}
}
tc = tc->next;