all: updated FSF address in GPL text
[sip-router] / modules / dmq / worker.c
1 /*
2  * $Id$
3  *
4  * dmq module - distributed message queue
5  *
6  * Copyright (C) 2011 Bucur Marius - Ovidiu
7  *
8  * This file is part of Kamailio, a free SIP server.
9  *
10  * Kamailio is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version
14  *
15  * Kamailio is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License 
21  * along with this program; if not, write to the Free Software 
22  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
23  *
24  */
25
26 #include "dmq.h"
27 #include "peer.h"
28 #include "worker.h"
29 #include "../../data_lump_rpl.h"
30 #include "../../mod_fix.h"
31 #include "../../sip_msg_clone.h"
32
33 /**
34  * @brief set the body of a response
35  */
36 static int set_reply_body(struct sip_msg* msg, str* body, str* content_type)
37 {
38         char* buf;
39         int len;
40         int value_len;
41         str nb = *body;
42         str nc = *content_type;
43
44         /* add content-type */
45         value_len = nc.len;
46         len=sizeof("Content-Type: ") - 1 + value_len + CRLF_LEN;
47         buf=pkg_malloc(sizeof(char)*(len));
48
49         if (buf==0) {
50                 LM_ERR("out of pkg memory\n");
51                 return -1;
52         }
53         memcpy(buf, "Content-Type: ", sizeof("Content-Type: ") - 1);
54         memcpy(buf+sizeof("Content-Type: ") - 1, nc.s, value_len);
55         memcpy(buf+sizeof("Content-Type: ") - 1 + value_len, CRLF, CRLF_LEN);
56         if (add_lump_rpl(msg, buf, len, LUMP_RPL_HDR) == 0) {
57                 LM_ERR("failed to insert content-type lump\n");
58                 pkg_free(buf);
59                 return -1;
60         }
61         pkg_free(buf);
62
63         /* add body */
64         if (add_lump_rpl(msg, nb.s, nb.len, LUMP_RPL_BODY) == 0) {
65                 LM_ERR("cannot add body lump\n");
66                 return -1;
67         }
68                 
69         return 1;
70 }
71
72 /**
73  * @brief dmq worker loop
74  */
75 void worker_loop(int id)
76 {
77         dmq_worker_t* worker;
78         dmq_job_t* current_job;
79         peer_reponse_t peer_response;
80         int ret_value;
81
82         worker = &workers[id];
83         for(;;) {
84                 LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
85                 lock_get(&worker->lock);
86                 LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
87                 /* multiple lock_release calls might be performed, so remove
88                  * from queue until empty */
89                 do {
90                         /* fill the response with 0's */
91                         memset(&peer_response, 0, sizeof(peer_response));
92                         current_job = job_queue_pop(worker->queue);
93                         /* job_queue_pop might return NULL if queue is empty */
94                         if(current_job) {
95                                 ret_value = current_job->f(current_job->msg, &peer_response);
96                                 if(ret_value < 0) {
97                                         LM_ERR("running job failed\n");
98                                         continue;
99                                 }
100                                 /* add the body to the reply */
101                                 if(peer_response.body.s) {
102                                         if(set_reply_body(current_job->msg, &peer_response.body,
103                                                                 &peer_response.content_type) < 0) {
104                                                 LM_ERR("error adding lumps\n");
105                                                 continue;
106                                         }
107                                 }
108                                 /* send the reply */
109                                 if(slb.freply(current_job->msg, peer_response.resp_code,
110                                                         &peer_response.reason) < 0)
111                                 {
112                                         LM_ERR("error sending reply\n");
113                                 }
114                                 
115                                 /* if body given, free the lumps and free the body */
116                                 if(peer_response.body.s) {
117                                         del_nonshm_lump_rpl(&current_job->msg->reply_lump);
118                                         pkg_free(peer_response.body.s);
119                                 }
120                                 LM_DBG("sent reply\n");
121                                 shm_free(current_job->msg);
122                                 shm_free(current_job);
123                                 worker->jobs_processed++;
124                         }
125                 } while(job_queue_size(worker->queue) > 0);
126         }
127 }
128
129 /**
130  * @brief add a dmq job
131  */
132 int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer)
133 {
134         int i, found_available = 0;
135         dmq_job_t new_job = { 0 };
136         dmq_worker_t* worker;
137         struct sip_msg* cloned_msg = NULL;
138         int cloned_msg_len;
139
140         /* Pre-parse headers so they are included in our clone. Parsing later
141          * will result in linking pkg structures to shm msg, eventually leading 
142          * to memory errors. */
143         if (parse_headers(msg, HDR_EOH_F, 0) == -1) {
144                 LM_ERR("failed to parse headers\n");
145                 return -1;
146         }
147
148         cloned_msg = sip_msg_shm_clone(msg, &cloned_msg_len, 1);
149         if(!cloned_msg) {
150                 LM_ERR("error cloning sip message\n");
151                 return -1;
152         }
153
154         new_job.f = peer->callback;
155         new_job.msg = cloned_msg;
156         new_job.orig_peer = peer;
157         if(!num_workers) {
158                 LM_ERR("error in add_dmq_job: no workers spawned\n");
159                 goto error;
160         }
161         /* initialize the worker with the first one */
162         worker = workers;
163         /* search for an available worker, or, if not possible,
164          * for the least busy one */
165         for(i = 0; i < num_workers; i++) {
166                 if(job_queue_size(workers[i].queue) == 0) {
167                         worker = &workers[i];
168                         found_available = 1;
169                         break;
170                 } else if(job_queue_size(workers[i].queue)
171                                 < job_queue_size(worker->queue)) {
172                         worker = &workers[i];
173                 }
174         }
175         if(!found_available) {
176                 LM_DBG("no available worker found, passing job"
177                                 " to the least busy one [%d %d]\n",
178                                 worker->pid, job_queue_size(worker->queue));
179         }
180         if (job_queue_push(worker->queue, &new_job)<0) {
181                 goto error;
182         }
183         lock_release(&worker->lock);
184         return 0;
185 error:
186         if (cloned_msg!=NULL) {
187                 shm_free(cloned_msg);
188         }
189         return -1;
190 }
191
192 /**
193  * @brief init dmq worker
194  */
195 void init_worker(dmq_worker_t* worker)
196 {
197         memset(worker, 0, sizeof(*worker));
198         lock_init(&worker->lock);
199         // acquire the lock for the first time - so that dmq_worker_loop blocks
200         lock_get(&worker->lock);
201         worker->queue = alloc_job_queue();
202 }
203
204 /**
205  * @brief allog dmq job queue
206  */
207 job_queue_t* alloc_job_queue()
208 {
209         job_queue_t* queue;
210         
211         queue = shm_malloc(sizeof(job_queue_t));
212         if(queue==NULL) {
213                 LM_ERR("no more shm\n");
214                 return NULL;
215         }
216         memset(queue, 0, sizeof(job_queue_t));
217         atomic_set(&queue->count, 0);
218         lock_init(&queue->lock);
219         return queue;
220 }
221
222 /**
223  * @ brief destroy job queue
224  */
225 void destroy_job_queue(job_queue_t* queue)
226 {
227         if(queue!=NULL)
228                 shm_free(queue);
229 }
230
231 /**
232  * @brief return job queue size
233  */
234 int job_queue_size(job_queue_t* queue)
235 {
236         return atomic_get(&queue->count);
237 }
238
239 /**
240  * @brief push to job queue
241  */
242 int job_queue_push(job_queue_t* queue, dmq_job_t* job)
243 {
244         /* we need to copy the dmq_job into a newly created dmq_job in shm */
245         dmq_job_t* newjob;
246         
247         newjob = shm_malloc(sizeof(dmq_job_t));
248         if(newjob==NULL) {
249                 LM_ERR("no more shm\n");
250                 return -1;
251         }
252
253         *newjob = *job;
254         
255         lock_get(&queue->lock);
256         newjob->prev = NULL;
257         newjob->next = queue->back;
258         if(queue->back) {
259                 queue->back->prev = newjob;
260         }
261         queue->back = newjob;
262         if(!queue->front) {
263                 queue->front = newjob;
264         }
265         atomic_inc(&queue->count);
266         lock_release(&queue->lock);
267         return 0;
268 }
269
270 /**
271  * @brief pop from job queue
272  */
273 dmq_job_t* job_queue_pop(job_queue_t* queue)
274 {
275         dmq_job_t* front;
276         lock_get(&queue->lock);
277         if(!queue->front) {
278                 lock_release(&queue->lock);
279                 return NULL;
280         }
281         front = queue->front;
282         if(front->prev) {
283                 queue->front = front->prev;
284                 front->prev->next = NULL;
285         } else {
286                 queue->front = NULL;
287                 queue->back = NULL;
288         }
289         atomic_dec(&queue->count);
290         lock_release(&queue->lock);
291         return front;
292 }
293