2 * dmq module - distributed message queue
4 * Copyright (C) 2011 Bucur Marius - Ovidiu
6 * This file is part of Kamailio, a free SIP server.
8 * Kamailio is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version
13 * Kamailio is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
27 #include "../../core/data_lump_rpl.h"
28 #include "../../core/mod_fix.h"
29 #include "../../core/sip_msg_clone.h"
30 #include "../../core/parser/parse_from.h"
31 #include "../../core/parser/parse_to.h"
34 * @brief set the body of a response
36 static int set_reply_body(struct sip_msg *msg, str *body, str *content_type)
41 /* add content-type */
42 len = sizeof("Content-Type: ") - 1 + content_type->len + CRLF_LEN;
43 buf = pkg_malloc(sizeof(char) * (len));
46 LM_ERR("out of pkg memory\n");
49 memcpy(buf, "Content-Type: ", sizeof("Content-Type: ") - 1);
50 memcpy(buf + sizeof("Content-Type: ") - 1, content_type->s,
52 memcpy(buf + sizeof("Content-Type: ") - 1 + content_type->len, CRLF,
54 if(add_lump_rpl(msg, buf, len, LUMP_RPL_HDR) == 0) {
55 LM_ERR("failed to insert content-type lump\n");
62 if(add_lump_rpl(msg, body->s, body->len, LUMP_RPL_BODY) == 0) {
63 LM_ERR("cannot add body lump\n");
71 * @brief dmq worker loop
73 void worker_loop(int id)
76 dmq_job_t *current_job;
77 peer_reponse_t peer_response;
80 dmq_node_t *dmq_node = NULL;
82 worker = &workers[id];
84 if(worker_usleep <= 0) {
85 LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
86 lock_get(&worker->lock);
87 LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
89 sleep_us(worker_usleep);
92 /* remove from queue until empty */
93 while(job_queue_size(worker->queue) > 0) {
94 /* fill the response with 0's */
95 memset(&peer_response, 0, sizeof(peer_response));
96 current_job = job_queue_pop(worker->queue);
97 /* job_queue_pop might return NULL if queue is empty */
99 /* extract the from uri */
100 if(current_job->msg->from->parsed) {
105 if(parse_from_header(current_job->msg) < 0) {
106 LM_ERR("bad sip message or missing From hdr\n");
108 dmq_node = find_dmq_node_uri(node_list,
109 &((struct to_body *)current_job->msg->from->parsed)
113 ret_value = current_job->f(
114 current_job->msg, &peer_response, dmq_node);
116 LM_ERR("running job failed\n");
119 /* add the body to the reply */
120 if(peer_response.body.s) {
121 if(set_reply_body(current_job->msg, &peer_response.body,
122 &peer_response.content_type)
124 LM_ERR("error adding lumps\n");
129 if(peer_response.resp_code>0 && peer_response.reason.s!=NULL
130 && peer_response.reason.len>0) {
131 if(slb.freply(current_job->msg, peer_response.resp_code,
132 &peer_response.reason)
134 LM_ERR("error sending reply\n");
136 LM_DBG("done sending reply\n");
139 LM_WARN("no reply sent\n");
141 worker->jobs_processed++;
144 /* if body given, free the lumps and free the body */
145 if(peer_response.body.s) {
146 del_nonshm_lump_rpl(¤t_job->msg->reply_lump);
147 pkg_free(peer_response.body.s);
149 if((current_job->msg->from->parsed) && (not_parsed)) {
150 free_to(current_job->msg->from->parsed);
153 shm_free(current_job->msg);
154 shm_free(current_job);
161 * @brief add a dmq job
163 int add_dmq_job(struct sip_msg *msg, dmq_peer_t *peer)
165 int i, found_available = 0;
166 dmq_job_t new_job = {0};
167 dmq_worker_t *worker;
168 struct sip_msg *cloned_msg = NULL;
171 /* Pre-parse headers so they are included in our clone. Parsing later
172 * will result in linking pkg structures to shm msg, eventually leading
173 * to memory errors. */
174 if(parse_headers(msg, HDR_EOH_F, 0) == -1) {
175 LM_ERR("failed to parse headers\n");
179 cloned_msg = sip_msg_shm_clone(msg, &cloned_msg_len, 1);
181 LM_ERR("error cloning sip message\n");
185 new_job.f = peer->callback;
186 new_job.msg = cloned_msg;
187 new_job.orig_peer = peer;
189 LM_ERR("error in add_dmq_job: no workers spawned\n");
192 if(!workers[0].queue) {
193 LM_ERR("workers not (yet) initialized\n");
196 /* initialize the worker with the first one */
198 /* search for an available worker, or, if not possible,
199 * for the least busy one */
200 for(i = 0; i < num_workers; i++) {
201 if(job_queue_size(workers[i].queue) == 0) {
202 worker = &workers[i];
205 } else if(job_queue_size(workers[i].queue)
206 < job_queue_size(worker->queue)) {
207 worker = &workers[i];
210 if(!found_available) {
211 LM_DBG("no available worker found, passing job"
212 " to the least busy one [%d %d]\n",
213 worker->pid, job_queue_size(worker->queue));
215 if(job_queue_push(worker->queue, &new_job) < 0) {
218 if(worker_usleep <= 0) {
219 lock_release(&worker->lock);
223 if(cloned_msg != NULL) {
224 shm_free(cloned_msg);
230 * @brief init dmq worker
232 int init_worker(dmq_worker_t *worker)
234 memset(worker, 0, sizeof(*worker));
235 if(worker_usleep <= 0) {
236 lock_init(&worker->lock);
237 // acquire the lock for the first time - so that dmq_worker_loop blocks
238 lock_get(&worker->lock);
240 worker->queue = alloc_job_queue();
241 if(worker->queue==NULL) {
242 LM_ERR("queue could not be initialized\n");
249 * @brief allog dmq job queue
251 job_queue_t *alloc_job_queue()
255 queue = shm_malloc(sizeof(job_queue_t));
257 LM_ERR("no more shm\n");
260 memset(queue, 0, sizeof(job_queue_t));
261 atomic_set(&queue->count, 0);
262 lock_init(&queue->lock);
267 * @ brief destroy job queue
269 void destroy_job_queue(job_queue_t *queue)
276 * @brief return job queue size
278 int job_queue_size(job_queue_t *queue)
280 return atomic_get(&queue->count);
284 * @brief push to job queue
286 int job_queue_push(job_queue_t *queue, dmq_job_t *job)
288 /* we need to copy the dmq_job into a newly created dmq_job in shm */
291 newjob = shm_malloc(sizeof(dmq_job_t));
293 LM_ERR("no more shm\n");
299 lock_get(&queue->lock);
301 newjob->next = queue->back;
303 queue->back->prev = newjob;
305 queue->back = newjob;
307 queue->front = newjob;
309 atomic_inc(&queue->count);
310 lock_release(&queue->lock);
315 * @brief pop from job queue
317 dmq_job_t *job_queue_pop(job_queue_t *queue)
320 lock_get(&queue->lock);
322 lock_release(&queue->lock);
325 front = queue->front;
327 queue->front = front->prev;
328 front->prev->next = NULL;
333 atomic_dec(&queue->count);
334 lock_release(&queue->lock);