mqueue: Added MI command to get current size of mqueue.
[sip-router] / modules / mqueue / mqueue_api.c
1 /**
2  * $Id$
3  *
4  * Copyright (C) 2010 Elena-Ramona Modroiu (asipto.com)
5  *
6  * This file is part of Kamailio, a free SIP server.
7  *
8  * This file is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version
12  *
13  * This file is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
21  *
22  */
23
24
25 #include <stdio.h>
26 #include <unistd.h>
27 #include <stdlib.h>
28 #include <string.h>
29
30 #include "../../dprint.h"
31 #include "../../mem/mem.h"
32 #include "../../mem/shm_mem.h"
33 #include "../../parser/parse_param.h"
34 #include "../../ut.h"
35 #include "../../shm_init.h"
36 #include "../../lib/kcore/faked_msg.h"
37
38 #include "mqueue_api.h"
39
40 /**
41  *
42  */
43 typedef struct _mq_item
44 {
45         str key;
46         str val;
47         struct _mq_item *prev;
48         struct _mq_item *next;
49 } mq_item_t;
50
51 /**
52  *
53  */
54 typedef struct _mq_head
55 {
56         str name;
57         int msize;
58         int csize;
59         gen_lock_t lock;
60         mq_item_t *ifirst;
61         mq_item_t *ilast;
62         struct _mq_head *next;
63 } mq_head_t;
64
65 /**
66  *
67  */
68 typedef struct _mq_pv
69 {
70         str *name;
71         mq_item_t *item;
72         struct _mq_pv *next;
73 } mq_pv_t;
74
75 /**
76  *
77  */
78 static mq_head_t *_mq_head_list = NULL;
79
80 /**
81  *
82  */
83 static mq_pv_t *_mq_pv_list = NULL;
84
85 /**
86  *
87  */
88 int mq_head_defined(void)
89 {
90         if(_mq_head_list!=NULL)
91                 return 1;
92         return 0;
93 }
94
95 /**
96  *
97  */
98 void mq_destroy(void)
99 {
100         mq_head_t *mh = NULL;
101         mq_pv_t *mp = NULL;
102         mq_item_t *mi = NULL;
103         mq_head_t *mh1 = NULL;
104         mq_pv_t *mp1 = NULL;
105         mq_item_t *mi1 = NULL;
106         
107         mh = _mq_head_list;
108         while(mh!=NULL)
109         {
110                 mi = mh->ifirst;
111                 while(mi!=NULL)
112                 {
113                         mi1 = mi;
114                         mi = mi->next;
115                         shm_free(mi1);
116                 }
117                 mh1 = mh;
118                 mh = mh->next;
119                 lock_destroy(&mh1->lock);
120                 shm_free(mh1);
121         }
122         _mq_head_list = 0;
123         mp = _mq_pv_list;
124         while(mp!=NULL)
125         {
126                 mp1 = mp;
127                 mp = mp->next;
128                 pkg_free(mp1);
129         }
130 }
131
132 /**
133  *
134  */
135 int mq_head_add(str *name, int msize)
136 {
137         mq_head_t *mh = NULL;
138         mq_pv_t *mp = NULL;
139         int len;
140
141         if(!shm_initialized())
142         {
143                 LM_ERR("shm not intialized - cannot define mqueue now\n");
144                 return 0;
145         }
146
147         mh = _mq_head_list;
148         while(mh!=NULL)
149         {
150                 if(name->len == mh->name.len
151                                 && strncmp(mh->name.s, name->s, name->len)==0)
152                 {
153                         LM_ERR("mqueue redefined: %.*s\n", name->len, name->s);
154                         return -1;
155                 }
156                 mh = mh->next;
157         }
158
159         mp = (mq_pv_t*)pkg_malloc(sizeof(mq_pv_t));
160         if(mp==NULL)
161         {
162                 LM_ERR("no more pkg for: %.*s\n", name->len, name->s);
163                 return -1;
164         }
165         memset(mp, 0, sizeof(mq_pv_t));
166
167         len = sizeof(mq_head_t) + name->len + 1;
168         mh = (mq_head_t*)shm_malloc(len);
169         if(mh==NULL)
170         {
171                 LM_ERR("no more shm for: %.*s\n", name->len, name->s);
172                 pkg_free(mp);
173                 return -1;
174         }
175         memset(mh, 0, len);
176         if (lock_init(&mh->lock)==0 )
177         {
178                 LM_CRIT("failed to init lock\n");
179                 pkg_free(mp);
180                 shm_free(mh);
181                 return -1;
182         }
183
184         mh->name.s = (char*)mh + sizeof(mq_head_t);
185         memcpy(mh->name.s, name->s, name->len);
186         mh->name.len = name->len;
187         mh->name.s[name->len] = '\0';
188         mh->msize = msize;
189         mh->next = _mq_head_list;
190         _mq_head_list = mh;
191
192         mp->name = &mh->name;
193         mp->next = _mq_pv_list;
194         _mq_pv_list = mp;
195
196         return 0;
197 }
198
199 /**
200  *
201  */
202 mq_head_t *mq_head_get(str *name)
203 {
204         mq_head_t *mh = NULL;
205
206         mh = _mq_head_list;
207         while(mh!=NULL)
208         {
209                 if(name->len == mh->name.len
210                                 && strncmp(mh->name.s, name->s, name->len)==0)
211                 {
212                         return mh;
213                 }
214                 mh = mh->next;
215         }
216         return NULL;
217 }
218
219 /**
220  *
221  */
222 mq_pv_t *mq_pv_get(str *name)
223 {
224         mq_pv_t *mp = NULL;
225
226         mp = _mq_pv_list;
227         while(mp!=NULL)
228         {
229                 if(mp->name->len==name->len
230                                 && strncmp(mp->name->s, name->s, name->len)==0)
231                         return mp;
232                 mp = mp->next;
233         }
234         return NULL;
235 }
236
237 /**
238  *
239  */
240 int mq_head_fetch(str *name)
241 {
242         mq_head_t *mh = NULL;
243         mq_pv_t *mp = NULL;
244
245         mp = mq_pv_get(name);
246         if(mp==NULL)
247                 return -1;
248         if(mp->item!=NULL)
249         {
250                 shm_free(mp->item);
251                 mp->item = NULL;
252         }
253         mh = mq_head_get(name);
254         if(mh==NULL)
255                 return -1;
256         lock_get(&mh->lock);
257
258         if(mh->ifirst==NULL)
259         {
260                 /* empty queue */
261                 lock_release(&mh->lock);
262                 return -2;
263         }
264
265         mp->item = mh->ifirst;
266         mh->ifirst = mh->ifirst->next;
267         if(mh->ifirst==NULL) {
268                 mh->ilast = NULL;
269         } else {
270                 mh->ifirst->prev = NULL;
271         }
272         mh->csize--;
273
274         lock_release(&mh->lock);
275         return 0;
276 }
277
278 /**
279  *
280  */
281 void mq_pv_free(str *name)
282 {
283         mq_pv_t *mp = NULL;
284
285         mp = mq_pv_get(name);
286         if(mp==NULL)
287                 return;
288         if(mp->item!=NULL)
289         {
290                 shm_free(mp->item);
291                 mp->item = NULL;
292         }
293 }
294
295 /**
296  *
297  */
298 int mq_item_add(str *qname, str *key, str *val)
299 {
300         mq_head_t *mh = NULL;
301         mq_item_t *mi = NULL;
302         int len;
303
304         mh = mq_head_get(qname);
305         if(mh==NULL)
306         {
307                 LM_ERR("mqueue not found: %.*s\n", qname->len, qname->s);
308                 return -1;
309         }
310         len = sizeof(mq_item_t) + key->len + val->len + 2;
311         mi = (mq_item_t*)shm_malloc(len);
312         if(mi==NULL)
313         {
314                 LM_ERR("no more shm to add to: %.*s\n", qname->len, qname->s);
315                 return -1;
316         }
317         memset(mi, 0, len);
318         mi->key.s = (char*)mi + sizeof(mq_item_t);
319         memcpy(mi->key.s, key->s, key->len);
320         mi->key.len = key->len;
321         mi->key.s[key->len] = '\0';
322         
323         mi->val.s = mi->key.s + mi->key.len + 1;
324         memcpy(mi->val.s, val->s, val->len);
325         mi->val.len = val->len;
326         mi->val.s[val->len] = '\0';
327         
328         lock_get(&mh->lock);
329         if(mh->ifirst==NULL)
330         {
331                 mh->ifirst = mi;
332                 mh->ilast = mi;
333         } else {
334                 mh->ilast->next = mi;
335                 mi->prev = mh->ilast;
336                 mh->ilast = mi;
337         }
338         mh->csize++;
339         if(mh->msize>0 && mh->csize>mh->msize)
340         {
341                 mi = mh->ifirst;
342                 mh->ifirst = mh->ifirst->next;
343                 if(mh->ifirst==NULL)
344                         mh->ilast = NULL;
345                 else
346                         mh->ifirst->prev = NULL;
347                 mh->csize--;
348                 shm_free(mi);
349         }
350         lock_release(&mh->lock);
351         return 0;
352 }
353
354 /**
355  *
356  */
357 int pv_parse_mq_name(pv_spec_t *sp, str *in)
358 {
359         sp->pvp.pvn.u.isname.name.s = *in;
360         sp->pvp.pvn.type = PV_NAME_INTSTR;
361         sp->pvp.pvn.u.isname.type = 1;
362         return 0;
363 }
364
365 str *pv_get_mq_name(sip_msg_t *msg, str *in)
366 {
367         str *queue;
368
369         if (in->s[0] != '$')
370                 queue = in;
371         else
372         {
373                 pv_spec_t *pvs;
374                 pv_value_t pvv;
375
376                 if (pv_locate_name(in) != in->len)
377                 {
378                         LM_ERR("invalid pv [%.*s]\n", in->len, in->s);
379                         return NULL;
380                 }
381                 if ((pvs = pv_cache_get(in)) == NULL)
382                 {
383                         LM_ERR("failed to get pv spec for [%.*s]\n", in->len, in->s);
384                         return NULL;
385                 }
386
387                 memset(&pvv, 0, sizeof(pv_value_t));
388                 if (msg==NULL && faked_msg_init() < 0)
389                 {
390                         LM_ERR("faked_msg_init() failed\n");
391                         return NULL;
392                 }
393                 if (pv_get_spec_value((msg)?msg:faked_msg_next(), pvs, &pvv) != 0)
394                 {
395                         LM_ERR("failed to get pv value for [%.*s]\n", in->len, in->s);
396                         return NULL;
397                 }
398
399                 queue = &pvv.rs;
400         }
401
402         return queue;
403 }
404
405 /**
406  *
407  */
408 int pv_get_mqk(struct sip_msg *msg, pv_param_t *param,
409                 pv_value_t *res)
410 {
411         mq_pv_t *mp = NULL;
412         str *in = pv_get_mq_name(msg, &param->pvn.u.isname.name.s);
413
414         if (in == NULL)
415         {
416                 LM_ERR("failed to get mq name\n");
417                 return -1;
418         }
419
420         if (mq_head_get(in) == NULL)
421         {
422                 LM_ERR("mqueue not found: %.*s\n", in->len, in->s);
423                 return -1;
424         }
425
426         mp = mq_pv_get(in);
427         if(mp==NULL || mp->item==NULL || mp->item->key.len<=0)
428                 return pv_get_null(msg, param, res);
429         return pv_get_strval(msg, param, res, &mp->item->key);
430 }
431
432 /**
433  *
434  */
435 int pv_get_mqv(struct sip_msg *msg, pv_param_t *param,
436                 pv_value_t *res)
437 {
438         mq_pv_t *mp = NULL;
439         str *in = pv_get_mq_name(msg, &param->pvn.u.isname.name.s);
440
441         if (in == NULL)
442         {
443                 LM_ERR("failed to get mq name\n");
444                 return -1;
445         }
446
447         if (mq_head_get(in) == NULL)
448         {
449                 LM_ERR("mqueue not found: %.*s\n", in->len, in->s);
450                 return -1;
451         }
452
453         mp = mq_pv_get(in);
454         if(mp==NULL || mp->item==NULL || mp->item->val.len<=0)
455                 return pv_get_null(msg, param, res);
456         return pv_get_strval(msg, param, res, &mp->item->val);
457 }
458
459 /* Return head->csize for a given queue */
460
461 int _mq_get_csize(str *name) 
462 {
463         mq_head_t *mh = mq_head_get(name);
464         int mqueue_size = 0;
465
466         if(mh == NULL)
467                 return -1;
468
469         lock_get(&mh->lock);
470         mqueue_size = mh->csize;
471         lock_release(&mh->lock);
472
473         return mqueue_size;
474 }