589b54f5b8dbd32f27d9656d792bce44278b60a4
[sip-router] / src / modules / dispatcher / dispatch.c
1 /*
2  * dispatcher module
3  *
4  * Copyright (C) 2004-2006 FhG Fokus
5  * Copyright (C) 2005 Voice-System.ro
6  * Copyright (C) 2015 Daniel-Constantin Mierla (asipto.com)
7  *
8  * This file is part of Kamailio, a free SIP server.
9  *
10  * Kamailio is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version
14  *
15  * Kamailio is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
23  */
24
25 /*! \file
26  * \ingroup dispatcher
27  * \brief Dispatcher :: Dispatch
28  */
29
30 #include <stdio.h>
31 #include <string.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <time.h>
35 #include <math.h>
36
37 #include "../../core/ut.h"
38 #include "../../core/trim.h"
39 #include "../../core/dprint.h"
40 #include "../../core/action.h"
41 #include "../../core/route.h"
42 #include "../../core/dset.h"
43 #include "../../core/mem/shm_mem.h"
44 #include "../../core/parser/parse_uri.h"
45 #include "../../core/parser/parse_from.h"
46 #include "../../core/parser/parse_param.h"
47 #include "../../core/xavp.h"
48 #include "../../core/parser/digest/digest.h"
49 #include "../../core/resolve.h"
50 #include "../../core/lvalue.h"
51 #include "../../modules/tm/tm_load.h"
52 #include "../../lib/srdb1/db.h"
53 #include "../../lib/srdb1/db_res.h"
54 #include "../../core/str.h"
55 #include "../../core/script_cb.h"
56 #include "../../core/kemi.h"
57 #include "../../core/fmsg.h"
58
59 #include "ds_ht.h"
60 #include "api.h"
61 #include "dispatch.h"
62
63 #define DS_TABLE_VERSION 1
64 #define DS_TABLE_VERSION2 2
65 #define DS_TABLE_VERSION3 3
66 #define DS_TABLE_VERSION4 4
67
68 #define DS_ALG_HASHCALLID 0
69 #define DS_ALG_HASHFROMURI 1
70 #define DS_ALG_HASHTOURI 2
71 #define DS_ALG_HASHRURI 3
72 #define DS_ALG_ROUNDROBIN 4
73 #define DS_ALG_HASHAUTHUSER 5
74 #define DS_ALG_RANDOM 6
75 #define DS_ALG_HASHPV 7
76 #define DS_ALG_SERIAL 8
77 #define DS_ALG_WEIGHT 9
78 #define DS_ALG_CALLLOAD 10
79 #define DS_ALG_RELWEIGHT 11
80 #define DS_ALG_PARALLEL 12
81
82 static int _ds_table_version = DS_TABLE_VERSION;
83
84 static ds_ht_t *_dsht_load = NULL;
85
86 static int *_ds_ping_active = NULL;
87
88 extern int ds_force_dst;
89 extern str ds_event_callback;
90 extern int ds_ping_latency_stats;
91 extern float ds_latency_estimator_alpha;
92 extern int ds_attrs_none;
93 extern param_t *ds_db_extra_attrs_list;
94 extern int ds_load_mode;
95
96 static db_func_t ds_dbf;
97 static db1_con_t *ds_db_handle = NULL;
98
99 ds_set_t **ds_lists = NULL;
100
101 int *ds_list_nr = NULL;
102 int *crt_idx = NULL;
103 int *next_idx = NULL;
104
105 #define _ds_list (ds_lists[*crt_idx])
106 #define _ds_list_nr (*ds_list_nr)
107
108 static void ds_run_route(struct sip_msg *msg, str *uri, char *route,
109                 ds_rctx_t *rctx);
110
111 void shuffle_uint100array(unsigned int *arr);
112 int ds_reinit_rweight_on_state_change(
113                 int old_state, int new_state, ds_set_t *dset);
114
115 /**
116  *
117  */
118 int ds_ping_active_init(void)
119 {
120         if(_ds_ping_active != NULL)
121                 return 0;
122         _ds_ping_active = (int *)shm_malloc(sizeof(int));
123         if(_ds_ping_active == NULL) {
124                 LM_ERR("no more shared memory\n");
125                 return -1;
126         }
127         *_ds_ping_active = 1;
128         return 0;
129 }
130
131 /**
132  *
133  */
134 int ds_ping_active_get(void)
135 {
136         if(_ds_ping_active == NULL)
137                 return -1;
138         return *_ds_ping_active;
139 }
140
141 /**
142  *
143  */
144 int ds_ping_active_set(int v)
145 {
146         if(_ds_ping_active == NULL)
147                 return -1;
148         *_ds_ping_active = v;
149         return 0;
150 }
151
152 /**
153  *
154  */
155 int ds_hash_load_init(unsigned int htsize, int expire, int initexpire)
156 {
157         if(_dsht_load != NULL)
158                 return 0;
159         _dsht_load = ds_ht_init(htsize, expire, initexpire);
160         if(_dsht_load == NULL)
161                 return -1;
162         return 0;
163 }
164
165 /**
166  *
167  */
168 int ds_hash_load_destroy(void)
169 {
170         if(_dsht_load == NULL)
171                 return -1;
172         ds_ht_destroy(_dsht_load);
173         _dsht_load = NULL;
174         return 0;
175 }
176
177 /**
178  * Recursivly iterate over ds_set and execute callback
179  */
180 void ds_iter_set(ds_set_t *node, void (*ds_action_cb)(ds_set_t *node, int i, void *arg),
181                 void *ds_action_arg)
182 {
183         if(!node)
184                 return;
185
186         int i;
187
188         for(i = 0; i < 2; ++i)
189                 ds_iter_set(node->next[i], ds_action_cb, ds_action_arg);
190
191         for(i = 0; i < node->nr; i++) {
192                 ds_action_cb(node, i, ds_action_arg);
193         }
194
195         return;
196 }
197
198 void ds_log_dst_cb(ds_set_t *node, int i, void *arg)
199 {
200         LM_DBG("dst>> %d %.*s %d %d (%.*s,%d,%d,%d)\n", node->id,
201                 node->dlist[i].uri.len, node->dlist[i].uri.s,
202                 node->dlist[i].flags, node->dlist[i].priority,
203                 node->dlist[i].attrs.duid.len, node->dlist[i].attrs.duid.s,
204                 node->dlist[i].attrs.maxload, node->dlist[i].attrs.weight,
205                 node->dlist[i].attrs.rweight);
206 }
207
208 /**
209  * Recursivly print ds_set
210  */
211 void ds_log_set(ds_set_t *node)
212 {
213         ds_iter_set(node, &ds_log_dst_cb, NULL);
214
215         return;
216 }
217
218 /**
219  *
220  */
221 int ds_log_sets(void)
222 {
223         if(_ds_list == NULL)
224                 return -1;
225
226         ds_log_set(_ds_list);
227
228         return 0;
229 }
230
231 /**
232  *
233  */
234 int ds_init_data(void)
235 {
236         int *p;
237
238         ds_lists = (ds_set_t **)shm_malloc(2 * sizeof(ds_set_t *));
239         if(!ds_lists) {
240                 LM_ERR("Out of memory\n");
241                 return -1;
242         }
243         memset(ds_lists, 0, 2 * sizeof(ds_set_t *));
244
245
246         p = (int *)shm_malloc(3 * sizeof(int));
247         if(!p) {
248                 LM_ERR("Out of memory\n");
249                 return -1;
250         }
251         memset(p, 0, 3 * sizeof(int));
252
253         crt_idx = p;
254         next_idx = p + 1;
255         ds_list_nr = p + 2;
256         *crt_idx = *next_idx = 0;
257
258         return 0;
259 }
260
261 /**
262  *
263  */
264 int ds_set_attrs(ds_dest_t *dest, str *vattrs)
265 {
266         param_t *params_list = NULL;
267         param_hooks_t phooks;
268         param_t *pit = NULL;
269         str param;
270         int tmp_rweight = 0;
271         str sattrs;
272
273         if(vattrs == NULL || vattrs->len <= 0) {
274                 if(ds_attrs_none==0) {
275                         return 0;
276                 }
277                 sattrs.s = "none=yes";
278                 sattrs.len = 8;
279         } else {
280                 sattrs = *vattrs;
281         }
282         if(sattrs.s[sattrs.len - 1] == ';')
283                 sattrs.len--;
284         /* clone in shm */
285         dest->attrs.body.s = (char *)shm_malloc(sattrs.len + 1);
286         if(dest->attrs.body.s == NULL) {
287                 LM_ERR("no more shm\n");
288                 return -1;
289         }
290         memcpy(dest->attrs.body.s, sattrs.s, sattrs.len);
291         dest->attrs.body.s[sattrs.len] = '\0';
292         dest->attrs.body.len = sattrs.len;
293
294         param = dest->attrs.body;
295         if(parse_params(&param, CLASS_ANY, &phooks, &params_list) < 0)
296                 return -1;
297         for(pit = params_list; pit; pit = pit->next) {
298                 if(pit->name.len == 4 && strncasecmp(pit->name.s, "duid", 4) == 0) {
299                         dest->attrs.duid = pit->body;
300                 } else if(pit->name.len == 2
301                                   && strncasecmp(pit->name.s, "cc", 2) == 0) {
302                         str2sint(&pit->body, &dest->attrs.congestion_control);
303                 } else if(pit->name.len == 6
304                                   && strncasecmp(pit->name.s, "weight", 6) == 0) {
305                         str2sint(&pit->body, &dest->attrs.weight);
306                 } else if(pit->name.len == 7
307                                   && strncasecmp(pit->name.s, "maxload", 7) == 0) {
308                         str2sint(&pit->body, &dest->attrs.maxload);
309                 } else if(pit->name.len == 6
310                                   && strncasecmp(pit->name.s, "socket", 6) == 0) {
311                         dest->attrs.socket = pit->body;
312                 } else if(pit->name.len == 8
313                                   && strncasecmp(pit->name.s, "sockname", 8) == 0) {
314                         dest->attrs.sockname = pit->body;
315                 } else if(pit->name.len == 7
316                                   && strncasecmp(pit->name.s, "rweight", 7) == 0) {
317                         tmp_rweight = 0;
318                         str2sint(&pit->body, &tmp_rweight);
319                         if(tmp_rweight >= 1 && tmp_rweight <= 100) {
320                                 dest->attrs.rweight = tmp_rweight;
321                         } else {
322                                 LM_ERR("rweight %d not in 1-100 range; skipped", tmp_rweight);
323                         }
324                 } else if(pit->name.len == 9
325                                 && strncasecmp(pit->name.s, "ping_from", 9) == 0) {
326                         dest->attrs.ping_from = pit->body;
327                 } else if(pit->name.len == 7 
328                                   && strncasecmp(pit->name.s, "obproxy", 7) == 0) {
329                         dest->attrs.obproxy = pit->body;
330                 }
331         }
332         if(params_list)
333                 free_params(params_list);
334         return 0;
335 }
336
337 /**
338  *
339  */
340 ds_dest_t *pack_dest(str iuri, int flags, int priority, str *attrs)
341 {
342         ds_dest_t *dp = NULL;
343         /* For DNS-Lookups */
344         static char hn[256];
345         char ub[512];
346         struct hostent *he;
347         struct sip_uri puri;
348         str host;
349         int port, proto;
350         char c = 0;
351         str uri;
352
353         uri = iuri;
354         /* check uri */
355         if(parse_uri(uri.s, uri.len, &puri) != 0) {
356                 if(iuri.len>4 && strncmp(iuri.s, "sip:", 4)!=0 && iuri.len<500) {
357                         memcpy(ub, "sip:", 4);
358                         memcpy(ub+4, iuri.s, iuri.len);
359                         ub[iuri.len+4] = '\0';
360                         uri.s = ub;
361                         uri.len = iuri.len+4;
362                         if(parse_uri(uri.s, uri.len, &puri) != 0) {
363                                 LM_ERR("bad uri [%.*s]\n", iuri.len, iuri.s);
364                                 goto err;
365                         } else {
366                                 LM_INFO("uri without sip scheme - fixing it: %.*s\n",
367                                                 iuri.len, iuri.s);
368                         }
369                 } else {
370                         LM_ERR("bad uri [%.*s]\n", iuri.len, iuri.s);
371                         goto err;
372                 }
373         }
374
375         if(puri.host.len > 254) {
376                 LM_ERR("hostname in uri is too long [%.*s]\n", uri.len, uri.s);
377                 goto err;
378         }
379
380         /* skip IPv6 references if IPv6 lookups are disabled */
381         if(default_core_cfg.dns_try_ipv6 == 0 && puri.host.s[0] == '['
382                         && puri.host.s[puri.host.len - 1] == ']') {
383                 LM_DBG("skipping IPv6 record %.*s\n", puri.host.len, puri.host.s);
384                 return NULL;
385         }
386
387         /* store uri */
388         dp = (ds_dest_t *)shm_malloc(sizeof(ds_dest_t));
389         if(dp == NULL) {
390                 LM_ERR("no more memory!\n");
391                 goto err;
392         }
393         memset(dp, 0, sizeof(ds_dest_t));
394
395         dp->uri.s = (char *)shm_malloc((uri.len + 1) * sizeof(char));
396         if(dp->uri.s == NULL) {
397                 LM_ERR("no more memory!\n");
398                 goto err;
399         }
400         strncpy(dp->uri.s, uri.s, uri.len);
401         dp->uri.s[uri.len] = '\0';
402         dp->uri.len = uri.len;
403
404         dp->flags = flags;
405         dp->priority = priority;
406
407         if(ds_set_attrs(dp, attrs) < 0) {
408                 LM_ERR("cannot set attributes!\n");
409                 goto err;
410         }
411
412         /* set send socket by name or address */
413         if(dp->attrs.sockname.s && dp->attrs.sockname.len > 0) {
414                 dp->sock = ksr_get_socket_by_name(&dp->attrs.sockname);
415                 if(dp->sock == 0) {
416                         LM_ERR("non-local socket name <%.*s>\n", dp->attrs.sockname.len,
417                                         dp->attrs.sockname.s);
418                         goto err;
419                 }
420         } else if(dp->attrs.socket.s && dp->attrs.socket.len > 0) {
421                 /* parse_phostport(...) expects 0-terminated string
422                  * - after socket parameter is either ';' or '\0' */
423                 STR_VTOZ(dp->attrs.socket.s[dp->attrs.socket.len], c);
424                 if(parse_phostport(
425                                    dp->attrs.socket.s, &host.s, &host.len, &port, &proto)
426                                 != 0) {
427                         LM_ERR("bad socket <%.*s>\n", dp->attrs.socket.len,
428                                         dp->attrs.socket.s);
429                         STR_ZTOV(dp->attrs.socket.s[dp->attrs.socket.len], c);
430                         goto err;
431                 }
432                 STR_ZTOV(dp->attrs.socket.s[dp->attrs.socket.len], c);
433                 dp->sock = grep_sock_info(&host, (unsigned short)port, proto);
434                 if(dp->sock == 0) {
435                         LM_ERR("non-local socket <%.*s>\n", dp->attrs.socket.len,
436                                         dp->attrs.socket.s);
437                         goto err;
438                 }
439         } else if(ds_default_sockinfo) {
440                 dp->sock = ds_default_sockinfo;
441         }
442
443         /* The Hostname needs to be \0 terminated for resolvehost, so we
444          * make a copy here. */
445         strncpy(hn, puri.host.s, puri.host.len);
446         hn[puri.host.len] = '\0';
447
448         /* Do a DNS-Lookup for the Host-Name: */
449         he = resolvehost(hn);
450         if(he == 0) {
451                 if(dp->flags & DS_NODNSARES_DST) {
452                         dp->irmode |= DS_IRMODE_NOIPADDR;
453                 } else {
454                         LM_ERR("could not resolve %.*s (missing no-probing flag?!?)\n",
455                                         puri.host.len, puri.host.s);
456                         goto err;
457                 }
458         } else {
459                 /* Store hostent in the dispatcher structure */
460                 hostent2ip_addr(&dp->ip_address, he, 0);
461         }
462
463         /* Copy the port out of the URI */
464         dp->port = puri.port_no;
465         /* Copy the proto out of the URI */
466         dp->proto = puri.proto;
467
468         return dp;
469 err:
470         if(dp != NULL) {
471                 if(dp->uri.s != NULL)
472                         shm_free(dp->uri.s);
473                 if(dp->attrs.body.s != NULL)
474                         shm_free(dp->attrs.body.s);
475                 shm_free(dp);
476         }
477
478         return NULL;
479 }
480
481 /**
482  *
483  */
484 int add_dest2list(int id, str uri, int flags, int priority, str *attrs,
485                 int list_idx, int *setn)
486 {
487         ds_dest_t *dp = NULL;
488         ds_set_t *sp = NULL;
489         ds_dest_t *dp0 = NULL;
490         ds_dest_t *dp1 = NULL;
491
492         dp = pack_dest(uri, flags, priority, attrs);
493         if(!dp)
494                 goto err;
495
496         sp = ds_avl_insert(&ds_lists[list_idx], id, setn);
497         if(!sp) {
498                 LM_ERR("no more memory.\n");
499                 goto err;
500         }
501         sp->nr++;
502
503         if(sp->dlist == NULL) {
504                 sp->dlist = dp;
505         } else {
506                 dp1 = NULL;
507                 dp0 = sp->dlist;
508                 /* highest priority last -> reindex will copy backwards */
509                 while(dp0) {
510                         if(dp0->priority > dp->priority)
511                                 break;
512                         dp1 = dp0;
513                         dp0 = dp0->next;
514                 }
515                 if(dp1 == NULL) {
516                         dp->next = sp->dlist;
517                         sp->dlist = dp;
518                 } else {
519                         dp->next = dp1->next;
520                         dp1->next = dp;
521                 }
522         }
523
524         LM_DBG("dest [%d/%d] <%.*s>\n", sp->id, sp->nr, dp->uri.len, dp->uri.s);
525
526         return 0;
527 err:
528         if(dp != NULL) {
529                 if(dp->uri.s != NULL)
530                         shm_free(dp->uri.s);
531                 if(dp->attrs.body.s != NULL)
532                         shm_free(dp->attrs.body.s);
533                 shm_free(dp);
534         }
535
536         return -1;
537 }
538
539
540 /* for internal usage; arr must be arr[100] */
541 void shuffle_uint100array(unsigned int *arr)
542 {
543         if(arr == NULL)
544                 return;
545         int k;
546         int j;
547         unsigned int t;
548         for(j = 0; j < 100; j++) {
549                 k = j + (kam_rand() % (100 - j));
550                 t = arr[j];
551                 arr[j] = arr[k];
552                 arr[k] = t;
553         }
554 }
555
556
557 /**
558  * Initialize the relative weight distribution for a destination set
559  * - fill the array of 0..99 elements where to keep the index of the
560  *   destination address to be used. The Nth call will use
561  *   the address with the index at possition N%100
562  */
563 int dp_init_relative_weights(ds_set_t *dset)
564 {
565         int j;
566         int k;
567         int t;
568         int *ds_dests_flags = NULL;
569         int *ds_dests_rweights = NULL;
570         int current_slice;
571         int rw_sum;
572         unsigned int last_insert;
573
574         if(dset == NULL || dset->dlist == NULL || dset->nr < 2)
575                 return -1;
576
577         /* local copy to avoid syncronization problems */
578         ds_dests_flags = pkg_malloc(sizeof(int) * dset->nr);
579         if(ds_dests_flags == NULL) {
580                 LM_ERR("no more pkg\n");
581                 return -1;
582         }
583         ds_dests_rweights = pkg_malloc(sizeof(int) * dset->nr);
584         if(ds_dests_rweights == NULL) {
585                 LM_ERR("no more pkg\n");
586                 pkg_free(ds_dests_flags);
587                 return -1;
588         }
589
590
591         /* needed to sync the rwlist access */
592         lock_get(&dset->lock);
593         rw_sum = 0;
594         /* find the sum of relative weights */
595         for(j = 0; j < dset->nr; j++) {
596                 ds_dests_flags[j] = dset->dlist[j].flags;
597                 ds_dests_rweights[j] = dset->dlist[j].attrs.rweight;
598                 if(ds_skip_dst(ds_dests_flags[j]))
599                         continue;
600                 rw_sum += ds_dests_rweights[j];
601         }
602
603         if(rw_sum == 0)
604                 goto ret;
605
606         /* fill the array based on the relative weight of each destination */
607         t = 0;
608         for(j = 0; j < dset->nr; j++) {
609                 if(ds_skip_dst(ds_dests_flags[j]))
610                         continue;
611
612                 current_slice =
613                                 ds_dests_rweights[j] * 100 / rw_sum; /* truncate here */
614                 LM_DBG("rw_sum[%d][%d][%d]\n",j, rw_sum, current_slice);
615                 for(k = 0; k < current_slice; k++) {
616                         dset->rwlist[t] = (unsigned int)j;
617                         t++;
618                 }
619         }
620
621         /* if the array was not completely filled (i.e., the sum of rweights is
622          * less than 100 due to truncated), then use last address to fill the rest */
623         last_insert = t > 0 ? dset->rwlist[t - 1] : (unsigned int)(dset->nr - 1);
624         for(j = t; j < 100; j++)
625                 dset->rwlist[j] = last_insert;
626
627         /* shuffle the content of the array in order to mix the selection
628          * of the addresses (e.g., if first address has weight=20, avoid
629          * sending first 20 calls to it, but ensure that within a 100 calls,
630          * 20 go to first address */
631         shuffle_uint100array(dset->rwlist);
632         goto ret;
633
634 ret:
635         lock_release(&dset->lock);
636         pkg_free(ds_dests_flags);
637         pkg_free(ds_dests_rweights);
638         return 0;
639 }
640
641
642 /**
643  * Initialize the weight distribution for a destination set
644  * - fill the array of 0..99 elements where to keep the index of the
645  *   destination address to be used. The Nth call will use
646  *   the address with the index at possition N%100
647  */
648 int dp_init_weights(ds_set_t *dset)
649 {
650         int j;
651         int k;
652         int t;
653
654         if(dset == NULL || dset->dlist == NULL)
655                 return -1;
656
657         /* is weight set for dst list? (first address must have weight!=0) */
658         if(dset->dlist[0].attrs.weight == 0)
659                 return 0;
660
661         /* first fill the array based on the weight of each destination
662          * - the weight is the percentage (e.g., if weight=20, the afferent
663          *   address gets its index 20 times in the array)
664          * - if the sum of weights is more than 100, the addresses over the
665          *   limit are ignored */
666         t = 0;
667         for(j = 0; j < dset->nr; j++) {
668                 for(k = 0; k < dset->dlist[j].attrs.weight; k++) {
669                         if(t >= 100)
670                                 goto randomize;
671                         dset->wlist[t] = (unsigned int)j;
672                         t++;
673                 }
674         }
675         /* if the array was not completely filled (i.e., the sum of weights is
676          * less than 100), then use last address to fill the rest */
677         for(; t < 100; t++)
678                 dset->wlist[t] = (unsigned int)(dset->nr - 1);
679 randomize:
680         /* shuffle the content of the array in order to mix the selection
681          * of the addresses (e.g., if first address has weight=20, avoid
682          * sending first 20 calls to it, but ensure that within a 100 calls,
683          * 20 go to first address */
684         shuffle_uint100array(dset->wlist);
685
686         return 0;
687 }
688
689 /*! \brief  compact destinations from sets for fast access */
690 int reindex_dests(ds_set_t *node)
691 {
692         int i = 0;
693         int j = 0;
694
695         if(!node)
696                 return 0;
697
698         for(; i < 2; ++i) {
699                 int rc = reindex_dests(node->next[i]);
700                 if(rc != 0)
701                         return rc;
702         }
703
704         ds_dest_t *dp = NULL, *dp0 = NULL;
705
706         dp0 = (ds_dest_t *)shm_malloc(node->nr * sizeof(ds_dest_t));
707         if(dp0 == NULL) {
708                 LM_ERR("no more memory!\n");
709                 goto err1;
710         }
711         memset(dp0, 0, node->nr * sizeof(ds_dest_t));
712
713         /* copy from the old pointer to destination, and then free it */
714         for(j = node->nr - 1; j >= 0 && node->dlist != NULL; j--) {
715                 memcpy(&dp0[j], node->dlist, sizeof(ds_dest_t));
716                 if(j == node->nr - 1)
717                         dp0[j].next = NULL;
718                 else
719                         dp0[j].next = &dp0[j + 1];
720
721
722                 dp = node->dlist;
723                 node->dlist = dp->next;
724
725                 shm_free(dp);
726                 dp = NULL;
727         }
728         node->dlist = dp0;
729         dp_init_weights(node);
730         dp_init_relative_weights(node);
731
732         return 0;
733
734 err1:
735         return -1;
736 }
737
738 /*! \brief load groups of destinations from file */
739 int ds_load_list(char *lfile)
740 {
741         char line[1024], *p;
742         FILE *f = NULL;
743         int id, setn, flags, priority;
744         str uri;
745         str attrs;
746
747         if((*crt_idx) != (*next_idx)) {
748                 LM_WARN("load command already generated, aborting reload...\n");
749                 return 0;
750         }
751
752         if(lfile == NULL || strlen(lfile) <= 0) {
753                 LM_ERR("bad list file\n");
754                 return -1;
755         }
756
757         f = fopen(lfile, "r");
758         if(f == NULL) {
759                 LM_ERR("can't open list file [%s]\n", lfile);
760                 return -1;
761         }
762
763         id = setn = flags = priority = 0;
764
765         *next_idx = (*crt_idx + 1) % 2;
766         ds_avl_destroy(&ds_lists[*next_idx]);
767
768         p = fgets(line, 1024, f);
769         while(p) {
770                 /* eat all white spaces */
771                 while(*p && (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n'))
772                         p++;
773                 if(*p == '\0' || *p == '#')
774                         goto next_line;
775
776                 /* get set id */
777                 id = 0;
778                 while(*p >= '0' && *p <= '9') {
779                         id = id * 10 + (*p - '0');
780                         p++;
781                 }
782
783                 /* eat all white spaces */
784                 while(*p && (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n'))
785                         p++;
786                 if(*p == '\0' || *p == '#') {
787                         LM_ERR("bad line [%s]\n", line);
788                         goto error;
789                 }
790
791                 /* get uri */
792                 uri.s = p;
793                 while(*p && *p != ' ' && *p != '\t' && *p != '\r' && *p != '\n'
794                                 && *p != '#')
795                         p++;
796                 uri.len = p - uri.s;
797
798                 /* eat all white spaces */
799                 while(*p && (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n'))
800                         p++;
801
802                 /* get flags */
803                 flags = 0;
804                 priority = 0;
805                 attrs.s = 0;
806                 attrs.len = 0;
807                 if(*p == '\0' || *p == '#')
808                         goto add_destination; /* no flags given */
809
810                 while(*p >= '0' && *p <= '9') {
811                         flags = flags * 10 + (*p - '0');
812                         p++;
813                 }
814
815                 /* eat all white spaces */
816                 while(*p && (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n'))
817                         p++;
818
819                 /* get priority */
820                 if(*p == '\0' || *p == '#')
821                         goto add_destination; /* no priority given */
822
823                 while(*p >= '0' && *p <= '9') {
824                         priority = priority * 10 + (*p - '0');
825                         p++;
826                 }
827
828                 /* eat all white spaces */
829                 while(*p && (*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n'))
830                         p++;
831                 if(*p == '\0' || *p == '#')
832                         goto add_destination; /* no attrs given */
833
834                 /* get attributes */
835                 attrs.s = p;
836                 while(*p && *p != ' ' && *p != '\t' && *p != '\r' && *p != '\n')
837                         p++;
838                 attrs.len = p - attrs.s;
839
840 add_destination:
841                 if(add_dest2list(id, uri, flags, priority, &attrs, *next_idx, &setn)
842                                 != 0) {
843                         LM_WARN("unable to add destination %.*s to set %d -- skipping\n",
844                                         uri.len, uri.s, id);
845                         if(ds_load_mode==1) {
846                                 goto error;
847                         }
848                 }
849 next_line:
850                 p = fgets(line, 1024, f);
851         }
852
853         if(reindex_dests(ds_lists[*next_idx]) != 0) {
854                 LM_ERR("error on reindex\n");
855                 goto error;
856         }
857
858         LM_DBG("found [%d] dest sets\n", _ds_list_nr);
859
860         fclose(f);
861         f = NULL;
862         /* Update list - should it be sync'ed? */
863         _ds_list_nr = setn;
864         *crt_idx = *next_idx;
865         ds_ht_clear_slots(_dsht_load);
866         ds_log_sets();
867         return 0;
868
869 error:
870         if(f != NULL)
871                 fclose(f);
872         ds_avl_destroy(&ds_lists[*next_idx]);
873         *next_idx = *crt_idx;
874         return -1;
875 }
876
877 /**
878  *
879  */
880 int ds_connect_db(void)
881 {
882         if(ds_db_url.s == NULL)
883                 return -1;
884
885         if((ds_db_handle = ds_dbf.init(&ds_db_url)) == 0) {
886                 LM_ERR("cannot initialize db connection\n");
887                 return -1;
888         }
889         return 0;
890 }
891
892 /**
893  *
894  */
895 void ds_disconnect_db(void)
896 {
897         if(ds_db_handle) {
898                 ds_dbf.close(ds_db_handle);
899                 ds_db_handle = 0;
900         }
901 }
902
903 /*! \brief Initialize and verify DB stuff*/
904 int ds_init_db(void)
905 {
906         int ret;
907
908         if(ds_table_name.s == 0) {
909                 LM_ERR("invalid database name\n");
910                 return -1;
911         }
912
913         /* Find a database module */
914         if(db_bind_mod(&ds_db_url, &ds_dbf) < 0) {
915                 LM_ERR("Unable to bind to a database driver\n");
916                 return -1;
917         }
918
919         if(ds_connect_db() != 0) {
920                 LM_ERR("unable to connect to the database\n");
921                 return -1;
922         }
923
924         _ds_table_version = db_table_version(&ds_dbf, ds_db_handle, &ds_table_name);
925         if(_ds_table_version < 0) {
926                 LM_ERR("failed to query table version\n");
927                 return -1;
928         } else if(_ds_table_version != DS_TABLE_VERSION
929                           && _ds_table_version != DS_TABLE_VERSION2
930                           && _ds_table_version != DS_TABLE_VERSION3
931                           && _ds_table_version != DS_TABLE_VERSION4) {
932                 LM_ERR("invalid table version (found %d , required %d, %d, %d or %d)\n"
933                            "(use kamdbctl reinit)\n",
934                                 _ds_table_version, DS_TABLE_VERSION, DS_TABLE_VERSION2,
935                                 DS_TABLE_VERSION3, DS_TABLE_VERSION4);
936                 return -1;
937         }
938
939         ret = ds_load_db();
940         if(ret == -2) {
941                 LM_WARN("failure while loading one or more dispatcher entries\n");
942                 ret = 0;
943         }
944
945         ds_disconnect_db();
946
947         return ret;
948 }
949
950 /*! \brief reload groups of destinations from DB*/
951 int ds_reload_db(void)
952 {
953         int ret;
954
955         if(ds_connect_db() != 0) {
956                 LM_ERR("unable to connect to the database\n");
957                 return -1;
958         }
959         ret = ds_load_db();
960         if(ret == -2) {
961                 LM_WARN("failure while loading one or more dispatcher entries\n");
962         }
963         ds_disconnect_db();
964
965         return ret;
966 }
967
968 /*! \brief load groups of destinations from DB*/
969 int ds_load_db(void)
970 {
971         int i, id, nr_rows, setn;
972         int flags;
973         int priority;
974         int nrcols;
975         int dest_errs = 0;
976         str uri;
977         str attrs = {0, 0};
978         db1_res_t *res;
979         db_val_t *values;
980         db_row_t *rows;
981 #define DS_DB_MAX_COLS  32
982         db_key_t query_cols[DS_DB_MAX_COLS];
983         param_t *pit=NULL;
984         int nc;
985         int plen;
986 #define DS_ATTRS_MAXSIZE        1024
987         char ds_attrs_buf[DS_ATTRS_MAXSIZE];
988
989         query_cols[0] = &ds_set_id_col;
990         query_cols[1] = &ds_dest_uri_col;
991         query_cols[2] = &ds_dest_flags_col;
992         query_cols[3] = &ds_dest_priority_col;
993         query_cols[4] = &ds_dest_attrs_col;
994
995         nrcols = 2;
996         if(_ds_table_version == DS_TABLE_VERSION2) {
997                 nrcols = 3;
998         } else if(_ds_table_version == DS_TABLE_VERSION3) {
999                 nrcols = 4;
1000         } else if(_ds_table_version == DS_TABLE_VERSION4) {
1001                 nrcols = 5;
1002                 for(pit = ds_db_extra_attrs_list; pit!=NULL; pit=pit->next) {
1003                         if(nrcols>=DS_DB_MAX_COLS) {
1004                                 LM_ERR("too many db columns: %d\n", nrcols);
1005                                 return -1;
1006                         }
1007                         query_cols[nrcols++] = &pit->body; 
1008                 }
1009         }
1010
1011         if((*crt_idx) != (*next_idx)) {
1012                 LM_WARN("load command already generated, aborting reload...\n");
1013                 return 0;
1014         }
1015
1016         if(ds_db_handle == NULL) {
1017                 LM_ERR("invalid DB handler\n");
1018                 return -1;
1019         }
1020
1021         if(ds_dbf.use_table(ds_db_handle, &ds_table_name) < 0) {
1022                 LM_ERR("error in use_table\n");
1023                 return -1;
1024         }
1025
1026         LM_DBG("loading dispatcher db records - nrcols: %d\n", nrcols);
1027
1028         /*select the whole table and all the columns*/
1029         if(ds_dbf.query(ds_db_handle, 0, 0, 0, query_cols, 0, nrcols, 0, &res)
1030                         < 0) {
1031                 LM_ERR("error while querying database\n");
1032                 return -1;
1033         }
1034
1035         nr_rows = RES_ROW_N(res);
1036         rows = RES_ROWS(res);
1037         if(nr_rows == 0) {
1038                 LM_WARN("no dispatching data in the db -- empty destination set\n");
1039         }
1040
1041         setn = 0;
1042         *next_idx = (*crt_idx + 1) % 2;
1043         ds_avl_destroy(&ds_lists[*next_idx]);
1044
1045         for(i = 0; i < nr_rows; i++) {
1046                 values = ROW_VALUES(rows + i);
1047
1048                 id = VAL_INT(values);
1049                 uri.s = VAL_STR(values + 1).s;
1050                 uri.len = strlen(uri.s);
1051                 flags = 0;
1052                 if(nrcols >= 3)
1053                         flags = VAL_INT(values + 2);
1054                 priority = 0;
1055                 if(nrcols >= 4)
1056                         priority = VAL_INT(values + 3);
1057
1058                 attrs.s = 0;
1059                 attrs.len = 0;
1060                 if(nrcols >= 5) {
1061                         if(!VAL_NULL(values + 4)) {
1062                                 attrs.s = VAL_STR(values + 4).s;
1063                                 if(attrs.s) attrs.len = strlen(attrs.s);
1064                         }
1065                         if(ds_db_extra_attrs_list!=NULL && nrcols > 5) {
1066                                 if(attrs.len>0) {
1067                                         memcpy(ds_attrs_buf, attrs.s, attrs.len);
1068                                         if(ds_attrs_buf[attrs.len-1]!=';') {
1069                                                 ds_attrs_buf[attrs.len++] = ';';
1070                                         }
1071                                 }
1072                                 attrs.s = ds_attrs_buf;
1073                                 pit = ds_db_extra_attrs_list;
1074                                 for(nc = 5; nc<nrcols && pit!=NULL; nc++) {
1075                                         if(!VAL_NULL(values + nc) && strlen(VAL_STRING(values + nc))>0) {
1076                                                 plen = snprintf(attrs.s + attrs.len,
1077                                                                 DS_ATTRS_MAXSIZE - attrs.len - 1,
1078                                                                 "%.*s=%s;", pit->name.len, pit->name.s,
1079                                                                 VAL_STRING(values + nc));
1080                                                 if(plen<=0 || plen>=DS_ATTRS_MAXSIZE - attrs.len - 1) {
1081                                                         LM_ERR("cannot build attrs buffer\n");
1082                                                         goto err2;
1083                                                 }
1084                                                 attrs.len+=plen;
1085                                         }
1086                                         pit = pit->next;
1087                                 }
1088                         }
1089                 }
1090                 LM_DBG("attributes string: [%.*s]\n", attrs.len, (attrs.s)?attrs.s:"");
1091                 if(add_dest2list(id, uri, flags, priority, &attrs, *next_idx, &setn)
1092                                 != 0) {
1093                         dest_errs++;
1094                         LM_WARN("unable to add destination %.*s to set %d -- skipping\n",
1095                                         uri.len, uri.s, id);
1096                         if(ds_load_mode==1) {
1097                                 goto err2;
1098                         }
1099                 }
1100         }
1101         if(reindex_dests(ds_lists[*next_idx]) != 0) {
1102                 LM_ERR("error on reindex\n");
1103                 goto err2;
1104         }
1105
1106         LM_DBG("found [%d] dest sets\n", _ds_list_nr);
1107
1108         ds_dbf.free_result(ds_db_handle, res);
1109
1110         /* update data - should it be sync'ed? */
1111         _ds_list_nr = setn;
1112         *crt_idx = *next_idx;
1113         ds_ht_clear_slots(_dsht_load);
1114
1115         ds_log_sets();
1116
1117         if(dest_errs > 0)
1118                 return -2;
1119         return 0;
1120
1121 err2:
1122         ds_avl_destroy(&ds_lists[*next_idx]);
1123         ds_dbf.free_result(ds_db_handle, res);
1124         *next_idx = *crt_idx;
1125
1126         return -1;
1127 }
1128
1129 /*! \brief called from dispatcher.c: free all*/
1130 int ds_destroy_list(void)
1131 {
1132         if(ds_lists) {
1133                 ds_avl_destroy(&ds_lists[0]);
1134                 ds_avl_destroy(&ds_lists[1]);
1135                 shm_free(ds_lists);
1136         }
1137
1138         if(crt_idx)
1139                 shm_free(crt_idx);
1140
1141         return 0;
1142 }
1143
1144 /**
1145  *
1146  */
1147 unsigned int ds_get_hash(str *x, str *y)
1148 {
1149         char *p;
1150         register unsigned v;
1151         register unsigned h;
1152
1153         if(!x && !y)
1154                 return 0;
1155         h = 0;
1156         if(x) {
1157                 p = x->s;
1158                 if(x->len >= 4) {
1159                         for(; p <= (x->s + x->len - 4); p += 4) {
1160                                 v = (*p << 24) + (p[1] << 16) + (p[2] << 8) + p[3];
1161                                 h += v ^ (v >> 3);
1162                         }
1163                 }
1164                 v = 0;
1165                 for(; p < (x->s + x->len); p++) {
1166                         v <<= 8;
1167                         v += *p;
1168                 }
1169                 h += v ^ (v >> 3);
1170         }
1171         if(y) {
1172                 p = y->s;
1173                 if(y->len >= 4) {
1174                         for(; p <= (y->s + y->len - 4); p += 4) {
1175                                 v = (*p << 24) + (p[1] << 16) + (p[2] << 8) + p[3];
1176                                 h += v ^ (v >> 3);
1177                         }
1178                 }
1179
1180                 v = 0;
1181                 for(; p < (y->s + y->len); p++) {
1182                         v <<= 8;
1183                         v += *p;
1184                 }
1185                 h += v ^ (v >> 3);
1186         }
1187         h = ((h) + (h >> 11)) + ((h >> 13) + (h >> 23));
1188
1189         return (h) ? h : 1;
1190 }
1191
1192
1193 /*! \brief
1194  * gets the part of the uri we will use as a key for hashing
1195  * \param  key1       - will be filled with first part of the key
1196  *                       (uri user or "" if no user)
1197  * \param  key2       - will be filled with the second part of the key
1198  *                       (uri host:port)
1199  * \param  uri        - str with the whole uri
1200  * \param  parsed_uri - struct sip_uri pointer with the parsed uri
1201  *                       (it must point inside uri). It can be null
1202  *                       (in this case the uri will be parsed internally).
1203  * \param  flags  -    if & DS_HASH_USER_ONLY, only the user part of the uri
1204  *                      will be used
1205  * \return: -1 on error, 0 on success
1206  */
1207 static inline int get_uri_hash_keys(
1208                 str *key1, str *key2, str *uri, struct sip_uri *parsed_uri, int flags)
1209 {
1210         struct sip_uri tmp_p_uri; /* used only if parsed_uri==0 */
1211
1212         if(parsed_uri == 0) {
1213                 if(parse_uri(uri->s, uri->len, &tmp_p_uri) < 0) {
1214                         LM_ERR("invalid uri %.*s\n", uri->len, uri->len ? uri->s : "");
1215                         goto error;
1216                 }
1217                 parsed_uri = &tmp_p_uri;
1218         }
1219         /* uri sanity checks */
1220         if(parsed_uri->host.s == 0) {
1221                 LM_ERR("invalid uri, no host present: %.*s\n", uri->len,
1222                                 uri->len ? uri->s : "");
1223                 goto error;
1224         }
1225
1226         /* we want: user@host:port if port !=5060
1227          *          user@host if port==5060
1228          *          user if the user flag is set*/
1229         *key1 = parsed_uri->user;
1230         key2->s = 0;
1231         key2->len = 0;
1232         if(!(flags & DS_HASH_USER_ONLY)) { /* key2=host */
1233                 *key2 = parsed_uri->host;
1234                 /* add port if needed */
1235                 if(parsed_uri->port.s != 0) { /* uri has a port */
1236                         /* skip port if == 5060 or sips and == 5061 */
1237                         if(parsed_uri->port_no != ((parsed_uri->type == SIPS_URI_T)
1238                                                                                                           ? SIPS_PORT
1239                                                                                                           : SIP_PORT))
1240                                 key2->len += parsed_uri->port.len + 1 /* ':' */;
1241                 }
1242         }
1243         if(key1->s == 0) {
1244                 LM_WARN("empty username in: %.*s\n", uri->len, uri->len ? uri->s : "");
1245         }
1246         return 0;
1247 error:
1248         return -1;
1249 }
1250
1251
1252 /**
1253  *
1254  */
1255 int ds_hash_fromuri(struct sip_msg *msg, unsigned int *hash)
1256 {
1257         str from;
1258         str key1;
1259         str key2;
1260
1261         if(msg == NULL || hash == NULL) {
1262                 LM_ERR("bad parameters\n");
1263                 return -1;
1264         }
1265
1266         if(parse_from_header(msg) < 0) {
1267                 LM_ERR("cannot parse From hdr\n");
1268                 return -1;
1269         }
1270
1271         if(msg->from == NULL || get_from(msg) == NULL) {
1272                 LM_ERR("cannot get From uri\n");
1273                 return -1;
1274         }
1275
1276         from = get_from(msg)->uri;
1277         trim(&from);
1278         if(get_uri_hash_keys(&key1, &key2, &from, 0, ds_flags) < 0)
1279                 return -1;
1280         *hash = ds_get_hash(&key1, &key2);
1281
1282         return 0;
1283 }
1284
1285
1286 /**
1287  *
1288  */
1289 int ds_hash_touri(struct sip_msg *msg, unsigned int *hash)
1290 {
1291         str to;
1292         str key1;
1293         str key2;
1294
1295         if(msg == NULL || hash == NULL) {
1296                 LM_ERR("bad parameters\n");
1297                 return -1;
1298         }
1299         if((msg->to == 0)
1300                         && ((parse_headers(msg, HDR_TO_F, 0) == -1) || (msg->to == 0))) {
1301                 LM_ERR("cannot parse To hdr\n");
1302                 return -1;
1303         }
1304
1305
1306         to = get_to(msg)->uri;
1307         trim(&to);
1308
1309         if(get_uri_hash_keys(&key1, &key2, &to, 0, ds_flags) < 0)
1310                 return -1;
1311         *hash = ds_get_hash(&key1, &key2);
1312
1313         return 0;
1314 }
1315
1316
1317 /**
1318  *
1319  */
1320 int ds_hash_callid(struct sip_msg *msg, unsigned int *hash)
1321 {
1322         str cid;
1323         if(msg == NULL || hash == NULL) {
1324                 LM_ERR("bad parameters\n");
1325                 return -1;
1326         }
1327
1328         if(msg->callid == NULL && ((parse_headers(msg, HDR_CALLID_F, 0) == -1)
1329                                                                           || (msg->callid == NULL))) {
1330                 LM_ERR("cannot parse Call-Id\n");
1331                 return -1;
1332         }
1333
1334         cid.s = msg->callid->body.s;
1335         cid.len = msg->callid->body.len;
1336         trim(&cid);
1337
1338         *hash = ds_get_hash(&cid, NULL);
1339
1340         return 0;
1341 }
1342
1343
1344 /**
1345  *
1346  */
1347 int ds_hash_ruri(struct sip_msg *msg, unsigned int *hash)
1348 {
1349         str *uri;
1350         str key1;
1351         str key2;
1352
1353
1354         if(msg == NULL || hash == NULL) {
1355                 LM_ERR("bad parameters\n");
1356                 return -1;
1357         }
1358         if(parse_sip_msg_uri(msg) < 0) {
1359                 LM_ERR("bad request uri\n");
1360                 return -1;
1361         }
1362
1363         uri = GET_RURI(msg);
1364         if(get_uri_hash_keys(&key1, &key2, uri, &msg->parsed_uri, ds_flags) < 0)
1365                 return -1;
1366
1367         *hash = ds_get_hash(&key1, &key2);
1368         return 0;
1369 }
1370
1371 /**
1372  *
1373  */
1374 int ds_hash_authusername(struct sip_msg *msg, unsigned int *hash)
1375 {
1376         /* Header, which contains the authorization */
1377         struct hdr_field *h = 0;
1378         /* The Username */
1379         str username = {0, 0};
1380         /* The Credentials from this request */
1381         auth_body_t *cred;
1382
1383         if(msg == NULL || hash == NULL) {
1384                 LM_ERR("bad parameters\n");
1385                 return -1;
1386         }
1387         *hash = 0;
1388         if(parse_headers(msg, HDR_PROXYAUTH_F, 0) == -1) {
1389                 LM_ERR("error parsing headers!\n");
1390                 return -1;
1391         }
1392         if(msg->proxy_auth && !msg->proxy_auth->parsed) {
1393                 if(parse_credentials(msg->proxy_auth)!=0) {
1394                         LM_DBG("no parsing for proxy-auth header\n");
1395                 }
1396         }
1397         if(msg->proxy_auth && msg->proxy_auth->parsed) {
1398                 h = msg->proxy_auth;
1399         }
1400         if(!h) {
1401                 if(parse_headers(msg, HDR_AUTHORIZATION_F, 0) == -1) {
1402                         LM_ERR("error parsing headers!\n");
1403                         return -1;
1404                 }
1405                 if(msg->authorization && !msg->authorization->parsed) {
1406                         if(parse_credentials(msg->authorization)!=0) {
1407                                 LM_DBG("no parsing for auth header\n");
1408                         }
1409                 }
1410                 if(msg->authorization && msg->authorization->parsed) {
1411                         h = msg->authorization;
1412                 }
1413         }
1414         if(!h) {
1415                 LM_DBG("No Authorization-Header!\n");
1416                 return 1;
1417         }
1418
1419         cred = (auth_body_t *)(h->parsed);
1420         if(!cred || !cred->digest.username.user.len) {
1421                 LM_ERR("No Authorization-Username or Credentials!\n");
1422                 return 1;
1423         }
1424
1425         username.s = cred->digest.username.user.s;
1426         username.len = cred->digest.username.user.len;
1427
1428         trim(&username);
1429
1430         *hash = ds_get_hash(&username, NULL);
1431
1432         return 0;
1433 }
1434
1435
1436 /**
1437  *
1438  */
1439 int ds_hash_pvar(struct sip_msg *msg, unsigned int *hash)
1440 {
1441         /* The String to create the hash */
1442         str hash_str = {0, 0};
1443
1444         if(msg == NULL || hash == NULL || hash_param_model == NULL) {
1445                 LM_ERR("bad parameters\n");
1446                 return -1;
1447         }
1448         if(pv_printf_s(msg, hash_param_model, &hash_str) < 0) {
1449                 LM_ERR("error - cannot print the format\n");
1450                 return -1;
1451         }
1452
1453         /* Remove empty spaces */
1454         trim(&hash_str);
1455         if(hash_str.len <= 0) {
1456                 LM_ERR("String is empty!\n");
1457                 return -1;
1458         }
1459
1460         *hash = ds_get_hash(&hash_str, NULL);
1461         LM_DBG("Hashing of '%.*s' resulted in %u !\n", hash_str.len, hash_str.s,
1462                         *hash);
1463
1464         return 0;
1465 }
1466
1467 /**
1468  *
1469  */
1470 static inline int ds_get_index(int group, int ds_list_idx, ds_set_t **index)
1471 {
1472         ds_set_t *si = NULL;
1473
1474         if(index == NULL || group < 0 || ds_lists[ds_list_idx] == NULL)
1475                 return -1;
1476
1477         /* get the index of the set */
1478         si = ds_avl_find(ds_lists[ds_list_idx], group);
1479
1480         if(si == NULL)
1481                 return -1;
1482
1483         *index = si;
1484         return 0;
1485 }
1486
1487 /*
1488  * Check if a destination set exists
1489  */
1490 int ds_list_exist(int set)
1491 {
1492         ds_set_t *si = NULL;
1493         LM_DBG("looking for destination set [%d]\n", set);
1494
1495         /* get the index of the set */
1496         si = ds_avl_find(_ds_list, set);
1497
1498         if(si == NULL) {
1499                 LM_DBG("destination set [%d] not found\n", set);
1500                 return -1; /* False */
1501         }
1502         LM_DBG("destination set [%d] found\n", set);
1503         return 1; /* True */
1504 }
1505
1506 /**
1507  *
1508  */
1509 int ds_get_leastloaded(ds_set_t *dset)
1510 {
1511         int j;
1512         int k;
1513         int t;
1514
1515         k = -1;
1516         t = 0x7fffffff; /* high load */
1517         for(j = 0; j < dset->nr; j++) {
1518                 if(!ds_skip_dst(dset->dlist[j].flags)
1519                                 && (dset->dlist[j].attrs.maxload == 0
1520                                                    || dset->dlist[j].dload
1521                                                                           < dset->dlist[j].attrs.maxload)) {
1522                         if(dset->dlist[j].dload < t) {
1523                                 k = j;
1524                                 t = dset->dlist[k].dload;
1525                         }
1526                 }
1527         }
1528         return k;
1529 }
1530
1531 /**
1532  *
1533  */
1534 int ds_load_add(struct sip_msg *msg, ds_set_t *dset, int setid, int dst)
1535 {
1536         if(dset->dlist[dst].attrs.duid.len == 0) {
1537                 LM_ERR("dst unique id not set for %d (%.*s)\n", setid,
1538                                 msg->callid->body.len, msg->callid->body.s);
1539                 return -1;
1540         }
1541
1542         if(ds_add_cell(_dsht_load, &msg->callid->body, &dset->dlist[dst].attrs.duid,
1543                            setid)
1544                         < 0) {
1545                 LM_ERR("cannot add load to %d (%.*s)\n", setid, msg->callid->body.len,
1546                                 msg->callid->body.s);
1547                 return -1;
1548         }
1549         dset->dlist[dst].dload++;
1550         return 0;
1551 }
1552
1553 /**
1554  *
1555  */
1556 int ds_load_replace(struct sip_msg *msg, str *duid)
1557 {
1558         ds_cell_t *it;
1559         int set;
1560         int olddst;
1561         int newdst;
1562         ds_set_t *idx = NULL;
1563         int i;
1564
1565         if(duid->len <= 0) {
1566                 LM_ERR("invalid dst unique id not set for (%.*s)\n",
1567                                 msg->callid->body.len, msg->callid->body.s);
1568                 return -1;
1569         }
1570
1571         if((it = ds_get_cell(_dsht_load, &msg->callid->body)) == NULL) {
1572                 LM_ERR("cannot find load for (%.*s)\n", msg->callid->body.len,
1573                                 msg->callid->body.s);
1574                 return -1;
1575         }
1576         set = it->dset;
1577         /* get the index of the set */
1578         if(ds_get_index(set, *crt_idx, &idx) != 0) {
1579                 ds_unlock_cell(_dsht_load, &msg->callid->body);
1580                 LM_ERR("destination set [%d] not found\n", set);
1581                 return -1;
1582         }
1583         olddst = -1;
1584         newdst = -1;
1585         for(i = 0; i < idx->nr; i++) {
1586                 if(idx->dlist[i].attrs.duid.len == it->duid.len
1587                                 && strncasecmp(
1588                                                    idx->dlist[i].attrs.duid.s, it->duid.s, it->duid.len)
1589                                                    == 0) {
1590                         olddst = i;
1591                         if(newdst != -1)
1592                                 break;
1593                 }
1594                 if(idx->dlist[i].attrs.duid.len == duid->len
1595                                 && strncasecmp(idx->dlist[i].attrs.duid.s, duid->s, duid->len)
1596                                                    == 0) {
1597                         newdst = i;
1598                         if(olddst != -1)
1599                                 break;
1600                 }
1601         }
1602         if(olddst == -1) {
1603                 ds_unlock_cell(_dsht_load, &msg->callid->body);
1604                 LM_ERR("old destination address not found for [%d, %.*s]\n", set,
1605                                 it->duid.len, it->duid.s);
1606                 return -1;
1607         }
1608         if(newdst == -1) {
1609                 ds_unlock_cell(_dsht_load, &msg->callid->body);
1610                 LM_ERR("new destination address not found for [%d, %.*s]\n", set,
1611                                 duid->len, duid->s);
1612                 return -1;
1613         }
1614
1615         ds_unlock_cell(_dsht_load, &msg->callid->body);
1616         ds_del_cell(_dsht_load, &msg->callid->body);
1617         if(idx->dlist[olddst].dload > 0)
1618                 idx->dlist[olddst].dload--;
1619
1620         if(ds_load_add(msg, idx, set, newdst) < 0) {
1621                 LM_ERR("unable to replace destination load [%.*s / %.*s]\n", duid->len,
1622                                 duid->s, msg->callid->body.len, msg->callid->body.s);
1623                 return -1;
1624         }
1625         return 0;
1626 }
1627
1628 /**
1629  *
1630  */
1631 int ds_load_remove_byid(int set, str *duid)
1632 {
1633         int olddst;
1634         ds_set_t *idx = NULL;
1635         int i;
1636
1637         /* get the index of the set */
1638         if(ds_get_index(set, *crt_idx, &idx) != 0) {
1639                 LM_ERR("destination set [%d] not found\n", set);
1640                 return -1;
1641         }
1642         olddst = -1;
1643         for(i = 0; i < idx->nr; i++) {
1644                 if(idx->dlist[i].attrs.duid.len == duid->len
1645                                 && strncasecmp(idx->dlist[i].attrs.duid.s, duid->s, duid->len)
1646                                                    == 0) {
1647                         olddst = i;
1648                         break;
1649                 }
1650         }
1651         if(olddst == -1) {
1652                 LM_ERR("old destination address not found for [%d, %.*s]\n", set,
1653                                 duid->len, duid->s);
1654                 return -1;
1655         }
1656
1657         if(idx->dlist[olddst].dload > 0)
1658                 idx->dlist[olddst].dload--;
1659
1660         return 0;
1661 }
1662
1663 /**
1664  *
1665  */
1666 int ds_load_remove(struct sip_msg *msg)
1667 {
1668         ds_cell_t *it;
1669
1670         if((it = ds_get_cell(_dsht_load, &msg->callid->body)) == NULL) {
1671                 LM_ERR("cannot find load for (%.*s)\n", msg->callid->body.len,
1672                                 msg->callid->body.s);
1673                 return -1;
1674         }
1675
1676         if (ds_load_remove_byid(it->dset, &it->duid) < 0) {
1677                 ds_unlock_cell(_dsht_load, &msg->callid->body);
1678                 return -1;
1679         }
1680         ds_unlock_cell(_dsht_load, &msg->callid->body);
1681         ds_del_cell(_dsht_load, &msg->callid->body);
1682
1683         return 0;
1684 }
1685
1686 /**
1687  *
1688  */
1689 int ds_load_state(struct sip_msg *msg, int state)
1690 {
1691         ds_cell_t *it;
1692
1693         if((it = ds_get_cell(_dsht_load, &msg->callid->body)) == NULL) {
1694                 LM_DBG("cannot find load for (%.*s)\n", msg->callid->body.len,
1695                                 msg->callid->body.s);
1696                 return -1;
1697         }
1698
1699         it->state = state;
1700         ds_unlock_cell(_dsht_load, &msg->callid->body);
1701
1702         return 0;
1703 }
1704
1705
1706 /**
1707  *
1708  */
1709 int ds_load_update(struct sip_msg *msg)
1710 {
1711         if(parse_headers(msg, HDR_CSEQ_F | HDR_CALLID_F, 0) != 0
1712                         || msg->cseq == NULL || msg->callid == NULL) {
1713                 LM_ERR("cannot parse cseq and callid headers\n");
1714                 return -1;
1715         }
1716         if(msg->first_line.type == SIP_REQUEST) {
1717                 if(msg->first_line.u.request.method_value == METHOD_BYE
1718                                 || msg->first_line.u.request.method_value == METHOD_CANCEL) {
1719                         /* off-load call */
1720                         ds_load_remove(msg);
1721                 }
1722                 return 0;
1723         }
1724
1725         if(get_cseq(msg)->method_id == METHOD_INVITE) {
1726                 /* if status is 2xx then set state to confirmed */
1727                 if(REPLY_CLASS(msg) == 2)
1728                         ds_load_state(msg, DS_LOAD_CONFIRMED);
1729         }
1730         return 0;
1731 }
1732
1733 /**
1734  *
1735  */
1736 int ds_load_unset(struct sip_msg *msg)
1737 {
1738         sr_xavp_t *rxavp = NULL;
1739
1740         if(ds_xavp_dst.len <= 0)
1741                 return 0;
1742
1743         /* for INVITE requests should be called after dst list is built */
1744         if(msg->first_line.type == SIP_REQUEST
1745                         && msg->first_line.u.request.method_value == METHOD_INVITE) {
1746                 rxavp = xavp_get_child_with_sval(&ds_xavp_dst, &ds_xavp_dst_dstid);
1747                 if(rxavp == NULL)
1748                         return 0;
1749         }
1750         return ds_load_remove(msg);
1751 }
1752
1753 /**
1754  *
1755  */
1756 static inline int ds_push_dst(sip_msg_t *msg, str *uri, socket_info_t *sock,
1757                 int mode)
1758 {
1759         struct action act;
1760         struct run_act_ctx ra_ctx;
1761         switch(mode) {
1762                 case DS_SETOP_RURI:
1763                         memset(&act, '\0', sizeof(act));
1764                         act.type = SET_HOSTALL_T;
1765                         act.val[0].type = STRING_ST;
1766                         if(uri->len > 4 && strncasecmp(uri->s, "sip:", 4) == 0) {
1767                                 act.val[0].u.string = uri->s + 4;
1768                         } else if(uri->len > 5 && strncasecmp(uri->s, "sips:", 5) == 0) {
1769                                 act.val[0].u.string = uri->s + 5;
1770                         } else {
1771                                 act.val[0].u.string = uri->s;
1772                         }
1773                         init_run_actions_ctx(&ra_ctx);
1774                         if(do_action(&ra_ctx, &act, msg) < 0) {
1775                                 LM_ERR("error while setting r-uri domain with: %.*s\n",
1776                                                 uri->len, uri->s);
1777                                 return -1;
1778                         }
1779                         break;
1780
1781                 case DS_SETOP_XAVP:
1782                         /* no update to d-uri/r-uri */
1783                         return 0;
1784
1785                 default:
1786                         if(set_dst_uri(msg, uri) < 0) {
1787                                 LM_ERR("error while setting dst uri with: %.*s\n",
1788                                                 uri->len, uri->s);
1789                                 return -1;
1790                         }
1791                         /* dst_uri changes, so it makes sense to re-use the current uri for
1792                          * forking */
1793                         ruri_mark_new(); /* re-use uri for serial forking */
1794                         break;
1795         }
1796         if(sock) {
1797                 msg->force_send_socket = sock;
1798         }
1799         return 0;
1800 }
1801
1802 /**
1803  *
1804  */
1805 int ds_add_branches(sip_msg_t *msg, ds_set_t *idx, unsigned int hash, int mode)
1806 {
1807         unsigned int i = 0;
1808         str ruri = STR_NULL;
1809         sip_uri_t *puri = NULL;
1810         char buri[MAX_URI_SIZE];
1811
1812         if(mode!=DS_SETOP_XAVP && hash+1>=idx->nr) {
1813                 /* nothing to add */
1814                 return 0;
1815         }
1816
1817         if(mode==DS_SETOP_RURI) {
1818                 /* ruri updates */
1819                 LM_DBG("adding branches with ruri\n");
1820                 if(parse_sip_msg_uri(msg)<0) {
1821                         LM_ERR("failed to parse sip msg uri\n");
1822                         return -1;
1823                 }
1824                 puri = &msg->parsed_uri;
1825         } else {
1826                 /* duri updates */
1827                 LM_DBG("adding branches with duri\n");
1828         }
1829         if(mode!=DS_SETOP_XAVP) {
1830                 i = hash + 1;
1831         } else {
1832                 i = hash;
1833         }
1834         for(; i<idx->nr; i++) {
1835                 if(mode==DS_SETOP_RURI) {
1836                         /* ruri updates */
1837                         if(puri->user.len<=0) {
1838                                 /* no username to preserve */
1839                                 if(append_branch(msg, &idx->dlist[i].uri, NULL, NULL,
1840                                                 Q_UNSPECIFIED, 0, idx->dlist[i].sock, NULL, 0,
1841                                                 NULL, NULL)<0) {
1842                                         LM_ERR("failed to add branch with ruri\n");
1843                                         return -1;
1844                                 }
1845                         } else {
1846                                 /* new uri from ruri username and dispatcher uri */
1847                                 if(idx->dlist[i].uri.len<6) {
1848                                         LM_WARN("invalid dispatcher uri - skipping (%u)\n", i);
1849                                         continue;
1850                                 }
1851                                 if(strncmp(idx->dlist[i].uri.s, "sips:", 5)==0) {
1852                                         ruri.len = snprintf(buri, MAX_URI_SIZE, "sips:%.*s@%.*s",
1853                                                         puri->user.len, puri->user.s,
1854                                                         idx->dlist[i].uri.len-5, idx->dlist[i].uri.s+5);
1855                                 } else {
1856                                         if(strncmp(idx->dlist[i].uri.s, "sip:", 4)==0) {
1857                                                 ruri.len = snprintf(buri, MAX_URI_SIZE, "sip:%.*s@%.*s",
1858                                                                 puri->user.len, puri->user.s,
1859                                                                 idx->dlist[i].uri.len-4, idx->dlist[i].uri.s+4);
1860                                         } else {
1861                                                 LM_WARN("unsupported protocol schema - ignoring\n");
1862                                                 continue;
1863                                         }
1864                                 }
1865                                 ruri.s = buri;
1866                                 if(append_branch(msg, &ruri, NULL, NULL,
1867                                                 Q_UNSPECIFIED, 0, idx->dlist[i].sock, NULL, 0,
1868                                                 NULL, NULL)<0) {
1869                                         LM_ERR("failed to add branch with user ruri\n");
1870                                         return -1;
1871                                 }
1872                         }
1873                 } else {
1874                         /* duri updates */
1875                         if(append_branch(msg, GET_RURI(msg), &idx->dlist[i].uri, NULL,
1876                                         Q_UNSPECIFIED, 0, idx->dlist[i].sock, NULL, 0,
1877                                         NULL, NULL)<0) {
1878                                 LM_ERR("failed to add branch with duri\n");
1879                                 return -1;
1880                         }
1881                 }
1882
1883         }
1884         return 0;
1885 }
1886
1887 /**
1888  *
1889  */
1890 int ds_add_xavp_record(ds_set_t *dsidx, int pos, int set, int alg,
1891                 sr_xavp_t **pxavp)
1892 {
1893         sr_xavp_t *nxavp=NULL;
1894         sr_xval_t nxval;
1895
1896         /* add destination uri field */
1897         memset(&nxval, 0, sizeof(sr_xval_t));
1898         nxval.type = SR_XTYPE_STR;
1899         nxval.v.s = dsidx->dlist[pos].uri;
1900         if(xavp_add_value(&ds_xavp_dst_addr, &nxval, &nxavp)==NULL) {
1901                 LM_ERR("failed to add destination uri xavp field\n");
1902                 return -1;
1903         }
1904
1905         /* add setid field */
1906         memset(&nxval, 0, sizeof(sr_xval_t));
1907         nxval.type = SR_XTYPE_INT;
1908         nxval.v.i = set;
1909         if(xavp_add_value(&ds_xavp_dst_grp, &nxval, &nxavp)==NULL) {
1910                 xavp_destroy_list(&nxavp);
1911                 LM_ERR("failed to add destination setid xavp field\n");
1912                 return -1;
1913         }
1914
1915         if(((ds_xavp_dst_mode & DS_XAVP_DST_SKIP_ATTRS) == 0)
1916                         && (dsidx->dlist[pos].attrs.body.len > 0)) {
1917                 memset(&nxval, 0, sizeof(sr_xval_t));
1918                 nxval.type = SR_XTYPE_STR;
1919                 nxval.v.s = dsidx->dlist[pos].attrs.body;
1920                 if(xavp_add_value(&ds_xavp_dst_attrs, &nxval, &nxavp)==NULL) {
1921                         xavp_destroy_list(&nxavp);
1922                         LM_ERR("failed to add destination attrs xavp field\n");
1923                         return -1;
1924                 }
1925         }
1926
1927         if(dsidx->dlist[pos].sock) {
1928                 memset(&nxval, 0, sizeof(sr_xval_t));
1929                 nxval.type = SR_XTYPE_VPTR;
1930                 nxval.v.vptr = dsidx->dlist[pos].sock;
1931                 if(xavp_add_value(&ds_xavp_dst_sock, &nxval, &nxavp)==NULL) {
1932                         xavp_destroy_list(&nxavp);
1933                         LM_ERR("failed to add destination sock xavp field\n");
1934                         return -1;
1935                 }
1936                 if((ds_xavp_dst_mode & DS_XAVP_DST_ADD_SOCKSTR)
1937                                 && (dsidx->dlist[pos].attrs.socket.len > 0)) {
1938                         memset(&nxval, 0, sizeof(sr_xval_t));
1939                         nxval.type = SR_XTYPE_STR;
1940                         nxval.v.s = dsidx->dlist[pos].attrs.socket;
1941                         if(xavp_add_value(&ds_xavp_dst_socket, &nxval, &nxavp)==NULL) {
1942                                 xavp_destroy_list(&nxavp);
1943                                 LM_ERR("failed to add socket address attrs xavp field\n");
1944                                 return -1;
1945                         }
1946                 }
1947                 if((ds_xavp_dst_mode & DS_XAVP_DST_ADD_SOCKNAME)
1948                                 && (dsidx->dlist[pos].attrs.sockname.len > 0)) {
1949                         memset(&nxval, 0, sizeof(sr_xval_t));
1950                         nxval.type = SR_XTYPE_STR;
1951                         nxval.v.s = dsidx->dlist[pos].attrs.sockname;
1952                         if(xavp_add_value(&ds_xavp_dst_sockname, &nxval, &nxavp)==NULL) {
1953                                 xavp_destroy_list(&nxavp);
1954                                 LM_ERR("failed to add socket name attrs xavp field\n");
1955                                 return -1;
1956                         }
1957                 }
1958         }
1959
1960         if(alg == DS_ALG_CALLLOAD) {
1961                 if(dsidx->dlist[pos].attrs.duid.len <= 0) {
1962                         LM_ERR("no uid for destination: %d %.*s\n", set,
1963                                         dsidx->dlist[pos].uri.len,
1964                                         dsidx->dlist[pos].uri.s);
1965                         xavp_destroy_list(&nxavp);
1966                         return -1;
1967                 }
1968                 memset(&nxval, 0, sizeof(sr_xval_t));
1969                 nxval.type = SR_XTYPE_STR;
1970                 nxval.v.s = dsidx->dlist[pos].attrs.duid;
1971                 if(xavp_add_value(&ds_xavp_dst_dstid, &nxval, &nxavp)==NULL) {
1972                         xavp_destroy_list(&nxavp);
1973                         LM_ERR("failed to add destination dst uid xavp field\n");
1974                         return -1;
1975                 }
1976         }
1977
1978         /* add xavp in root list */
1979         memset(&nxval, 0, sizeof(sr_xval_t));
1980         nxval.type = SR_XTYPE_XAVP;
1981         nxval.v.xavp = nxavp;
1982         if((*pxavp = xavp_add_value_after(&ds_xavp_dst, &nxval, *pxavp))==NULL) {
1983                 LM_ERR("cannot add dst xavp to root list\n");
1984                 xavp_destroy_list(&nxavp);
1985                 return -1;
1986         }
1987
1988         return 0;
1989 }
1990
1991 /**
1992  *
1993  */
1994 int ds_select_dst(struct sip_msg *msg, int set, int alg, int mode)
1995 {
1996         return ds_select_dst_limit(msg, set, alg, 0, mode);
1997 }
1998
1999 /**
2000  * Set destination address from group 'set' selected with alogorithm 'alg'
2001  * - the rest of addresses in group are added as next destination in xavps,
2002  *   up to the 'limit'
2003  * - mode specify to set address in R-URI or outboud proxy
2004  *
2005  */
2006 int ds_select_dst_limit(sip_msg_t *msg, int set, int alg, uint32_t limit,
2007                 int mode)
2008 {
2009         int ret;
2010         sr_xval_t nxval;
2011         ds_select_state_t vstate;
2012
2013         memset(&vstate, 0, sizeof(ds_select_state_t));
2014         vstate.setid = set;
2015         vstate.alg = alg;
2016         vstate.umode = mode;
2017         vstate.limit = limit;
2018
2019         if(vstate.limit == 0) {
2020                 LM_DBG("Limit set to 0 - forcing to unlimited\n");
2021                 vstate.limit = 0xffffffff;
2022         }
2023
2024         ret = ds_manage_routes(msg, &vstate);
2025         if(ret<0) {
2026                 return ret;
2027         }
2028
2029         /* add cnt value to xavp */
2030         if(((ds_xavp_ctx_mode & DS_XAVP_CTX_SKIP_CNT)==0)
2031                         && (ds_xavp_ctx.len >= 0)) {
2032                 /* add to xavp the number of selected dst records */
2033                 memset(&nxval, 0, sizeof(sr_xval_t));
2034                 nxval.type = SR_XTYPE_INT;
2035                 nxval.v.i = vstate.cnt;
2036                 if(xavp_add_xavp_value(&ds_xavp_ctx, &ds_xavp_ctx_cnt, &nxval, NULL)==NULL) {
2037                         LM_ERR("failed to add cnt value to xavp\n");
2038                         return -1;
2039                 }
2040         }
2041
2042         LM_DBG("selected target destinations: %d\n", vstate.cnt);
2043
2044         return ret;
2045 }
2046
2047 /**
2048  *
2049  */
2050 int ds_manage_routes(sip_msg_t *msg, ds_select_state_t *rstate)
2051 {
2052         int i;
2053         unsigned int hash;
2054         ds_set_t *idx = NULL;
2055         int ulast = 0;
2056
2057         if(msg == NULL) {
2058                 LM_ERR("bad parameters\n");
2059                 return -1;
2060         }
2061
2062         if(_ds_list == NULL || _ds_list_nr <= 0) {
2063                 LM_ERR("no destination sets\n");
2064                 return -1;
2065         }
2066
2067         if((rstate->umode == DS_SETOP_DSTURI) && (ds_force_dst == 0)
2068                         && (msg->dst_uri.s != NULL || msg->dst_uri.len > 0)) {
2069                 LM_ERR("destination already set [%.*s]\n", msg->dst_uri.len,
2070                                 msg->dst_uri.s);
2071                 return -1;
2072         }
2073
2074
2075         /* get the index of the set */
2076         if(ds_get_index(rstate->setid, *crt_idx, &idx) != 0) {
2077                 LM_ERR("destination set [%d] not found\n", rstate->setid);
2078                 return -1;
2079         }
2080
2081         LM_DBG("set [%d]\n", rstate->setid);
2082
2083         hash = 0;
2084         switch(rstate->alg) {
2085                 case DS_ALG_HASHCALLID: /* 0 - hash call-id */
2086                         if(ds_hash_callid(msg, &hash) != 0) {
2087                                 LM_ERR("can't get callid hash\n");
2088                                 return -1;
2089                         }
2090                         break;
2091                 case DS_ALG_HASHFROMURI: /* 1 - hash from-uri */
2092                         if(ds_hash_fromuri(msg, &hash) != 0) {
2093                                 LM_ERR("can't get From uri hash\n");
2094                                 return -1;
2095                         }
2096                         break;
2097                 case DS_ALG_HASHTOURI: /* 2 - hash to-uri */
2098                         if(ds_hash_touri(msg, &hash) != 0) {
2099                                 LM_ERR("can't get To uri hash\n");
2100                                 return -1;
2101                         }
2102                         break;
2103                 case DS_ALG_HASHRURI: /* 3 - hash r-uri */
2104                         if(ds_hash_ruri(msg, &hash) != 0) {
2105                                 LM_ERR("can't get ruri hash\n");
2106                                 return -1;
2107                         }
2108                         break;
2109                 case DS_ALG_ROUNDROBIN: /* 4 - round robin */
2110                         hash = idx->last;
2111                         idx->last = (idx->last + 1) % idx->nr;
2112                         ulast = 1;
2113                         break;
2114                 case DS_ALG_HASHAUTHUSER: /* 5 - hash auth username */
2115                         i = ds_hash_authusername(msg, &hash);
2116                         switch(i) {
2117                                 case 0:
2118                                         /* Authorization-Header found: Nothing to be done here */
2119                                         break;
2120                                 case 1:
2121                                         /* No Authorization found: Use round robin */
2122                                         hash = idx->last;
2123                                         idx->last = (idx->last + 1) % idx->nr;
2124                                         ulast = 1;
2125                                         break;
2126                                 default:
2127                                         LM_ERR("can't get authorization hash\n");
2128                                         return -1;
2129                         }
2130                         break;
2131                 case DS_ALG_RANDOM: /* 6 - random selection */
2132                         hash = kam_rand();
2133                         break;
2134                 case DS_ALG_HASHPV: /* 7 - hash on PV value */
2135                         if(ds_hash_pvar(msg, &hash) != 0) {
2136                                 LM_ERR("can't get PV hash\n");
2137                                 return -1;
2138                         }
2139                         break;
2140                 case DS_ALG_SERIAL: /* 8 - use always first entry */
2141                         hash = 0;
2142                         break;
2143                 case DS_ALG_WEIGHT: /* 9 - weight based distribution */
2144                         hash = idx->wlist[idx->wlast];
2145                         idx->wlast = (idx->wlast + 1) % 100;
2146                         break;
2147                 case DS_ALG_CALLLOAD: /* 10 - call load based distribution */
2148                         /* only INVITE can start a call */
2149                         if(msg->first_line.u.request.method_value != METHOD_INVITE) {
2150                                 /* use first entry */
2151                                 hash = 0;
2152                                 rstate->alg = 0;
2153                                 break;
2154                         }
2155                         if(ds_xavp_dst.len <= 0) {
2156                                 LM_ERR("no dst xavp for load distribution"
2157                                            " - using first entry...\n");
2158                                 hash = 0;
2159                                 rstate->alg = 0;
2160                         } else {
2161                                 i = ds_get_leastloaded(idx);
2162                                 if(i < 0) {
2163                                         /* no address selected */
2164                                         return -1;
2165                                 }
2166                                 hash = i;
2167                                 if(ds_load_add(msg, idx, rstate->setid, hash) < 0) {
2168                                         LM_ERR("unable to update destination load"
2169                                                    " - classic dispatching\n");
2170                                         rstate->alg = 0;
2171                                 }
2172                         }
2173                         break;
2174                 case DS_ALG_RELWEIGHT: /* 11 - relative weight based distribution */
2175                         hash = idx->rwlist[idx->rwlast];
2176                         idx->rwlast = (idx->rwlast + 1) % 100;
2177                         break;
2178                 case DS_ALG_PARALLEL: /* 12 - parallel dispatching */
2179                         hash = 0;
2180                         break;
2181                 default:
2182                         LM_WARN("algo %d not implemented - using first entry...\n",
2183                                         rstate->alg);
2184                         hash = 0;
2185         }
2186
2187         LM_DBG("using alg [%d] hash [%u]\n", rstate->alg, hash);
2188
2189         if(ds_use_default != 0 && idx->nr != 1)
2190                 hash = hash % (idx->nr - 1);
2191         else
2192                 hash = hash % idx->nr;
2193         i = hash;
2194
2195         /* if selected address is inactive, find next active */
2196         while(ds_skip_dst(idx->dlist[i].flags)) {
2197                 if(ds_use_default != 0 && idx->nr != 1)
2198                         i = (i + 1) % (idx->nr - 1);
2199                 else
2200                         i = (i + 1) % idx->nr;
2201                 if(i == hash) {
2202                         /* back to start -- looks like no active dst */
2203                         if(ds_use_default != 0) {
2204                                 i = idx->nr - 1;
2205                                 if(ds_skip_dst(idx->dlist[i].flags))
2206                                         return -1;
2207                                 break;
2208                         } else {
2209                                 return -1;
2210                         }
2211                 }
2212         }
2213
2214         hash = i;
2215
2216         if(rstate->umode!=DS_SETOP_XAVP) {
2217                 if(ds_push_dst(msg, &idx->dlist[hash].uri, idx->dlist[hash].sock,
2218                                         rstate->umode) != 0) {
2219                         LM_ERR("cannot set next hop address with: %.*s\n",
2220                                         idx->dlist[hash].uri.len, idx->dlist[hash].uri.s);
2221                         return -1;
2222                 }
2223                 rstate->emode = 1;
2224         }
2225
2226         /* update last field for next select to point after the current active used */
2227         if(ulast) {
2228                 idx->last = (hash + 1) % idx->nr;
2229         }
2230
2231         LM_DBG("selected [%d-%d-%d/%d] <%.*s>\n", rstate->alg, rstate->setid,
2232                         rstate->umode, hash,
2233                         idx->dlist[hash].uri.len, idx->dlist[hash].uri.s);
2234
2235         if(rstate->alg == DS_ALG_PARALLEL) {
2236                 if(ds_add_branches(msg, idx, hash, rstate->umode)<0) {
2237                         LM_ERR("failed to add additional branches\n");
2238                         /* one destination was already set - return success anyhow */
2239                         return 2;
2240                 }
2241                 return 1;
2242         }
2243
2244         if(!(ds_flags & DS_FAILOVER_ON))
2245                 return 1;
2246
2247         if(ds_xavp_dst.len<=0) {
2248                 /* no xavp name to store the rest of the records */
2249                 return 1;
2250         }
2251
2252         LM_DBG("using first entry [%d/%d]\n", rstate->setid, hash);
2253         if(ds_add_xavp_record(idx, hash, rstate->setid, rstate->alg,
2254                                 &rstate->lxavp)<0) {
2255                 LM_ERR("failed to add destination in the xavp (%d/%d)\n",
2256                                 hash, rstate->setid);
2257                 return -1;
2258         }
2259         rstate->cnt++;
2260
2261         /* add to xavp the destinations after the selected one */
2262         for(i = hash + 1; i < idx->nr && rstate->cnt < rstate->limit; i++) {
2263                 if(ds_skip_dst(idx->dlist[i].flags)
2264                                 || (ds_use_default != 0 && i == (idx->nr - 1))) {
2265                         continue;
2266                 }
2267                 /* max load exceeded per destination */
2268                 if(rstate->alg == DS_ALG_CALLLOAD
2269                                 && idx->dlist[i].attrs.maxload != 0
2270                                 && idx->dlist[i].dload >= idx->dlist[i].attrs.maxload) {
2271                         continue;
2272                 }
2273                 LM_DBG("using entry [%d/%d]\n", rstate->setid, i);
2274                 if(ds_add_xavp_record(idx, i, rstate->setid, rstate->alg,
2275                                         &rstate->lxavp)<0) {
2276                         LM_ERR("failed to add destination in the xavp (%d/%d)\n",
2277                                         i, rstate->setid);
2278                         return -1;
2279                 }
2280                 rstate->cnt++;
2281         }
2282
2283         /* add to xavp the destinations before the selected one */
2284         for(i = 0; i < hash && rstate->cnt < rstate->limit; i++) {
2285                 if(ds_skip_dst(idx->dlist[i].flags)
2286                                 || (ds_use_default != 0 && i == (idx->nr - 1))) {
2287                         continue;
2288                 }
2289                 /* max load exceeded per destination */
2290                 if(rstate->alg == DS_ALG_CALLLOAD
2291                                 && idx->dlist[i].attrs.maxload != 0
2292                                 && idx->dlist[i].dload >= idx->dlist[i].attrs.maxload) {
2293                         continue;
2294                 }
2295                 LM_DBG("using entry [%d/%d]\n", rstate->setid, i);
2296                 if(ds_add_xavp_record(idx, i, rstate->setid, rstate->alg,
2297                                         &rstate->lxavp)<0) {
2298                         LM_ERR("failed to add destination in the xavp (%d/%d)\n",
2299                                         i, rstate->setid);
2300                         return -1;
2301                 }
2302                 rstate->cnt++;
2303         }
2304
2305         /* add default dst to last position in XAVP list */
2306         if(ds_use_default != 0 && hash != idx->nr - 1
2307                                 && rstate->cnt < rstate->limit) {
2308                 LM_DBG("using default entry [%d/%d]\n", rstate->setid, idx->nr - 1);
2309                 if(ds_add_xavp_record(idx, idx->nr - 1, rstate->setid, rstate->alg,
2310                                         &rstate->lxavp)<0) {
2311                         LM_ERR("failed to add default destination in the xavp\n");
2312                         return -1;
2313                 }
2314                 rstate->cnt++;
2315         }
2316
2317         return 1;
2318 }
2319
2320 int ds_update_dst(struct sip_msg *msg, int upos, int mode)
2321 {
2322
2323         socket_info_t *sock = NULL;
2324         sr_xavp_t *rxavp = NULL;
2325         sr_xavp_t *lxavp = NULL;
2326
2327         LM_DBG("updating dst\n");
2328         if(upos == DS_USE_NEXT) {
2329                 if(!(ds_flags & DS_FAILOVER_ON) || ds_xavp_dst.len <= 0) {
2330                         LM_WARN("failover support disabled\n");
2331                         return -1;
2332                 }
2333         }
2334
2335         rxavp = xavp_get(&ds_xavp_dst, NULL);
2336         if(rxavp == NULL || rxavp->val.type != SR_XTYPE_XAVP) {
2337                 LM_DBG("no xavp with previous destination record\n");
2338                 return -1;
2339         }
2340
2341         if(upos == DS_USE_NEXT) {
2342                 LM_DBG("updating dst with next record\n");
2343                 /* use next destination - delete the current one and search the next */
2344                 xavp_rm(rxavp, NULL);
2345
2346                 rxavp = xavp_get(&ds_xavp_dst, NULL);
2347                 if(rxavp == NULL || rxavp->val.type != SR_XTYPE_XAVP) {
2348                         LM_DBG("no xavp with next destination record\n");
2349                         return -1;
2350                 }
2351         }
2352
2353         /* retrieve attributes from sub list */
2354         rxavp = rxavp->val.v.xavp;
2355         lxavp = xavp_get(&ds_xavp_dst_sock, rxavp);
2356         if(lxavp!=NULL && lxavp->val.type==SR_XTYPE_VPTR) {
2357                 LM_DBG("socket enforced in next destination record\n");
2358                 sock = lxavp->val.v.vptr;
2359         }
2360
2361         lxavp = xavp_get(&ds_xavp_dst_addr, rxavp);
2362         if(lxavp==NULL || lxavp->val.type!=SR_XTYPE_STR) {
2363                 LM_WARN("no xavp uri field in next destination record (%p)\n", lxavp);
2364                 return -1;
2365         }
2366
2367         if(ds_push_dst(msg, &lxavp->val.v.s, sock, mode) != 0) {
2368                 LM_ERR("cannot set dst addr: %.*s\n", lxavp->val.v.s.len,
2369                                 lxavp->val.v.s.s);
2370                 return -1;
2371         }
2372         LM_DBG("using next dst uri [%.*s]\n", lxavp->val.v.s.len,
2373                                 lxavp->val.v.s.s);
2374
2375         /* call load update if dstid field is set */
2376         lxavp = xavp_get(&ds_xavp_dst_dstid, rxavp);
2377         if(lxavp==NULL || lxavp->val.type!=SR_XTYPE_STR) {
2378                 /* no dstid field - done */
2379                 return 1;
2380         }
2381         if(upos == DS_USE_NEXT) {
2382                 if(ds_load_replace(msg, &lxavp->val.v.s) < 0) {
2383                         LM_ERR("cannot update load distribution\n");
2384                         return -1;
2385                 }
2386         }
2387
2388         return 1;
2389 }
2390
2391 /* callback for adding nodes based on index */
2392 void ds_add_dest_cb(ds_set_t *node, int i, void *arg)
2393 {
2394         int setn;
2395
2396         if(add_dest2list(node->id, node->dlist[i].uri, node->dlist[i].flags,
2397                         node->dlist[i].priority, &node->dlist[i].attrs.body, *next_idx,
2398                         &setn) != 0) {
2399                 LM_WARN("failed to add destination in group %d - %.*s\n",
2400                                 node->id, node->dlist[i].uri.len, node->dlist[i].uri.s);
2401         }
2402         return;
2403 }
2404
2405 /* add dispatcher entry to in-memory dispatcher list */
2406 int ds_add_dst(int group, str *address, int flags)
2407 {
2408         int setn, priority;
2409         str attrs;
2410
2411         setn = _ds_list_nr;
2412         priority = 0;
2413         attrs.s = 0;
2414         attrs.len = 0;
2415
2416         *next_idx = (*crt_idx + 1) % 2;
2417         ds_avl_destroy(&ds_lists[*next_idx]);
2418
2419         // add all existing destinations
2420         ds_iter_set(_ds_list, &ds_add_dest_cb, NULL);
2421
2422         // add new destination
2423         if(add_dest2list(group, *address, flags, priority, &attrs,
2424                         *next_idx, &setn) != 0) {
2425                 LM_WARN("unable to add destination %.*s to set %d", address->len, address->s, group);
2426                 if(ds_load_mode==1) {
2427                         goto error;
2428                 }
2429         }
2430
2431         if(reindex_dests(ds_lists[*next_idx]) != 0) {
2432                 LM_ERR("error on reindex\n");
2433                 goto error;
2434         }
2435
2436         _ds_list_nr = setn;
2437         *crt_idx = *next_idx;
2438         ds_ht_clear_slots(_dsht_load);
2439         ds_log_sets();
2440         return 0;
2441
2442 error:
2443         ds_avl_destroy(&ds_lists[*next_idx]);
2444         *next_idx = *crt_idx;
2445         return -1;
2446 }
2447
2448 /* callback for removing nodes based on setid & address */
2449 void ds_filter_dest_cb(ds_set_t *node, int i, void *arg)
2450 {
2451         struct ds_filter_dest_cb_arg *filter_arg = (typeof(filter_arg)) arg;
2452
2453         if(node->id == filter_arg->setid && node->dlist[i].uri.len == filter_arg->dest->uri.len &&
2454                 strncmp(node->dlist[i].uri.s, filter_arg->dest->uri.s, filter_arg->dest->uri.len) == 0)
2455                 return;
2456
2457         if(add_dest2list(node->id, node->dlist[i].uri, node->dlist[i].flags,
2458                         node->dlist[i].priority, &node->dlist[i].attrs.body, *next_idx,
2459                         filter_arg->setn) != 0) {
2460                 LM_WARN("failed to add destination in group %d - %.*s\n",
2461                                 node->id, node->dlist[i].uri.len, node->dlist[i].uri.s);
2462         }
2463         return;
2464 }
2465
2466 /* remove dispatcher entry from in-memory dispatcher list */
2467 int ds_remove_dst(int group, str *address)
2468 {
2469         int setn;
2470         struct ds_filter_dest_cb_arg filter_arg;
2471         ds_dest_t *dp = NULL;
2472
2473         setn = 0;
2474
2475         dp = pack_dest(*address, 0, 0, NULL);
2476         filter_arg.setid = group;
2477         filter_arg.dest = dp;
2478         filter_arg.setn = &setn;
2479
2480         *next_idx = (*crt_idx + 1) % 2;
2481         ds_avl_destroy(&ds_lists[*next_idx]);
2482
2483         // add existing destinations except destination that matches group & address
2484         ds_iter_set(_ds_list, &ds_filter_dest_cb, &filter_arg);
2485
2486         if(reindex_dests(ds_lists[*next_idx]) != 0) {
2487                 LM_ERR("error on reindex\n");
2488                 goto error;
2489         }
2490
2491         _ds_list_nr = setn;
2492         *crt_idx = *next_idx;
2493         ds_ht_clear_slots(_dsht_load);
2494         ds_log_sets();
2495         return 0;
2496
2497 error:
2498         ds_avl_destroy(&ds_lists[*next_idx]);
2499         *next_idx = *crt_idx;
2500         return -1;
2501 }
2502
2503 int ds_mark_dst(struct sip_msg *msg, int state)
2504 {
2505         sr_xavp_t *rxavp = NULL;
2506         int group;
2507         int ret;
2508         ds_rctx_t rctx;
2509
2510         if(!(ds_flags & DS_FAILOVER_ON)) {
2511                 LM_WARN("failover support disabled\n");
2512                 return -1;
2513         }
2514
2515         if(ds_xavp_dst.len<=0) {
2516                 LM_WARN("no xavp name to store dst records\n");
2517                 return -1;
2518         }
2519         rxavp = xavp_get_child_with_ival(&ds_xavp_dst, &ds_xavp_dst_grp);
2520
2521         if(rxavp == NULL)
2522                 return -1; /* grp xavp not available */
2523         group = rxavp->val.v.i;
2524
2525         rxavp = xavp_get_child_with_sval(&ds_xavp_dst, &ds_xavp_dst_addr);
2526
2527         if(rxavp == NULL )
2528                 return -1; /* dst addr uri not available */
2529
2530         memset(&rctx, 0, sizeof(ds_rctx_t));
2531         if(msg!=NULL) {
2532                 if(msg!=FAKED_REPLY) {
2533                         if(msg->first_line.type == SIP_REPLY) {
2534                                 rctx.flags |= 1;
2535                                 rctx.code = (int)msg->first_line.u.reply.statuscode;
2536                                 rctx.reason = msg->first_line.u.reply.reason;
2537                         } else {
2538                                 rctx.code = 820;
2539                         }
2540                 } else {
2541                         rctx.code = 810;
2542                 }
2543         } else {
2544                 rctx.code = 800;
2545         }
2546         ret = ds_update_state(msg, group, &rxavp->val.v.s, state, &rctx);
2547
2548         LM_DBG("state [%d] grp [%d] dst [%.*s]\n", state, group, rxavp->val.v.s.len,
2549                         rxavp->val.v.s.s);
2550
2551         return (ret == 0) ? 1 : -1;
2552 }
2553
2554 static inline void latency_stats_update(ds_latency_stats_t *latency_stats, int latency) {
2555         /* after 2^21 ~24 days at 1s interval, the average becomes a weighted average */
2556         if (latency_stats->count < 2097152) {
2557                 latency_stats->count++;
2558         } else { /* We adjust the sum of squares used by the oneline algorithm proportionally */
2559                 latency_stats->m2 -= latency_stats->m2/latency_stats->count;
2560         }
2561         if (latency_stats->count == 1) {
2562                 latency_stats->stdev = 0.0f;
2563                 latency_stats->m2 = 0.0f;
2564                 latency_stats->max = latency;
2565                 latency_stats->min = latency;
2566                 latency_stats->average = latency;
2567                 latency_stats->estimate = latency;
2568         }
2569         /* train the average if stable after 10 samples */
2570         if (latency_stats->count > 10 && latency_stats->stdev < 0.5) latency_stats->count = 500000;
2571         if (latency_stats->min > latency)
2572                 latency_stats->min = latency;
2573         if (latency_stats->max < latency)
2574                 latency_stats->max = latency;
2575
2576         /* standard deviation using oneline algorithm */
2577         /* https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm */
2578         if (latency_stats->count > 1) {
2579                 float delta = latency - latency_stats->average;
2580                 latency_stats->average += delta/latency_stats->count;
2581                 float delta2 = latency - latency_stats->average;
2582                 latency_stats->m2 += delta*delta2;
2583                 latency_stats->stdev = sqrt(latency_stats->m2 / (latency_stats->count-1));
2584         }
2585         /* exponentialy weighted moving average */
2586         if (latency_stats->count < 10) {
2587                 latency_stats->estimate = latency_stats->average;
2588         } else {
2589                 latency_stats->estimate = latency_stats->estimate*ds_latency_estimator_alpha
2590                                           + latency*(1-ds_latency_estimator_alpha);
2591         }
2592 }
2593
2594 int ds_update_latency(int group, str *address, int code)
2595 {
2596         int i = 0;
2597         int state = 0;
2598         ds_set_t *idx = NULL;
2599
2600         if(_ds_list == NULL || _ds_list_nr <= 0) {
2601                 LM_ERR("the list is null\n");
2602                 return -1;
2603         }
2604
2605         /* get the index of the set */
2606         if(ds_get_index(group, *crt_idx, &idx) != 0) {
2607                 LM_ERR("destination set [%d] not found\n", group);
2608                 return -1;
2609         }
2610         int apply_rweights = 0;
2611         int all_gw_congested = 1;
2612         int total_congestion_ms = 0;
2613         lock_get(&idx->lock);
2614         while (i < idx->nr) {
2615                 ds_dest_t *ds_dest = &idx->dlist[i];
2616                 ds_latency_stats_t *latency_stats = &ds_dest->latency_stats;
2617                 if (ds_dest->uri.len == address->len
2618                                 && strncasecmp(ds_dest->uri.s, address->s, address->len) == 0) {
2619                         /* Destination address found, this is the gateway that was pinged. */
2620                         state = ds_dest->flags;
2621                         if (code == 408 && latency_stats->timeout < UINT32_MAX)
2622                                 latency_stats->timeout++;
2623                         struct timeval now;
2624                         gettimeofday(&now, NULL);
2625                         int latency_ms = (now.tv_sec - latency_stats->start.tv_sec)*1000
2626                             + (now.tv_usec - latency_stats->start.tv_usec)/1000;
2627                         latency_stats_update(latency_stats, latency_ms);
2628
2629                         int congestion_ms = latency_stats->estimate - latency_stats->average;
2630                         if (congestion_ms < 0) congestion_ms = 0;
2631                         total_congestion_ms += congestion_ms;
2632
2633                         /* Adjusting weight using congestion detection based on latency estimator. */
2634                         if (ds_dest->attrs.congestion_control && ds_dest->attrs.weight) {
2635                                 int active_weight = ds_dest->attrs.weight - congestion_ms;
2636                                 if (active_weight <= 0) {
2637                                         active_weight = 0;
2638                                 } else {
2639                                         all_gw_congested = 0;
2640                                 }
2641                                 if (ds_dest->attrs.rweight != active_weight) {
2642                                         apply_rweights = 1;
2643                                         ds_dest->attrs.rweight = active_weight;
2644                                 }
2645                                 LM_DBG("[%d]latency[%d]avg[%.2f][%.*s]code[%d]rweight[%d]cms[%d]\n",
2646                                         latency_stats->count, latency_ms,
2647                                         latency_stats->average, address->len, address->s,
2648                                         code, ds_dest->attrs.rweight, congestion_ms);
2649                         }
2650                 } else {
2651                         /* Another gateway in the set, we verify if it is congested. */
2652                         int congestion_ms = latency_stats->estimate - latency_stats->average;
2653                         if (congestion_ms < 0) congestion_ms = 0;
2654                         total_congestion_ms += congestion_ms;
2655                         int active_weight = ds_dest->attrs.weight - congestion_ms;
2656                         if (active_weight > 0) all_gw_congested = 0;
2657                 }
2658                 if (!ds_dest->attrs.congestion_control) all_gw_congested = 0;
2659                 i++;
2660         }
2661         /* All the GWs are above their congestion threshold, load distribution will now be based on
2662          * the ratio of congestion_ms each GW is facing. */
2663         if (all_gw_congested) {
2664                 i = 0;
2665                 while (i < idx->nr) {
2666                         ds_dest_t *ds_dest = &idx->dlist[i];
2667                         ds_latency_stats_t *latency_stats = &ds_dest->latency_stats;
2668                         int congestion_ms = latency_stats->estimate - latency_stats->average;
2669                         /* We multiply by 2^4 to keep enough precision */
2670                         int active_weight = (total_congestion_ms << 4) / congestion_ms;
2671                         if (ds_dest->attrs.rweight != active_weight) {
2672                                 apply_rweights = 1;
2673                                 ds_dest->attrs.rweight = active_weight;
2674                         }
2675                         LM_DBG("all gw congested[%d][%d]latency_avg[%.2f][%.*s]code[%d]rweight[%d/%d:%d]cms[%d]\n",
2676                                         total_congestion_ms, latency_stats->count, latency_stats->average,
2677                                         address->len, address->s, code, total_congestion_ms, congestion_ms,
2678                                         ds_dest->attrs.rweight, congestion_ms);
2679                 i++;
2680                 }
2681         }
2682
2683         lock_release(&idx->lock);
2684         if (apply_rweights) dp_init_relative_weights(idx);
2685         return state;
2686 }
2687
2688
2689 /**
2690  * Get state for given destination
2691  */
2692 int ds_get_state(int group, str *address)
2693 {
2694         int i = 0;
2695         ds_set_t *idx = NULL;
2696
2697         if(_ds_list == NULL || _ds_list_nr <= 0) {
2698                 LM_ERR("the list is null\n");
2699                 return -1;
2700         }
2701
2702         /* get the index of the set */
2703         if(ds_get_index(group, *crt_idx, &idx) != 0) {
2704                 LM_ERR("destination set [%d] not found\n", group);
2705                 return -1;
2706         }
2707
2708         while(i < idx->nr) {
2709                 if(idx->dlist[i].uri.len == address->len
2710                                 && strncasecmp(idx->dlist[i].uri.s, address->s, address->len)
2711                                                    == 0) {
2712                         /* destination address found */
2713                         return idx->dlist[i].flags;
2714                 }
2715                 i++;
2716         }
2717         return 0;
2718 }
2719
2720 /**
2721  * Update destionation's state
2722  */
2723 int ds_update_state(sip_msg_t *msg, int group, str *address, int state,
2724                 ds_rctx_t *rctx)
2725 {
2726         int i = 0;
2727         int old_state = 0;
2728         int init_state = 0;
2729         ds_set_t *idx = NULL;
2730
2731         if(_ds_list == NULL || _ds_list_nr <= 0) {
2732                 LM_ERR("the list is null\n");
2733                 return -1;
2734         }
2735
2736         /* get the index of the set */
2737         if(ds_get_index(group, *crt_idx, &idx) != 0) {
2738                 LM_ERR("destination set [%d] not found\n", group);
2739                 return -1;
2740         }
2741         LM_DBG("update state for %.*s in group %d to %d\n", address->len, address->s, group, state);
2742
2743         while(i < idx->nr) {
2744                 if(idx->dlist[i].uri.len == address->len
2745                                 && strncasecmp(idx->dlist[i].uri.s, address->s, address->len)
2746                                                    == 0) {
2747                         /* destination address found */
2748                         old_state = idx->dlist[i].flags;
2749
2750                         /* reset the bits used for states */
2751                         idx->dlist[i].flags &= ~(DS_STATES_ALL);
2752
2753                         /* we need the initial state for inactive counter */
2754                         init_state = state;
2755
2756                         if((state & DS_TRYING_DST) && (old_state & DS_INACTIVE_DST)) {
2757                                 /* old state is inactive, new state is trying => keep it inactive
2758                                  * - it has to go first to active state and then to trying */
2759                                 state &= ~(DS_TRYING_DST);
2760                                 state |= DS_INACTIVE_DST;
2761                         }
2762
2763                         /* set the new states */
2764                         if(state & DS_DISABLED_DST) {
2765                                 idx->dlist[i].flags |= DS_DISABLED_DST;
2766                         } else {
2767                                 idx->dlist[i].flags |= state;
2768                         }
2769
2770                         if(state & DS_TRYING_DST) {
2771                                 idx->dlist[i].message_count++;
2772                                 LM_DBG("destination did not replied %d times, threshold %d\n",
2773                                                  idx->dlist[i].message_count, probing_threshold);
2774                                 /* Destination is not replying.. Increasing failure counter */
2775                                 if(idx->dlist[i].message_count >= probing_threshold) {
2776                                         /* Destination has too much lost messages.. Bringing it to inactive state */
2777                                         idx->dlist[i].flags &= ~DS_TRYING_DST;
2778                                         idx->dlist[i].flags |= DS_INACTIVE_DST;
2779                                         idx->dlist[i].message_count = 0;
2780                                         LM_DBG("deactivate destination, threshold %d reached\n", probing_threshold);
2781                                 }
2782                         } else {
2783                                 if(!(init_state & DS_TRYING_DST)
2784                                                 && (old_state & DS_INACTIVE_DST)) {
2785                                         idx->dlist[i].message_count++;
2786                                         /* Destination was inactive but it is just replying.. Increasing successful counter */
2787                                         if(idx->dlist[i].message_count < inactive_threshold) {
2788                                                 /* Destination has not enough successful replies.. Leaving it into inactive state */
2789                                                 idx->dlist[i].flags |= DS_INACTIVE_DST;
2790                                                 /* if destination was in probing state, we stay there for now */
2791                                                 if((old_state & DS_PROBING_DST) != 0) {
2792                                                         idx->dlist[i].flags |= DS_PROBING_DST;
2793                                                 }
2794                                                 LM_DBG("destination replied successful %d times, threshold %d\n",
2795                                                                  idx->dlist[i].message_count, inactive_threshold);
2796                                         } else {
2797                                                 /* Destination has enough replied messages.. Bringing it to active state */
2798                                                 idx->dlist[i].message_count = 0;
2799                                                 LM_DBG("activate destination, threshold %d reached\n", inactive_threshold);
2800                                         }
2801                                 } else {
2802                                         idx->dlist[i].message_count = 0;
2803                                 }
2804                         }
2805
2806                         if(!ds_skip_dst(old_state) && ds_skip_dst(idx->dlist[i].flags)) {
2807                                 ds_run_route(msg, address, "dispatcher:dst-down", rctx);
2808
2809                         } else {
2810                                 if(ds_skip_dst(old_state) && !ds_skip_dst(idx->dlist[i].flags))
2811                                         ds_run_route(msg, address, "dispatcher:dst-up", rctx);
2812                         }
2813                         if(idx->dlist[i].attrs.rweight > 0)
2814                                 ds_reinit_rweight_on_state_change(
2815                                                 old_state, idx->dlist[i].flags, idx);
2816
2817                         LM_DBG("old state was %d, set new state to %d\n", old_state, idx->dlist[i].flags);
2818                         return 0;
2819                 }
2820                 i++;
2821         }
2822
2823         return -1;
2824 }
2825
2826 /**
2827  *
2828  */
2829 static ds_rctx_t *_ds_rctx = NULL;
2830
2831 /**
2832  *
2833  */
2834 ds_rctx_t* ds_get_rctx(void)
2835 {
2836         return _ds_rctx;
2837 }
2838
2839 static void ds_run_route(sip_msg_t *msg, str *uri, char *route, ds_rctx_t *rctx)
2840 {
2841         int rt, backup_rt;
2842         struct run_act_ctx ctx;
2843         sip_msg_t *fmsg;
2844         sr_kemi_eng_t *keng = NULL;
2845         str evname;
2846
2847         if(route == NULL) {
2848                 LM_ERR("bad route\n");
2849                 return;
2850         }
2851
2852         LM_DBG("executing event_route[%s]\n", route);
2853
2854         rt = -1;
2855         if(ds_event_callback.s==NULL || ds_event_callback.len<=0) {
2856                 rt = route_lookup(&event_rt, route);
2857                 if(rt < 0 || event_rt.rlist[rt] == NULL) {
2858                         LM_DBG("route does not exist");
2859                         return;
2860                 }
2861         } else {
2862                 keng = sr_kemi_eng_get();
2863                 if(keng==NULL) {
2864                         LM_DBG("event callback (%s) set, but no cfg engine\n",
2865                                         ds_event_callback.s);
2866                         return;
2867                 }
2868         }
2869
2870         if(msg == NULL) {
2871                 if(faked_msg_init() < 0) {
2872                         LM_ERR("faked_msg_init() failed\n");
2873                         return;
2874                 }
2875                 fmsg = faked_msg_next();
2876                 fmsg->parsed_orig_ruri_ok = 0;
2877                 fmsg->new_uri = *uri;
2878         } else {
2879                 fmsg = msg;
2880         }
2881
2882         if(rt>=0 || ds_event_callback.len>0) {
2883                 _ds_rctx = rctx;
2884                 backup_rt = get_route_type();
2885                 set_route_type(REQUEST_ROUTE);
2886                 init_run_actions_ctx(&ctx);
2887                 if(rt>=0) {
2888                         run_top_route(event_rt.rlist[rt], fmsg, 0);
2889                 } else {
2890                         if(keng!=NULL) {
2891                                 evname.s = route;
2892                                 evname.len = strlen(evname.s);
2893                                 if(sr_kemi_route(keng, fmsg, EVENT_ROUTE,
2894                                                         &ds_event_callback, &evname)<0) {
2895                                         LM_ERR("error running event route kemi callback\n");
2896                                 }
2897                         }
2898                 }
2899                 set_route_type(backup_rt);
2900                 _ds_rctx = NULL;
2901         }
2902 }
2903
2904
2905 /**
2906  * recalculate relative states if some destination state was changed
2907  */
2908 int ds_reinit_rweight_on_state_change(
2909                 int old_state, int new_state, ds_set_t *dset)
2910 {
2911         if(dset == NULL) {
2912                 LM_ERR("destination set is null\n");
2913                 return -1;
2914         }
2915         if((!ds_skip_dst(old_state) && ds_skip_dst(new_state))
2916                         || (ds_skip_dst(old_state) && !ds_skip_dst(new_state))) {
2917                 dp_init_relative_weights(dset);
2918         }
2919
2920         return 0;
2921 }
2922
2923
2924 /**
2925  *
2926  */
2927 int ds_reinit_state(int group, str *address, int state)
2928 {
2929         int i = 0;
2930         ds_set_t *idx = NULL;
2931
2932         if(_ds_list == NULL || _ds_list_nr <= 0) {
2933                 LM_ERR("the list is null\n");
2934                 return -1;
2935         }
2936
2937         /* get the index of the set */
2938         if(ds_get_index(group, *crt_idx, &idx) != 0) {
2939                 LM_ERR("destination set [%d] not found\n", group);
2940                 return -1;
2941         }
2942
2943         for(i = 0; i < idx->nr; i++) {
2944                 if(idx->dlist[i].uri.len == address->len
2945                                 && strncasecmp(idx->dlist[i].uri.s, address->s, address->len)
2946                                                    == 0) {
2947                         int old_state = idx->dlist[i].flags;
2948                         /* reset the bits used for states */
2949                         idx->dlist[i].flags &= ~(DS_STATES_ALL);
2950                         /* set the new states */
2951                         idx->dlist[i].flags |= state;
2952                         if(idx->dlist[i].attrs.rweight > 0) {
2953                                 ds_reinit_rweight_on_state_change(
2954                                                 old_state, idx->dlist[i].flags, idx);
2955                         }
2956
2957                         return 0;
2958                 }
2959         }
2960         LM_ERR("destination address [%d : %.*s] not found\n", group, address->len,
2961                         address->s);
2962         return -1;
2963 }
2964
2965 /**
2966  *
2967  */
2968 int ds_reinit_state_all(int group, int state)
2969 {
2970         int i = 0;
2971         ds_set_t *idx = NULL;
2972
2973         if(_ds_list == NULL || _ds_list_nr <= 0) {
2974                 LM_ERR("the list is null\n");
2975                 return -1;
2976         }
2977
2978         /* get the index of the set */
2979         if(ds_get_index(group, *crt_idx, &idx) != 0) {
2980                 LM_ERR("destination set [%d] not found\n", group);
2981                 return -1;
2982         }
2983
2984         for(i = 0; i < idx->nr; i++) {
2985                 int old_state = idx->dlist[i].flags;
2986                 /* reset the bits used for states */
2987                 idx->dlist[i].flags &= ~(DS_STATES_ALL);
2988                 /* set the new states */
2989                 idx->dlist[i].flags |= state;
2990                 if(idx->dlist[i].attrs.rweight > 0) {
2991                         ds_reinit_rweight_on_state_change(
2992                                         old_state, idx->dlist[i].flags, idx);
2993                 }
2994         }
2995         return 0;
2996 }
2997
2998 /**
2999  *
3000  */
3001 void ds_fprint_set(FILE *fout, ds_set_t *node)
3002 {
3003         int i, j;
3004
3005         if(!node)
3006                 return;
3007
3008         for(i = 0; i < 2; ++i)
3009                 ds_fprint_set(fout, node->next[i]);
3010
3011         for(j = 0; j < node->nr; j++) {
3012                 fprintf(fout, "\n set #%d\n", node->id);
3013
3014                 if(node->dlist[j].flags & DS_DISABLED_DST)
3015                         fprintf(fout, "    Disabled         ");
3016                 else if(node->dlist[j].flags & DS_INACTIVE_DST)
3017                         fprintf(fout, "    Inactive         ");
3018                 else if(node->dlist[j].flags & DS_TRYING_DST) {
3019                         fprintf(fout, "    Trying");
3020                         /* print the tries for this host. */
3021                         if(node->dlist[j].message_count > 0) {
3022                                 fprintf(fout, " (Fail %d/%d)", node->dlist[j].message_count,
3023                                                 probing_threshold);
3024                         } else {
3025                                 fprintf(fout, "           ");
3026                         }
3027
3028                 } else {
3029                         fprintf(fout, "    Active           ");
3030                 }
3031                 if(node->dlist[j].flags & DS_PROBING_DST)
3032                         fprintf(fout, "(P)");
3033                 else
3034                         fprintf(fout, "(*)");
3035
3036                 fprintf(fout, "   %.*s\n", node->dlist[j].uri.len,
3037                                 node->dlist[j].uri.s);
3038         }
3039 }
3040
3041 /**
3042  *
3043  */
3044 int ds_fprint_list(FILE *fout)
3045 {
3046         if(_ds_list == NULL || _ds_list_nr <= 0) {
3047                 LM_ERR("no destination sets\n");
3048                 return -1;
3049         }
3050
3051         fprintf(fout, "\nnumber of destination sets: %d\n", _ds_list_nr);
3052
3053         ds_fprint_set(fout, _ds_list);
3054
3055         return 0;
3056 }
3057
3058
3059 int ds_is_addr_from_set(sip_msg_t *_m, struct ip_addr *pipaddr,
3060                 unsigned short tport, unsigned short tproto, ds_set_t *node, int mode,
3061                 int export_set_pv)
3062 {
3063         pv_value_t val;
3064         int j;
3065         for(j = 0; j < node->nr; j++) {
3066                 if(ip_addr_cmp(pipaddr, &node->dlist[j].ip_address)
3067                                 && ((mode & DS_MATCH_NOPORT) || node->dlist[j].port == 0
3068                                                    || tport == node->dlist[j].port)
3069                                 && ((mode & DS_MATCH_NOPROTO)
3070                                                    || tproto == node->dlist[j].proto)
3071                                 && (((mode & DS_MATCH_ACTIVE) && !ds_skip_dst(node->dlist[j].flags))
3072                                                    || !(mode & DS_MATCH_ACTIVE))) {
3073                         if(export_set_pv && ds_setid_pvname.s != 0) {
3074                                 memset(&val, 0, sizeof(pv_value_t));
3075                                 val.flags = PV_VAL_INT | PV_TYPE_INT;
3076
3077                                 val.ri = node->id;
3078                                 if(ds_setid_pv.setf(_m, &ds_setid_pv.pvp, (int)EQ_T, &val)
3079                                                 < 0) {
3080                                         LM_ERR("setting PV failed\n");
3081                                         return -2;
3082                                 }
3083                         }
3084                         if(ds_attrs_pvname.s != 0 && node->dlist[j].attrs.body.len > 0) {
3085                                 memset(&val, 0, sizeof(pv_value_t));
3086                                 val.flags = PV_VAL_STR;
3087                                 val.rs = node->dlist[j].attrs.body;
3088                                 if(ds_attrs_pv.setf(_m, &ds_attrs_pv.pvp, (int)EQ_T, &val)
3089                                                 < 0) {
3090                                         LM_ERR("setting attrs pv failed\n");
3091                                         return -3;
3092                                 }
3093                         }
3094                         return 1;
3095                 }
3096         }
3097         return -1;
3098 }
3099
3100 /**
3101  *
3102  */
3103 int ds_is_addr_from_set_r(sip_msg_t *_m, struct ip_addr *pipaddr,
3104                 unsigned short tport, unsigned short tproto, ds_set_t *node, int mode,
3105                 int export_set_pv)
3106 {
3107         int i, rc;
3108
3109         if(!node)
3110                 return -1;
3111
3112         for(i = 0; i < 2; ++i) {
3113                 rc = ds_is_addr_from_set_r(
3114                                 _m, pipaddr, tport, tproto, node->next[i], mode, export_set_pv);
3115                 if(rc != -1)
3116                         return rc;
3117         }
3118
3119         return ds_is_addr_from_set(
3120                         _m, pipaddr, tport, tproto, node, mode, export_set_pv);
3121 }
3122
3123 /* Checks, if the request (sip_msg *_m) comes from a host in a group
3124  * (group-id or -1 for all groups)
3125  */
3126 int ds_is_addr_from_list(sip_msg_t *_m, int group, str *uri, int mode)
3127 {
3128         ds_set_t *list;
3129
3130         struct ip_addr *pipaddr;
3131         struct ip_addr aipaddr;
3132         unsigned short tport;
3133         unsigned short tproto;
3134         sip_uri_t puri;
3135         static char hn[256];
3136         struct hostent *he;
3137         int rc = -1;
3138
3139         if(uri == NULL || uri->len <= 0) {
3140                 pipaddr = &_m->rcv.src_ip;
3141                 tport = _m->rcv.src_port;
3142                 tproto = _m->rcv.proto;
3143         } else {
3144                 if(parse_uri(uri->s, uri->len, &puri) != 0 || puri.host.len > 255) {
3145                         LM_ERR("bad uri [%.*s]\n", uri->len, uri->s);
3146                         return -1;
3147                 }
3148                 strncpy(hn, puri.host.s, puri.host.len);
3149                 hn[puri.host.len] = '\0';
3150
3151                 he = resolvehost(hn);
3152                 if(he == 0) {
3153                         LM_ERR("could not resolve %.*s\n", puri.host.len, puri.host.s);
3154                         return -1;
3155                 }
3156                 hostent2ip_addr(&aipaddr, he, 0);
3157                 pipaddr = &aipaddr;
3158                 tport = puri.port_no;
3159                 tproto = puri.proto;
3160         }
3161
3162
3163         if(group == -1) {
3164                 rc = ds_is_addr_from_set_r(_m, pipaddr, tport, tproto, _ds_list,
3165                                 mode, 1);
3166         } else {
3167                 list = ds_avl_find(_ds_list, group);
3168                 if(list) {
3169                         rc = ds_is_addr_from_set(_m, pipaddr, tport, tproto, list, mode, 0);
3170                 }
3171         }
3172
3173         return rc;
3174 }
3175
3176 int ds_is_from_list(struct sip_msg *_m, int group)
3177 {
3178         return ds_is_addr_from_list(_m, group, NULL, DS_MATCH_NOPROTO);
3179 }
3180
3181 /*! \brief
3182  * Callback-Function for the OPTIONS-Request
3183  * This Function is called, as soon as the Transaction is finished
3184  * (e. g. a Response came in, the timeout was hit, ...)
3185  */
3186 static void ds_options_callback(
3187                 struct cell *t, int type, struct tmcb_params *ps)
3188 {
3189         int group = 0;
3190         str uri = {0, 0};
3191         sip_msg_t *fmsg;
3192         int state;
3193         ds_rctx_t rctx;
3194
3195         /* The param contains the group, in which the failed host
3196          * can be found.*/
3197         if(ps->param == NULL) {
3198                 LM_DBG("No parameter provided, OPTIONS-Request was finished"
3199                            " with code %d\n",
3200                                 ps->code);
3201                 return;
3202         }
3203
3204         fmsg = NULL;
3205
3206         /* The param is a (void*) Pointer, so we need to dereference it and
3207          *  cast it to an int. */
3208         group = (int)(long)(*ps->param);
3209         /* The SIP-URI is taken from the Transaction.
3210          * Remove the "To: <" (s+5) and the trailing >+new-line (s - 5 (To: <)
3211          * - 3 (>\r\n)). */
3212         uri.s = t->to.s + 5;
3213         uri.len = t->to.len - 8;
3214         LM_DBG("OPTIONS-Request was finished with code %d (to %.*s, group %d)\n",
3215                         ps->code, uri.len, uri.s, group);
3216         if (ds_ping_latency_stats) {
3217                 ds_update_latency(group, &uri, ps->code);
3218         }
3219
3220         memset(&rctx, 0, sizeof(ds_rctx_t));
3221         rctx.code = ps->code;
3222         if(ps->rpl!=NULL) {
3223                 if(ps->rpl!=FAKED_REPLY) {
3224                         rctx.flags |= 1;
3225                         rctx.reason = ps->rpl->first_line.u.reply.reason;
3226                 }
3227         }
3228
3229         /* ps->code contains the result-code of the request.
3230          *
3231          * We accept both a "200 OK" or the configured reply as a valid response */
3232         if((ps->code >= 200 && ps->code <= 299)
3233                         || ds_ping_check_rplcode(ps->code)) {
3234                 /* Set the according entry back to "Active" */
3235                 state = 0;
3236                 if(ds_probing_mode == DS_PROBE_ALL
3237                                 || ((ds_probing_mode == DS_PROBE_ONLYFLAGGED)
3238                                                    && (ds_get_state(group, &uri) & DS_PROBING_DST)))
3239                         state |= DS_PROBING_DST;
3240
3241                 /* Check if in the meantime someone disabled the target through RPC or MI */
3242                 if(!(ds_get_state(group, &uri) & DS_DISABLED_DST)
3243                                 && ds_update_state(fmsg, group, &uri, state, &rctx) != 0) {
3244                         LM_ERR("Setting the state failed (%.*s, group %d)\n", uri.len,
3245                                         uri.s, group);
3246                 }
3247         } else {
3248                 state = DS_TRYING_DST;
3249                 if(ds_probing_mode != DS_PROBE_NONE)
3250                         state |= DS_PROBING_DST;
3251                 /* Check if in the meantime someone disabled the target through RPC or MI */
3252                 if(!(ds_get_state(group, &uri) & DS_DISABLED_DST)
3253                                 && ds_update_state(fmsg, group, &uri, state, &rctx) != 0) {
3254                         LM_ERR("Setting the probing state failed (%.*s, group %d)\n",
3255                                         uri.len, uri.s, group);
3256                 }
3257         }
3258
3259         return;
3260 }
3261
3262 /*
3263  * Small helper to decide to ping a gateway or not
3264  */
3265 static inline int ds_ping_result_helper(ds_set_t *node, int j)
3266 {
3267         /* probe all */
3268         if(ds_probing_mode == DS_PROBE_ALL) {
3269                 LM_DBG("probe all, mode DS_PROBE_ALL\n");
3270                 return 1;
3271         }
3272         /* probe if probing is set, but not in mode DS_PROBE_INACTIVE */
3273         if (ds_probing_mode != DS_PROBE_INACTIVE
3274                         && (node->dlist[j].flags & DS_PROBING_DST) != 0) {
3275                 LM_DBG("probing set, but not mode DS_PROBE_INACTIVE\n");
3276                 return 1;
3277         }
3278         /* probe for mode DS_PROBE_INACTIVE only for inactive and probing gw */
3279         if (ds_probing_mode == DS_PROBE_INACTIVE
3280                         && (node->dlist[j].flags & DS_PROBING_DST) != 0
3281                         && (node->dlist[j].flags & DS_INACTIVE_DST) != 0) {
3282                 LM_DBG("probing and inactive set, mode DS_PROBE_INACTIVE\n");
3283                 return 1;
3284         }
3285         return 0;
3286 }
3287
3288 /**
3289  *
3290  */
3291 void ds_ping_set(ds_set_t *node)
3292 {
3293         uac_req_t uac_r;
3294         int i, j;
3295         str ping_from;
3296         str obproxy;
3297
3298         if(!node)
3299                 return;
3300
3301         for(i = 0; i < 2; ++i)
3302                 ds_ping_set(node->next[i]);
3303
3304         for(j = 0; j < node->nr; j++) {
3305                 /* skip addresses set in disabled state by admin */
3306                 if((node->dlist[j].flags & DS_DISABLED_DST) != 0)
3307                         continue;
3308                 /* If the Flag of the entry has "Probing set, send a probe:     */
3309                 if(ds_ping_result_helper(node, j)) {
3310                         LM_DBG("probing set #%d, URI %.*s\n", node->id,
3311                                         node->dlist[j].uri.len, node->dlist[j].uri.s);
3312
3313                         /* Send ping using TM-Module.
3314                          * int request(str* m, str* ruri, str* to, str* from, str* h,
3315                          *              str* b, str *oburi,
3316                          *              transaction_cb cb, void* cbp); */
3317                         set_uac_req(&uac_r, &ds_ping_method, 0, 0, 0, TMCB_LOCAL_COMPLETED,
3318                                         ds_options_callback, (void *)(long)node->id);
3319                         if(node->dlist[j].attrs.sockname.s != NULL
3320                                         && node->dlist[j].attrs.sockname.len > 0) {
3321                                 uac_r.ssockname = &node->dlist[j].attrs.sockname;
3322                         } else if(node->dlist[j].attrs.socket.s != NULL
3323                                         && node->dlist[j].attrs.socket.len > 0) {
3324                                 uac_r.ssock = &node->dlist[j].attrs.socket;
3325                         } else if(ds_default_sockname.s != NULL
3326                                           && ds_default_sockname.len > 0) {
3327                                 uac_r.ssockname = &ds_default_sockname;
3328                         } else if(ds_default_socket.s != NULL
3329                                           && ds_default_socket.len > 0) {
3330                                 uac_r.ssock = &ds_default_socket;
3331                         }
3332
3333                         /* Overwrite default ping From URI with attribute */
3334                         if(node->dlist[j].attrs.ping_from.s != NULL
3335                                         && node->dlist[j].attrs.ping_from.len > 0) {
3336                                 ping_from = node->dlist[j].attrs.ping_from;
3337                                 LM_DBG("ping_from: %.*s\n", ping_from.len, ping_from.s);
3338                         }
3339                         else {
3340                                 ping_from = ds_ping_from;
3341                                 LM_DBG("Default ping_from: %.*s\n", ping_from.len, ping_from.s);
3342                         }
3343
3344                         if(node->dlist[j].attrs.obproxy.s != NULL
3345                                         && node->dlist[j].attrs.obproxy.len > 0) {
3346                                 obproxy = node->dlist[j].attrs.obproxy;
3347                                 LM_DBG("outbound proxy: %.*s\n", obproxy.len, obproxy.s);
3348                         }
3349                         else {
3350                                 obproxy = ds_outbound_proxy;
3351                                 LM_DBG("Default outbound proxy: %.*s\n", ds_outbound_proxy.len, ds_outbound_proxy.s);
3352                         }
3353
3354                         gettimeofday(&node->dlist[j].latency_stats.start, NULL);
3355
3356                         if(tmb.t_request(&uac_r, &node->dlist[j].uri, &node->dlist[j].uri,
3357                                            &ping_from, &obproxy)
3358                                         < 0) {
3359                                 LM_ERR("unable to ping [%.*s]\n", node->dlist[j].uri.len,
3360                                                 node->dlist[j].uri.s);
3361                         }
3362                 }
3363         }
3364 }
3365
3366 /*! \brief
3367  * Timer for checking probing destinations
3368  *
3369  * This timer is regularly fired.
3370  */
3371 void ds_check_timer(unsigned int ticks, void *param)
3372 {
3373
3374
3375         /* Check for the list. */
3376         if(_ds_list == NULL || _ds_list_nr <= 0) {
3377                 LM_DBG("no destination sets\n");
3378                 return;
3379         }
3380
3381         if(_ds_ping_active != NULL && *_ds_ping_active == 0) {
3382                 LM_DBG("pinging destinations is inactive by admin\n");
3383                 return;
3384         }
3385
3386         ds_ping_set(_ds_list);
3387 }
3388
3389 /*! \brief
3390  * Timer for checking expired items in call load dispatching
3391  *
3392  * This timer is regularly fired.
3393  */
3394 void ds_ht_timer(unsigned int ticks, void *param)
3395 {
3396         ds_cell_t *it;
3397         ds_cell_t *it0;
3398         time_t now;
3399         int i;
3400
3401         if(_dsht_load == NULL)
3402                 return;
3403
3404         now = time(NULL);
3405
3406         for(i = 0; i < _dsht_load->htsize; i++) {
3407                 /* free entries */
3408                 lock_get(&_dsht_load->entries[i].lock);
3409                 it = _dsht_load->entries[i].first;
3410                 while(it) {
3411                         it0 = it->next;
3412                         if((it->expire != 0 && it->expire < now)
3413                                         || (it->state == DS_LOAD_INIT && it->initexpire != 0
3414                                                            && it->initexpire < now)) {
3415                                 /* expired */
3416                                 if(it->prev == NULL)
3417                                         _dsht_load->entries[i].first = it->next;
3418                                 else
3419                                         it->prev->next = it->next;
3420                                 if(it->next)
3421                                         it->next->prev = it->prev;
3422                                 _dsht_load->entries[i].esize--;
3423
3424                                 /* execute ds unload callback */
3425                                 ds_load_remove_byid(it->dset, &it->duid);
3426
3427                                 ds_cell_free(it);
3428                         }
3429                         it = it0;
3430                 }
3431                 lock_release(&_dsht_load->entries[i].lock);
3432         }
3433         return;