dispatcher: sync on load management operations
[sip-router] / src / modules / dispatcher / dispatch.c
1 /*
2  * dispatcher module
3  *
4  * Copyright (C) 2004-2006 FhG Fokus
5  * Copyright (C) 2005 Voice-System.ro
6  * Copyright (C) 2015 Daniel-Constantin Mierla (asipto.com)
7  *
8  * This file is part of Kamailio, a free SIP server.
9  *
10  * Kamailio is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version
14  *
15  * Kamailio is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
23  */
24
25 /*! \file
26  * \ingroup dispatcher
27  * \brief Dispatcher :: Dispatch
28  */
29
30 #include <stdio.h>
31 #include <string.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <time.h>
35 #include <math.h>
36
37 #include "../../core/ut.h"
38 #include "../../core/trim.h"
39 #include "../../core/dprint.h"
40 #include "../../core/action.h"
41 #include "../../core/route.h"
42 #include "../../core/dset.h"
43 #include "../../core/mem/shm_mem.h"
44 #include "../../core/parser/parse_uri.h"
45 #include "../../core/parser/parse_from.h"
46 #include "../../core/parser/parse_param.h"
47 #include "../../core/xavp.h"
48 #include "../../core/parser/digest/digest.h"
49 #include "../../core/resolve.h"
50 #include "../../core/lvalue.h"
51 #include "../../modules/tm/tm_load.h"
52 #include "../../lib/srdb1/db.h"
53 #include "../../lib/srdb1/db_res.h"
54 #include "../../core/str.h"
55 #include "../../core/script_cb.h"
56 #include "../../core/kemi.h"
57 #include "../../core/fmsg.h"
58
59 #include "ds_ht.h"
60 #include "api.h"
61 #include "dispatch.h"
62
63 #define DS_TABLE_VERSION 1
64 #define DS_TABLE_VERSION2 2
65 #define DS_TABLE_VERSION3 3
66 #define DS_TABLE_VERSION4 4
67
68 #define DS_ALG_HASHCALLID 0
69 #define DS_ALG_HASHFROMURI 1
70 #define DS_ALG_HASHTOURI 2
71 #define DS_ALG_HASHRURI 3
72 #define DS_ALG_ROUNDROBIN 4
73 #define DS_ALG_HASHAUTHUSER 5
74 #define DS_ALG_RANDOM 6
75 #define DS_ALG_HASHPV 7
76 #define DS_ALG_SERIAL 8
77 #define DS_ALG_WEIGHT 9
78 #define DS_ALG_CALLLOAD 10
79 #define DS_ALG_RELWEIGHT 11
80 #define DS_ALG_PARALLEL 12
81
82 /* increment call load */
83 #define DS_LOAD_INC(dgrp, didx) do { \
84                 lock_get(&(dgrp)->lock); \
85                 (dgrp)->dlist[didx].dload++; \
86                 lock_release(&(dgrp)->lock); \
87         } while(0)
88
89 /* decrement call load */
90 #define DS_LOAD_DEC(dgrp, didx) do { \
91                 lock_get(&(dgrp)->lock); \
92                 if(likely((dgrp)->dlist[didx].dload > 0)) { \
93                         (dgrp)->dlist[didx].dload--; \
94                 } \
95                 lock_release(&(dgrp)->lock); \
96         } while(0)
97
98 static int _ds_table_version = DS_TABLE_VERSION;
99
100 static ds_ht_t *_dsht_load = NULL;
101
102 static int *_ds_ping_active = NULL;
103
104 extern int ds_force_dst;
105 extern str ds_event_callback;
106 extern int ds_ping_latency_stats;
107 extern float ds_latency_estimator_alpha;
108 extern int ds_attrs_none;
109 extern param_t *ds_db_extra_attrs_list;
110 extern int ds_load_mode;
111
112 static db_func_t ds_dbf;
113 static db1_con_t *ds_db_handle = NULL;
114
115 ds_set_t **ds_lists = NULL;
116
117 int *ds_list_nr = NULL;
118 int *crt_idx = NULL;
119 int *next_idx = NULL;
120
121 #define _ds_list (ds_lists[*crt_idx])
122 #define _ds_list_nr (*ds_list_nr)
123
124 static void ds_run_route(struct sip_msg *msg, str *uri, char *route,
125                 ds_rctx_t *rctx);
126
127 void shuffle_uint100array(unsigned int *arr);
128 int ds_reinit_rweight_on_state_change(
129                 int old_state, int new_state, ds_set_t *dset);
130
131 /**
132  *
133  */
134 int ds_ping_active_init(void)
135 {
136         if(_ds_ping_active != NULL)
137                 return 0;
138         _ds_ping_active = (int *)shm_malloc(sizeof(int));
139         if(_ds_ping_active == NULL) {
140                 LM_ERR("no more shared memory\n");
141                 return -1;
142         }
143         *_ds_ping_active = 1;
144         return 0;
145 }
146
147 /**
148  *
149  */
150 int ds_ping_active_get(void)
151 {
152         if(_ds_ping_active == NULL)
153                 return -1;
154         return *_ds_ping_active;
155 }
156
157 /**
158  *
159  */
160 int ds_ping_active_set(int v)
161 {
162         if(_ds_ping_active == NULL)
163                 return -1;
164         *_ds_ping_active = v;
165         return 0;
166 }
167
168 /**
169  *
170  */
171 int ds_hash_load_init(unsigned int htsize, int expire, int initexpire)
172 {
173         if(_dsht_load != NULL)
174                 return 0;
175         _dsht_load = ds_ht_init(htsize, expire, initexpire);
176         if(_dsht_load == NULL)
177                 return -1;
178         return 0;
179 }
180
181 /**
182  *
183  */
184 int ds_hash_load_destroy(void)
185 {
186         if(_dsht_load == NULL)
187                 return -1;
188         ds_ht_destroy(_dsht_load);
189         _dsht_load = NULL;
190         return 0;
191 }
192
193 /**
194  * Recursivly iterate over ds_set and execute callback
195  */
196 void ds_iter_set(ds_set_t *node, void (*ds_action_cb)(ds_set_t *node, int i, void *arg),
197                 void *ds_action_arg)
198 {
199         if(!node)
200                 return;
201
202         int i;
203
204         for(i = 0; i < 2; ++i)
205                 ds_iter_set(node->next[i], ds_action_cb, ds_action_arg);
206
207         for(i = 0; i < node->nr; i++) {
208                 ds_action_cb(node, i, ds_action_arg);
209         }
210
211         return;
212 }
213
214 void ds_log_dst_cb(ds_set_t *node, int i, void *arg)
215 {
216         LM_DBG("dst>> %d %.*s %d %d (%.*s,%d,%d,%d)\n", node->id,
217                 node->dlist[i].uri.len, node->dlist[i].uri.s,
218                 node->dlist[i].flags, node->dlist[i].priority,
219                 node->dlist[i].attrs.duid.len, node->dlist[i].attrs.duid.s,
220                 node->dlist[i].attrs.maxload, node->dlist[i].attrs.weight,
221                 node->dlist[i].attrs.rweight);
222 }
223
224 /**
225  * Recursivly print ds_set
226  */
227 void ds_log_set(ds_set_t *node)
228 {
229         ds_iter_set(node, &ds_log_dst_cb, NULL);
230
231         return;
232 }
233
234 /**
235  *
236  */
237 int ds_log_sets(void)
238 {
239         if(_ds_list == NULL)
240                 return -1;
241
242         ds_log_set(_ds_list);
243
244         return 0;
245 }
246
247 /**
248  *
249  */
250 int ds_init_data(void)
251 {
252         int *p;
253
254         ds_lists = (ds_set_t **)shm_malloc(2 * sizeof(ds_set_t *));
255         if(!ds_lists) {
256                 LM_ERR("Out of memory\n");
257                 return -1;
258         }
259         memset(ds_lists, 0, 2 * sizeof(ds_set_t *));
260
261
262         p = (int *)shm_malloc(3 * sizeof(int));
263         if(!p) {
264                 LM_ERR("Out of memory\n");
265                 return -1;
266         }
267         memset(p, 0, 3 * sizeof(int));
268
269         crt_idx = p;
270         next_idx = p + 1;
271         ds_list_nr = p + 2;
272         *crt_idx = *next_idx = 0;
273
274         return 0;
275 }
276
277 /**
278  *
279  */
280 int ds_set_attrs(ds_dest_t *dest, str *vattrs)
281 {
282         param_t *params_list = NULL;
283         param_hooks_t phooks;
284         param_t *pit = NULL;
285         str param;
286         int tmp_rweight = 0;
287         str sattrs;
288
289         if(vattrs == NULL || vattrs->len <= 0) {
290                 if(ds_attrs_none==0) {
291                         return 0;
292                 }
293                 sattrs.s = "none=yes";
294                 sattrs.len = 8;
295         } else {
296                 sattrs = *vattrs;
297         }
298         if(sattrs.s[sattrs.len - 1] == ';')
299                 sattrs.len--;
300         /* clone in shm */
301         dest->attrs.body.s = (char *)shm_malloc(sattrs.len + 1);
302         if(dest->attrs.body.s == NULL) {
303                 LM_ERR("no more shm\n");
304                 return -1;
305         }
306         memcpy(dest->attrs.body.s, sattrs.s, sattrs.len);
307         dest->attrs.body.s[sattrs.len] = '\0';
308         dest->attrs.body.len = sattrs.len;
309
310         param = dest->attrs.body;
311         if(parse_params(&param, CLASS_ANY, &phooks, &params_list) < 0)
312                 return -1;
313         for(pit = params_list; pit; pit = pit->next) {
314                 if(pit->name.len == 4 && strncasecmp(pit->name.s, "duid", 4) == 0) {
315                         dest->attrs.duid = pit->body;
316                 } else if(pit->name.len == 2
317                                   && strncasecmp(pit->name.s, "cc", 2) == 0) {
318                         str2sint(&pit->body, &dest->attrs.congestion_control);
319                 } else if(pit->name.len == 6
320                                   && strncasecmp(pit->name.s, "weight", 6) == 0) {
321                         str2sint(&pit->body, &dest->attrs.weight);
322                 } else if(pit->name.len == 7
323                                   && strncasecmp(pit->name.s, "maxload", 7) == 0) {
324                         str2sint(&pit->body, &dest->attrs.maxload);
325                 } else if(pit->name.len == 6
326                                   && strncasecmp(pit->name.s, "socket", 6) == 0) {
327                         dest->attrs.socket = pit->body;
328                 } else if(pit->name.len == 8
329                                   && strncasecmp(pit->name.s, "sockname", 8) == 0) {
330                         dest->attrs.sockname = pit->body;
331                 } else if(pit->name.len == 7
332                                   && strncasecmp(pit->name.s, "rweight", 7) == 0) {
333                         tmp_rweight = 0;
334                         str2sint(&pit->body, &tmp_rweight);
335                         if(tmp_rweight >= 1 && tmp_rweight <= 100) {
336                                 dest->attrs.rweight = tmp_rweight;
337                         } else {
338                                 LM_ERR("rweight %d not in 1-100 range; skipped", tmp_rweight);
339                         }
340                 } else if(pit->name.len == 9
341                                 && strncasecmp(pit->name.s, "ping_from", 9) == 0) {
342                         dest->attrs.ping_from = pit->body;
343                 } else if(pit->name.len == 7 
344                                   && strncasecmp(pit->name.s, "obproxy", 7) == 0) {
345                         dest->attrs.obproxy = pit->body;
346                 }
347         }
348         if(params_list)
349                 free_params(params_list);
350         return 0;
351 }
352
353 /**
354  *
355  */
356 ds_dest_t *pack_dest(str iuri, int flags, int priority, str *attrs)
357 {
358         ds_dest_t *dp = NULL;
359         /* For DNS-Lookups */
360         static char hn[256];
361         char ub[512];
362         struct hostent *he;
363         struct sip_uri puri;
364         str host;
365         int port, proto;
366         char c = 0;
367         str uri;
368
369         uri = iuri;
370         /* check uri */
371         if(parse_uri(uri.s, uri.len, &puri) != 0) {
372                 if(iuri.len>4 && strncmp(iuri.s, "sip:", 4)!=0 && iuri.len<500) {
373                         memcpy(ub, "sip:", 4);
374                         memcpy(ub+4, iuri.s, iuri.len);
375                         ub[iuri.len+4] = '\0';
376                         uri.s = ub;
377                         uri.len = iuri.len+4;
378                         if(parse_uri(uri.s, uri.len, &puri) != 0) {
379                                 LM_ERR("bad uri [%.*s]\n", iuri.len, iuri.s);
380                                 goto err;
381                         } else {
382                                 LM_INFO("uri without sip scheme - fixing it: %.*s\n",
383                                                 iuri.len, iuri.s);
384                         }
385                 } else {
386                         LM_ERR("bad uri [%.*s]\n", iuri.len, iuri.s);
387                         goto err;
388                 }
389         }
390
391         if(puri.host.len > 254) {
392                 LM_ERR("hostname in uri is too long [%.*s]\n", uri.len, uri.s);
393                 goto err;
394         }
395
396         /* skip IPv6 references if IPv6 lookups are disabled */
397         if(default_core_cfg.dns_try_ipv6 == 0 && puri.host.s[0] == '['
398                         && puri.host.s[puri.host.len - 1] == ']') {
399                 LM_DBG("skipping IPv6 record %.*s\n", puri.host.len, puri.host.s);
400                 return NULL;
401         }
402
403         /* store uri */
404         dp = (ds_dest_t *)shm_malloc(sizeof(ds_dest_t));
405         if(dp == NULL) {
406                 LM_ERR("no more memory!\n");
407                 goto err;
408         }
409         memset(dp, 0, sizeof(ds_dest_t));
410
411         dp->uri.s = (char *)shm_malloc((uri.len + 1) * sizeof(char));
412         if(dp->uri.s == NULL) {
413                 LM_ERR("no more memory!\n");
414                 goto err;
415         }
416         strncpy(dp->uri.s, uri.s, uri.len);
417         dp->uri.s[uri.len] = '\0';
418         dp->uri.len = uri.len;
419
420         dp->flags = flags;
421         dp->priority = priority;
422
423         if(ds_set_attrs(dp, attrs) < 0) {
424                 LM_ERR("cannot set attributes!\n");
425                 goto err;
426         }
427
428         /* set send socket by name or address */
429         if(dp->attrs.sockname.s && dp->attrs.sockname.len > 0) {
430                 dp->sock = ksr_get_socket_by_name(&dp->attrs.sockname);
431                 if(dp->sock == 0) {
432                         LM_ERR("non-local socket name <%.*s>\n", dp->attrs.sockname.len,
433                                         dp->attrs.sockname.s);
434                         goto err;
435                 }
436         } else if(dp->attrs.socket.s && dp->attrs.socket.len > 0) {
437                 /* parse_phostport(...) expects 0-terminated string
438                  * - after socket parameter is either ';' or '\0' */
439                 STR_VTOZ(dp->attrs.socket.s[dp->attrs.socket.len], c);
440                 if(parse_phostport(
441                                    dp->attrs.socket.s, &host.s, &host.len, &port, &proto)
442                                 != 0) {
443                         LM_ERR("bad socket <%.*s>\n", dp->attrs.socket.len,
444                                         dp->attrs.socket.s);
445                         STR_ZTOV(dp->attrs.socket.s[dp->attrs.socket.len], c);
446                         goto err;
447                 }
448                 STR_ZTOV(dp->attrs.socket.s[dp->attrs.socket.len], c);
449                 dp->sock = grep_sock_info(&host, (unsigned short)port, proto);
450                 if(dp->sock == 0) {
451                         LM_ERR("non-local socket <%.*s>\n", dp->attrs.socket.len,
452                                         dp->attrs.socket.s);
453                         goto err;
454                 }
455         } else if(ds_default_sockinfo) {
456                 dp->sock = ds_default_sockinfo;
457         }
458
459         /* The Hostname needs to be \0 terminated for resolvehost, so we
460          * make a copy here. */
461         strncpy(hn, puri.host.s, puri.host.len);
462         hn[puri.host.len] = '\0';
463
464         /* Do a DNS-Lookup for the Host-Name: */
465         he = resolvehost(hn);
466         if(he == 0) {
467                 if(dp->flags & DS_NODNSARES_DST) {
468                         dp->irmode |= DS_IRMODE_NOIPADDR;
469                 } else {
470                         LM_ERR("could not resolve %.*s (missing no-probing flag?!?)\n",
471                                         puri.host.len, puri.host.s);
472                         goto err;
473                 }
474         } else {
475                 /* Store hostent in the dispatcher structure */
476                 hostent2ip_addr(&dp->ip_address, he, 0);
477         }
478
479         /* Copy the port out of the URI */
480         dp->port = puri.port_no;
481         /* Copy the proto out of the URI */
482         dp->proto = puri.proto;
483
484         return dp;
485 err:
486         if(dp != NULL) {
487                 if(dp->uri.s != NULL)
488                         shm_free(dp->uri.s);
489                 if(dp->attrs.body.s != NULL)
490                         shm_free(dp->attrs.body.s);
491                 shm_free(dp);
492         }
493
494         return NULL;
495 }
496
497 /**
498  *
499  */
500 int add_dest2list(int id, str uri, int flags, int priority, str *attrs,
501                 int list_idx, int *setn)
502 {
503         ds_dest_t *dp = NULL;
504         ds_set_t *sp = NULL;
505         ds_dest_t *dp0 = NULL;
506         ds_dest_t *dp1 = NULL;
507
508         dp = pack_dest(uri, flags, priority, attrs);
509         if(!dp)
510                 goto err;
511
512         sp = ds_avl_insert(&ds_lists[list_idx], id, setn);
513         if(!sp) {
514                 LM_ERR("no more memory.\n");
515                 goto err;
516         }
517         sp->nr++;
518
519         if(sp->dlist == NULL) {
520                 sp->dlist = dp;
521         } else {
522                 dp1 = NULL;
523                 dp0 = sp->dlist;
524                 /* highest priority last -> reindex will copy backwards */
525                 while(dp0) {
526                         if(dp0->priority > dp->priority)
527                                 break;
528                         dp1 = dp0;
529                         dp0 = dp0->next;
530                 }
531                 if(dp1 == NULL) {
532                         dp->next = sp->dlist;
533                         sp->dlist = dp;
534                 } else {
535                         dp->next = dp1->next;
536                         dp1->next = dp;
537                 }
538         }
539
540         LM_DBG("dest [%d/%d] <%.*s>\n", sp->id, sp->nr, dp->uri.len, dp->uri.s);
541
542         return 0;
543 err:
544         if(dp != NULL) {
545                 if(dp->uri.s != NULL)
546                         shm_free(dp->uri.s);
547                 if(dp->attrs.body.s != NULL)
548                         shm_free(dp->attrs.body.s);
549                 shm_free(dp);
550         }
551
552         return -1;
553 }
554
555
556 /* for internal usage; arr must be arr[100] */
557 void shuffle_uint100array(unsigned int *arr)
558 {
559         if(arr == NULL)
560                 return;
561         int k;
562         int j;
563         unsigned int t;
564         for(j = 0; j < 100; j++) {
565                 k = j + (kam_rand() % (100 - j));
566                 t = arr[j];
567                 arr[j] = arr[k];
568                 arr[k] = t;
569         }
570 }
571
572
573 /**
574  * Initialize the relative weight distribution for a destination set
575  * - fill the array of 0..99 elements where to keep the index of the
576  *   destination address to be used. The Nth call will use
577  *   the address with the index at possition N%100
578  */
579 int dp_init_relative_weights(ds_set_t *dset)
580 {
581         int j;
582         int k;
583         int t;
584         int *ds_dests_flags = NULL;
585         int *ds_dests_rweights = NULL;
586         int current_slice;
587         int rw_sum;
588         unsigned int last_insert;
589
590         if(dset == NULL || dset->dlist == NULL || dset->nr < 2)
591                 return -1;
592
593         /* local copy to avoid syncronization problems */
594         ds_dests_flags = pkg_malloc(sizeof(int) * dset->nr);
595         if(ds_dests_flags == NULL) {
596                 LM_ERR("no more pkg\n");
597                 return -1;
598         }
599         ds_dests_rweights = pkg_malloc(sizeof(int) * dset->nr);
600         if(ds_dests_rweights == NULL) {
601                 LM_ERR("no more pkg\n");
602                 pkg_free(ds_dests_flags);
603                 return -1;
604         }
605
606
607         /* needed to sync the rwlist access */
608         lock_get(&dset->lock);
609         rw_sum = 0;
610         /* find the sum of relative weights */
611         for(j = 0; j < dset->nr; j++) {
612                 ds_dests_flags[j] = dset->dlist[j].flags;
613                 ds_dests_rweights[j] = dset->dlist[j].attrs.rweight;
614                 if(ds_skip_dst(ds_dests_flags[j]))
615                         continue;
616                 rw_sum += ds_dests_rweights[j];
617         }
618
619         if(rw_sum == 0)
620                 goto ret;
621
622         /* fill the array based on the relative weight of each destination */
623         t = 0;
624         for(j = 0; j < dset->nr; j++) {
625                 if(ds_skip_dst(ds_dests_flags[j]))
626                         continue;
627
628                 current_slice =
629                                 ds_dests_rweights[j] * 100 / rw_sum; /* truncate here */
630                 LM_DBG("rw_sum[%d][%d][%d]\n",j, rw_sum, current_slice);
631                 for(k = 0; k < current_slice; k++) {
632                         dset->rwlist[t] = (unsigned int)j;
633                         t++;
634                 }
635         }
636
637         /* if the array was not completely filled (i.e., the sum of rweights is
638          * less than 100 due to truncated), then use last address to fill the rest */
639         last_insert = t > 0 ? dset->rwlist[t - 1] : (unsigned int)(dset->nr - 1);
640         for(j = t; j < 100; j++)
641                 dset->rwlist[j] = last_insert;
642
643         /* shuffle the content of the array in order to mix the selection
644          * of the addresses (e.g., if first address has weight=20, avoid
645          * sending first 20 calls to it, but ensure that within a 100 calls,
646          * 20 go to first address */
647         shuffle_uint100array(dset->rwlist);
648         goto ret;
649
650 ret:
651         lock_release(&dset->lock);
652         pkg_free(ds_dests_flags);
653         pkg_free(ds_dests_rweights);
654         return 0;
655 }
656
657
658 /**
659  * Initialize the weight distribution for a destination set
660  * - fill the array of 0..99 elements where to keep the index of the
661  *   destination address to be used. The Nth call will use
662  *   the address with the index at possition N%100
663  */
664 int dp_init_weights(ds_set_t *dset)
665 {
666         int j;
667         int k;
668         int t;
669
670         if(dset == NULL || dset->dlist == NULL)
671                 return -1;
672
673         /* is weight set for dst list? (first address must have weight!=0) */
674         if(dset->dlist[0].attrs.weight == 0)
675                 return 0;
676
677         /* first fill the array based on the weight of each destination
678          * - the weight is the percentage (e.g., if weight=20, the afferent
679          *   address gets its index 20 times in the array)
680          * - if the sum of weights is more than 100, the addresses over the
681          *   limit are ignored */
682         t = 0;
683         for(j = 0; j < dset->nr; j++) {
684                 for(k = 0; k < dset->dlist[j].attrs.weight; k++) {
685                         if(t >= 100)
686                                 goto randomize;
687                         dset->wlist[t] = (unsigned int)j;
688                         t++;
689                 }
690         }
691         /* if the array was not completely filled (i.e., the sum of weights is
692          * less than 100), then use last address to fill the rest */
693         for(; t < 100; t++)
694                 dset->wlist[t] = (unsigned int)(dset->nr - 1);
695 randomize:
696         /* shuffle the content of the array in order to mix the selection
697          * of the addresses (e.g., if first address has weight=20, avoid
698          * sending first 20 calls to it, but ensure that within a 100 calls,
699          * 20 go to first address */
700         shuffle_uint100array(dset->wlist);
701
702         return 0;
703 }
704
705 /*! \brief  compact destinations from sets for fast access */
706 int reindex_dests(ds_set_t *node)
707 {
708         int i = 0;
709         int j = 0;
710
711         if(!node)
712                 return 0;
713
714         for(; i < 2; ++i) {
715                 int rc = reindex_dests(node->next[i]);
716                 if(rc != 0)
717                         return rc;
718         }
719
720         ds_dest_t *dp = NULL, *dp0 = NULL;
721
722         dp0 = (ds_dest_t *)shm_malloc(node->nr * sizeof(ds_dest_t));
723         if(dp0 == NULL) {
724                 LM_ERR("no more memory!\n");
725                 goto err1;
726         }
727         memset(dp0, 0, node->nr * sizeof(ds_dest_t));
728
729         /* copy from the old pointer to destination, and then free it */
730         for(j = node->nr - 1; j >= 0 && node->dlist != NULL; j--) {
731                 memcpy(&dp0[j], node->dlist, sizeof(ds_dest_t));
732                 if(j == node->nr - 1)
733                         dp0[j].next = NULL;
734                 else
735                         dp0[j].next = &dp0[j + 1];
736
737
738                 dp = node->dlist;
739                 node->dlist = dp->next;
740
741                 shm_free(dp);
742                 dp = NULL;
743         }
744         node->dlist = dp0;
745         dp_init_weights(node);
746         dp_init_relative_weights(node);
747
748         return 0;
749
750 err1:
751         return -1;
752 }
753
754 /*! \brief load groups of destinations from file */
755 int ds_load_list(char *lfile)
756 {
757         char line[1024], *p;
758         FILE *f = NULL;
759         int id, setn, flags, priority;
760         str uri;
761         str attrs;
762
763         if((*crt_idx) != (*next_idx)) {
764                 LM_WARN("load command already generated, aborting reload...\n");
765                 return 0;
766         }
767
768         if(lfile == NULL || strlen(lfile) <= 0) {
769                 LM_ERR("bad list file\n");
770                 return -1;
771         }
772
773         f = fopen(lfile, "r");
774         if(f == NULL) {
775                 LM_ERR("can't open list file [%s]\n", lfile);
776                 return -1;
777         }
778
779         id = setn = flags = priority = 0;
780
781         *next_idx = (*crt_idx + 1) % 2;
782         ds_avl_destroy(&ds_lists[*next_idx]);
783
784         p = fgets(line, 1024, f);
785         while(p) {
786                 /* eat all white spaces */
787                 while(*p && (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n'))
788                         p++;
789                 if(*p == '\0' || *p == '#')
790                         goto next_line;
791
792                 /* get set id */
793                 id = 0;
794                 while(*p >= '0' && *p <= '9') {
795                         id = id * 10 + (*p - '0');
796                         p++;
797                 }
798
799                 /* eat all white spaces */
800                 while(*p && (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n'))
801                         p++;
802                 if(*p == '\0' || *p == '#') {
803                         LM_ERR("bad line [%s]\n", line);
804                         goto error;
805                 }
806
807                 /* get uri */
808                 uri.s = p;
809                 while(*p && *p != ' ' && *p != '\t' && *p != '\r' && *p != '\n'
810                                 && *p != '#')
811                         p++;
812                 uri.len = p - uri.s;
813
814                 /* eat all white spaces */
815                 while(*p && (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n'))
816                         p++;
817
818                 /* get flags */
819                 flags = 0;
820                 priority = 0;
821                 attrs.s = 0;
822                 attrs.len = 0;
823                 if(*p == '\0' || *p == '#')
824                         goto add_destination; /* no flags given */
825
826                 while(*p >= '0' && *p <= '9') {
827                         flags = flags * 10 + (*p - '0');
828                         p++;
829                 }
830
831                 /* eat all white spaces */
832                 while(*p && (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n'))
833                         p++;
834
835                 /* get priority */
836                 if(*p == '\0' || *p == '#')
837                         goto add_destination; /* no priority given */
838
839                 while(*p >= '0' && *p <= '9') {
840                         priority = priority * 10 + (*p - '0');
841                         p++;
842                 }
843
844                 /* eat all white spaces */
845                 while(*p && (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n'))
846                         p++;
847                 if(*p == '\0' || *p == '#')
848                         goto add_destination; /* no attrs given */
849
850                 /* get attributes */
851                 attrs.s = p;
852                 while(*p && *p != ' ' && *p != '\t' && *p != '\r' && *p != '\n')
853                         p++;
854                 attrs.len = p - attrs.s;
855
856 add_destination:
857                 if(add_dest2list(id, uri, flags, priority, &attrs, *next_idx, &setn)
858                                 != 0) {
859                         LM_WARN("unable to add destination %.*s to set %d -- skipping\n",
860                                         uri.len, uri.s, id);
861                         if(ds_load_mode==1) {
862                                 goto error;
863                         }
864                 }
865 next_line:
866                 p = fgets(line, 1024, f);
867         }
868
869         if(reindex_dests(ds_lists[*next_idx]) != 0) {
870                 LM_ERR("error on reindex\n");
871                 goto error;
872         }
873
874         LM_DBG("found [%d] dest sets\n", _ds_list_nr);
875
876         fclose(f);
877         f = NULL;
878         /* Update list - should it be sync'ed? */
879         _ds_list_nr = setn;
880         *crt_idx = *next_idx;
881         ds_ht_clear_slots(_dsht_load);
882         ds_log_sets();
883         return 0;
884
885 error:
886         if(f != NULL)
887                 fclose(f);
888         ds_avl_destroy(&ds_lists[*next_idx]);
889         *next_idx = *crt_idx;
890         return -1;
891 }
892
893 /**
894  *
895  */
896 int ds_connect_db(void)
897 {
898         if(ds_db_url.s == NULL)
899                 return -1;
900
901         if((ds_db_handle = ds_dbf.init(&ds_db_url)) == 0) {
902                 LM_ERR("cannot initialize db connection\n");
903                 return -1;
904         }
905         return 0;
906 }
907
908 /**
909  *
910  */
911 void ds_disconnect_db(void)
912 {
913         if(ds_db_handle) {
914                 ds_dbf.close(ds_db_handle);
915                 ds_db_handle = 0;
916         }
917 }
918
919 /*! \brief Initialize and verify DB stuff*/
920 int ds_init_db(void)
921 {
922         int ret;
923
924         if(ds_table_name.s == 0) {
925                 LM_ERR("invalid database name\n");
926                 return -1;
927         }
928
929         /* Find a database module */
930         if(db_bind_mod(&ds_db_url, &ds_dbf) < 0) {
931                 LM_ERR("Unable to bind to a database driver\n");
932                 return -1;
933         }
934
935         if(ds_connect_db() != 0) {
936                 LM_ERR("unable to connect to the database\n");
937                 return -1;
938         }
939
940         _ds_table_version = db_table_version(&ds_dbf, ds_db_handle, &ds_table_name);
941         if(_ds_table_version < 0) {
942                 LM_ERR("failed to query table version\n");
943                 return -1;
944         } else if(_ds_table_version != DS_TABLE_VERSION
945                           && _ds_table_version != DS_TABLE_VERSION2
946                           && _ds_table_version != DS_TABLE_VERSION3
947                           && _ds_table_version != DS_TABLE_VERSION4) {
948                 LM_ERR("invalid table version (found %d , required %d, %d, %d or %d)\n"
949                            "(use kamdbctl reinit)\n",
950                                 _ds_table_version, DS_TABLE_VERSION, DS_TABLE_VERSION2,
951                                 DS_TABLE_VERSION3, DS_TABLE_VERSION4);
952                 return -1;
953         }
954
955         ret = ds_load_db();
956         if(ret == -2) {
957                 LM_WARN("failure while loading one or more dispatcher entries\n");
958                 ret = 0;
959         }
960
961         ds_disconnect_db();
962
963         return ret;
964 }
965
966 /*! \brief reload groups of destinations from DB*/
967 int ds_reload_db(void)
968 {
969         int ret;
970
971         if(ds_connect_db() != 0) {
972                 LM_ERR("unable to connect to the database\n");
973                 return -1;
974         }
975         ret = ds_load_db();
976         if(ret == -2) {
977                 LM_WARN("failure while loading one or more dispatcher entries\n");
978         }
979         ds_disconnect_db();
980
981         return ret;
982 }
983
984 /*! \brief load groups of destinations from DB*/
985 int ds_load_db(void)
986 {
987         int i, id, nr_rows, setn;
988         int flags;
989         int priority;
990         int nrcols;
991         int dest_errs = 0;
992         str uri;
993         str attrs = {0, 0};
994         db1_res_t *res;
995         db_val_t *values;
996         db_row_t *rows;
997 #define DS_DB_MAX_COLS  32
998         db_key_t query_cols[DS_DB_MAX_COLS];
999         param_t *pit=NULL;
1000         int nc;
1001         int plen;
1002 #define DS_ATTRS_MAXSIZE        1024
1003         char ds_attrs_buf[DS_ATTRS_MAXSIZE];
1004
1005         query_cols[0] = &ds_set_id_col;
1006         query_cols[1] = &ds_dest_uri_col;
1007         query_cols[2] = &ds_dest_flags_col;
1008         query_cols[3] = &ds_dest_priority_col;
1009         query_cols[4] = &ds_dest_attrs_col;
1010
1011         nrcols = 2;
1012         if(_ds_table_version == DS_TABLE_VERSION2) {
1013                 nrcols = 3;
1014         } else if(_ds_table_version == DS_TABLE_VERSION3) {
1015                 nrcols = 4;
1016         } else if(_ds_table_version == DS_TABLE_VERSION4) {
1017                 nrcols = 5;
1018                 for(pit = ds_db_extra_attrs_list; pit!=NULL; pit=pit->next) {
1019                         if(nrcols>=DS_DB_MAX_COLS) {
1020                                 LM_ERR("too many db columns: %d\n", nrcols);
1021                                 return -1;
1022                         }
1023                         query_cols[nrcols++] = &pit->body; 
1024                 }
1025         }
1026
1027         if((*crt_idx) != (*next_idx)) {
1028                 LM_WARN("load command already generated, aborting reload...\n");
1029                 return 0;
1030         }
1031
1032         if(ds_db_handle == NULL) {
1033                 LM_ERR("invalid DB handler\n");
1034                 return -1;
1035         }
1036
1037         if(ds_dbf.use_table(ds_db_handle, &ds_table_name) < 0) {
1038                 LM_ERR("error in use_table\n");
1039                 return -1;
1040         }
1041
1042         LM_DBG("loading dispatcher db records - nrcols: %d\n", nrcols);
1043
1044         /*select the whole table and all the columns*/
1045         if(ds_dbf.query(ds_db_handle, 0, 0, 0, query_cols, 0, nrcols, 0, &res)
1046                         < 0) {
1047                 LM_ERR("error while querying database\n");
1048                 return -1;
1049         }
1050
1051         nr_rows = RES_ROW_N(res);
1052         rows = RES_ROWS(res);
1053         if(nr_rows == 0) {
1054                 LM_WARN("no dispatching data in the db -- empty destination set\n");
1055         }
1056
1057         setn = 0;
1058         *next_idx = (*crt_idx + 1) % 2;
1059         ds_avl_destroy(&ds_lists[*next_idx]);
1060
1061         for(i = 0; i < nr_rows; i++) {
1062                 values = ROW_VALUES(rows + i);
1063
1064                 id = VAL_INT(values);
1065                 uri.s = VAL_STR(values + 1).s;
1066                 uri.len = strlen(uri.s);
1067                 flags = 0;
1068                 if(nrcols >= 3)
1069                         flags = VAL_INT(values + 2);
1070                 priority = 0;
1071                 if(nrcols >= 4)
1072                         priority = VAL_INT(values + 3);
1073
1074                 attrs.s = 0;
1075                 attrs.len = 0;
1076                 if(nrcols >= 5) {
1077                         if(!VAL_NULL(values + 4)) {
1078                                 attrs.s = VAL_STR(values + 4).s;
1079                                 if(attrs.s) attrs.len = strlen(attrs.s);
1080                         }
1081                         if(ds_db_extra_attrs_list!=NULL && nrcols > 5) {
1082                                 if(attrs.len>0) {
1083                                         memcpy(ds_attrs_buf, attrs.s, attrs.len);
1084                                         if(ds_attrs_buf[attrs.len-1]!=';') {
1085                                                 ds_attrs_buf[attrs.len++] = ';';
1086                                         }
1087                                 }
1088                                 attrs.s = ds_attrs_buf;
1089                                 pit = ds_db_extra_attrs_list;
1090                                 for(nc = 5; nc<nrcols && pit!=NULL; nc++) {
1091                                         if(!VAL_NULL(values + nc) && strlen(VAL_STRING(values + nc))>0) {
1092                                                 plen = snprintf(attrs.s + attrs.len,
1093                                                                 DS_ATTRS_MAXSIZE - attrs.len - 1,
1094                                                                 "%.*s=%s;", pit->name.len, pit->name.s,
1095                                                                 VAL_STRING(values + nc));
1096                                                 if(plen<=0 || plen>=DS_ATTRS_MAXSIZE - attrs.len - 1) {
1097                                                         LM_ERR("cannot build attrs buffer\n");
1098                                                         goto err2;
1099                                                 }
1100                                                 attrs.len+=plen;
1101                                         }
1102                                         pit = pit->next;
1103                                 }
1104                         }
1105                 }
1106                 LM_DBG("attributes string: [%.*s]\n", attrs.len, (attrs.s)?attrs.s:"");
1107                 if(add_dest2list(id, uri, flags, priority, &attrs, *next_idx, &setn)
1108                                 != 0) {
1109                         dest_errs++;
1110                         LM_WARN("unable to add destination %.*s to set %d -- skipping\n",
1111                                         uri.len, uri.s, id);
1112                         if(ds_load_mode==1) {
1113                                 goto err2;
1114                         }
1115                 }
1116         }
1117         if(reindex_dests(ds_lists[*next_idx]) != 0) {
1118                 LM_ERR("error on reindex\n");
1119                 goto err2;
1120         }
1121
1122         LM_DBG("found [%d] dest sets\n", _ds_list_nr);
1123
1124         ds_dbf.free_result(ds_db_handle, res);
1125
1126         /* update data - should it be sync'ed? */
1127         _ds_list_nr = setn;
1128         *crt_idx = *next_idx;
1129         ds_ht_clear_slots(_dsht_load);
1130
1131         ds_log_sets();
1132
1133         if(dest_errs > 0)
1134                 return -2;
1135         return 0;
1136
1137 err2:
1138         ds_avl_destroy(&ds_lists[*next_idx]);
1139         ds_dbf.free_result(ds_db_handle, res);
1140         *next_idx = *crt_idx;
1141
1142         return -1;
1143 }
1144
1145 /*! \brief called from dispatcher.c: free all*/
1146 int ds_destroy_list(void)
1147 {
1148         if(ds_lists) {
1149                 ds_avl_destroy(&ds_lists[0]);
1150                 ds_avl_destroy(&ds_lists[1]);
1151                 shm_free(ds_lists);
1152         }
1153
1154         if(crt_idx)
1155                 shm_free(crt_idx);
1156
1157         return 0;
1158 }
1159
1160 /**
1161  *
1162  */
1163 unsigned int ds_get_hash(str *x, str *y)
1164 {
1165         char *p;
1166         register unsigned v;
1167         register unsigned h;
1168
1169         if(!x && !y)
1170                 return 0;
1171         h = 0;
1172         if(x) {
1173                 p = x->s;
1174                 if(x->len >= 4) {
1175                         for(; p <= (x->s + x->len - 4); p += 4) {
1176                                 v = (*p << 24) + (p[1] << 16) + (p[2] << 8) + p[3];
1177                                 h += v ^ (v >> 3);
1178                         }
1179                 }
1180                 v = 0;
1181                 for(; p < (x->s + x->len); p++) {
1182                         v <<= 8;
1183                         v += *p;
1184                 }
1185                 h += v ^ (v >> 3);
1186         }
1187         if(y) {
1188                 p = y->s;
1189                 if(y->len >= 4) {
1190                         for(; p <= (y->s + y->len - 4); p += 4) {
1191                                 v = (*p << 24) + (p[1] << 16) + (p[2] << 8) + p[3];
1192                                 h += v ^ (v >> 3);
1193                         }
1194                 }
1195
1196                 v = 0;
1197                 for(; p < (y->s + y->len); p++) {
1198                         v <<= 8;
1199                         v += *p;
1200                 }
1201                 h += v ^ (v >> 3);
1202         }
1203         h = ((h) + (h >> 11)) + ((h >> 13) + (h >> 23));
1204
1205         return (h) ? h : 1;
1206 }
1207
1208
1209 /*! \brief
1210  * gets the part of the uri we will use as a key for hashing
1211  * \param  key1       - will be filled with first part of the key
1212  *                       (uri user or "" if no user)
1213  * \param  key2       - will be filled with the second part of the key
1214  *                       (uri host:port)
1215  * \param  uri        - str with the whole uri
1216  * \param  parsed_uri - struct sip_uri pointer with the parsed uri
1217  *                       (it must point inside uri). It can be null
1218  *                       (in this case the uri will be parsed internally).
1219  * \param  flags  -    if & DS_HASH_USER_ONLY, only the user part of the uri
1220  *                      will be used
1221  * \return: -1 on error, 0 on success
1222  */
1223 static inline int get_uri_hash_keys(
1224                 str *key1, str *key2, str *uri, struct sip_uri *parsed_uri, int flags)
1225 {
1226         struct sip_uri tmp_p_uri; /* used only if parsed_uri==0 */
1227
1228         if(parsed_uri == 0) {
1229                 if(parse_uri(uri->s, uri->len, &tmp_p_uri) < 0) {
1230                         LM_ERR("invalid uri %.*s\n", uri->len, uri->len ? uri->s : "");
1231                         goto error;
1232                 }
1233                 parsed_uri = &tmp_p_uri;
1234         }
1235         /* uri sanity checks */
1236         if(parsed_uri->host.s == 0) {
1237                 LM_ERR("invalid uri, no host present: %.*s\n", uri->len,
1238                                 uri->len ? uri->s : "");
1239                 goto error;
1240         }
1241
1242         /* we want: user@host:port if port !=5060
1243          *          user@host if port==5060
1244          *          user if the user flag is set*/
1245         *key1 = parsed_uri->user;
1246         key2->s = 0;
1247         key2->len = 0;
1248         if(!(flags & DS_HASH_USER_ONLY)) { /* key2=host */
1249                 *key2 = parsed_uri->host;
1250                 /* add port if needed */
1251                 if(parsed_uri->port.s != 0) { /* uri has a port */
1252                         /* skip port if == 5060 or sips and == 5061 */
1253                         if(parsed_uri->port_no != ((parsed_uri->type == SIPS_URI_T)
1254                                                                                                           ? SIPS_PORT
1255                                                                                                           : SIP_PORT))
1256                                 key2->len += parsed_uri->port.len + 1 /* ':' */;
1257                 }
1258         }
1259         if(key1->s == 0) {
1260                 LM_WARN("empty username in: %.*s\n", uri->len, uri->len ? uri->s : "");
1261         }
1262         return 0;
1263 error:
1264         return -1;
1265 }
1266
1267
1268 /**
1269  *
1270  */
1271 int ds_hash_fromuri(struct sip_msg *msg, unsigned int *hash)
1272 {
1273         str from;
1274         str key1;
1275         str key2;
1276
1277         if(msg == NULL || hash == NULL) {
1278                 LM_ERR("bad parameters\n");
1279                 return -1;
1280         }
1281
1282         if(parse_from_header(msg) < 0) {
1283                 LM_ERR("cannot parse From hdr\n");
1284                 return -1;
1285         }
1286
1287         if(msg->from == NULL || get_from(msg) == NULL) {
1288                 LM_ERR("cannot get From uri\n");
1289                 return -1;
1290         }
1291
1292         from = get_from(msg)->uri;
1293         trim(&from);
1294         if(get_uri_hash_keys(&key1, &key2, &from, 0, ds_flags) < 0)
1295                 return -1;
1296         *hash = ds_get_hash(&key1, &key2);
1297
1298         return 0;
1299 }
1300
1301
1302 /**
1303  *
1304  */
1305 int ds_hash_touri(struct sip_msg *msg, unsigned int *hash)
1306 {
1307         str to;
1308         str key1;
1309         str key2;
1310
1311         if(msg == NULL || hash == NULL) {
1312                 LM_ERR("bad parameters\n");
1313                 return -1;
1314         }
1315         if((msg->to == 0)
1316                         && ((parse_headers(msg, HDR_TO_F, 0) == -1) || (msg->to == 0))) {
1317                 LM_ERR("cannot parse To hdr\n");
1318                 return -1;
1319         }
1320
1321
1322         to = get_to(msg)->uri;
1323         trim(&to);
1324
1325         if(get_uri_hash_keys(&key1, &key2, &to, 0, ds_flags) < 0)
1326                 return -1;
1327         *hash = ds_get_hash(&key1, &key2);
1328
1329         return 0;
1330 }
1331
1332
1333 /**
1334  *
1335  */
1336 int ds_hash_callid(struct sip_msg *msg, unsigned int *hash)
1337 {
1338         str cid;
1339         if(msg == NULL || hash == NULL) {
1340                 LM_ERR("bad parameters\n");
1341                 return -1;
1342         }
1343
1344         if(msg->callid == NULL && ((parse_headers(msg, HDR_CALLID_F, 0) == -1)
1345                                                                           || (msg->callid == NULL))) {
1346                 LM_ERR("cannot parse Call-Id\n");
1347                 return -1;
1348         }
1349
1350         cid.s = msg->callid->body.s;
1351         cid.len = msg->callid->body.len;
1352         trim(&cid);
1353
1354         *hash = ds_get_hash(&cid, NULL);
1355
1356         return 0;
1357 }
1358
1359
1360 /**
1361  *
1362  */
1363 int ds_hash_ruri(struct sip_msg *msg, unsigned int *hash)
1364 {
1365         str *uri;
1366         str key1;
1367         str key2;
1368
1369
1370         if(msg == NULL || hash == NULL) {
1371                 LM_ERR("bad parameters\n");
1372                 return -1;
1373         }
1374         if(parse_sip_msg_uri(msg) < 0) {
1375                 LM_ERR("bad request uri\n");
1376                 return -1;
1377         }
1378
1379         uri = GET_RURI(msg);
1380         if(get_uri_hash_keys(&key1, &key2, uri, &msg->parsed_uri, ds_flags) < 0)
1381                 return -1;
1382
1383         *hash = ds_get_hash(&key1, &key2);
1384         return 0;
1385 }
1386
1387 /**
1388  *
1389  */
1390 int ds_hash_authusername(struct sip_msg *msg, unsigned int *hash)
1391 {
1392         /* Header, which contains the authorization */
1393         struct hdr_field *h = 0;
1394         /* The Username */
1395         str username = {0, 0};
1396         /* The Credentials from this request */
1397         auth_body_t *cred;
1398
1399         if(msg == NULL || hash == NULL) {
1400                 LM_ERR("bad parameters\n");
1401                 return -1;
1402         }
1403         *hash = 0;
1404         if(parse_headers(msg, HDR_PROXYAUTH_F, 0) == -1) {
1405                 LM_ERR("error parsing headers!\n");
1406                 return -1;
1407         }
1408         if(msg->proxy_auth && !msg->proxy_auth->parsed) {
1409                 if(parse_credentials(msg->proxy_auth)!=0) {
1410                         LM_DBG("no parsing for proxy-auth header\n");
1411                 }
1412         }
1413         if(msg->proxy_auth && msg->proxy_auth->parsed) {
1414                 h = msg->proxy_auth;
1415         }
1416         if(!h) {
1417                 if(parse_headers(msg, HDR_AUTHORIZATION_F, 0) == -1) {
1418                         LM_ERR("error parsing headers!\n");
1419                         return -1;
1420                 }
1421                 if(msg->authorization && !msg->authorization->parsed) {
1422                         if(parse_credentials(msg->authorization)!=0) {
1423                                 LM_DBG("no parsing for auth header\n");
1424                         }
1425                 }
1426                 if(msg->authorization && msg->authorization->parsed) {
1427                         h = msg->authorization;
1428                 }
1429         }
1430         if(!h) {
1431                 LM_DBG("No Authorization-Header!\n");
1432                 return 1;
1433         }
1434
1435         cred = (auth_body_t *)(h->parsed);
1436         if(!cred || !cred->digest.username.user.len) {
1437                 LM_ERR("No Authorization-Username or Credentials!\n");
1438                 return 1;
1439         }
1440
1441         username.s = cred->digest.username.user.s;
1442         username.len = cred->digest.username.user.len;
1443
1444         trim(&username);
1445
1446         *hash = ds_get_hash(&username, NULL);
1447
1448         return 0;
1449 }
1450
1451
1452 /**
1453  *
1454  */
1455 int ds_hash_pvar(struct sip_msg *msg, unsigned int *hash)
1456 {
1457         /* The String to create the hash */
1458         str hash_str = {0, 0};
1459
1460         if(msg == NULL || hash == NULL || hash_param_model == NULL) {
1461                 LM_ERR("bad parameters\n");
1462                 return -1;
1463         }
1464         if(pv_printf_s(msg, hash_param_model, &hash_str) < 0) {
1465                 LM_ERR("error - cannot print the format\n");
1466                 return -1;
1467         }
1468
1469         /* Remove empty spaces */
1470         trim(&hash_str);
1471         if(hash_str.len <= 0) {
1472                 LM_ERR("String is empty!\n");
1473                 return -1;
1474         }
1475
1476         *hash = ds_get_hash(&hash_str, NULL);
1477         LM_DBG("Hashing of '%.*s' resulted in %u !\n", hash_str.len, hash_str.s,
1478                         *hash);
1479
1480         return 0;
1481 }
1482
1483 /**
1484  *
1485  */
1486 static inline int ds_get_index(int group, int ds_list_idx, ds_set_t **index)
1487 {
1488         ds_set_t *si = NULL;
1489
1490         if(index == NULL || group < 0 || ds_lists[ds_list_idx] == NULL)
1491                 return -1;
1492
1493         /* get the index of the set */
1494         si = ds_avl_find(ds_lists[ds_list_idx], group);
1495
1496         if(si == NULL)
1497                 return -1;
1498
1499         *index = si;
1500         return 0;
1501 }
1502
1503 /*
1504  * Check if a destination set exists
1505  */
1506 int ds_list_exist(int set)
1507 {
1508         ds_set_t *si = NULL;
1509         LM_DBG("looking for destination set [%d]\n", set);
1510
1511         /* get the index of the set */
1512         si = ds_avl_find(_ds_list, set);
1513
1514         if(si == NULL) {
1515                 LM_DBG("destination set [%d] not found\n", set);
1516                 return -1; /* False */
1517         }
1518         LM_DBG("destination set [%d] found\n", set);
1519         return 1; /* True */
1520 }
1521
1522 /**
1523  *
1524  */
1525 int ds_get_leastloaded(ds_set_t *dset)
1526 {
1527         int j;
1528         int k;
1529         int t;
1530
1531         k = -1;
1532         t = 0x7fffffff; /* high load */
1533         lock_get(&dset->lock); \
1534         for(j = 0; j < dset->nr; j++) {
1535                 if(!ds_skip_dst(dset->dlist[j].flags)
1536                                 && (dset->dlist[j].attrs.maxload == 0
1537                                                    || dset->dlist[j].dload
1538                                                                           < dset->dlist[j].attrs.maxload)) {
1539                         if(dset->dlist[j].dload < t) {
1540                                 k = j;
1541                                 t = dset->dlist[k].dload;
1542                         }
1543                 }
1544         }
1545         lock_release(&dset->lock); \
1546         return k;
1547 }
1548
1549 /**
1550  *
1551  */
1552 int ds_load_add(struct sip_msg *msg, ds_set_t *dset, int setid, int dst)
1553 {
1554         if(dset->dlist[dst].attrs.duid.len == 0) {
1555                 LM_ERR("dst unique id not set for %d (%.*s)\n", setid,
1556                                 msg->callid->body.len, msg->callid->body.s);
1557                 return -1;
1558         }
1559
1560         if(ds_add_cell(_dsht_load, &msg->callid->body, &dset->dlist[dst].attrs.duid,
1561                            setid)
1562                         < 0) {
1563                 LM_ERR("cannot add load to %d (%.*s)\n", setid, msg->callid->body.len,
1564                                 msg->callid->body.s);
1565                 return -1;
1566         }
1567         DS_LOAD_INC(dset, dst);
1568         return 0;
1569 }
1570
1571 /**
1572  *
1573  */
1574 int ds_load_replace(struct sip_msg *msg, str *duid)
1575 {
1576         ds_cell_t *it;
1577         int set;
1578         int olddst;
1579         int newdst;
1580         ds_set_t *idx = NULL;
1581         int i;
1582
1583         if(duid->len <= 0) {
1584                 LM_ERR("invalid dst unique id not set for (%.*s)\n",
1585                                 msg->callid->body.len, msg->callid->body.s);
1586                 return -1;
1587         }
1588
1589         if((it = ds_get_cell(_dsht_load, &msg->callid->body)) == NULL) {
1590                 LM_ERR("cannot find load for (%.*s)\n", msg->callid->body.len,
1591                                 msg->callid->body.s);
1592                 return -1;
1593         }
1594         set = it->dset;
1595         /* get the index of the set */
1596         if(ds_get_index(set, *crt_idx, &idx) != 0) {
1597                 ds_unlock_cell(_dsht_load, &msg->callid->body);
1598                 LM_ERR("destination set [%d] not found\n", set);
1599                 return -1;
1600         }
1601         olddst = -1;
1602         newdst = -1;
1603         for(i = 0; i < idx->nr; i++) {
1604                 if(idx->dlist[i].attrs.duid.len == it->duid.len
1605                                 && strncasecmp(
1606                                                    idx->dlist[i].attrs.duid.s, it->duid.s, it->duid.len)
1607                                                    == 0) {
1608                         olddst = i;
1609                         if(newdst != -1)
1610                                 break;
1611                 }
1612                 if(idx->dlist[i].attrs.duid.len == duid->len
1613                                 && strncasecmp(idx->dlist[i].attrs.duid.s, duid->s, duid->len)
1614                                                    == 0) {
1615                         newdst = i;
1616                         if(olddst != -1)
1617                                 break;
1618                 }
1619         }
1620         if(olddst == -1) {
1621                 ds_unlock_cell(_dsht_load, &msg->callid->body);
1622                 LM_ERR("old destination address not found for [%d, %.*s]\n", set,
1623                                 it->duid.len, it->duid.s);
1624                 return -1;
1625         }
1626         if(newdst == -1) {
1627                 ds_unlock_cell(_dsht_load, &msg->callid->body);
1628                 LM_ERR("new destination address not found for [%d, %.*s]\n", set,
1629                                 duid->len, duid->s);
1630                 return -1;
1631         }
1632
1633         ds_unlock_cell(_dsht_load, &msg->callid->body);
1634         ds_del_cell(_dsht_load, &msg->callid->body);
1635         DS_LOAD_DEC(idx, olddst);
1636
1637         if(ds_load_add(msg, idx, set, newdst) < 0) {
1638                 LM_ERR("unable to replace destination load [%.*s / %.*s]\n", duid->len,
1639                                 duid->s, msg->callid->body.len, msg->callid->body.s);
1640                 return -1;
1641         }
1642         return 0;
1643 }
1644
1645 /**
1646  *
1647  */
1648 int ds_load_remove_byid(int set, str *duid)
1649 {
1650         int olddst;
1651         ds_set_t *idx = NULL;
1652         int i;
1653
1654         /* get the index of the set */
1655         if(ds_get_index(set, *crt_idx, &idx) != 0) {
1656                 LM_ERR("destination set [%d] not found\n", set);
1657                 return -1;
1658         }
1659         olddst = -1;
1660         for(i = 0; i < idx->nr; i++) {
1661                 if(idx->dlist[i].attrs.duid.len == duid->len
1662                                 && strncasecmp(idx->dlist[i].attrs.duid.s, duid->s, duid->len)
1663                                                    == 0) {
1664                         olddst = i;
1665                         break;
1666                 }
1667         }
1668         if(olddst == -1) {
1669                 LM_ERR("old destination address not found for [%d, %.*s]\n", set,
1670                                 duid->len, duid->s);
1671                 return -1;
1672         }
1673
1674         DS_LOAD_DEC(idx, olddst);
1675
1676         return 0;
1677 }
1678
1679 /**
1680  *
1681  */
1682 int ds_load_remove(struct sip_msg *msg)
1683 {
1684         ds_cell_t *it;
1685
1686         if((it = ds_get_cell(_dsht_load, &msg->callid->body)) == NULL) {
1687                 LM_ERR("cannot find load for (%.*s)\n", msg->callid->body.len,
1688                                 msg->callid->body.s);
1689                 return -1;
1690         }
1691
1692         if (ds_load_remove_byid(it->dset, &it->duid) < 0) {
1693                 ds_unlock_cell(_dsht_load, &msg->callid->body);
1694                 return -1;
1695         }
1696         ds_unlock_cell(_dsht_load, &msg->callid->body);
1697         ds_del_cell(_dsht_load, &msg->callid->body);
1698
1699         return 0;
1700 }
1701
1702 /**
1703  *
1704  */
1705 int ds_load_state(struct sip_msg *msg, int state)
1706 {
1707         ds_cell_t *it;
1708
1709         if((it = ds_get_cell(_dsht_load, &msg->callid->body)) == NULL) {
1710                 LM_DBG("cannot find load for (%.*s)\n", msg->callid->body.len,
1711                                 msg->callid->body.s);
1712                 return -1;
1713         }
1714
1715         it->state = state;
1716         ds_unlock_cell(_dsht_load, &msg->callid->body);
1717
1718         return 0;
1719 }
1720
1721
1722 /**
1723  *
1724  */
1725 int ds_load_update(struct sip_msg *msg)
1726 {
1727         if(parse_headers(msg, HDR_CSEQ_F | HDR_CALLID_F, 0) != 0
1728                         || msg->cseq == NULL || msg->callid == NULL) {
1729                 LM_ERR("cannot parse cseq and callid headers\n");
1730                 return -1;
1731         }
1732         if(msg->first_line.type == SIP_REQUEST) {
1733                 if(msg->first_line.u.request.method_value == METHOD_BYE
1734                                 || msg->first_line.u.request.method_value == METHOD_CANCEL) {
1735                         /* off-load call */
1736                         ds_load_remove(msg);
1737                 }
1738                 return 0;
1739         }
1740
1741         if(get_cseq(msg)->method_id == METHOD_INVITE) {
1742                 /* if status is 2xx then set state to confirmed */
1743                 if(REPLY_CLASS(msg) == 2)
1744                         ds_load_state(msg, DS_LOAD_CONFIRMED);
1745         }
1746         return 0;
1747 }
1748
1749 /**
1750  *
1751  */
1752 int ds_load_unset(struct sip_msg *msg)
1753 {
1754         sr_xavp_t *rxavp = NULL;
1755
1756         if(ds_xavp_dst.len <= 0)
1757                 return 0;
1758
1759         /* for INVITE requests should be called after dst list is built */
1760         if(msg->first_line.type == SIP_REQUEST
1761                         && msg->first_line.u.request.method_value == METHOD_INVITE) {
1762                 rxavp = xavp_get_child_with_sval(&ds_xavp_dst, &ds_xavp_dst_dstid);
1763                 if(rxavp == NULL)
1764                         return 0;
1765         }
1766         return ds_load_remove(msg);
1767 }
1768
1769 /**
1770  *
1771  */
1772 static inline int ds_push_dst(sip_msg_t *msg, str *uri, socket_info_t *sock,
1773                 int mode)
1774 {
1775         struct action act;
1776         struct run_act_ctx ra_ctx;
1777         switch(mode) {
1778                 case DS_SETOP_RURI:
1779                         memset(&act, '\0', sizeof(act));
1780                         act.type = SET_HOSTALL_T;
1781                         act.val[0].type = STRING_ST;
1782                         if(uri->len > 4 && strncasecmp(uri->s, "sip:", 4) == 0) {
1783                                 act.val[0].u.string = uri->s + 4;
1784                         } else if(uri->len > 5 && strncasecmp(uri->s, "sips:", 5) == 0) {
1785                                 act.val[0].u.string = uri->s + 5;
1786                         } else {
1787                                 act.val[0].u.string = uri->s;
1788                         }
1789                         init_run_actions_ctx(&ra_ctx);
1790                         if(do_action(&ra_ctx, &act, msg) < 0) {
1791                                 LM_ERR("error while setting r-uri domain with: %.*s\n",
1792                                                 uri->len, uri->s);
1793                                 return -1;
1794                         }
1795                         break;
1796
1797                 case DS_SETOP_XAVP:
1798                         /* no update to d-uri/r-uri */
1799                         return 0;
1800
1801                 default:
1802                         if(set_dst_uri(msg, uri) < 0) {
1803                                 LM_ERR("error while setting dst uri with: %.*s\n",
1804                                                 uri->len, uri->s);
1805                                 return -1;
1806                         }
1807                         /* dst_uri changes, so it makes sense to re-use the current uri for
1808                          * forking */
1809                         ruri_mark_new(); /* re-use uri for serial forking */
1810                         break;
1811         }
1812         if(sock) {
1813                 msg->force_send_socket = sock;
1814         }
1815         return 0;
1816 }
1817
1818 /**
1819  *
1820  */
1821 int ds_add_branches(sip_msg_t *msg, ds_set_t *idx, unsigned int hash, int mode)
1822 {
1823         unsigned int i = 0;
1824         str ruri = STR_NULL;
1825         sip_uri_t *puri = NULL;
1826         char buri[MAX_URI_SIZE];
1827
1828         if(mode!=DS_SETOP_XAVP && hash+1>=idx->nr) {
1829                 /* nothing to add */
1830                 return 0;
1831         }
1832
1833         if(mode==DS_SETOP_RURI) {
1834                 /* ruri updates */
1835                 LM_DBG("adding branches with ruri\n");
1836                 if(parse_sip_msg_uri(msg)<0) {
1837                         LM_ERR("failed to parse sip msg uri\n");
1838                         return -1;
1839                 }
1840                 puri = &msg->parsed_uri;
1841         } else {
1842                 /* duri updates */
1843                 LM_DBG("adding branches with duri\n");
1844         }
1845         if(mode!=DS_SETOP_XAVP) {
1846                 i = hash + 1;
1847         } else {
1848                 i = hash;
1849         }
1850         for(; i<idx->nr; i++) {
1851                 if(mode==DS_SETOP_RURI) {
1852                         /* ruri updates */
1853                         if(puri->user.len<=0) {
1854                                 /* no username to preserve */
1855                                 if(append_branch(msg, &idx->dlist[i].uri, NULL, NULL,
1856                                                 Q_UNSPECIFIED, 0, idx->dlist[i].sock, NULL, 0,
1857                                                 NULL, NULL)<0) {
1858                                         LM_ERR("failed to add branch with ruri\n");
1859                                         return -1;
1860                                 }
1861                         } else {
1862                                 /* new uri from ruri username and dispatcher uri */
1863                                 if(idx->dlist[i].uri.len<6) {
1864                                         LM_WARN("invalid dispatcher uri - skipping (%u)\n", i);
1865                                         continue;
1866                                 }
1867                                 if(strncmp(idx->dlist[i].uri.s, "sips:", 5)==0) {
1868                                         ruri.len = snprintf(buri, MAX_URI_SIZE, "sips:%.*s@%.*s",
1869                                                         puri->user.len, puri->user.s,
1870                                                         idx->dlist[i].uri.len-5, idx->dlist[i].uri.s+5);
1871                                 } else {
1872                                         if(strncmp(idx->dlist[i].uri.s, "sip:", 4)==0) {
1873                                                 ruri.len = snprintf(buri, MAX_URI_SIZE, "sip:%.*s@%.*s",
1874                                                                 puri->user.len, puri->user.s,
1875                                                                 idx->dlist[i].uri.len-4, idx->dlist[i].uri.s+4);
1876                                         } else {
1877                                                 LM_WARN("unsupported protocol schema - ignoring\n");
1878                                                 continue;
1879                                         }
1880                                 }
1881                                 ruri.s = buri;
1882                                 if(append_branch(msg, &ruri, NULL, NULL,
1883                                                 Q_UNSPECIFIED, 0, idx->dlist[i].sock, NULL, 0,
1884                                                 NULL, NULL)<0) {
1885                                         LM_ERR("failed to add branch with user ruri\n");
1886                                         return -1;
1887                                 }
1888                         }
1889                 } else {
1890                         /* duri updates */
1891                         if(append_branch(msg, GET_RURI(msg), &idx->dlist[i].uri, NULL,
1892                                         Q_UNSPECIFIED, 0, idx->dlist[i].sock, NULL, 0,
1893                                         NULL, NULL)<0) {
1894                                 LM_ERR("failed to add branch with duri\n");
1895                                 return -1;
1896                         }
1897                 }
1898
1899         }
1900         return 0;
1901 }
1902
1903 /**
1904  *
1905  */
1906 int ds_add_xavp_record(ds_set_t *dsidx, int pos, int set, int alg,
1907                 sr_xavp_t **pxavp)
1908 {
1909         sr_xavp_t *nxavp=NULL;
1910         sr_xval_t nxval;
1911
1912         /* add destination uri field */
1913         memset(&nxval, 0, sizeof(sr_xval_t));
1914         nxval.type = SR_XTYPE_STR;
1915         nxval.v.s = dsidx->dlist[pos].uri;
1916         if(xavp_add_value(&ds_xavp_dst_addr, &nxval, &nxavp)==NULL) {
1917                 LM_ERR("failed to add destination uri xavp field\n");
1918                 return -1;
1919         }
1920
1921         /* add setid field */
1922         memset(&nxval, 0, sizeof(sr_xval_t));
1923         nxval.type = SR_XTYPE_INT;
1924         nxval.v.i = set;
1925         if(xavp_add_value(&ds_xavp_dst_grp, &nxval, &nxavp)==NULL) {
1926                 xavp_destroy_list(&nxavp);
1927                 LM_ERR("failed to add destination setid xavp field\n");
1928                 return -1;
1929         }
1930
1931         if(((ds_xavp_dst_mode & DS_XAVP_DST_SKIP_ATTRS) == 0)
1932                         && (dsidx->dlist[pos].attrs.body.len > 0)) {
1933                 memset(&nxval, 0, sizeof(sr_xval_t));
1934                 nxval.type = SR_XTYPE_STR;
1935                 nxval.v.s = dsidx->dlist[pos].attrs.body;
1936                 if(xavp_add_value(&ds_xavp_dst_attrs, &nxval, &nxavp)==NULL) {
1937                         xavp_destroy_list(&nxavp);
1938                         LM_ERR("failed to add destination attrs xavp field\n");
1939                         return -1;
1940                 }
1941         }
1942
1943         if(dsidx->dlist[pos].sock) {
1944                 memset(&nxval, 0, sizeof(sr_xval_t));
1945                 nxval.type = SR_XTYPE_VPTR;
1946                 nxval.v.vptr = dsidx->dlist[pos].sock;
1947                 if(xavp_add_value(&ds_xavp_dst_sock, &nxval, &nxavp)==NULL) {
1948                         xavp_destroy_list(&nxavp);
1949                         LM_ERR("failed to add destination sock xavp field\n");
1950                         return -1;
1951                 }
1952                 if((ds_xavp_dst_mode & DS_XAVP_DST_ADD_SOCKSTR)
1953                                 && (dsidx->dlist[pos].attrs.socket.len > 0)) {
1954                         memset(&nxval, 0, sizeof(sr_xval_t));
1955                         nxval.type = SR_XTYPE_STR;
1956                         nxval.v.s = dsidx->dlist[pos].attrs.socket;
1957                         if(xavp_add_value(&ds_xavp_dst_socket, &nxval, &nxavp)==NULL) {
1958                                 xavp_destroy_list(&nxavp);
1959                                 LM_ERR("failed to add socket address attrs xavp field\n");
1960                                 return -1;
1961                         }
1962                 }
1963                 if((ds_xavp_dst_mode & DS_XAVP_DST_ADD_SOCKNAME)
1964                                 && (dsidx->dlist[pos].attrs.sockname.len > 0)) {
1965                         memset(&nxval, 0, sizeof(sr_xval_t));
1966                         nxval.type = SR_XTYPE_STR;
1967                         nxval.v.s = dsidx->dlist[pos].attrs.sockname;
1968                         if(xavp_add_value(&ds_xavp_dst_sockname, &nxval, &nxavp)==NULL) {
1969                                 xavp_destroy_list(&nxavp);
1970                                 LM_ERR("failed to add socket name attrs xavp field\n");
1971                                 return -1;
1972                         }
1973                 }
1974         }
1975
1976         if(alg == DS_ALG_CALLLOAD) {
1977                 if(dsidx->dlist[pos].attrs.duid.len <= 0) {
1978                         LM_ERR("no uid for destination: %d %.*s\n", set,
1979                                         dsidx->dlist[pos].uri.len,
1980                                         dsidx->dlist[pos].uri.s);
1981                         xavp_destroy_list(&nxavp);
1982                         return -1;
1983                 }
1984                 memset(&nxval, 0, sizeof(sr_xval_t));
1985                 nxval.type = SR_XTYPE_STR;
1986                 nxval.v.s = dsidx->dlist[pos].attrs.duid;
1987                 if(xavp_add_value(&ds_xavp_dst_dstid, &nxval, &nxavp)==NULL) {
1988                         xavp_destroy_list(&nxavp);
1989                         LM_ERR("failed to add destination dst uid xavp field\n");
1990                         return -1;
1991                 }
1992         }
1993
1994         /* add xavp in root list */
1995         memset(&nxval, 0, sizeof(sr_xval_t));
1996         nxval.type = SR_XTYPE_XAVP;
1997         nxval.v.xavp = nxavp;
1998         if((*pxavp = xavp_add_value_after(&ds_xavp_dst, &nxval, *pxavp))==NULL) {
1999                 LM_ERR("cannot add dst xavp to root list\n");
2000                 xavp_destroy_list(&nxavp);
2001                 return -1;
2002         }
2003
2004         return 0;
2005 }
2006
2007 /**
2008  *
2009  */
2010 int ds_select_dst(struct sip_msg *msg, int set, int alg, int mode)
2011 {
2012         return ds_select_dst_limit(msg, set, alg, 0, mode);
2013 }
2014
2015 /**
2016  * Set destination address from group 'set' selected with alogorithm 'alg'
2017  * - the rest of addresses in group are added as next destination in xavps,
2018  *   up to the 'limit'
2019  * - mode specify to set address in R-URI or outboud proxy
2020  *
2021  */
2022 int ds_select_dst_limit(sip_msg_t *msg, int set, int alg, uint32_t limit,
2023                 int mode)
2024 {
2025         int ret;
2026         sr_xval_t nxval;
2027         ds_select_state_t vstate;
2028
2029         memset(&vstate, 0, sizeof(ds_select_state_t));
2030         vstate.setid = set;
2031         vstate.alg = alg;
2032         vstate.umode = mode;
2033         vstate.limit = limit;
2034
2035         if(vstate.limit == 0) {
2036                 LM_DBG("Limit set to 0 - forcing to unlimited\n");
2037                 vstate.limit = 0xffffffff;
2038         }
2039
2040         ret = ds_manage_routes(msg, &vstate);
2041         if(ret<0) {
2042                 return ret;
2043         }
2044
2045         /* add cnt value to xavp */
2046         if(((ds_xavp_ctx_mode & DS_XAVP_CTX_SKIP_CNT)==0)
2047                         && (ds_xavp_ctx.len >= 0)) {
2048                 /* add to xavp the number of selected dst records */
2049                 memset(&nxval, 0, sizeof(sr_xval_t));
2050                 nxval.type = SR_XTYPE_INT;
2051                 nxval.v.i = vstate.cnt;
2052                 if(xavp_add_xavp_value(&ds_xavp_ctx, &ds_xavp_ctx_cnt, &nxval, NULL)==NULL) {
2053                         LM_ERR("failed to add cnt value to xavp\n");
2054                         return -1;
2055                 }
2056         }
2057
2058         LM_DBG("selected target destinations: %d\n", vstate.cnt);
2059
2060         return ret;
2061 }
2062
2063 /**
2064  *
2065  */
2066 int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
2067 {
2068         int i;
2069         unsigned int hash;
2070         ds_set_t *idx = NULL;
2071         int ulast = 0;
2072
2073         if(msg == NULL) {
2074                 LM_ERR("bad parameters\n");
2075                 return -1;
2076         }
2077
2078         if(_ds_list == NULL || _ds_list_nr <= 0) {
2079                 LM_ERR("no destination sets\n");
2080                 return -1;
2081         }
2082
2083         if((rstate->umode == DS_SETOP_DSTURI) && (ds_force_dst == 0)
2084                         && (msg->dst_uri.s != NULL || msg->dst_uri.len > 0)) {
2085                 LM_ERR("destination already set [%.*s]\n", msg->dst_uri.len,
2086                                 msg->dst_uri.s);
2087                 return -1;
2088         }
2089
2090
2091         /* get the index of the set */
2092         if(ds_get_index(rstate->setid, *crt_idx, &idx) != 0) {
2093                 LM_ERR("destination set [%d] not found\n", rstate->setid);
2094                 return -1;
2095         }
2096
2097         LM_DBG("set [%d]\n", rstate->setid);
2098
2099         hash = 0;
2100         switch(rstate->alg) {
2101                 case DS_ALG_HASHCALLID: /* 0 - hash call-id */
2102                         if(ds_hash_callid(msg, &hash) != 0) {
2103                                 LM_ERR("can't get callid hash\n");
2104                                 return -1;
2105                         }
2106                         break;
2107                 case DS_ALG_HASHFROMURI: /* 1 - hash from-uri */
2108                         if(ds_hash_fromuri(msg, &hash) != 0) {
2109                                 LM_ERR("can't get From uri hash\n");
2110                                 return -1;
2111                         }
2112                         break;
2113                 case DS_ALG_HASHTOURI: /* 2 - hash to-uri */
2114                         if(ds_hash_touri(msg, &hash) != 0) {
2115                                 LM_ERR("can't get To uri hash\n");
2116                                 return -1;
2117                         }
2118                         break;
2119                 case DS_ALG_HASHRURI: /* 3 - hash r-uri */
2120                         if(ds_hash_ruri(msg, &hash) != 0) {
2121                                 LM_ERR("can't get ruri hash\n");
2122                                 return -1;
2123                         }
2124                         break;
2125                 case DS_ALG_ROUNDROBIN: /* 4 - round robin */
2126                         hash = idx->last;
2127                         idx->last = (idx->last + 1) % idx->nr;
2128                         ulast = 1;
2129                         break;
2130                 case DS_ALG_HASHAUTHUSER: /* 5 - hash auth username */
2131                         i = ds_hash_authusername(msg, &hash);
2132                         switch(i) {
2133                                 case 0:
2134                                         /* Authorization-Header found: Nothing to be done here */
2135                                         break;
2136                                 case 1:
2137                                         /* No Authorization found: Use round robin */
2138                                         hash = idx->last;
2139                                         idx->last = (idx->last + 1) % idx->nr;
2140                                         ulast = 1;
2141                                         break;
2142                                 default:
2143                                         LM_ERR("can't get authorization hash\n");
2144                                         return -1;
2145                         }
2146                         break;
2147                 case DS_ALG_RANDOM: /* 6 - random selection */
2148                         hash = kam_rand();
2149                         break;
2150                 case DS_ALG_HASHPV: /* 7 - hash on PV value */
2151                         if(ds_hash_pvar(msg, &hash) != 0) {
2152                                 LM_ERR("can't get PV hash\n");
2153                                 return -1;
2154                         }
2155                         break;
2156                 case DS_ALG_SERIAL: /* 8 - use always first entry */
2157                         hash = 0;
2158                         break;
2159                 case DS_ALG_WEIGHT: /* 9 - weight based distribution */
2160                         hash = idx->wlist[idx->wlast];
2161                         idx->wlast = (idx->wlast + 1) % 100;
2162                         break;
2163                 case DS_ALG_CALLLOAD: /* 10 - call load based distribution */
2164                         /* only INVITE can start a call */
2165                         if(msg->first_line.u.request.method_value != METHOD_INVITE) {
2166                                 /* use first entry */
2167                                 hash = 0;
2168                                 rstate->alg = 0;
2169                                 break;
2170                         }
2171                         if(ds_xavp_dst.len <= 0) {
2172                                 LM_ERR("no dst xavp for load distribution"
2173                                            " - using first entry...\n");
2174                                 hash = 0;
2175                                 rstate->alg = 0;
2176                         } else {
2177                                 i = ds_get_leastloaded(idx);
2178                                 if(i < 0) {
2179                                         /* no address selected */
2180                                         return -1;
2181                                 }
2182                                 hash = i;
2183                                 if(ds_load_add(msg, idx, rstate->setid, hash) < 0) {
2184                                         LM_ERR("unable to update destination load"
2185                                                    " - classic dispatching\n");
2186                                         rstate->alg = 0;
2187                                 }
2188                         }
2189                         break;
2190                 case DS_ALG_RELWEIGHT: /* 11 - relative weight based distribution */
2191                         hash = idx->rwlist[idx->rwlast];
2192                         idx->rwlast = (idx->rwlast + 1) % 100;
2193                         break;
2194                 case DS_ALG_PARALLEL: /* 12 - parallel dispatching */
2195                         hash = 0;
2196                         break;
2197                 default:
2198                         LM_WARN("algo %d not implemented - using first entry...\n",
2199                                         rstate->alg);
2200                         hash = 0;
2201         }
2202
2203         LM_DBG("using alg [%d] hash [%u]\n", rstate->alg, hash);
2204
2205         if(ds_use_default != 0 && idx->nr != 1)
2206                 hash = hash % (idx->nr - 1);
2207         else
2208                 hash = hash % idx->nr;
2209         i = hash;
2210
2211         /* if selected address is inactive, find next active */
2212         while(ds_skip_dst(idx->dlist[i].flags)) {
2213                 if(ds_use_default != 0 && idx->nr != 1)
2214                         i = (i + 1) % (idx->nr - 1);
2215                 else
2216                         i = (i + 1) % idx->nr;
2217                 if(i == hash) {
2218                         /* back to start -- looks like no active dst */
2219                         if(ds_use_default != 0) {
2220                                 i = idx->nr - 1;
2221                                 if(ds_skip_dst(idx->dlist[i].flags))
2222                                         return -1;
2223                                 break;
2224                         } else {
2225                                 return -1;
2226                         }
2227                 }
2228         }
2229
2230         hash = i;
2231
2232         if(rstate->umode!=DS_SETOP_XAVP) {
2233                 if(ds_push_dst(msg, &idx->dlist[hash].uri, idx->dlist[hash].sock,
2234                                         rstate->umode) != 0) {
2235                         LM_ERR("cannot set next hop address with: %.*s\n",
2236                                         idx->dlist[hash].uri.len, idx->dlist[hash].uri.s);
2237                         return -1;
2238                 }
2239                 rstate->emode = 1;
2240         }
2241
2242         /* update last field for next select to point after the current active used */
2243         if(ulast) {
2244                 idx->last = (hash + 1) % idx->nr;
2245         }
2246
2247         LM_DBG("selected [%d-%d-%d/%d] <%.*s>\n", rstate->alg, rstate->setid,
2248                         rstate->umode, hash,
2249                         idx->dlist[hash].uri.len, idx->dlist[hash].uri.s);
2250
2251         if(rstate->alg == DS_ALG_PARALLEL) {
2252                 if(ds_add_branches(msg, idx, hash, rstate->umode)<0) {
2253                         LM_ERR("failed to add additional branches\n");
2254                         /* one destination was already set - return success anyhow */
2255                         return 2;
2256                 }
2257                 return 1;
2258         }
2259
2260         if(!(ds_flags & DS_FAILOVER_ON))
2261                 return 1;
2262
2263         if(ds_xavp_dst.len<=0) {
2264                 /* no xavp name to store the rest of the records */
2265                 return 1;
2266         }
2267
2268         LM_DBG("using first entry [%d/%d]\n", rstate->setid, hash);
2269         if(ds_add_xavp_record(idx, hash, rstate->setid, rstate->alg,
2270                                 &rstate->lxavp)<0) {
2271                 LM_ERR("failed to add destination in the xavp (%d/%d)\n",
2272                                 hash, rstate->setid);
2273                 return -1;
2274         }
2275         rstate->cnt++;
2276
2277         /* add to xavp the destinations after the selected one */
2278         for(i = hash + 1; i < idx->nr && rstate->cnt < rstate->limit; i++) {
2279                 if(ds_skip_dst(idx->dlist[i].flags)
2280                                 || (ds_use_default != 0 && i == (idx->nr - 1))) {
2281                         continue;
2282                 }
2283                 /* max load exceeded per destination */
2284                 if(rstate->alg == DS_ALG_CALLLOAD
2285                                 && idx->dlist[i].attrs.maxload != 0
2286                                 && idx->dlist[i].dload >= idx->dlist[i].attrs.maxload) {
2287                         continue;
2288                 }
2289                 LM_DBG("using entry [%d/%d]\n", rstate->setid, i);
2290                 if(ds_add_xavp_record(idx, i, rstate->setid, rstate->alg,
2291                                         &rstate->lxavp)<0) {
2292                         LM_ERR("failed to add destination in the xavp (%d/%d)\n",
2293                                         i, rstate->setid);
2294                         return -1;
2295                 }
2296                 rstate->cnt++;
2297         }
2298
2299         /* add to xavp the destinations before the selected one */
2300         for(i = 0; i < hash && rstate->cnt < rstate->limit; i++) {
2301                 if(ds_skip_dst(idx->dlist[i].flags)
2302                                 || (ds_use_default != 0 && i == (idx->nr - 1))) {
2303                         continue;
2304                 }
2305                 /* max load exceeded per destination */
2306                 if(rstate->alg == DS_ALG_CALLLOAD
2307                                 && idx->dlist[i].attrs.maxload != 0
2308                                 && idx->dlist[i].dload >= idx->dlist[i].attrs.maxload) {
2309                         continue;
2310                 }
2311                 LM_DBG("using entry [%d/%d]\n", rstate->setid, i);
2312                 if(ds_add_xavp_record(idx, i, rstate->setid, rstate->alg,
2313                                         &rstate->lxavp)<0) {
2314                         LM_ERR("failed to add destination in the xavp (%d/%d)\n",
2315                                         i, rstate->setid);
2316                         return -1;
2317                 }
2318                 rstate->cnt++;
2319         }
2320
2321         /* add default dst to last position in XAVP list */
2322         if(ds_use_default != 0 && hash != idx->nr - 1
2323                                 && rstate->cnt < rstate->limit) {
2324                 LM_DBG("using default entry [%d/%d]\n", rstate->setid, idx->nr - 1);
2325                 if(ds_add_xavp_record(idx, idx->nr - 1, rstate->setid, rstate->alg,
2326                                         &rstate->lxavp)<0) {
2327                         LM_ERR("failed to add default destination in the xavp\n");
2328                         return -1;
2329                 }
2330                 rstate->cnt++;
2331         }
2332
2333         return 1;
2334 }
2335
2336 int ds_update_dst(struct sip_msg *msg, int upos, int mode)
2337 {
2338
2339         socket_info_t *sock = NULL;
2340         sr_xavp_t *rxavp = NULL;
2341         sr_xavp_t *lxavp = NULL;
2342
2343         LM_DBG("updating dst\n");
2344         if(upos == DS_USE_NEXT) {
2345                 if(!(ds_flags & DS_FAILOVER_ON) || ds_xavp_dst.len <= 0) {
2346                         LM_WARN("failover support disabled\n");
2347                         return -1;
2348                 }
2349         }
2350
2351         rxavp = xavp_get(&ds_xavp_dst, NULL);
2352         if(rxavp == NULL || rxavp->val.type != SR_XTYPE_XAVP) {
2353                 LM_DBG("no xavp with previous destination record\n");
2354                 return -1;
2355         }
2356
2357         if(upos == DS_USE_NEXT) {
2358                 LM_DBG("updating dst with next record\n");
2359                 /* use next destination - delete the current one and search the next */
2360                 xavp_rm(rxavp, NULL);
2361
2362                 rxavp = xavp_get(&ds_xavp_dst, NULL);
2363                 if(rxavp == NULL || rxavp->val.type != SR_XTYPE_XAVP) {
2364                         LM_DBG("no xavp with next destination record\n");
2365                         return -1;
2366                 }
2367         }
2368
2369         /* retrieve attributes from sub list */
2370         rxavp = rxavp->val.v.xavp;
2371         lxavp = xavp_get(&ds_xavp_dst_sock, rxavp);
2372         if(lxavp!=NULL && lxavp->val.type==SR_XTYPE_VPTR) {
2373                 LM_DBG("socket enforced in next destination record\n");
2374                 sock = lxavp->val.v.vptr;
2375         }
2376
2377         lxavp = xavp_get(&ds_xavp_dst_addr, rxavp);
2378         if(lxavp==NULL || lxavp->val.type!=SR_XTYPE_STR) {
2379                 LM_WARN("no xavp uri field in next destination record (%p)\n", lxavp);
2380                 return -1;
2381         }
2382
2383         if(ds_push_dst(msg, &lxavp->val.v.s, sock, mode) != 0) {
2384                 LM_ERR("cannot set dst addr: %.*s\n", lxavp->val.v.s.len,
2385                                 lxavp->val.v.s.s);
2386                 return -1;
2387         }
2388         LM_DBG("using next dst uri [%.*s]\n", lxavp->val.v.s.len,
2389                                 lxavp->val.v.s.s);
2390
2391         /* call load update if dstid field is set */
2392         lxavp = xavp_get(&ds_xavp_dst_dstid, rxavp);
2393         if(lxavp==NULL || lxavp->val.type!=SR_XTYPE_STR) {
2394                 /* no dstid field - done */
2395                 return 1;
2396         }
2397         if(upos == DS_USE_NEXT) {
2398                 if(ds_load_replace(msg, &lxavp->val.v.s) < 0) {
2399                         LM_ERR("cannot update load distribution\n");
2400                         return -1;
2401                 }
2402         }
2403
2404         return 1;
2405 }
2406
2407 /* callback for adding nodes based on index */
2408 void ds_add_dest_cb(ds_set_t *node, int i, void *arg)
2409 {
2410         int setn;
2411
2412         if(add_dest2list(node->id, node->dlist[i].uri, node->dlist[i].flags,
2413                         node->dlist[i].priority, &node->dlist[i].attrs.body, *next_idx,
2414                         &setn) != 0) {
2415                 LM_WARN("failed to add destination in group %d - %.*s\n",
2416                                 node->id, node->dlist[i].uri.len, node->dlist[i].uri.s);
2417         }
2418         return;
2419 }
2420
2421 /* add dispatcher entry to in-memory dispatcher list */
2422 int ds_add_dst(int group, str *address, int flags)
2423 {
2424         int setn, priority;
2425         str attrs;
2426
2427         setn = _ds_list_nr;
2428         priority = 0;
2429         attrs.s = 0;
2430         attrs.len = 0;
2431
2432         *next_idx = (*crt_idx + 1) % 2;
2433         ds_avl_destroy(&ds_lists[*next_idx]);
2434
2435         // add all existing destinations
2436         ds_iter_set(_ds_list, &ds_add_dest_cb, NULL);
2437
2438         // add new destination
2439         if(add_dest2list(group, *address, flags, priority, &attrs,
2440                         *next_idx, &setn) != 0) {
2441                 LM_WARN("unable to add destination %.*s to set %d", address->len, address->s, group);
2442                 if(ds_load_mode==1) {
2443                         goto error;
2444                 }
2445         }
2446
2447         if(reindex_dests(ds_lists[*next_idx]) != 0) {
2448                 LM_ERR("error on reindex\n");
2449                 goto error;
2450         }
2451
2452         _ds_list_nr = setn;
2453         *crt_idx = *next_idx;
2454         ds_ht_clear_slots(_dsht_load);
2455         ds_log_sets();
2456         return 0;
2457
2458 error:
2459         ds_avl_destroy(&ds_lists[*next_idx]);
2460         *next_idx = *crt_idx;
2461         return -1;
2462 }
2463
2464 /* callback for removing nodes based on setid & address */
2465 void ds_filter_dest_cb(ds_set_t *node, int i, void *arg)
2466 {
2467         struct ds_filter_dest_cb_arg *filter_arg = (typeof(filter_arg)) arg;
2468
2469         if(node->id == filter_arg->setid && node->dlist[i].uri.len == filter_arg->dest->uri.len &&
2470                 strncmp(node->dlist[i].uri.s, filter_arg->dest->uri.s, filter_arg->dest->uri.len) == 0)
2471                 return;
2472
2473         if(add_dest2list(node->id, node->dlist[i].uri, node->dlist[i].flags,
2474                         node->dlist[i].priority, &node->dlist[i].attrs.body, *next_idx,
2475                         filter_arg->setn) != 0) {
2476                 LM_WARN("failed to add destination in group %d - %.*s\n",
2477                                 node->id, node->dlist[i].uri.len, node->dlist[i].uri.s);
2478         }
2479         return;
2480 }
2481
2482 /* remove dispatcher entry from in-memory dispatcher list */
2483 int ds_remove_dst(int group, str *address)
2484 {
2485         int setn;
2486         struct ds_filter_dest_cb_arg filter_arg;
2487         ds_dest_t *dp = NULL;
2488
2489         setn = 0;
2490
2491         dp = pack_dest(*address, 0, 0, NULL);
2492         filter_arg.setid = group;
2493         filter_arg.dest = dp;
2494         filter_arg.setn = &setn;
2495
2496         *next_idx = (*crt_idx + 1) % 2;
2497         ds_avl_destroy(&ds_lists[*next_idx]);
2498
2499         // add existing destinations except destination that matches group & address
2500         ds_iter_set(_ds_list, &ds_filter_dest_cb, &filter_arg);
2501
2502         if(reindex_dests(ds_lists[*next_idx]) != 0) {
2503                 LM_ERR("error on reindex\n");
2504                 goto error;
2505         }
2506
2507         _ds_list_nr = setn;
2508         *crt_idx = *next_idx;
2509         ds_ht_clear_slots(_dsht_load);
2510         ds_log_sets();
2511         return 0;
2512
2513 error:
2514         ds_avl_destroy(&ds_lists[*next_idx]);
2515         *next_idx = *crt_idx;
2516         return -1;
2517 }
2518
2519 int ds_mark_dst(struct sip_msg *msg, int state)
2520 {
2521         sr_xavp_t *rxavp = NULL;
2522         int group;
2523         int ret;
2524         ds_rctx_t rctx;
2525
2526         if(!(ds_flags & DS_FAILOVER_ON)) {
2527                 LM_WARN("failover support disabled\n");
2528                 return -1;
2529         }
2530
2531         if(ds_xavp_dst.len<=0) {
2532                 LM_WARN("no xavp name to store dst records\n");
2533                 return -1;
2534         }
2535         rxavp = xavp_get_child_with_ival(&ds_xavp_dst, &ds_xavp_dst_grp);
2536
2537         if(rxavp == NULL)
2538                 return -1; /* grp xavp not available */
2539         group = rxavp->val.v.i;
2540
2541         rxavp = xavp_get_child_with_sval(&ds_xavp_dst, &ds_xavp_dst_addr);
2542
2543         if(rxavp == NULL )
2544                 return -1; /* dst addr uri not available */
2545
2546         memset(&rctx, 0, sizeof(ds_rctx_t));
2547         if(msg!=NULL) {
2548                 if(msg!=FAKED_REPLY) {
2549                         if(msg->first_line.type == SIP_REPLY) {
2550                                 rctx.flags |= 1;
2551                                 rctx.code = (int)msg->first_line.u.reply.statuscode;
2552                                 rctx.reason = msg->first_line.u.reply.reason;
2553                         } else {
2554                                 rctx.code = 820;
2555                         }
2556                 } else {
2557                         rctx.code = 810;
2558                 }
2559         } else {
2560                 rctx.code = 800;
2561         }
2562         ret = ds_update_state(msg, group, &rxavp->val.v.s, state, &rctx);
2563
2564         LM_DBG("state [%d] grp [%d] dst [%.*s]\n", state, group, rxavp->val.v.s.len,
2565                         rxavp->val.v.s.s);
2566
2567         return (ret == 0) ? 1 : -1;
2568 }
2569
2570 static inline void latency_stats_update(ds_latency_stats_t *latency_stats, int latency) {
2571         /* after 2^21 ~24 days at 1s interval, the average becomes a weighted average */
2572         if (latency_stats->count < 2097152) {
2573                 latency_stats->count++;
2574         } else { /* We adjust the sum of squares used by the oneline algorithm proportionally */
2575                 latency_stats->m2 -= latency_stats->m2/latency_stats->count;
2576         }
2577         if (latency_stats->count == 1) {
2578                 latency_stats->stdev = 0.0f;
2579                 latency_stats->m2 = 0.0f;
2580                 latency_stats->max = latency;
2581                 latency_stats->min = latency;
2582                 latency_stats->average = latency;
2583                 latency_stats->estimate = latency;
2584         }
2585         /* train the average if stable after 10 samples */
2586         if (latency_stats->count > 10 && latency_stats->stdev < 0.5) latency_stats->count = 500000;
2587         if (latency_stats->min > latency)
2588                 latency_stats->min = latency;
2589         if (latency_stats->max < latency)
2590                 latency_stats->max = latency;
2591
2592         /* standard deviation using oneline algorithm */
2593         /* https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm */
2594         if (latency_stats->count > 1) {
2595                 float delta = latency - latency_stats->average;
2596                 latency_stats->average += delta/latency_stats->count;
2597                 float delta2 = latency - latency_stats->average;
2598                 latency_stats->m2 += delta*delta2;
2599                 latency_stats->stdev = sqrt(latency_stats->m2 / (latency_stats->count-1));
2600         }
2601         /* exponentialy weighted moving average */
2602         if (latency_stats->count < 10) {
2603                 latency_stats->estimate = latency_stats->average;
2604         } else {
2605                 latency_stats->estimate = latency_stats->estimate*ds_latency_estimator_alpha
2606                                           + latency*(1-ds_latency_estimator_alpha);
2607         }
2608 }
2609
2610 int ds_update_latency(int group, str *address, int code)
2611 {
2612         int i = 0;
2613         int state = 0;
2614         ds_set_t *idx = NULL;
2615
2616         if(_ds_list == NULL || _ds_list_nr <= 0) {
2617                 LM_ERR("the list is null\n");
2618                 return -1;
2619         }
2620
2621         /* get the index of the set */
2622         if(ds_get_index(group, *crt_idx, &idx) != 0) {
2623                 LM_ERR("destination set [%d] not found\n", group);
2624                 return -1;
2625         }
2626         int apply_rweights = 0;
2627         int all_gw_congested = 1;
2628         int total_congestion_ms = 0;
2629         lock_get(&idx->lock);
2630         while (i < idx->nr) {
2631                 ds_dest_t *ds_dest = &idx->dlist[i];
2632                 ds_latency_stats_t *latency_stats = &ds_dest->latency_stats;
2633                 if (ds_dest->uri.len == address->len
2634                                 && strncasecmp(ds_dest->uri.s, address->s, address->len) == 0) {
2635                         /* Destination address found, this is the gateway that was pinged. */
2636                         state = ds_dest->flags;
2637                         if (code == 408 && latency_stats->timeout < UINT32_MAX)
2638                                 latency_stats->timeout++;
2639                         struct timeval now;
2640                         gettimeofday(&now, NULL);
2641                         int latency_ms = (now.tv_sec - latency_stats->start.tv_sec)*1000
2642                             + (now.tv_usec - latency_stats->start.tv_usec)/1000;
2643                         latency_stats_update(latency_stats, latency_ms);
2644
2645                         int congestion_ms = latency_stats->estimate - latency_stats->average;
2646                         if (congestion_ms < 0) congestion_ms = 0;
2647                         total_congestion_ms += congestion_ms;
2648
2649                         /* Adjusting weight using congestion detection based on latency estimator. */
2650                         if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight) {
2651                                 int active_weight = ds_dest->attrs.weight - congestion_ms;
2652                                 if (active_weight <= 0) {
2653                                         active_weight = 0;
2654                                 } else {
2655                                         all_gw_congested = 0;
2656                                 }
2657                                 if (ds_dest->attrs.rweight != active_weight) {
2658                                         apply_rweights = 1;
2659                                         ds_dest->attrs.rweight = active_weight;
2660                                 }
2661                                 LM_DBG("[%d]latency[%d]avg[%.2f][%.*s]code[%d]rweight[%d]cms[%d]\n",
2662                                         latency_stats->count, latency_ms,
2663                                         latency_stats->average, address->len, address->s,
2664                                         code, ds_dest->attrs.rweight, congestion_ms);
2665                         }
2666                 } else {
2667                         /* Another gateway in the set, we verify if it is congested. */
2668                         int congestion_ms = latency_stats->estimate - latency_stats->average;
2669                         if (congestion_ms < 0) congestion_ms = 0;
2670                         total_congestion_ms += congestion_ms;
2671                         int active_weight = ds_dest->attrs.weight - congestion_ms;
2672                         if (active_weight > 0) all_gw_congested = 0;
2673                 }
2674                 if (!ds_dest->attrs.congestion_control) all_gw_congested = 0;
2675                 i++;
2676         }
2677         /* All the GWs are above their congestion threshold, load distribution will now be based on
2678          * the ratio of congestion_ms each GW is facing. */
2679         if (all_gw_congested) {
2680                 i = 0;
2681                 while (i < idx->nr) {
2682                         ds_dest_t *ds_dest = &idx->dlist[i];
2683                         ds_latency_stats_t *latency_stats = &ds_dest->latency_stats;
2684                         int congestion_ms = latency_stats->estimate - latency_stats->average;
2685                         /* We multiply by 2^4 to keep enough precision */
2686                         int active_weight = (total_congestion_ms << 4) / congestion_ms;
2687                         if (ds_dest->attrs.rweight != active_weight) {
2688                                 apply_rweights = 1;
2689                                 ds_dest->attrs.rweight = active_weight;
2690                         }
2691                         LM_DBG("all gw congested[%d][%d]latency_avg[%.2f][%.*s]code[%d]rweight[%d/%d:%d]cms[%d]\n",
2692                                         total_congestion_ms, latency_stats->count, latency_stats->average,
2693                                         address->len, address->s, code, total_congestion_ms, congestion_ms,
2694                                         ds_dest->attrs.rweight, congestion_ms);
2695                 i++;
2696                 }
2697         }
2698
2699         lock_release(&idx->lock);
2700         if (apply_rweights) dp_init_relative_weights(idx);
2701         return state;
2702 }
2703
2704
2705 /**
2706  * Get state for given destination
2707  */
2708 int ds_get_state(int group, str *address)
2709 {
2710         int i = 0;
2711         ds_set_t *idx = NULL;
2712
2713         if(_ds_list == NULL || _ds_list_nr <= 0) {
2714                 LM_ERR("the list is null\n");
2715                 return -1;
2716         }
2717
2718         /* get the index of the set */
2719         if(ds_get_index(group, *crt_idx, &idx) != 0) {
2720                 LM_ERR("destination set [%d] not found\n", group);
2721                 return -1;
2722         }
2723
2724         while(i < idx->nr) {
2725                 if(idx->dlist[i].uri.len == address->len
2726                                 && strncasecmp(idx->dlist[i].uri.s, address->s, address->len)
2727                                                    == 0) {
2728                         /* destination address found */
2729                         return idx->dlist[i].flags;
2730                 }
2731                 i++;
2732         }
2733         return 0;
2734 }
2735
2736 /**
2737  * Update destionation's state
2738  */
2739 int ds_update_state(sip_msg_t *msg, int group, str *address, int state,
2740                 ds_rctx_t *rctx)
2741 {
2742         int i = 0;
2743         int old_state = 0;
2744         int init_state = 0;
2745         ds_set_t *idx = NULL;
2746
2747         if(_ds_list == NULL || _ds_list_nr <= 0) {
2748                 LM_ERR("the list is null\n");
2749                 return -1;
2750         }
2751
2752         /* get the index of the set */
2753         if(ds_get_index(group, *crt_idx, &idx) != 0) {
2754                 LM_ERR("destination set [%d] not found\n", group);
2755                 return -1;
2756         }
2757         LM_DBG("update state for %.*s in group %d to %d\n", address->len, address->s, group, state);
2758
2759         while(i < idx->nr) {
2760                 if(idx->dlist[i].uri.len == address->len
2761                                 && strncasecmp(idx->dlist[i].uri.s, address->s, address->len)
2762                                                    == 0) {
2763                         /* destination address found */
2764                         old_state = idx->dlist[i].flags;
2765
2766                         /* reset the bits used for states */
2767                         idx->dlist[i].flags &= ~(DS_STATES_ALL);
2768
2769                         /* we need the initial state for inactive counter */
2770                         init_state = state;
2771
2772                         if((state & DS_TRYING_DST) && (old_state & DS_INACTIVE_DST)) {
2773                                 /* old state is inactive, new state is trying => keep it inactive
2774                                  * - it has to go first to active state and then to trying */
2775                                 state &= ~(DS_TRYING_DST);
2776                                 state |= DS_INACTIVE_DST;
2777                         }
2778
2779                         /* set the new states */
2780                         if(state & DS_DISABLED_DST) {
2781                                 idx->dlist[i].flags |= DS_DISABLED_DST;
2782                         } else {
2783                                 idx->dlist[i].flags |= state;
2784                         }
2785
2786                         if(state & DS_TRYING_DST) {
2787                                 idx->dlist[i].message_count++;
2788                                 LM_DBG("destination did not replied %d times, threshold %d\n",
2789                                                  idx->dlist[i].message_count, probing_threshold);
2790                                 /* Destination is not replying.. Increasing failure counter */
2791                                 if(idx->dlist[i].message_count >= probing_threshold) {
2792                                         /* Destination has too much lost messages.. Bringing it to inactive state */
2793                                         idx->dlist[i].flags &= ~DS_TRYING_DST;
2794                                         idx->dlist[i].flags |= DS_INACTIVE_DST;
2795                                         idx->dlist[i].message_count = 0;
2796                                         LM_DBG("deactivate destination, threshold %d reached\n", probing_threshold);
2797                                 }
2798                         } else {
2799                                 if(!(init_state & DS_TRYING_DST)
2800                                                 && (old_state & DS_INACTIVE_DST)) {
2801                                         idx->dlist[i].message_count++;
2802                                         /* Destination was inactive but it is just replying.. Increasing successful counter */
2803                                         if(idx->dlist[i].message_count < inactive_threshold) {
2804                                                 /* Destination has not enough successful replies.. Leaving it into inactive state */
2805                                                 idx->dlist[i].flags |= DS_INACTIVE_DST;
2806                                                 /* if destination was in probing state, we stay there for now */
2807                                                 if((old_state & DS_PROBING_DST) != 0) {
2808                                                         idx->dlist[i].flags |= DS_PROBING_DST;
2809                                                 }
2810                                                 LM_DBG("destination replied successful %d times, threshold %d\n",
2811                                                                  idx->dlist[i].message_count, inactive_threshold);
2812                                         } else {
2813                                                 /* Destination has enough replied messages.. Bringing it to active state */
2814                                                 idx->dlist[i].message_count = 0;
2815                                                 LM_DBG("activate destination, threshold %d reached\n", inactive_threshold);
2816                                         }
2817                                 } else {
2818                                         idx->dlist[i].message_count = 0;
2819                                 }
2820                         }
2821
2822                         if(!ds_skip_dst(old_state) && ds_skip_dst(idx->dlist[i].flags)) {
2823                                 ds_run_route(msg, address, "dispatcher:dst-down", rctx);
2824
2825                         } else {
2826                                 if(ds_skip_dst(old_state) && !ds_skip_dst(idx->dlist[i].flags))
2827                                         ds_run_route(msg, address, "dispatcher:dst-up", rctx);
2828                         }
2829                         if(idx->dlist[i].attrs.rweight > 0)
2830                                 ds_reinit_rweight_on_state_change(
2831                                                 old_state, idx->dlist[i].flags, idx);
2832
2833                         LM_DBG("old state was %d, set new state to %d\n", old_state, idx->dlist[i].flags);
2834                         return 0;
2835                 }
2836                 i++;
2837         }
2838
2839         return -1;
2840 }
2841
2842 /**
2843  *
2844  */
2845 static ds_rctx_t *_ds_rctx = NULL;
2846
2847 /**
2848  *
2849  */
2850 ds_rctx_t* ds_get_rctx(void)
2851 {
2852         return _ds_rctx;
2853 }
2854
2855 static void ds_run_route(sip_msg_t *msg, str *uri, char *route, ds_rctx_t *rctx)
2856 {
2857         int rt, backup_rt;
2858         struct run_act_ctx ctx;
2859         sip_msg_t *fmsg;
2860         sr_kemi_eng_t *keng = NULL;
2861         str evname;
2862
2863         if(route == NULL) {
2864                 LM_ERR("bad route\n");
2865                 return;
2866         }
2867
2868         LM_DBG("executing event_route[%s]\n", route);
2869
2870         rt = -1;
2871         if(ds_event_callback.s==NULL || ds_event_callback.len<=0) {
2872                 rt = route_lookup(&event_rt, route);
2873                 if(rt < 0 || event_rt.rlist[rt] == NULL) {
2874                         LM_DBG("route does not exist");
2875                         return;
2876                 }
2877         } else {
2878                 keng = sr_kemi_eng_get();
2879                 if(keng==NULL) {
2880                         LM_DBG("event callback (%s) set, but no cfg engine\n",
2881                                         ds_event_callback.s);
2882                         return;
2883                 }
2884         }
2885
2886         if(msg == NULL) {
2887                 if(faked_msg_init() < 0) {
2888                         LM_ERR("faked_msg_init() failed\n");
2889                         return;
2890                 }
2891                 fmsg = faked_msg_next();
2892                 fmsg->parsed_orig_ruri_ok = 0;
2893                 fmsg->new_uri = *uri;
2894         } else {
2895                 fmsg = msg;
2896         }
2897
2898         if(rt>=0 || ds_event_callback.len>0) {
2899                 _ds_rctx = rctx;
2900                 backup_rt = get_route_type();
2901                 set_route_type(REQUEST_ROUTE);
2902                 init_run_actions_ctx(&ctx);
2903                 if(rt>=0) {
2904                         run_top_route(event_rt.rlist[rt], fmsg, 0);
2905                 } else {
2906                         if(keng!=NULL) {
2907                                 evname.s = route;
2908                                 evname.len = strlen(evname.s);
2909                                 if(sr_kemi_route(keng, fmsg, EVENT_ROUTE,
2910                                                         &ds_event_callback, &evname)<0) {
2911                                         LM_ERR("error running event route kemi callback\n");
2912                                 }
2913                         }
2914                 }
2915                 set_route_type(backup_rt);
2916                 _ds_rctx = NULL;
2917         }
2918 }
2919
2920
2921 /**
2922  * recalculate relative states if some destination state was changed
2923  */
2924 int ds_reinit_rweight_on_state_change(
2925                 int old_state, int new_state, ds_set_t *dset)
2926 {
2927         if(dset == NULL) {
2928                 LM_ERR("destination set is null\n");
2929                 return -1;
2930         }
2931         if((!ds_skip_dst(old_state) && ds_skip_dst(new_state))
2932                         || (ds_skip_dst(old_state) && !ds_skip_dst(new_state))) {
2933                 dp_init_relative_weights(dset);
2934         }
2935
2936         return 0;
2937 }
2938
2939
2940 /**
2941  *
2942  */
2943 int ds_reinit_state(int group, str *address, int state)
2944 {
2945         int i = 0;
2946         ds_set_t *idx = NULL;
2947
2948         if(_ds_list == NULL || _ds_list_nr <= 0) {
2949                 LM_ERR("the list is null\n");
2950                 return -1;
2951         }
2952
2953         /* get the index of the set */
2954         if(ds_get_index(group, *crt_idx, &idx) != 0) {
2955                 LM_ERR("destination set [%d] not found\n", group);
2956                 return -1;
2957         }
2958
2959         for(i = 0; i < idx->nr; i++) {
2960                 if(idx->dlist[i].uri.len == address->len
2961                                 && strncasecmp(idx->dlist[i].uri.s, address->s, address->len)
2962                                                    == 0) {
2963                         int old_state = idx->dlist[i].flags;
2964                         /* reset the bits used for states */
2965                         idx->dlist[i].flags &= ~(DS_STATES_ALL);
2966                         /* set the new states */
2967                         idx->dlist[i].flags |= state;
2968                         if(idx->dlist[i].attrs.rweight > 0) {
2969                                 ds_reinit_rweight_on_state_change(
2970                                                 old_state, idx->dlist[i].flags, idx);
2971                         }
2972
2973                         return 0;
2974                 }
2975         }
2976         LM_ERR("destination address [%d : %.*s] not found\n", group, address->len,
2977                         address->s);
2978         return -1;
2979 }
2980
2981 /**
2982  *
2983  */
2984 int ds_reinit_state_all(int group, int state)
2985 {
2986         int i = 0;
2987         ds_set_t *idx = NULL;
2988
2989         if(_ds_list == NULL || _ds_list_nr <= 0) {
2990                 LM_ERR("the list is null\n");
2991                 return -1;
2992         }
2993
2994         /* get the index of the set */
2995         if(ds_get_index(group, *crt_idx, &idx) != 0) {
2996                 LM_ERR("destination set [%d] not found\n", group);
2997                 return -1;
2998         }
2999
3000         for(i = 0; i < idx->nr; i++) {
3001                 int old_state = idx->dlist[i].flags;
3002                 /* reset the bits used for states */
3003                 idx->dlist[i].flags &= ~(DS_STATES_ALL);
3004                 /* set the new states */
3005                 idx->dlist[i].flags |= state;
3006                 if(idx->dlist[i].attrs.rweight > 0) {
3007                         ds_reinit_rweight_on_state_change(
3008                                         old_state, idx->dlist[i].flags, idx);
3009                 }
3010         }
3011         return 0;
3012 }
3013
3014 /**
3015  *