rtpengine: free rtpl if error inside fixup_set_id()
[sip-router] / src / modules / rtpengine / rtpengine.c
1 /*
2  * Copyright (C) 2003-2008 Sippy Software, Inc., http://www.sippysoft.com
3  * Copyright (C) 2014-2015 Sipwise GmbH, http://www.sipwise.com
4  *
5  * This file is part of Kamailio, a free SIP server.
6  *
7  * Kamailio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version
11  *
12  * Kamailio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20  */
21
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <sys/time.h>
25 #include <netinet/in.h>
26 #include <netinet/in_systm.h>
27 #ifndef __USE_BSD
28 #define  __USE_BSD
29 #endif
30 #include <netinet/ip.h>
31 #ifndef __FAVOR_BSD
32 #define __FAVOR_BSD
33 #endif
34 #include <netinet/udp.h>
35 #include <arpa/inet.h>
36 #include <sys/uio.h>
37 #include <sys/un.h>
38 #include <ctype.h>
39 #include <errno.h>
40 #include <netdb.h>
41 #include <poll.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <string.h>
45 #include <unistd.h>
46 #include <ifaddrs.h>
47 #include <netinet/in.h>
48 #include <arpa/inet.h>
49
50 #include "../../core/flags.h"
51 #include "../../core/sr_module.h"
52 #include "../../core/dprint.h"
53 #include "../../core/data_lump.h"
54 #include "../../core/data_lump_rpl.h"
55 #include "../../core/error.h"
56 #include "../../core/forward.h"
57 #include "../../core/mem/mem.h"
58 #include "../../core/parser/parse_from.h"
59 #include "../../core/parser/parse_to.h"
60 #include "../../core/parser/parse_uri.h"
61 #include "../../core/parser/parser_f.h"
62 #include "../../core/parser/sdp/sdp.h"
63 #include "../../core/resolve.h"
64 #include "../../core/timer.h"
65 #include "../../core/trim.h"
66 #include "../../core/ut.h"
67 #include "../../core/pt.h"
68 #include "../../core/timer_proc.h"
69 #include "../../core/pvar.h"
70 #include "../../core/lvalue.h"
71 #include "../../core/msg_translator.h"
72 #include "../../core/usr_avp.h"
73 #include "../../core/socket_info.h"
74 #include "../../core/mod_fix.h"
75 #include "../../core/dset.h"
76 #include "../../core/route.h"
77 #include "../../core/rpc.h"
78 #include "../../core/rpc_lookup.h"
79 #include "../../core/kemi.h"
80 #include "../../modules/tm/tm_load.h"
81 #include "rtpengine.h"
82 #include "rtpengine_funcs.h"
83 #include "rtpengine_hash.h"
84 #include "bencode.h"
85 #include "config.h"
86
87 MODULE_VERSION
88
89 #if !defined(AF_LOCAL)
90 #define AF_LOCAL AF_UNIX
91 #endif
92 #if !defined(PF_LOCAL)
93 #define PF_LOCAL PF_UNIX
94 #endif
95
96 /* NAT UAC test constants */
97 #define NAT_UAC_TEST_C_1918                     0x01
98 #define NAT_UAC_TEST_RCVD                       0x02
99 #define NAT_UAC_TEST_V_1918                     0x04
100 #define NAT_UAC_TEST_S_1918                     0x08
101 #define NAT_UAC_TEST_RPORT                      0x10
102
103 #define COOKIE_SIZE                                     128
104 #define HOSTNAME_SIZE                           100
105
106 #define DEFAULT_RTPP_SET_ID                     0
107
108 enum {
109         RPC_FOUND_ALL = 2,
110         RPC_FOUND_ONE = 1,
111         RPC_FOUND_NONE = 0,
112 };
113
114 #define CPORT                                   "22222"
115
116 struct ng_flags_parse {
117         int via, to, packetize, transport;
118         bencode_item_t *dict, *flags, *direction, *replace, *rtcp_mux;
119         str call_id, from_tag, to_tag;
120 };
121
122 static const char *command_strings[] = {
123         [OP_OFFER]              = "offer",
124         [OP_ANSWER]             = "answer",
125         [OP_DELETE]             = "delete",
126         [OP_START_RECORDING]    = "start recording",
127         [OP_QUERY]              = "query",
128         [OP_PING]               = "ping",
129 };
130
131 struct minmax_mos_stats {
132         str mos_param;
133         str at_param;
134         str packetloss_param;
135         str jitter_param;
136         str roundtrip_param;
137         str samples_param;
138
139         pv_elem_t *mos_pv;
140         pv_elem_t *at_pv;
141         pv_elem_t *packetloss_pv;
142         pv_elem_t *jitter_pv;
143         pv_elem_t *roundtrip_pv;
144         pv_elem_t *samples_pv;
145 };
146 struct minmax_mos_label_stats {
147         int got_any_pvs;
148
149         str label_param;
150         pv_elem_t *label_pv;
151
152         struct minmax_mos_stats min,
153                                 max,
154                                 average;
155 };
156 struct minmax_stats_vals {
157         long long mos;
158         long long at;
159         long long packetloss;
160         long long jitter;
161         long long roundtrip;
162         long long samples;
163         long long avg_samples; /* our own running count to average the averages */
164 };
165
166 static char *gencookie();
167 static int rtpp_test(struct rtpp_node*, int, int);
168 static int start_recording_f(struct sip_msg *, char *, char *);
169 static int rtpengine_answer1_f(struct sip_msg *, char *, char *);
170 static int rtpengine_offer1_f(struct sip_msg *, char *, char *);
171 static int rtpengine_delete1_f(struct sip_msg *, char *, char *);
172 static int rtpengine_manage1_f(struct sip_msg *, char *, char *);
173 static int rtpengine_query1_f(struct sip_msg *, char *, char *);
174
175 static int parse_flags(struct ng_flags_parse *, struct sip_msg *, enum rtpe_operation *, const char *);
176
177 static int rtpengine_offer_answer(struct sip_msg *msg, const char *flags, int op, int more);
178 static int fixup_set_id(void ** param, int param_no);
179 static int set_rtpengine_set_f(struct sip_msg * msg, char * str1, char * str2);
180 static struct rtpp_set * select_rtpp_set(unsigned int id_set);
181 static struct rtpp_node *select_rtpp_node_new(str, str, int, struct rtpp_node **, int);
182 static struct rtpp_node *select_rtpp_node_old(str, str, int, enum rtpe_operation);
183 static struct rtpp_node *select_rtpp_node(str, str, int, struct rtpp_node **, int, enum rtpe_operation);
184 static int is_queried_node(struct rtpp_node *, struct rtpp_node **, int);
185 static int build_rtpp_socks();
186 static char *send_rtpp_command(struct rtpp_node *, bencode_item_t *, int *);
187 static int get_extra_id(struct sip_msg* msg, str *id_str);
188
189 static int rtpengine_set_store(modparam_t type, void * val);
190 static int rtpengine_add_rtpengine_set(char * rtp_proxies, unsigned int weight, int disabled, unsigned int ticks);
191
192 static int mod_init(void);
193 static int child_init(int);
194 static void mod_destroy(void);
195
196 static int get_ip_type(char *str_addr);
197 static int get_ip_scope(char *str_addr); // useful for link-local ipv6
198 static int bind_force_send_ip(int sock_idx);
199
200 static int add_rtpp_node_info(void *ptrs, struct rtpp_node *crt_rtpp, struct rtpp_set *rtpp_list);
201 static int rtpp_test_ping(struct rtpp_node *node);
202
203 /* Pseudo-Variables */
204 static int pv_get_rtpstat_f(struct sip_msg *, pv_param_t *, pv_value_t *);
205 static int set_rtp_inst_pvar(struct sip_msg *msg, const str * const uri);
206 static int pv_parse_var(str *inp, pv_elem_t **outp, int *got_any);
207 static int mos_label_stats_parse(struct minmax_mos_label_stats *mmls);
208 static void parse_call_stats(bencode_item_t *, struct sip_msg *);
209
210 static int rtpengine_allow_op = 0;
211 static struct rtpp_node **queried_nodes_ptr = NULL;
212 static pid_t mypid;
213 static unsigned int myseqn = 0;
214 static str extra_id_pv_param = {NULL, 0};
215 static char *setid_avp_param = NULL;
216 static int hash_table_tout = 3600;
217 static int hash_table_size = 256;
218 static unsigned int setid_default = DEFAULT_RTPP_SET_ID;
219
220 static char ** rtpp_strings=0;
221 static int rtpp_sets=0; /*used in rtpengine_set_store()*/
222 static int rtpp_set_count = 0;
223 static unsigned int current_msg_id = (unsigned int)-1;
224 /* RTP proxy balancing list */
225 static struct rtpp_set_head * rtpp_set_list =0;
226 static struct rtpp_set * active_rtpp_set =0;
227 static struct rtpp_set * selected_rtpp_set_1 =0;
228 static struct rtpp_set * selected_rtpp_set_2 =0;
229 static struct rtpp_set * default_rtpp_set=0;
230
231 static str body_intermediate;
232
233 static str rtp_inst_pv_param = {NULL, 0};
234 static pv_spec_t *rtp_inst_pvar = NULL;
235
236 /* array with the sockets used by rtpporxy (per process)*/
237 static unsigned int *rtpp_no = 0;
238 static gen_lock_t *rtpp_no_lock = 0;
239 static int *rtpp_socks = 0;
240 static unsigned int rtpp_socks_size = 0;
241
242 static int setid_avp_type;
243 static int_str setid_avp;
244
245 static str write_sdp_pvar_str = {NULL, 0};
246 static pv_spec_t *write_sdp_pvar = NULL;
247
248 static str read_sdp_pvar_str = {NULL, 0};
249 static pv_spec_t *read_sdp_pvar = NULL;
250
251 #define RTPENGINE_SESS_LIMIT_MSG "Parallel session limit reached"
252 #define RTPENGINE_SESS_LIMIT_MSG_LEN (sizeof(RTPENGINE_SESS_LIMIT_MSG)-1)
253
254 char* force_send_ip_str="";
255 int force_send_ip_af = AF_UNSPEC;
256
257 typedef struct rtpp_set_link {
258         struct rtpp_set *rset;
259         pv_spec_t *rpv;
260 } rtpp_set_link_t;
261
262 /* tm */
263 static struct tm_binds tmb;
264
265 static pv_elem_t *extra_id_pv = NULL;
266
267
268 static struct minmax_mos_label_stats global_mos_stats,
269                                      side_A_mos_stats,
270                                      side_B_mos_stats;
271 int got_any_mos_pvs;
272
273
274
275 static cmd_export_t cmds[] = {
276         {"set_rtpengine_set",   (cmd_function)set_rtpengine_set_f,      1,
277                 fixup_set_id, 0,
278                 ANY_ROUTE},
279         {"set_rtpengine_set",   (cmd_function)set_rtpengine_set_f,      2,
280                 fixup_set_id, 0,
281                 ANY_ROUTE},
282         {"start_recording",     (cmd_function)start_recording_f,        0,
283                 0, 0,
284                 ANY_ROUTE },
285         {"rtpengine_offer",     (cmd_function)rtpengine_offer1_f,       0,
286                 0, 0,
287                 ANY_ROUTE},
288         {"rtpengine_offer",     (cmd_function)rtpengine_offer1_f,       1,
289                 fixup_spve_null, 0,
290                 ANY_ROUTE},
291         {"rtpengine_answer",    (cmd_function)rtpengine_answer1_f,      0,
292                 0, 0,
293                 ANY_ROUTE},
294         {"rtpengine_answer",    (cmd_function)rtpengine_answer1_f,      1,
295                 fixup_spve_null, 0,
296                 ANY_ROUTE},
297         {"rtpengine_manage",    (cmd_function)rtpengine_manage1_f,      0,
298                 0, 0,
299                 ANY_ROUTE},
300         {"rtpengine_manage",    (cmd_function)rtpengine_manage1_f,      1,
301                 fixup_spve_null, 0,
302                 ANY_ROUTE},
303         {"rtpengine_delete",    (cmd_function)rtpengine_delete1_f,      0,
304                 0, 0,
305                 ANY_ROUTE},
306         {"rtpengine_delete",    (cmd_function)rtpengine_delete1_f,      1,
307                 fixup_spve_null, 0,
308                 ANY_ROUTE},
309         {"rtpengine_query",     (cmd_function)rtpengine_query1_f,       0,
310                 0, 0,
311                 ANY_ROUTE},
312         {"rtpengine_query",     (cmd_function)rtpengine_query1_f,       1,
313                 fixup_spve_null, 0,
314                 ANY_ROUTE},
315         {0, 0, 0, 0, 0, 0}
316 };
317
318 static pv_export_t mod_pvs[] = {
319         {{"rtpstat", (sizeof("rtpstat")-1)}, /* RTP-Statistics */
320         PVT_OTHER, pv_get_rtpstat_f, 0, 0, 0, 0, 0},
321         {{0, 0}, 0, 0, 0, 0, 0, 0, 0}
322 };
323
324 static param_export_t params[] = {
325         {"rtpengine_sock",        PARAM_STRING|USE_FUNC_PARAM,
326                                  (void*)rtpengine_set_store          },
327         {"rtpengine_disable_tout",INT_PARAM, &default_rtpengine_cfg.rtpengine_disable_tout },
328         {"rtpengine_retr",        INT_PARAM, &default_rtpengine_cfg.rtpengine_retr         },
329         {"queried_nodes_limit",   INT_PARAM, &default_rtpengine_cfg.queried_nodes_limit    },
330         {"rtpengine_tout_ms",     INT_PARAM, &default_rtpengine_cfg.rtpengine_tout_ms      },
331         {"rtpengine_allow_op",    INT_PARAM, &rtpengine_allow_op     },
332         {"db_url",                PARAM_STR, &rtpp_db_url            },
333         {"table_name",            PARAM_STR, &rtpp_table_name        },
334         {"setid_col",             PARAM_STR, &rtpp_setid_col         },
335         {"url_col",               PARAM_STR, &rtpp_url_col           },
336         {"weight_col",            PARAM_STR, &rtpp_weight_col        },
337         {"disabled_col",          PARAM_STR, &rtpp_disabled_col      },
338         {"extra_id_pv",           PARAM_STR, &extra_id_pv_param      },
339         {"setid_avp",             PARAM_STRING, &setid_avp_param     },
340         {"force_send_interface",  PARAM_STRING, &force_send_ip_str   },
341         {"rtp_inst_pvar",         PARAM_STR, &rtp_inst_pv_param      },
342         {"write_sdp_pv",          PARAM_STR, &write_sdp_pvar_str     },
343         {"read_sdp_pv",           PARAM_STR, &read_sdp_pvar_str      },
344         {"hash_table_tout",       INT_PARAM, &hash_table_tout        },
345         {"hash_table_size",       INT_PARAM, &hash_table_size        },
346         {"setid_default",         INT_PARAM, &setid_default          },
347
348         /* MOS stats output */
349         /* global averages */
350         {"mos_min_pv",                PARAM_STR, &global_mos_stats.min.mos_param             },
351         {"mos_min_at_pv",             PARAM_STR, &global_mos_stats.min.at_param              },
352         {"mos_min_packetloss_pv",     PARAM_STR, &global_mos_stats.min.packetloss_param      },
353         {"mos_min_jitter_pv",         PARAM_STR, &global_mos_stats.min.jitter_param          },
354         {"mos_min_roundtrip_pv",      PARAM_STR, &global_mos_stats.min.roundtrip_param       },
355         {"mos_max_pv",                PARAM_STR, &global_mos_stats.max.mos_param             },
356         {"mos_max_at_pv",             PARAM_STR, &global_mos_stats.max.at_param              },
357         {"mos_max_packetloss_pv",     PARAM_STR, &global_mos_stats.max.packetloss_param      },
358         {"mos_max_jitter_pv",         PARAM_STR, &global_mos_stats.max.jitter_param          },
359         {"mos_max_roundtrip_pv",      PARAM_STR, &global_mos_stats.max.roundtrip_param       },
360         {"mos_average_pv",            PARAM_STR, &global_mos_stats.average.mos_param         },
361         {"mos_average_packetloss_pv", PARAM_STR, &global_mos_stats.average.packetloss_param  },
362         {"mos_average_jitter_pv",     PARAM_STR, &global_mos_stats.average.jitter_param      },
363         {"mos_average_roundtrip_pv",  PARAM_STR, &global_mos_stats.average.roundtrip_param   },
364         {"mos_average_samples_pv",    PARAM_STR, &global_mos_stats.average.samples_param     },
365
366         /* designated side A */
367         {"mos_A_label_pv",              PARAM_STR, &side_A_mos_stats.label_param               },
368         {"mos_min_A_pv",                PARAM_STR, &side_A_mos_stats.min.mos_param             },
369         {"mos_min_at_A_pv",             PARAM_STR, &side_A_mos_stats.min.at_param              },
370         {"mos_min_packetloss_A_pv",     PARAM_STR, &side_A_mos_stats.min.packetloss_param      },
371         {"mos_min_jitter_A_pv",         PARAM_STR, &side_A_mos_stats.min.jitter_param          },
372         {"mos_min_roundtrip_A_pv",      PARAM_STR, &side_A_mos_stats.min.roundtrip_param       },
373         {"mos_max_A_pv",                PARAM_STR, &side_A_mos_stats.max.mos_param             },
374         {"mos_max_at_A_pv",             PARAM_STR, &side_A_mos_stats.max.at_param              },
375         {"mos_max_packetloss_A_pv",     PARAM_STR, &side_A_mos_stats.max.packetloss_param      },
376         {"mos_max_jitter_A_pv",         PARAM_STR, &side_A_mos_stats.max.jitter_param          },
377         {"mos_max_roundtrip_A_pv",      PARAM_STR, &side_A_mos_stats.max.roundtrip_param       },
378         {"mos_average_A_pv",            PARAM_STR, &side_A_mos_stats.average.mos_param         },
379         {"mos_average_packetloss_A_pv", PARAM_STR, &side_A_mos_stats.average.packetloss_param  },
380         {"mos_average_jitter_A_pv",     PARAM_STR, &side_A_mos_stats.average.jitter_param      },
381         {"mos_average_roundtrip_A_pv",  PARAM_STR, &side_A_mos_stats.average.roundtrip_param   },
382         {"mos_average_samples_A_pv",    PARAM_STR, &side_A_mos_stats.average.samples_param     },
383
384         /* designated side B */
385         {"mos_B_label_pv",              PARAM_STR, &side_B_mos_stats.label_param               },
386         {"mos_min_B_pv",                PARAM_STR, &side_B_mos_stats.min.mos_param             },
387         {"mos_min_at_B_pv",             PARAM_STR, &side_B_mos_stats.min.at_param              },
388         {"mos_min_packetloss_B_pv",     PARAM_STR, &side_B_mos_stats.min.packetloss_param      },
389         {"mos_min_jitter_B_pv",         PARAM_STR, &side_B_mos_stats.min.jitter_param          },
390         {"mos_min_roundtrip_B_pv",      PARAM_STR, &side_B_mos_stats.min.roundtrip_param       },
391         {"mos_max_B_pv",                PARAM_STR, &side_B_mos_stats.max.mos_param             },
392         {"mos_max_at_B_pv",             PARAM_STR, &side_B_mos_stats.max.at_param              },
393         {"mos_max_packetloss_B_pv",     PARAM_STR, &side_B_mos_stats.max.packetloss_param      },
394         {"mos_max_jitter_B_pv",         PARAM_STR, &side_B_mos_stats.max.jitter_param          },
395         {"mos_max_roundtrip_B_pv",      PARAM_STR, &side_B_mos_stats.max.roundtrip_param       },
396         {"mos_average_B_pv",            PARAM_STR, &side_B_mos_stats.average.mos_param         },
397         {"mos_average_packetloss_B_pv", PARAM_STR, &side_B_mos_stats.average.packetloss_param  },
398         {"mos_average_jitter_B_pv",     PARAM_STR, &side_B_mos_stats.average.jitter_param      },
399         {"mos_average_roundtrip_B_pv",  PARAM_STR, &side_B_mos_stats.average.roundtrip_param   },
400         {"mos_average_samples_B_pv",    PARAM_STR, &side_B_mos_stats.average.samples_param     },
401
402         {0, 0, 0}
403 };
404
405 struct module_exports exports = {
406         "rtpengine",
407         DEFAULT_DLFLAGS, /* dlopen flags */
408         cmds,
409         params,
410         0,           /* exported statistics */
411         0,           /* exported MI functions */
412         mod_pvs,     /* exported pseudo-variables */
413         0,           /* extra processes */
414         mod_init,
415         0,           /* reply processing */
416         mod_destroy, /* destroy function */
417         child_init
418 };
419
420 /* check if the node is already queried */
421 static int is_queried_node(struct rtpp_node *node, struct rtpp_node **queried_nodes_ptr, int queried_nodes)
422 {
423         int i;
424
425         if (!queried_nodes_ptr) {
426                 return 0;
427         }
428
429         for (i = 0; i < queried_nodes; i++) {
430                 if (node == queried_nodes_ptr[i]) {
431                         return 1;
432                 }
433         }
434
435         return 0;
436 }
437
438 /* hide the node from display and disable it permanent */
439 int rtpengine_delete_node(struct rtpp_node *rtpp_node)
440 {
441         rtpp_node->rn_displayed = 0;
442         rtpp_node->rn_disabled = RTPENGINE_MAX_RECHECK_TICKS;
443
444         return 1;
445 }
446
447
448 int rtpengine_delete_node_set(struct rtpp_set *rtpp_list)
449 {
450         struct rtpp_node *rtpp_node;
451
452         lock_get(rtpp_list->rset_lock);
453         for(rtpp_node = rtpp_list->rn_first; rtpp_node != NULL;
454                         rtpp_node = rtpp_node->rn_next) {
455                 rtpengine_delete_node(rtpp_node);
456         }
457         lock_release(rtpp_list->rset_lock);
458
459         return 1;
460 }
461
462
463 int rtpengine_delete_node_all()
464 {
465         struct rtpp_set *rtpp_list;
466
467         if (!rtpp_set_list) {
468                 return 1;
469         }
470
471         lock_get(rtpp_set_list->rset_head_lock);
472         for(rtpp_list = rtpp_set_list->rset_first; rtpp_list != NULL;
473                         rtpp_list = rtpp_list->rset_next) {
474                 rtpengine_delete_node_set(rtpp_list);
475         }
476         lock_release(rtpp_set_list->rset_head_lock);
477
478         return 1;
479 }
480
481
482 static int get_ip_type(char *str_addr)
483 {
484         struct addrinfo hint, *info = NULL;
485         int ret;
486
487         memset(&hint, '\0', sizeof hint);
488         hint.ai_family = PF_UNSPEC;
489         hint.ai_flags = AI_NUMERICHOST;
490
491         ret = getaddrinfo(str_addr, NULL, &hint, &info);
492         if (ret) {
493                 /* Invalid ip addinfos */
494                 return -1;
495         }
496
497         if(info->ai_family == AF_INET) {
498                 LM_DBG("%s is an ipv4 addinfos\n", str_addr);
499         } else if (info->ai_family == AF_INET6) {
500                 LM_DBG("%s is an ipv6 addinfos\n", str_addr);
501         } else {
502                 LM_DBG("%s is an unknown addinfos format AF=%d\n",str_addr, info->ai_family);
503                 freeaddrinfo(info);
504                 return -1;
505         }
506
507         ret = info->ai_family;
508
509         freeaddrinfo(info);
510
511         return ret;
512 }
513
514
515 static int get_ip_scope(char *str_addr)
516 {
517         struct ifaddrs *ifaddr, *ifa;
518         struct sockaddr_in6 *in6;
519         char str_if_ip[NI_MAXHOST];
520         int ret = -1;
521
522         if (getifaddrs(&ifaddr) == -1) {
523                 LM_ERR("getifaddrs() failed: %s\n", gai_strerror(ret));
524                 return -1;
525         }
526
527         for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
528                 in6 = (struct sockaddr_in6 *)ifa->ifa_addr;
529
530                 if (ifa->ifa_addr == NULL)
531                         continue;
532
533                 if (ifa->ifa_addr->sa_family != AF_INET6)
534                         continue;
535
536                 ret = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in6),
537                 str_if_ip, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
538                 if (ret != 0) {
539                         LM_ERR("getnameinfo() failed: %s\n", gai_strerror(ret));
540                         return -1;
541                 }
542
543                 if (strstr(str_if_ip, str_addr)) {
544                         LM_INFO("dev: %-8s address: <%s> scope %d\n",
545                         ifa->ifa_name, str_if_ip, in6->sin6_scope_id);
546                         ret = in6->sin6_scope_id;
547                         break;
548                 }
549         }
550
551         freeifaddrs(ifaddr);
552
553         return ret;
554 }
555
556
557 static int bind_force_send_ip(int sock_idx)
558 {
559         struct sockaddr_in tmp, ip4addr;
560         struct sockaddr_in6 tmp6, ip6addr;
561         char str_addr[INET_ADDRSTRLEN];
562         char str_addr6[INET6_ADDRSTRLEN];
563         socklen_t sock_len = sizeof(struct sockaddr);
564         int ret, scope;
565
566         switch (force_send_ip_af) {
567                 case AF_INET:
568                         memset(&ip4addr, 0, sizeof(ip4addr));
569                         ip4addr.sin_family = AF_INET;
570                         ip4addr.sin_port = htons(0);
571                         if (inet_pton(AF_INET, force_send_ip_str, &ip4addr.sin_addr) != 1) {
572                                 LM_ERR("failed to parse IPv4 address %s\n", force_send_ip_str);
573                                 return -1;
574                         }
575
576                         if (bind(rtpp_socks[sock_idx], (struct sockaddr*)&ip4addr, sizeof(ip4addr)) < 0) {
577                                 LM_ERR("can't bind socket to required ipv4 interface\n");
578                                 return -1;
579                         }
580
581                         memset(&tmp, 0, sizeof(tmp));
582                         if (getsockname(rtpp_socks[sock_idx], (struct sockaddr *) &tmp, &sock_len))
583                                 LM_ERR("could not determine local socket name\n");
584                         else {
585                                 inet_ntop(AF_INET, &tmp.sin_addr, str_addr, INET_ADDRSTRLEN);
586                                 LM_DBG("Binding on %s:%d\n", str_addr, ntohs(tmp.sin_port));
587                         }
588
589                         break;
590
591                 case AF_INET6:
592                         if ((scope = get_ip_scope(force_send_ip_str)) < 0) {
593                                 LM_ERR("can't get the ipv6 interface scope\n");
594                                 return -1;
595                         }
596                         memset(&ip6addr, 0, sizeof(ip6addr));
597                         ip6addr.sin6_family = AF_INET6;
598                         ip6addr.sin6_port = htons(0);
599                         ip6addr.sin6_scope_id = scope;
600                         if (inet_pton(AF_INET6, force_send_ip_str, &ip6addr.sin6_addr) != 1) {
601                                 LM_ERR("failed to parse IPv6 address %s\n", force_send_ip_str);
602                                 return -1;
603                         }
604
605                         if ((ret = bind(rtpp_socks[sock_idx], (struct sockaddr*)&ip6addr, sizeof(ip6addr))) < 0) {
606                                 LM_ERR("can't bind socket to required ipv6 interface\n");
607                                 LM_ERR("ret=%d errno=%d\n", ret, errno);
608                                 return -1;
609                         }
610
611                         memset(&tmp6, 0, sizeof(tmp6));
612                         if (getsockname(rtpp_socks[sock_idx], (struct sockaddr *) &tmp6, &sock_len))
613                                 LM_ERR("could not determine local socket name\n");
614                         else {
615                                 inet_ntop(AF_INET6, &tmp6.sin6_addr, str_addr6, INET6_ADDRSTRLEN);
616                                 LM_DBG("Binding on ipv6 %s:%d\n", str_addr6, ntohs(tmp6.sin6_port));
617                         }
618
619                         break;
620
621                 default:
622                         LM_DBG("force_send_ip_str not specified in .cfg file!\n");
623                         break;
624         }
625
626         return 0;
627 }
628
629 static inline int str_cmp(const str *a , const str *b) {
630         return ! (a->len == b->len && ! strncmp(a->s, b->s, a->len));
631 }
632
633 static inline int str_eq(const str *p, const char *q) {
634         int l = strlen(q);
635         if (p->len != l)
636                 return 0;
637         if (memcmp(p->s, q, l))
638                 return 0;
639         return 1;
640 }
641
642 static inline str str_prefix(const str *p, const char *q) {
643         str ret = STR_NULL;
644         int l = strlen(q);
645         if (p->len < l)
646                 return ret;
647         if (memcmp(p->s, q, l))
648                 return ret;
649         ret = *p;
650         ret.s += l;
651         ret.len -= l;
652         return ret;
653 }
654
655
656 static int rtpengine_set_store(modparam_t type, void * val){
657
658         char * p;
659         int len;
660
661         p = (char* )val;
662
663         if(p==0 || *p=='\0'){
664                 return 0;
665         }
666
667         if(rtpp_sets==0){
668                 rtpp_strings = (char**)pkg_malloc(sizeof(char*));
669                 if(!rtpp_strings){
670                         LM_ERR("no pkg memory left\n");
671                         return -1;
672                 }
673         } else {/*realloc to make room for the current set*/
674                 rtpp_strings = (char**)pkg_realloc(rtpp_strings, (rtpp_sets+1)* sizeof(char*));
675                 if(!rtpp_strings){
676                         LM_ERR("no pkg memory left\n");
677                         return -1;
678                 }
679         }
680
681         /*allocate for the current set of urls*/
682         len = strlen(p);
683         rtpp_strings[rtpp_sets] = (char*)pkg_malloc((len+1)*sizeof(char));
684
685         if(!rtpp_strings[rtpp_sets]){
686                 LM_ERR("no pkg memory left\n");
687                 return -1;
688         }
689
690         memcpy(rtpp_strings[rtpp_sets], p, len);
691         rtpp_strings[rtpp_sets][len] = '\0';
692         rtpp_sets++;
693
694         return 0;
695 }
696
697 struct rtpp_node *get_rtpp_node(struct rtpp_set *rtpp_list, str *url)
698 {
699         struct rtpp_node *rtpp_node;
700
701         if (rtpp_list == NULL) {
702                 return NULL;
703         }
704
705         lock_get(rtpp_list->rset_lock);
706         rtpp_node = rtpp_list->rn_first;
707         while (rtpp_node) {
708                 if (str_cmp(&rtpp_node->rn_url, url) == 0) {
709                         lock_release(rtpp_list->rset_lock);
710                         return rtpp_node;
711                 }
712                 rtpp_node = rtpp_node->rn_next;
713         }
714         lock_release(rtpp_list->rset_lock);
715
716         return NULL;
717 }
718
719 struct rtpp_set *get_rtpp_set(unsigned int set_id)
720 {
721         struct rtpp_set * rtpp_list;
722         unsigned int my_current_id = 0;
723         int new_list;
724
725 #if DEFAULT_RTPP_SET_ID > 0
726         if (set_id < DEFAULT_RTPP_SET_ID )
727         {
728                 LM_ERR(" invalid rtpproxy set value [%u]\n", set_id);
729                 return NULL;
730         }
731 #endif
732
733         my_current_id = set_id;
734         /*search for the current_id*/
735         lock_get(rtpp_set_list->rset_head_lock);
736         rtpp_list = rtpp_set_list ? rtpp_set_list->rset_first : 0;
737         while (rtpp_list != 0 && rtpp_list->id_set!=my_current_id)
738                 rtpp_list = rtpp_list->rset_next;
739
740         if (rtpp_list==NULL)
741         {       /*if a new id_set : add a new set of rtpp*/
742                 rtpp_list = shm_malloc(sizeof(struct rtpp_set));
743                 if(!rtpp_list)
744                 {
745                         lock_release(rtpp_set_list->rset_head_lock);
746                         LM_ERR("no shm memory left to create new rtpproxy set %u\n", my_current_id);
747                         return NULL;
748                 }
749                 memset(rtpp_list, 0, sizeof(struct rtpp_set));
750                 rtpp_list->id_set = my_current_id;
751                 rtpp_list->rset_lock = lock_alloc();
752                 if (!rtpp_list->rset_lock) {
753                         lock_release(rtpp_set_list->rset_head_lock);
754                         LM_ERR("no shm memory left to create rtpproxy set lock\n");
755                         shm_free(rtpp_list);
756                         rtpp_list = NULL;
757                         return NULL;
758                 }
759                 if (lock_init(rtpp_list->rset_lock) == 0) {
760                         lock_release(rtpp_set_list->rset_head_lock);
761                         LM_ERR("could not init rtpproxy set lock\n");
762                         lock_dealloc((void*)rtpp_list->rset_lock);
763                         rtpp_list->rset_lock = NULL;
764                         shm_free(rtpp_list);
765                         rtpp_list = NULL;
766                         return NULL;
767                 }
768                 new_list = 1;
769         }
770         else {
771                 new_list = 0;
772         }
773
774         if (new_list)
775         {
776                 /*update the list of set info*/
777                 if (!rtpp_set_list->rset_first)
778                 {
779                         rtpp_set_list->rset_first = rtpp_list;
780                 }
781                 else
782                 {
783                         rtpp_set_list->rset_last->rset_next = rtpp_list;
784                 }
785
786                 rtpp_set_list->rset_last = rtpp_list;
787                 rtpp_set_count++;
788
789                 if(my_current_id == DEFAULT_RTPP_SET_ID){
790                         default_rtpp_set = rtpp_list;
791                 }
792         }
793         lock_release(rtpp_set_list->rset_head_lock);
794
795         return rtpp_list;
796 }
797
798
799 int add_rtpengine_socks(struct rtpp_set * rtpp_list, char * rtpproxy,
800                         unsigned int weight, int disabled, unsigned int ticks, int isDB)
801 {
802         /* Make rtp proxies list. */
803         char *p, *p1, *p2, *plim;
804         struct rtpp_node *pnode;
805         struct rtpp_node *rtpp_node;
806         unsigned int local_weight, port;
807         str s1;
808
809         p = rtpproxy;
810         plim = p + strlen(p);
811
812         for(;;) {
813                 local_weight = weight;
814                 while (*p && isspace((int)*p))
815                         ++p;
816                 if (p >= plim)
817                         break;
818                 p1 = p;
819                 while (*p && !isspace((int)*p))
820                         ++p;
821                 if (p <= p1)
822                         break; /* may happen??? */
823                 p2 = p;
824
825                 /* if called for database, consider simple, single char *URL */
826                 /* if called for config, consider weight URL */
827                 if (!isDB) {
828                         /* Have weight specified? If yes, scan it */
829                         p2 = memchr(p1, '=', p - p1);
830                         if (p2 != NULL) {
831                                 local_weight = strtoul(p2 + 1, NULL, 10);
832                         } else {
833                                 p2 = p;
834                         }
835                 }
836
837                 pnode = shm_malloc(sizeof(struct rtpp_node));
838                 if (pnode == NULL) {
839                         LM_ERR("no shm memory left\n");
840                         return -1;
841                 }
842                 memset(pnode, 0, sizeof(*pnode));
843
844                 lock_get(rtpp_no_lock);
845                 pnode->idx = *rtpp_no;
846
847                 if (ticks == RTPENGINE_MAX_RECHECK_TICKS) {
848                         pnode->rn_recheck_ticks = ticks;
849                 } else {
850                         pnode->rn_recheck_ticks = ticks + get_ticks();
851                 }
852                 pnode->rn_weight = local_weight;
853                 pnode->rn_umode = 0;
854                 pnode->rn_disabled = disabled;
855                 pnode->rn_displayed = 1;
856                 pnode->rn_url.s = shm_malloc(p2 - p1 + 1);
857                 if (pnode->rn_url.s == NULL) {
858                         lock_release(rtpp_no_lock);
859                         shm_free(pnode);
860                         LM_ERR("no shm memory left\n");
861                         return -1;
862                 }
863                 memmove(pnode->rn_url.s, p1, p2 - p1);
864                 pnode->rn_url.s[p2 - p1] = 0;
865                 pnode->rn_url.len = p2-p1;
866
867                 /* Leave only address in rn_address */
868                 pnode->rn_address = pnode->rn_url.s;
869                 if (strncasecmp(pnode->rn_address, "udp:", 4) == 0) {
870                         pnode->rn_umode = 1;
871                         pnode->rn_address += 4;
872                 } else if (strncasecmp(pnode->rn_address, "udp6:", 5) == 0) {
873                         pnode->rn_umode = 6;
874                         pnode->rn_address += 5;
875                 } else if (strncasecmp(pnode->rn_address, "unix:", 5) == 0) {
876                         pnode->rn_umode = 0;
877                         pnode->rn_address += 5;
878                 } else {
879                         lock_release(rtpp_no_lock);
880                         LM_WARN("Node address must start with 'udp:' or 'udp6:' or 'unix:'. Ignore '%s'.\n", pnode->rn_address);
881                         shm_free(pnode->rn_url.s);
882                         shm_free(pnode);
883
884                         if (!isDB) {
885                                 continue;
886                         } else {
887                                 return 0;
888                         }
889                 }
890
891                 /* Check the rn_address is 'hostname:port' */
892                 /* Check the rn_address port is valid */
893                 p1 = strchr(pnode->rn_address, ':');
894                 if (p1 != NULL) {
895                         p1++;
896                 }
897
898                 if (p1 != NULL && p1 != '\0') {
899                         s1.s = p1;
900                         s1.len = strlen(p1);
901                         if (str2int(&s1, &port) < 0 || port > 0xFFFF) {
902                                 lock_release(rtpp_no_lock);
903                                 LM_WARN("Node address must end with a valid port number. Ignore '%s'.\n", pnode->rn_address);
904                                 shm_free(pnode->rn_url.s);
905                                 shm_free(pnode);
906
907                                 if (!isDB) {
908                                         continue;
909                                 } else {
910                                         return 0;
911                                 }
912                         }
913                 }
914
915                 /* If node found in set, update it */
916                 rtpp_node = get_rtpp_node(rtpp_list, &pnode->rn_url);
917
918                 lock_get(rtpp_list->rset_lock);
919                 if (rtpp_node) {
920                         rtpp_node->rn_disabled = pnode->rn_disabled;
921                         rtpp_node->rn_displayed = pnode->rn_displayed;
922                         rtpp_node->rn_recheck_ticks = pnode->rn_recheck_ticks;
923                         rtpp_node->rn_weight = pnode->rn_weight;
924                         lock_release(rtpp_list->rset_lock);
925                         lock_release(rtpp_no_lock);
926
927                         shm_free(pnode->rn_url.s);
928                         shm_free(pnode);
929
930                         if (!isDB) {
931                                 continue;
932                         } else {
933                                 return 0;
934                         }
935                 }
936
937                 if (rtpp_list->rn_first == NULL) {
938                         rtpp_list->rn_first = pnode;
939                 } else {
940                         rtpp_list->rn_last->rn_next = pnode;
941                 }
942
943                 rtpp_list->rn_last = pnode;
944                 rtpp_list->rtpp_node_count++;
945                 lock_release(rtpp_list->rset_lock);
946
947                 *rtpp_no = *rtpp_no + 1;
948                 lock_release(rtpp_no_lock);
949
950                 if (!isDB) {
951                         continue;
952                 } else {
953                         return 0;
954                 }
955         }
956         return 0;
957 }
958
959
960 /* 0 - succes
961  * -1 - erorr
962  * */
963 static int rtpengine_add_rtpengine_set(char * rtp_proxies, unsigned int weight, int disabled, unsigned int ticks)
964 {
965         char *p,*p2;
966         struct rtpp_set * rtpp_list;
967         unsigned int my_current_id;
968         str id_set;
969
970         /* empty definition? */
971         p= rtp_proxies;
972         if(!p || *p=='\0'){
973                 return 0;
974         }
975
976         for(;*p && isspace(*p);p++);
977         if(*p=='\0'){
978                 return 0;
979         }
980
981         rtp_proxies = strstr(p, "==");
982         if(rtp_proxies){
983                 if(*(rtp_proxies +2)=='\0'){
984                         LM_ERR("script error -invalid rtp proxy list!\n");
985                         return -1;
986                 }
987
988                 *rtp_proxies = '\0';
989                 p2 = rtp_proxies-1;
990                 for(;isspace(*p2); *p2 = '\0',p2--);
991                 id_set.s = p;   id_set.len = p2 - p+1;
992
993                 if(id_set.len <= 0 ||str2int(&id_set, &my_current_id)<0 ){
994                 LM_ERR("script error -invalid set_id value!\n");
995                         return -1;
996                 }
997
998                 rtp_proxies+=2;
999         }else{
1000                 rtp_proxies = p;
1001                 my_current_id = DEFAULT_RTPP_SET_ID;
1002         }
1003
1004         for(;*rtp_proxies && isspace(*rtp_proxies);rtp_proxies++);
1005
1006         if(!(*rtp_proxies)){
1007                 LM_ERR("script error -empty rtp_proxy list\n");
1008                 return -1;;
1009         }
1010
1011         /*search for the current_id*/
1012         rtpp_list = get_rtpp_set(my_current_id);
1013
1014         if (rtpp_list != NULL)
1015         {
1016
1017                 if (add_rtpengine_socks(rtpp_list, rtp_proxies, weight, disabled, ticks, 0) != 0)
1018                         goto error;
1019                 else
1020                         return 0;
1021         }
1022
1023 error:
1024         return -1;
1025 }
1026
1027
1028 static int fixup_set_id(void ** param, int param_no)
1029 {
1030         int int_val;
1031         unsigned int set_id;
1032         struct rtpp_set* rtpp_list;
1033         rtpp_set_link_t *rtpl = NULL;
1034         str s;
1035
1036         rtpl = (rtpp_set_link_t*)pkg_malloc(sizeof(rtpp_set_link_t));
1037         if(rtpl==NULL) {
1038                 LM_ERR("no more pkg memory\n");
1039                 return -1;
1040         }
1041         memset(rtpl, 0, sizeof(rtpp_set_link_t));
1042         s.s = (char*)*param;
1043         s.len = strlen(s.s);
1044
1045         if(s.s[0] == PV_MARKER) {
1046                 int_val = pv_locate_name(&s);
1047                 if(int_val<0 || int_val!=s.len) {
1048                         LM_ERR("invalid parameter %s\n", s.s);
1049                         pkg_free(rtpl);
1050                         return -1;
1051                 }
1052                 rtpl->rpv = pv_cache_get(&s);
1053                 if(rtpl->rpv == NULL) {
1054                         LM_ERR("invalid pv parameter %s\n", s.s);
1055                         pkg_free(rtpl);
1056                         return -1;
1057                 }
1058         } else {
1059                 int_val = str2int(&s, &set_id);
1060                 if (int_val == 0) {
1061                         pkg_free(*param);
1062                         if((rtpp_list = select_rtpp_set(set_id)) ==0){
1063                                 LM_ERR("rtpp_proxy set %u not configured\n", set_id);
1064                                 pkg_free(rtpl);
1065                                 return E_CFG;
1066                         }
1067                         rtpl->rset = rtpp_list;
1068                 } else {
1069                         LM_ERR("bad number <%s>\n",     (char *)(*param));
1070                         pkg_free(rtpl);
1071                         return E_CFG;
1072                 }
1073         }
1074         *param = (void*)rtpl;
1075         return 0;
1076 }
1077
1078 static int rtpp_test_ping(struct rtpp_node *node)
1079 {
1080         bencode_buffer_t bencbuf;
1081         bencode_item_t *dict;
1082         char *cp;
1083         int ret;
1084
1085         if (bencode_buffer_init(&bencbuf)) {
1086                 return -1;
1087         }
1088         dict = bencode_dictionary(&bencbuf);
1089         bencode_dictionary_add_string(dict, "command", command_strings[OP_PING]);
1090
1091         if (bencbuf.error) {
1092                 goto error;
1093         }
1094
1095         cp = send_rtpp_command(node, dict, &ret);
1096         if (!cp) {
1097                 goto error;
1098         }
1099
1100         dict = bencode_decode_expect(&bencbuf, cp, ret, BENCODE_DICTIONARY);
1101         if (!dict || bencode_dictionary_get_strcmp(dict, "result", "pong")) {
1102                 goto error;
1103         }
1104
1105         bencode_buffer_free(&bencbuf);
1106         return 0;
1107
1108 error:
1109         bencode_buffer_free(&bencbuf);
1110         return -1;
1111 }
1112
1113
1114 static void rtpengine_rpc_reload(rpc_t* rpc, void* ctx)
1115 {
1116         if (rtpp_db_url.s == NULL) {
1117                 // no database
1118                 rpc->fault(ctx, 500, "No Database URL");
1119                 return;
1120         }
1121
1122         if (init_rtpproxy_db() < 0) {
1123                 // fail reloading from database
1124                 rpc->fault(ctx, 500, "Failed reloading db");
1125                 return;
1126         }
1127
1128         if (build_rtpp_socks()) {
1129                 rpc->fault(ctx, 500, "Out of memory");
1130                 return;
1131         }
1132 }
1133
1134 static int rtpengine_rpc_iterate(rpc_t* rpc, void* ctx, const str *rtpp_url,
1135                 int (*cb)(struct rtpp_node *, struct rtpp_set *, void *), void *data)
1136 {
1137         struct rtpp_set *rtpp_list;
1138         struct rtpp_node *crt_rtpp;
1139         int found = RPC_FOUND_NONE, err = 0;
1140         int ret;
1141
1142         if (build_rtpp_socks()) {
1143                 rpc->fault(ctx, 500, "Out of memory");
1144                 return -1;
1145         }
1146
1147         if (!rtpp_set_list) {
1148                 rpc->fault(ctx, 404, "Instance not found (no sets loaded)");
1149                 return -1;
1150         }
1151
1152         /* found a matching all - show all rtpp */
1153         if (strncmp("all", rtpp_url->s, 3) == 0) {
1154                 found = RPC_FOUND_ALL;
1155         }
1156
1157         lock_get(rtpp_set_list->rset_head_lock);
1158         for (rtpp_list = rtpp_set_list->rset_first; rtpp_list != NULL;
1159                         rtpp_list = rtpp_list->rset_next) {
1160
1161                 lock_get(rtpp_list->rset_lock);
1162                 for (crt_rtpp = rtpp_list->rn_first; crt_rtpp != NULL;
1163                                 crt_rtpp = crt_rtpp->rn_next) {
1164
1165                         if (!crt_rtpp->rn_displayed) {
1166                                 continue;
1167                         }
1168
1169                         /* found a matching rtpp - ping it */
1170                         if (found == RPC_FOUND_ALL ||
1171                            (crt_rtpp->rn_url.len == rtpp_url->len &&
1172                            strncmp(crt_rtpp->rn_url.s, rtpp_url->s, rtpp_url->len) == 0)) {
1173
1174                                 ret = cb(crt_rtpp, rtpp_list, data);
1175                                 if (ret) {
1176                                         err = 1;
1177                                         break;
1178                                 }
1179
1180                                 if (found == RPC_FOUND_NONE) {
1181                                         found = RPC_FOUND_ONE;
1182                                 }
1183                         }
1184                 }
1185                 lock_release(rtpp_list->rset_lock);
1186
1187                 if (err)
1188                         break;
1189         }
1190         lock_release(rtpp_set_list->rset_head_lock);
1191
1192         if (err)
1193                 return -1;
1194
1195         if (found == RPC_FOUND_NONE) {
1196                 rpc->fault(ctx, 404, "Instance not found");
1197                 return -1;
1198         }
1199
1200         return found;
1201 }
1202
1203 static int add_rtpp_node_info (void *ptrsp, struct rtpp_node *crt_rtpp, struct rtpp_set *rtpp_list)
1204 {
1205         void *vh;
1206         void **ptrs = ptrsp;
1207         rpc_t *rpc;
1208         void *ctx;
1209         int rtpp_ticks;
1210
1211         rpc = ptrs[0];
1212         ctx = ptrs[1];
1213
1214         if (rpc->add(ctx, "{", &vh) < 0) {
1215                 rpc->fault(ctx, 500, "Server error");
1216                 return -1;
1217         }
1218
1219         rpc->struct_add(vh, "Sddd",
1220                         "url", &crt_rtpp->rn_url,
1221                         "set", rtpp_list->id_set,
1222                         "index", crt_rtpp->idx,
1223                         "weight", crt_rtpp->rn_weight);
1224
1225         if ((1 == crt_rtpp->rn_disabled ) && (crt_rtpp->rn_recheck_ticks == RTPENGINE_MAX_RECHECK_TICKS)) {
1226                 rpc->struct_add(vh, "s", "disabled", "1(permanent)");
1227         } else {
1228                 rpc->struct_add(vh, "d", "disabled", crt_rtpp->rn_disabled);
1229         }
1230
1231         if (crt_rtpp->rn_recheck_ticks == RTPENGINE_MAX_RECHECK_TICKS) {
1232                 rpc->struct_add(vh, "s", "recheck_ticks", "N/A");
1233         } else {
1234                 rtpp_ticks = crt_rtpp->rn_recheck_ticks - get_ticks();
1235                 rtpp_ticks = rtpp_ticks < 0 ? 0 : rtpp_ticks;
1236                 rpc->struct_add(vh, "d", "recheck_ticks", rtpp_ticks);
1237         }
1238
1239         return 0;
1240 }
1241
1242 static int rtpengine_iter_cb_enable(struct rtpp_node *crt_rtpp, struct rtpp_set *rtpp_list, void *flagp)
1243 {
1244         int *flag = flagp;
1245
1246         /* do ping when try to enable the rtpp */
1247         if (*flag) {
1248
1249                 /* if ping success, enable the rtpp and reset ticks */
1250                 if (rtpp_test_ping(crt_rtpp) == 0) {
1251                         crt_rtpp->rn_disabled = 0;
1252                         crt_rtpp->rn_recheck_ticks = RTPENGINE_MIN_RECHECK_TICKS;
1253
1254                 /* if ping fail, disable the rtpps but _not_ permanently*/
1255                 } else {
1256                         crt_rtpp->rn_recheck_ticks = get_ticks() + cfg_get(rtpengine,rtpengine_cfg,rtpengine_disable_tout);
1257                         crt_rtpp->rn_disabled = 1;
1258                         *flag = 2; /* return value to caller */
1259                 }
1260
1261         /* do not ping when disable the rtpp; disable it permanenty */
1262         } else {
1263                 crt_rtpp->rn_disabled = 1;
1264                 crt_rtpp->rn_recheck_ticks = RTPENGINE_MAX_RECHECK_TICKS;
1265         }
1266
1267         return 0;
1268 }
1269
1270 static void rtpengine_rpc_enable(rpc_t* rpc, void* ctx)
1271 {
1272         void *vh;
1273         str rtpp_url;
1274         int flag, found;
1275
1276         if (rpc->scan(ctx, "Sd", &rtpp_url, &flag) < 2) {
1277                 rpc->fault(ctx, 500, "Not enough parameters");
1278                 return;
1279         }
1280
1281         flag = flag ? 1 : 0; /* also used as a return value */
1282
1283         found = rtpengine_rpc_iterate(rpc, ctx, &rtpp_url, rtpengine_iter_cb_enable, &flag);
1284         if (found == -1)
1285                 return;
1286
1287         if (rpc->add(ctx, "{", &vh) < 0) {
1288                 rpc->fault(ctx, 500, "Server error");
1289                 return;
1290         }
1291
1292         rpc->struct_add(vh, "S", "url", &rtpp_url);
1293
1294         if (flag == 0)
1295                 rpc->struct_add(vh, "s", "status", "disable");
1296         else if (flag == 1)
1297                 rpc->struct_add(vh, "s", "status", "enable");
1298         else
1299                 rpc->struct_add(vh, "s", "status", "fail");
1300 }
1301
1302 static int rtpengine_iter_cb_show(struct rtpp_node *crt_rtpp, struct rtpp_set *rtpp_list, void *ptrsp)
1303 {
1304         if (add_rtpp_node_info(ptrsp, crt_rtpp, rtpp_list) < 0)
1305                 return -1;
1306         return 0;
1307 }
1308
1309 static void rtpengine_rpc_show(rpc_t* rpc, void* ctx)
1310 {
1311         str rtpp_url;
1312         void *ptrs[2] = {rpc, ctx};
1313
1314         if (rpc->scan(ctx, "S", &rtpp_url) < 1) {
1315                 rpc->fault(ctx, 500, "Not enough parameters");
1316                 return;
1317         }
1318
1319         rtpengine_rpc_iterate(rpc, ctx, &rtpp_url, rtpengine_iter_cb_show, ptrs);
1320 }
1321
1322 static int rtpengine_iter_cb_ping(struct rtpp_node *crt_rtpp, struct rtpp_set *rtpp_list, void *data)
1323 {
1324         int *found_rtpp_disabled = data;
1325
1326         /* if ping fail */
1327         if (rtpp_test_ping(crt_rtpp) < 0) {
1328                 crt_rtpp->rn_recheck_ticks = get_ticks() + cfg_get(rtpengine,rtpengine_cfg,rtpengine_disable_tout);
1329                 *found_rtpp_disabled = 1;
1330                 crt_rtpp->rn_disabled = 1;
1331         }
1332
1333         return 0;
1334 }
1335
1336 static void rtpengine_rpc_ping(rpc_t* rpc, void* ctx)
1337 {
1338         void *vh;
1339         int found;
1340         int found_rtpp_disabled = 0;
1341         str rtpp_url;
1342
1343         if (rpc->scan(ctx, "S", &rtpp_url) < 1) {
1344                 rpc->fault(ctx, 500, "Not enough parameters");
1345                 return;
1346         }
1347
1348         found = rtpengine_rpc_iterate(rpc, ctx, &rtpp_url, rtpengine_iter_cb_ping, &found_rtpp_disabled);
1349         if (found == -1)
1350                 return;
1351
1352         if (rpc->add(ctx, "{", &vh) < 0) {
1353                 rpc->fault(ctx, 500, "Server error");
1354                 return;
1355         }
1356
1357         rpc->struct_add(vh, "Ss",
1358                         "url", &rtpp_url,
1359                         "status", (found_rtpp_disabled ? "fail" : "success"));
1360 }
1361
1362 static void rtpengine_rpc_get_hash_total(rpc_t* rpc, void* ctx)
1363 {
1364         rpc->add(ctx, "u", rtpengine_hash_table_total());
1365 }
1366
1367
1368 static const char* rtpengine_rpc_reload_doc[2] = {
1369         "Reload rtpengine proxies from database", 0
1370 };
1371 static const char* rtpengine_rpc_ping_doc[2] = {
1372         "Ping an rtpengine instance", 0
1373 };
1374 static const char* rtpengine_rpc_show_doc[2] = {
1375         "Get details about an rtpengine instance", 0
1376 };
1377 static const char* rtpengine_rpc_enable_doc[2] = {
1378         "Enable or disable an rtpengine instance", 0
1379 };
1380 static const char* rtpengine_rpc_get_hash_total_doc[2] = {
1381         "Get total number of entries in hash table", 0
1382 };
1383
1384 rpc_export_t rtpengine_rpc[] = {
1385         {"rtpengine.reload", rtpengine_rpc_reload, rtpengine_rpc_reload_doc, 0},
1386         {"rtpengine.ping", rtpengine_rpc_ping, rtpengine_rpc_ping_doc, 0},
1387         {"rtpengine.show", rtpengine_rpc_show, rtpengine_rpc_show_doc, RET_ARRAY},
1388         {"rtpengine.enable", rtpengine_rpc_enable, rtpengine_rpc_enable_doc, 0},
1389         {"rtpengine.get_hash_total", rtpengine_rpc_get_hash_total, rtpengine_rpc_get_hash_total_doc, 0},
1390         {0, 0, 0, 0}
1391 };
1392
1393 static int rtpengine_rpc_init(void)
1394 {
1395         if (rpc_register_array(rtpengine_rpc)!=0)
1396         {
1397                 LM_ERR("failed to register RPC commands\n");
1398                 return -1;
1399         }
1400         return 0;
1401 }
1402
1403 static int
1404 mod_init(void)
1405 {
1406         int i;
1407         pv_spec_t *avp_spec;
1408         unsigned short avp_flags;
1409         str s;
1410
1411         if(rtpengine_rpc_init()<0)
1412         {
1413                 LM_ERR("failed to register RPC commands\n");
1414                 return -1;
1415         }
1416
1417         rtpp_no = (unsigned int*)shm_malloc(sizeof(unsigned int));
1418         if (!rtpp_no) {
1419                 LM_ERR("no more shm memory for rtpp_no\n");
1420                 return -1;
1421         }
1422         *rtpp_no = 0;
1423
1424         rtpp_no_lock = lock_alloc();
1425         if (!rtpp_no_lock) {
1426                 LM_ERR("no more shm memory for rtpp_no_lock\n");
1427                 return -1;
1428         }
1429
1430         if (lock_init(rtpp_no_lock) == 0) {
1431                 LM_ERR("could not init rtpp_no_lock\n");
1432                 return -1;
1433         }
1434
1435         /* initialize the list of set; mod_destroy does shm_free() if fail */
1436         if (!rtpp_set_list) {
1437                 rtpp_set_list = shm_malloc(sizeof(struct rtpp_set_head));
1438                 if(!rtpp_set_list){
1439                         LM_ERR("no shm memory left to create list of proxysets\n");
1440                         return -1;
1441                 }
1442                 memset(rtpp_set_list, 0, sizeof(struct rtpp_set_head));
1443
1444                 rtpp_set_list->rset_head_lock = lock_alloc();
1445                 if (!rtpp_set_list->rset_head_lock) {
1446                         LM_ERR("no shm memory left to create list of proxysets lock\n");
1447                         return -1;
1448                 }
1449
1450                 if (lock_init(rtpp_set_list->rset_head_lock) == 0) {
1451                         LM_ERR("could not init rtpproxy list of proxysets lock\n");
1452                         return -1;
1453                 }
1454         }
1455
1456         if (rtpp_db_url.s == NULL)
1457         {
1458                 /* storing the list of rtp proxy sets in shared memory*/
1459                 for(i=0;i<rtpp_sets;i++){
1460                         if(rtpengine_add_rtpengine_set(rtpp_strings[i], 1, 0, 0) !=0){
1461                                 for(;i<rtpp_sets;i++)
1462                                         if(rtpp_strings[i])
1463                                                 pkg_free(rtpp_strings[i]);
1464                                 pkg_free(rtpp_strings);
1465                                 return -1;
1466                         }
1467                         if(rtpp_strings[i])
1468                                 pkg_free(rtpp_strings[i]);
1469                 }
1470         }
1471         else
1472         {
1473                 LM_INFO("Loading rtp proxy definitions from DB\n");
1474                 if ( init_rtpproxy_db() < 0)
1475                 {
1476                         LM_ERR("error while loading rtp proxies from database\n");
1477                         return -1;
1478                 }
1479         }
1480
1481         /* any rtpproxy configured? */
1482         if (rtpp_set_list)
1483                 default_rtpp_set = select_rtpp_set(DEFAULT_RTPP_SET_ID);
1484
1485         if (rtp_inst_pv_param.s) {
1486                 rtp_inst_pv_param.len = strlen(rtp_inst_pv_param.s);
1487                 rtp_inst_pvar = pv_cache_get(&rtp_inst_pv_param);
1488                 if ((rtp_inst_pvar == NULL) ||
1489                    ((rtp_inst_pvar->type != PVT_AVP) &&
1490                    (rtp_inst_pvar->type != PVT_XAVP) &&
1491                    (rtp_inst_pvar->type != PVT_SCRIPTVAR))) {
1492                         LM_ERR("Invalid pvar name <%.*s>\n", rtp_inst_pv_param.len, rtp_inst_pv_param.s);
1493                         return -1;
1494                 }
1495         }
1496
1497         if (pv_parse_var(&extra_id_pv_param, &extra_id_pv, NULL))
1498                 return -1;
1499
1500         if (mos_label_stats_parse(&global_mos_stats))
1501                 return -1;
1502         if (mos_label_stats_parse(&side_A_mos_stats))
1503                 return -1;
1504         if (mos_label_stats_parse(&side_B_mos_stats))
1505                 return -1;
1506
1507         if (setid_avp_param) {
1508                 s.s = setid_avp_param; s.len = strlen(s.s);
1509                 avp_spec = pv_cache_get(&s);
1510                 if (avp_spec==NULL || (avp_spec->type != PVT_AVP)) {
1511                         LM_ERR("malformed or non AVP definition <%s>\n", setid_avp_param);
1512                         return -1;
1513                 }
1514                 if (pv_get_avp_name(0, &(avp_spec->pvp), &setid_avp, &avp_flags) != 0) {
1515                         LM_ERR("invalid AVP definition <%s>\n", setid_avp_param);
1516                         return -1;
1517                 }
1518                 setid_avp_type = avp_flags;
1519         }
1520
1521         if (write_sdp_pvar_str.len > 0) {
1522                 write_sdp_pvar = pv_cache_get(&write_sdp_pvar_str);
1523                 if (write_sdp_pvar == NULL
1524                         || (write_sdp_pvar->type != PVT_AVP &&  write_sdp_pvar->type != PVT_SCRIPTVAR) ) {
1525                         LM_ERR("write_sdp_pv: not a valid AVP or VAR definition <%.*s>\n",
1526                                 write_sdp_pvar_str.len, write_sdp_pvar_str.s);
1527                         return -1;
1528                 }
1529         }
1530
1531         if (read_sdp_pvar_str.len > 0) {
1532                 read_sdp_pvar = pv_cache_get(&read_sdp_pvar_str);
1533                 if (read_sdp_pvar == NULL
1534                         || (read_sdp_pvar->type != PVT_AVP &&  read_sdp_pvar->type != PVT_SCRIPTVAR) ) {
1535                         LM_ERR("read_sdp_pv: not a valid AVP or VAR definition <%.*s>\n",
1536                                 read_sdp_pvar_str.len, read_sdp_pvar_str.s);
1537                         return -1;
1538                 }
1539         }
1540
1541         if (rtpp_strings)
1542                 pkg_free(rtpp_strings);
1543
1544         if (load_tm_api( &tmb ) < 0)
1545         {
1546                 LM_DBG("could not load the TM-functions - answer-offer model"
1547                         " auto-detection is disabled\n");
1548                 memset(&tmb, 0, sizeof(struct tm_binds));
1549         }
1550
1551         /* Determine IP addr type (IPv4 or IPv6 allowed) */
1552         force_send_ip_af = get_ip_type(force_send_ip_str);
1553         if (force_send_ip_af != AF_INET && force_send_ip_af != AF_INET6 &&
1554            strlen(force_send_ip_str) > 0) {
1555                 LM_ERR("%s is an unknown address\n", force_send_ip_str);
1556                 return -1;
1557         }
1558
1559         /* init the hastable which keeps the call-id <-> selected_node relation */
1560         if (!rtpengine_hash_table_init(hash_table_size)) {
1561                 LM_ERR("rtpengine_hash_table_init(%d) failed!\n", hash_table_size);
1562                 return -1;
1563         } else {
1564                 LM_DBG("rtpengine_hash_table_init(%d) success!\n", hash_table_size);
1565         }
1566
1567         /* select the default set */
1568         default_rtpp_set = select_rtpp_set(setid_default);
1569         if (!default_rtpp_set) {
1570                 LM_NOTICE("Default rtpp set %u NOT found\n", setid_default);
1571         } else {
1572                 LM_DBG("Default rtpp set %u found\n", setid_default);
1573         }
1574
1575     if(cfg_declare("rtpengine", rtpengine_cfg_def, &default_rtpengine_cfg, cfg_sizeof(rtpengine), &rtpengine_cfg)){
1576         LM_ERR("Failed to declare the configuration\n");
1577         return -1;
1578     }
1579
1580
1581         return 0;
1582 }
1583
1584 static int build_rtpp_socks() {
1585         int n, i;
1586         char *cp;
1587         struct addrinfo hints, *res;
1588         struct rtpp_set  *rtpp_list;
1589         struct rtpp_node *pnode;
1590         unsigned int current_rtpp_no;
1591 #ifdef IP_MTU_DISCOVER
1592         int ip_mtu_discover = IP_PMTUDISC_DONT;
1593 #endif
1594
1595         lock_get(rtpp_no_lock);
1596         current_rtpp_no = *rtpp_no;
1597         lock_release(rtpp_no_lock);
1598
1599         if (current_rtpp_no == rtpp_socks_size)
1600                 return 0;
1601
1602         // close current sockets
1603         for (i = 0; i < rtpp_socks_size; i++) {
1604                 if (rtpp_socks[i] >= 0) {
1605                         close(rtpp_socks[i]);
1606                         rtpp_socks[i] = -1;
1607                 }
1608         }
1609
1610         rtpp_socks_size = current_rtpp_no;
1611         rtpp_socks = (int*)pkg_realloc(rtpp_socks, sizeof(int)*(rtpp_socks_size));
1612         if (!rtpp_socks) {
1613                 LM_ERR("no more pkg memory for rtpp_socks\n");
1614                 return -1;
1615         }
1616         memset(rtpp_socks, -1, sizeof(int)*(rtpp_socks_size));
1617
1618         lock_get(rtpp_set_list->rset_head_lock);
1619         for (rtpp_list = rtpp_set_list->rset_first; rtpp_list != 0;
1620                 rtpp_list = rtpp_list->rset_next) {
1621
1622                 lock_get(rtpp_list->rset_lock);
1623                 for (pnode=rtpp_list->rn_first; pnode!=0; pnode = pnode->rn_next) {
1624                         char *hostname;
1625
1626                         if (pnode->rn_umode == 0) {
1627                                 rtpp_socks[pnode->idx] = -1;
1628                                 goto rptest;
1629                         }
1630
1631                         /*
1632                          * This is UDP or UDP6. Detect host and port; lookup host;
1633                          * do connect() in order to specify peer address
1634                          */
1635                         hostname = (char*)pkg_malloc(sizeof(char) * (strlen(pnode->rn_address) + 1));
1636                         if (hostname==NULL) {
1637                                 LM_ERR("no more pkg memory\n");
1638                                 rtpp_socks[pnode->idx] = -1;
1639                                 continue;
1640                         }
1641                         strcpy(hostname, pnode->rn_address);
1642
1643                         cp = strrchr(hostname, ':');
1644                         if (cp != NULL) {
1645                                 *cp = '\0';
1646                                 cp++;
1647                         }
1648                         if (cp == NULL || *cp == '\0')
1649                                 cp = CPORT;
1650
1651                         memset(&hints, 0, sizeof(hints));
1652                         hints.ai_flags = 0;
1653                         hints.ai_family = (pnode->rn_umode == 6) ? AF_INET6 : AF_INET;
1654                         hints.ai_socktype = SOCK_DGRAM;
1655                         if ((n = getaddrinfo(hostname, cp, &hints, &res)) != 0) {
1656                                 LM_ERR("%s\n", gai_strerror(n));
1657                                 pkg_free(hostname);
1658                                 rtpp_socks[pnode->idx] = -1;
1659                                 continue;
1660                         }
1661                         pkg_free(hostname);
1662
1663                         rtpp_socks[pnode->idx] = socket((pnode->rn_umode == 6)
1664                                 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
1665                         if (rtpp_socks[pnode->idx] == -1) {
1666                                 LM_ERR("can't create socket\n");
1667                                 freeaddrinfo(res);
1668                                 continue;
1669                         }
1670
1671 #ifdef IP_MTU_DISCOVER
1672                         if (setsockopt(rtpp_socks[pnode->idx], IPPROTO_IP,
1673                                         IP_MTU_DISCOVER, &ip_mtu_discover,
1674                                         sizeof(ip_mtu_discover)))
1675                                 LM_WARN("Failed enable set MTU discovery socket option\n");
1676 #endif
1677
1678                         if (bind_force_send_ip(pnode->idx) == -1) {
1679                                 LM_ERR("can't bind socket\n");
1680                                 close(rtpp_socks[pnode->idx]);
1681                                 rtpp_socks[pnode->idx] = -1;
1682                                 freeaddrinfo(res);
1683                                 continue;
1684                         }
1685
1686                         if (connect(rtpp_socks[pnode->idx], res->ai_addr, res->ai_addrlen) == -1) {
1687                                 LM_ERR("can't connect to a RTP proxy\n");
1688                                 close(rtpp_socks[pnode->idx]);
1689                                 rtpp_socks[pnode->idx] = -1;
1690                                 freeaddrinfo(res);
1691                                 continue;
1692                         }
1693
1694                         freeaddrinfo(res);
1695 rptest:
1696                         pnode->rn_disabled = rtpp_test(pnode, 0, 1);
1697                 }
1698                 lock_release(rtpp_list->rset_lock);
1699         }
1700         lock_release(rtpp_set_list->rset_head_lock);
1701
1702         return 0;
1703 }
1704
1705 static int pv_parse_var(str *inp, pv_elem_t **outp, int *got_any) {
1706         if (inp->s && *inp->s) {
1707                 inp->len = strlen(inp->s);
1708                 if(pv_parse_format(inp, outp) < 0) {
1709                         LM_ERR("malformed PV string: %s\n", inp->s);
1710                         return -1;
1711                 }
1712                 if (got_any)
1713                         *got_any = 1;
1714         } else {
1715                 *outp = NULL;
1716         }
1717         return 0;
1718 }
1719
1720 static int minmax_pv_parse(struct minmax_mos_stats *s, int *got_any) {
1721         if (pv_parse_var(&s->mos_param, &s->mos_pv, got_any))
1722                 return -1;
1723         if (pv_parse_var(&s->at_param, &s->at_pv, got_any))
1724                 return -1;
1725         if (pv_parse_var(&s->packetloss_param, &s->packetloss_pv, got_any))
1726                 return -1;
1727         if (pv_parse_var(&s->jitter_param, &s->jitter_pv, got_any))
1728                 return -1;
1729         if (pv_parse_var(&s->roundtrip_param, &s->roundtrip_pv, got_any))
1730                 return -1;
1731         if (pv_parse_var(&s->samples_param, &s->samples_pv, got_any))
1732                 return -1;
1733         return 0;
1734 }
1735
1736 static int mos_label_stats_parse(struct minmax_mos_label_stats *mmls) {
1737         if (pv_parse_var(&mmls->label_param, &mmls->label_pv, &mmls->got_any_pvs))
1738                 return -1;
1739
1740         if (minmax_pv_parse(&mmls->min, &mmls->got_any_pvs))
1741                 return -1;
1742         if (minmax_pv_parse(&mmls->max, &mmls->got_any_pvs))
1743                 return -1;
1744         if (minmax_pv_parse(&mmls->average, &mmls->got_any_pvs))
1745                 return -1;
1746
1747         if (mmls->got_any_pvs)
1748                 got_any_mos_pvs = 1;
1749
1750         return 0;
1751 }
1752
1753
1754 static int
1755 child_init(int rank)
1756 {
1757         if(!rtpp_set_list)
1758                 return 0;
1759
1760         /* do not init sockets for PROC_INIT and main process when fork=yes */
1761         if(rank==PROC_INIT || (rank==PROC_MAIN && dont_fork==0)) {
1762                 return 0;
1763         }
1764
1765         mypid = getpid();
1766
1767         // vector of pointers to queried nodes
1768         queried_nodes_ptr = (struct rtpp_node**)pkg_malloc(MAX_RTPP_TRIED_NODES * sizeof(struct rtpp_node*));
1769         if (!queried_nodes_ptr) {
1770                 LM_ERR("no more pkg memory for queried_nodes_ptr\n");
1771                 return -1;
1772         }
1773         memset(queried_nodes_ptr, 0, MAX_RTPP_TRIED_NODES * sizeof(struct rtpp_node*));
1774
1775         /* Iterate known RTP proxies - create sockets */
1776         if (build_rtpp_socks())
1777                 return -1;
1778
1779         return 0;
1780 }
1781
1782
1783 static void mod_destroy(void)
1784 {
1785         struct rtpp_set * crt_list, * last_list;
1786         struct rtpp_node * crt_rtpp, *last_rtpp;
1787
1788         /*free the shared memory*/
1789         if (rtpp_no) {
1790                 shm_free(rtpp_no);
1791                 rtpp_no = NULL;
1792         }
1793
1794         if (rtpp_no_lock) {
1795                 lock_destroy(rtpp_no_lock);
1796                 lock_dealloc(rtpp_no_lock);
1797                 rtpp_no_lock = NULL;
1798         }
1799
1800         if (!rtpp_set_list) {
1801                 return;
1802         }
1803
1804         if (!rtpp_set_list->rset_head_lock) {
1805                 shm_free(rtpp_set_list);
1806                 rtpp_set_list = NULL;
1807                 return;
1808         }
1809
1810         lock_get(rtpp_set_list->rset_head_lock);
1811         for(crt_list = rtpp_set_list->rset_first; crt_list != NULL; ){
1812                 last_list = crt_list;
1813
1814                 if (!crt_list->rset_lock) {
1815                         crt_list = last_list->rset_next;
1816                         shm_free(last_list);
1817                         last_list = NULL;
1818                         continue;
1819                 }
1820
1821                 lock_get(last_list->rset_lock);
1822                 for(crt_rtpp = crt_list->rn_first; crt_rtpp != NULL;  ){
1823
1824                         if(crt_rtpp->rn_url.s)
1825                                 shm_free(crt_rtpp->rn_url.s);
1826
1827                         last_rtpp = crt_rtpp;
1828                         crt_rtpp = last_rtpp->rn_next;
1829                         shm_free(last_rtpp);
1830                 }
1831                 crt_list = last_list->rset_next;
1832                 lock_release(last_list->rset_lock);
1833
1834                 lock_destroy(last_list->rset_lock);
1835                 lock_dealloc((void*)last_list->rset_lock);
1836                 last_list->rset_lock = NULL;
1837
1838                 shm_free(last_list);
1839                 last_list = NULL;
1840         }
1841         lock_release(rtpp_set_list->rset_head_lock);
1842
1843         lock_destroy(rtpp_set_list->rset_head_lock);
1844         lock_dealloc((void*)rtpp_set_list->rset_head_lock);
1845         rtpp_set_list->rset_head_lock = NULL;
1846
1847         shm_free(rtpp_set_list);
1848         rtpp_set_list = NULL;
1849
1850         /* destroy the hastable which keeps the call-id <-> selected_node relation */
1851         if (!rtpengine_hash_table_destroy()) {
1852                 LM_ERR("rtpengine_hash_table_destroy() failed!\n");
1853         } else {
1854                 LM_DBG("rtpengine_hash_table_destroy() success!\n");
1855         }
1856 }
1857
1858
1859 static char * gencookie(void)
1860 {
1861         static char cook[34];
1862
1863         snprintf(cook, 34, "%d_%d_%u ", server_id, (int)mypid, myseqn);
1864         myseqn++;
1865         return cook;
1866 }
1867
1868
1869
1870 static const char *transports[] = {
1871         [0x00]  = "RTP/AVP",
1872         [0x01]  = "RTP/SAVP",
1873         [0x02]  = "RTP/AVPF",
1874         [0x03]  = "RTP/SAVPF",
1875 };
1876
1877 static int parse_flags(struct ng_flags_parse *ng_flags, struct sip_msg *msg, enum rtpe_operation *op,
1878                 const char *flags_str)
1879 {
1880         char *e;
1881         const char *err;
1882         str key, val, s;
1883
1884         if (!flags_str)
1885                 return 0;
1886
1887         while (1) {
1888                 while (*flags_str == ' ')
1889                         flags_str++;
1890
1891                 key.s = (void *) flags_str;
1892                 val.len = key.len = -1;
1893                 val.s = NULL;
1894
1895                 e = strpbrk(key.s, " =");
1896                 if (!e)
1897                         e = key.s + strlen(key.s);
1898                 else if (*e == '=') {
1899                         key.len = e - key.s;
1900                         val.s = e + 1;
1901                         e = strchr(val.s, ' ');
1902                         if (!e)
1903                                 e = val.s + strlen(val.s);
1904                         val.len = e - val.s;
1905                 }
1906
1907                 if (key.len == -1)
1908                         key.len = e - key.s;
1909                 if (!key.len)
1910                         break;
1911
1912                 /* check for items which have their own sub-list */
1913                 s = str_prefix(&key, "replace-");
1914                 if (s.s) {
1915                         bencode_list_add_str(ng_flags->replace, &s);
1916                         goto next;
1917                 }
1918
1919                 s = str_prefix(&key, "rtcp-mux-");
1920                 if (s.s) {
1921                         bencode_list_add_str(ng_flags->rtcp_mux, &s);
1922                         goto next;
1923                 }
1924
1925                 /* check for specially handled items */
1926                 switch (key.len) {
1927                         case 3:
1928                                 if (str_eq(&key, "RTP")) {
1929                                         ng_flags->transport |= 0x100;
1930                                         ng_flags->transport &= ~0x001;
1931                                 }
1932                                 else if (str_eq(&key, "AVP")) {
1933                                         ng_flags->transport |= 0x100;
1934                                         ng_flags->transport &= ~0x002;
1935                                 }
1936                                 else if (str_eq(&key, "TOS") && val.s)
1937                                         bencode_dictionary_add_integer(ng_flags->dict, "TOS", atoi(val.s));
1938                                 else if (str_eq(&key, "delete-delay") && val.s)
1939                                         bencode_dictionary_add_integer(ng_flags->dict, "delete delay", atoi(val.s));
1940                                 else
1941                                         goto generic;
1942                                 goto next;
1943                                 break;
1944
1945                         case 4:
1946                                 if (str_eq(&key, "SRTP"))
1947                                         ng_flags->transport |= 0x101;
1948                                 else if (str_eq(&key, "AVPF"))
1949                                         ng_flags->transport |= 0x102;
1950                                 else
1951                                         goto generic;
1952                                 goto next;
1953                                 break;
1954
1955                         case 6:
1956                                 if (str_eq(&key, "to-tag")) {
1957                                         if (val.s)
1958                                                 ng_flags->to_tag = val;
1959                                         ng_flags->to = 1;
1960                                         goto next;
1961                                 }
1962                                 break;
1963
1964                         case 7:
1965                                 if (str_eq(&key, "RTP/AVP"))
1966                                         ng_flags->transport = 0x100;
1967                                 else if (str_eq(&key, "call-id")) {
1968                                         err = "missing value";
1969                                         if (!val.s)
1970                                                 goto error;
1971                                         ng_flags->call_id = val;
1972                                 }
1973                                 else
1974                                         goto generic;
1975                                 goto next;
1976                                 break;
1977
1978                         case 8:
1979                                 if (str_eq(&key, "internal") || str_eq(&key, "external"))
1980                                         bencode_list_add_str(ng_flags->direction, &key);
1981                                 else if (str_eq(&key, "RTP/AVPF"))
1982                                         ng_flags->transport = 0x102;
1983                                 else if (str_eq(&key, "RTP/SAVP"))
1984                                         ng_flags->transport = 0x101;
1985                                 else if (str_eq(&key, "from-tag")) {
1986                                         err = "missing value";
1987                                         if (!val.s)
1988                                                 goto error;
1989                                         ng_flags->from_tag = val;
1990                                 }
1991                                 else
1992                                         goto generic;
1993                                 goto next;
1994                                 break;
1995
1996                         case 9:
1997                                 if (str_eq(&key, "RTP/SAVPF"))
1998                                         ng_flags->transport = 0x103;
1999                                 else if (str_eq(&key, "direction"))
2000                                         bencode_list_add_str(ng_flags->direction, &val);
2001                                 else
2002                                         goto generic;
2003                                 goto next;
2004                                 break;
2005
2006                         case 10:
2007                                 if (str_eq(&key, "via-branch")) {
2008                                         err = "missing value";
2009                                         if (!val.s)
2010                                                 goto error;
2011                                         err = "invalid value";
2012                                         if (*val.s == '1' || *val.s == '2')
2013                                                 ng_flags->via = *val.s - '0';
2014                                         else if (str_eq(&val, "auto"))
2015                                                 ng_flags->via = 3;
2016                                         else if (str_eq(&val, "extra"))
2017                                                 ng_flags->via = -1;
2018                                         else
2019                                                 goto error;
2020                                         goto next;
2021                                 }
2022                                 break;
2023
2024                         case 11:
2025                                 if (str_eq(&key, "repacketize")) {
2026                                         err = "missing value";
2027                                         if (!val.s)
2028                                                 goto error;
2029                                         ng_flags->packetize = 0;
2030                                         while (isdigit(*val.s)) {
2031                                                 ng_flags->packetize *= 10;
2032                                                 ng_flags->packetize += *val.s - '0';
2033                                                 val.s++;
2034                                         }
2035                                         err = "invalid value";
2036                                         if (!ng_flags->packetize)
2037                                                 goto error;
2038                                         bencode_dictionary_add_integer(ng_flags->dict, "repacketize", ng_flags->packetize);
2039                                         goto next;
2040                                 }
2041                                 break;
2042
2043                         case 12:
2044                                 if (str_eq(&key, "force-answer")) {
2045                                         err = "cannot force answer in non-offer command";
2046                                         if (*op != OP_OFFER)
2047                                                 goto error;
2048                                         *op = OP_ANSWER;
2049                                         goto next;
2050                                 }
2051                                 break;
2052                 }
2053
2054 generic:
2055                 if (!val.s)
2056                         bencode_list_add_str(ng_flags->flags, &key);
2057                 else
2058                         bencode_dictionary_str_add_str(ng_flags->dict, &key, &val);
2059                 goto next;
2060
2061 next:
2062                 flags_str = e;
2063         }
2064
2065         return 0;
2066
2067 error:
2068         if (val.s)
2069                 LM_ERR("error processing flag `%.*s' (value '%.*s'): %s\n", key.len, key.s,
2070                                 val.len, val.s, err);
2071         else
2072                 LM_ERR("error processing flag `%.*s': %s\n", key.len, key.s, err);
2073         return -1;
2074 }
2075
2076 static bencode_item_t *rtpp_function_call(bencode_buffer_t *bencbuf, struct sip_msg *msg,
2077         enum rtpe_operation op, const char *flags_str, str *body_out)
2078 {
2079         struct ng_flags_parse ng_flags;
2080         bencode_item_t *item, *resp;
2081         str viabranch = STR_NULL;
2082         str body = STR_NULL, error = STR_NULL;
2083         int ret, queried_nodes = 0;
2084         struct rtpp_node *node;
2085         char *cp;
2086         pv_value_t pv_val;
2087
2088         /*** get & init basic stuff needed ***/
2089
2090         memset(&ng_flags, 0, sizeof(ng_flags));
2091
2092         if (get_callid(msg, &ng_flags.call_id) == -1 || ng_flags.call_id.len == 0) {
2093                 LM_ERR("can't get Call-Id field\n");
2094                 return NULL;
2095         }
2096         if (get_to_tag(msg, &ng_flags.to_tag) == -1) {
2097                 LM_ERR("can't get To tag\n");
2098                 return NULL;
2099         }
2100         if (get_from_tag(msg, &ng_flags.from_tag) == -1 || ng_flags.from_tag.len == 0) {
2101                 LM_ERR("can't get From tag\n");
2102                 return NULL;
2103         }
2104         if (bencode_buffer_init(bencbuf)) {
2105                 LM_ERR("could not initialize bencode_buffer_t\n");
2106                 return NULL;
2107         }
2108         ng_flags.dict = bencode_dictionary(bencbuf);
2109
2110         body.s = NULL;
2111         if (op == OP_OFFER || op == OP_ANSWER) {
2112                 ng_flags.flags = bencode_list(bencbuf);
2113                 ng_flags.direction = bencode_list(bencbuf);
2114                 ng_flags.replace = bencode_list(bencbuf);
2115                 ng_flags.rtcp_mux = bencode_list(bencbuf);
2116
2117                 if (read_sdp_pvar!= NULL) {
2118                         if (read_sdp_pvar->getf(msg,&read_sdp_pvar->pvp, &pv_val) < 0)
2119                         {
2120                                 LM_ERR("error getting pvar value <%.*s>\n", read_sdp_pvar_str.len, read_sdp_pvar_str.s);
2121                                 goto error;
2122                         } else {
2123                                 body = pv_val.rs;
2124                         }
2125
2126                 } else if (extract_body(msg, &body) == -1) {
2127                         LM_ERR("can't extract body from the message\n");
2128                         goto error;
2129                 }
2130                 if (body_intermediate.s)
2131                         bencode_dictionary_add_str(ng_flags.dict, "sdp", &body_intermediate);
2132                 else
2133                         bencode_dictionary_add_str(ng_flags.dict, "sdp", &body);
2134         }
2135
2136         /*** parse flags & build dictionary ***/
2137
2138         ng_flags.to = (op == OP_DELETE) ? 0 : 1;
2139
2140         if (parse_flags(&ng_flags, msg, &op, flags_str))
2141                 goto error;
2142
2143         /* only add those if any flags were given at all */
2144         if (ng_flags.direction && ng_flags.direction->child)
2145                 bencode_dictionary_add(ng_flags.dict, "direction", ng_flags.direction);
2146         if (ng_flags.flags && ng_flags.flags->child)
2147                 bencode_dictionary_add(ng_flags.dict, "flags", ng_flags.flags);
2148         if (ng_flags.replace && ng_flags.replace->child)
2149                 bencode_dictionary_add(ng_flags.dict, "replace", ng_flags.replace);
2150         if ((ng_flags.transport & 0x100))
2151                 bencode_dictionary_add_string(ng_flags.dict, "transport-protocol",
2152                                 transports[ng_flags.transport & 0x003]);
2153         if (ng_flags.rtcp_mux && ng_flags.rtcp_mux->child)
2154                 bencode_dictionary_add(ng_flags.dict, "rtcp-mux", ng_flags.rtcp_mux);
2155
2156         bencode_dictionary_add_str(ng_flags.dict, "call-id", &ng_flags.call_id);
2157
2158         if (ng_flags.via) {
2159                 if (ng_flags.via == 1 || ng_flags.via == 2)
2160                         ret = get_via_branch(msg, ng_flags.via, &viabranch);
2161                 else if (ng_flags.via == -1 && extra_id_pv)
2162                         ret = get_extra_id(msg, &viabranch);
2163                 else
2164                         ret = -1;
2165                 if (ret == -1 || viabranch.len == 0) {
2166                         LM_ERR("can't get Via branch/extra ID\n");
2167                         goto error;
2168                 }
2169                 bencode_dictionary_add_str(ng_flags.dict, "via-branch", &viabranch);
2170         }
2171
2172         item = bencode_list(bencbuf);
2173         bencode_dictionary_add(ng_flags.dict, "received-from", item);
2174         bencode_list_add_string(item, (msg->rcv.src_ip.af == AF_INET) ? "IP4" : (
2175                 (msg->rcv.src_ip.af == AF_INET6) ? "IP6" :
2176                 "?"
2177         ) );
2178         bencode_list_add_string(item, ip_addr2a(&msg->rcv.src_ip));
2179
2180         if ((msg->first_line.type == SIP_REQUEST && op != OP_ANSWER)
2181                 || (msg->first_line.type == SIP_REPLY && op == OP_DELETE)
2182                 || (msg->first_line.type == SIP_REPLY && op == OP_ANSWER))
2183         {
2184                 bencode_dictionary_add_str(ng_flags.dict, "from-tag", &ng_flags.from_tag);
2185                 if (ng_flags.to && ng_flags.to_tag.s && ng_flags.to_tag.len)
2186                         bencode_dictionary_add_str(ng_flags.dict, "to-tag", &ng_flags.to_tag);
2187         }
2188         else {
2189                 if (!ng_flags.to_tag.s || !ng_flags.to_tag.len) {
2190                         LM_ERR("No to-tag present\n");
2191                         goto error;
2192                 }
2193                 bencode_dictionary_add_str(ng_flags.dict, "from-tag", &ng_flags.to_tag);
2194                 bencode_dictionary_add_str(ng_flags.dict, "to-tag", &ng_flags.from_tag);
2195         }
2196
2197         bencode_dictionary_add_string(ng_flags.dict, "command", command_strings[op]);
2198
2199         /*** send it out ***/
2200
2201         if (bencbuf->error) {
2202                 LM_ERR("out of memory - bencode failed\n");
2203                 goto error;
2204         }
2205
2206         if(msg->id != current_msg_id)
2207                 active_rtpp_set = default_rtpp_set;
2208
2209 select_node:
2210         do {
2211                 if (queried_nodes >= cfg_get(rtpengine,rtpengine_cfg,queried_nodes_limit)) {
2212                         LM_ERR("queried nodes limit reached\n");
2213                         goto error;
2214                 }
2215
2216                 node = select_rtpp_node(ng_flags.call_id, viabranch, 1, queried_nodes_ptr, queried_nodes, op);
2217                 if (!node) {
2218                         LM_ERR("no available proxies\n");
2219                         goto error;
2220                 }
2221
2222                 cp = send_rtpp_command(node, ng_flags.dict, &ret);
2223                 if (cp == NULL) {
2224                         node->rn_disabled = 1;
2225                         node->rn_recheck_ticks = get_ticks() + cfg_get(rtpengine,rtpengine_cfg,rtpengine_disable_tout);
2226                 }
2227
2228                 queried_nodes_ptr[queried_nodes++] = node;
2229         } while (cp == NULL);
2230
2231         LM_DBG("proxy reply: %.*s\n", ret, cp);
2232
2233         set_rtp_inst_pvar(msg, &node->rn_url);
2234         /*** process reply ***/
2235
2236         resp = bencode_decode_expect(bencbuf, cp, ret, BENCODE_DICTIONARY);
2237         if (!resp) {
2238                 LM_ERR("failed to decode bencoded reply from proxy: %.*s\n", ret, cp);
2239                 goto error;
2240         }
2241
2242         if (!bencode_dictionary_get_strcmp(resp, "result", "error")) {
2243                 if (!bencode_dictionary_get_str(resp, "error-reason", &error)) {
2244                         LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp);
2245                 } else {
2246                         if ((RTPENGINE_SESS_LIMIT_MSG_LEN == error.len) &&
2247                                 (strncmp(error.s, RTPENGINE_SESS_LIMIT_MSG, RTPENGINE_SESS_LIMIT_MSG_LEN) == 0))
2248                         {
2249                                 LM_WARN("proxy %.*s: %.*s", node->rn_url.len, node->rn_url.s , error.len, error.s);
2250                                 goto select_node;
2251                         }
2252                         LM_ERR("proxy replied with error: %.*s\n", error.len, error.s);
2253                 }
2254                 goto error;
2255         }
2256
2257         /* add hastable entry with the node => */
2258         if (!rtpengine_hash_table_lookup(ng_flags.call_id, viabranch, op)) {
2259                 // build the entry
2260                 struct rtpengine_hash_entry *entry = shm_malloc(sizeof(struct rtpengine_hash_entry));
2261                 if (!entry) {
2262                         LM_ERR("rtpengine hash table fail to create entry for calllen=%d callid=%.*s viabranch=%.*s\n",
2263                                 ng_flags.call_id.len, ng_flags.call_id.len, ng_flags.call_id.s,
2264                                 viabranch.len, viabranch.s);
2265                         goto skip_hash_table_insert;
2266                 }
2267                 memset(entry, 0, sizeof(struct rtpengine_hash_entry));
2268
2269                 // fill the entry
2270                 if (ng_flags.call_id.s && ng_flags.call_id.len > 0) {
2271                         if (shm_str_dup(&entry->callid, &ng_flags.call_id) < 0) {
2272                                 LM_ERR("rtpengine hash table fail to duplicate calllen=%d callid=%.*s\n",
2273                                         ng_flags.call_id.len, ng_flags.call_id.len, ng_flags.call_id.s);
2274                                 rtpengine_hash_table_free_entry(entry);
2275                                 goto skip_hash_table_insert;
2276                         }
2277                 }
2278                 if (viabranch.s && viabranch.len > 0) {
2279                         if (shm_str_dup(&entry->viabranch, &viabranch) < 0) {
2280                                 LM_ERR("rtpengine hash table fail to duplicate calllen=%d viabranch=%.*s\n",
2281                                         ng_flags.call_id.len, viabranch.len, viabranch.s);
2282                                 rtpengine_hash_table_free_entry(entry);
2283                                 goto skip_hash_table_insert;
2284                         }
2285                 }
2286                 entry->node = node;
2287                 entry->next = NULL;
2288                 entry->tout = get_ticks() + hash_table_tout;
2289
2290                 // insert the key<->entry from the hashtable
2291                 if (!rtpengine_hash_table_insert(ng_flags.call_id, viabranch, entry)) {
2292                         LM_ERR("rtpengine hash table fail to insert node=%.*s for calllen=%d callid=%.*s viabranch=%.*s\n",
2293                                 node->rn_url.len, node->rn_url.s, ng_flags.call_id.len,
2294                                 ng_flags.call_id.len, ng_flags.call_id.s, viabranch.len, viabranch.s);
2295                         rtpengine_hash_table_free_entry(entry);
2296                         goto skip_hash_table_insert;
2297                 } else {
2298                         LM_DBG("rtpengine hash table insert node=%.*s for calllen=%d callid=%.*s viabranch=%.*s\n",
2299                                 node->rn_url.len, node->rn_url.s, ng_flags.call_id.len,
2300                                 ng_flags.call_id.len, ng_flags.call_id.s, viabranch.len, viabranch.s);
2301                 }
2302         }
2303
2304 skip_hash_table_insert:
2305         if (body_out)
2306                 *body_out = body;
2307
2308         if (op == OP_DELETE) {
2309                 /* Delete the key<->value from the hashtable */
2310                 if (!rtpengine_hash_table_remove(ng_flags.call_id, viabranch, op)) {
2311                         LM_ERR("rtpengine hash table failed to remove entry for callen=%d callid=%.*s viabranch=%.*s\n",
2312                                 ng_flags.call_id.len, ng_flags.call_id.len, ng_flags.call_id.s,
2313                                 viabranch.len, viabranch.s);
2314                 } else {
2315                         LM_DBG("rtpengine hash table remove entry for callen=%d callid=%.*s viabranch=%.*s\n",
2316                                 ng_flags.call_id.len, ng_flags.call_id.len, ng_flags.call_id.s,
2317                                 viabranch.len, viabranch.s);
2318                 }
2319         }
2320
2321         return resp;
2322
2323 error:
2324         bencode_buffer_free(bencbuf);
2325         return NULL;
2326 }
2327
2328 static int rtpp_function_call_simple(struct sip_msg *msg, enum rtpe_operation op, const char *flags_str)
2329 {
2330         bencode_buffer_t bencbuf;
2331
2332         if (!rtpp_function_call(&bencbuf, msg, op, flags_str, NULL))
2333                 return -1;
2334
2335         bencode_buffer_free(&bencbuf);
2336         return 1;
2337 }
2338
2339 static bencode_item_t *rtpp_function_call_ok(bencode_buffer_t *bencbuf, struct sip_msg *msg,
2340                 enum rtpe_operation op, const char *flags_str, str *body)
2341 {
2342         bencode_item_t *ret;
2343
2344         ret = rtpp_function_call(bencbuf, msg, op, flags_str, body);
2345         if (!ret)
2346                 return NULL;
2347
2348         if (bencode_dictionary_get_strcmp(ret, "result", "ok")) {
2349                 LM_ERR("proxy didn't return \"ok\" result\n");
2350                 bencode_buffer_free(bencbuf);
2351                 return NULL;
2352         }
2353
2354         return ret;
2355 }
2356
2357
2358
2359 static int
2360 rtpp_test(struct rtpp_node *node, int isdisabled, int force)
2361 {
2362         bencode_buffer_t bencbuf;
2363         bencode_item_t *dict;
2364         char *cp;
2365         int ret;
2366
2367         if(node->rn_recheck_ticks == RTPENGINE_MAX_RECHECK_TICKS){
2368                 LM_DBG("rtpp %s disabled for ever\n", node->rn_url.s);
2369                 return 1;
2370         }
2371         if (force == 0) {
2372                 if (isdisabled == 0)
2373                         return 0;
2374                 if (node->rn_recheck_ticks > get_ticks())
2375                         return 1;
2376         }
2377
2378         if (bencode_buffer_init(&bencbuf)) {
2379                 LM_ERR("could not initialized bencode_buffer_t\n");
2380                 return 1;
2381         }
2382         dict = bencode_dictionary(&bencbuf);
2383         bencode_dictionary_add_string(dict, "command", "ping");
2384         if (bencbuf.error)
2385                 goto benc_error;
2386
2387         cp = send_rtpp_command(node, dict, &ret);
2388         if (!cp) {
2389                 node->rn_disabled = 1;
2390                 node->rn_recheck_ticks = get_ticks() + cfg_get(rtpengine,rtpengine_cfg,rtpengine_disable_tout);
2391                 LM_ERR("proxy did not respond to ping\n");
2392                 goto error;
2393         }
2394
2395         dict = bencode_decode_expect(&bencbuf, cp, ret, BENCODE_DICTIONARY);
2396         if (!dict || bencode_dictionary_get_strcmp(dict, "result", "pong")) {
2397                 LM_ERR("proxy responded with invalid response\n");
2398                 goto error;
2399         }
2400
2401         LM_INFO("rtp proxy <%s> found, support for it %senabled\n",
2402                 node->rn_url.s, force == 0 ? "re-" : "");
2403
2404         bencode_buffer_free(&bencbuf);
2405         return 0;
2406
2407 benc_error:
2408         LM_ERR("out of memory - bencode failed\n");
2409 error:
2410         bencode_buffer_free(&bencbuf);
2411         return 1;
2412 }
2413
2414 static char *
2415 send_rtpp_command(struct rtpp_node *node, bencode_item_t *dict, int *outlen)
2416 {
2417         struct sockaddr_un addr;
2418         int fd, len, i, vcnt;
2419         int rtpengine_retr, rtpengine_tout_ms = 1000;
2420         char *cp;
2421         static char buf[0x10000];
2422         struct pollfd fds[1];
2423         struct iovec *v;
2424         str out = STR_NULL;
2425
2426         v = bencode_iovec(dict, &vcnt, 1, 0);
2427         if (!v) {
2428                 LM_ERR("error converting bencode to iovec\n");
2429                 return NULL;
2430         }
2431
2432         len = 0;
2433         cp = buf;
2434         if (node->rn_umode == 0) {
2435                 memset(&addr, 0, sizeof(addr));
2436                 addr.sun_family = AF_LOCAL;
2437                 strncpy(addr.sun_path, node->rn_address,
2438                         sizeof(addr.sun_path) - 1);
2439 #ifdef HAVE_SOCKADDR_SA_LEN
2440                 addr.sun_len = strlen(addr.sun_path);
2441 #endif
2442
2443                 fd = socket(AF_LOCAL, SOCK_STREAM, 0);
2444                 if (fd < 0) {
2445                         LM_ERR("can't create socket\n");
2446                         goto badproxy;
2447                 }
2448                 if (connect(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
2449                         close(fd);
2450                         LM_ERR("can't connect to RTP proxy <%s>\n", node->rn_url.s);
2451                         goto badproxy;
2452                 }
2453
2454                 do {
2455                         len = writev(fd, v + 1, vcnt);
2456                 } while (len == -1 && errno == EINTR);
2457                 if (len <= 0) {
2458                         close(fd);
2459                         LM_ERR("can't send command to RTP proxy <%s>\n", node->rn_url.s);
2460                         goto badproxy;
2461                 }
2462                 do {
2463                         len = read(fd, buf, sizeof(buf) - 1);
2464                 } while (len == -1 && errno == EINTR);
2465                 close(fd);
2466                 if (len <= 0) {
2467                         LM_ERR("can't read reply from RTP proxy <%s>\n", node->rn_url.s);
2468                         goto badproxy;
2469                 }
2470         } else {
2471                 fds[0].fd = rtpp_socks[node->idx];
2472                 fds[0].events = POLLIN;
2473                 fds[0].revents = 0;
2474                 /* Drain input buffer */
2475                 while ((poll(fds, 1, 0) == 1) &&
2476                         ((fds[0].revents & POLLIN) != 0)) {
2477                         /* coverity[check_return : FALSE] */
2478                         recv(rtpp_socks[node->idx], buf, sizeof(buf) - 1, 0);
2479                         fds[0].revents = 0;
2480                 }
2481                 v[0].iov_base = gencookie();
2482                 v[0].iov_len = strlen(v[0].iov_base);
2483         rtpengine_retr = cfg_get(rtpengine,rtpengine_cfg,rtpengine_retr);
2484                 for (i = 0; i < rtpengine_retr; i++) {
2485                         do {
2486                                 len = writev(rtpp_socks[node->idx], v, vcnt + 1);
2487                         } while (len == -1 && (errno == EINTR || errno == ENOBUFS));
2488                         if (len <= 0) {
2489                                 bencode_get_str(bencode_dictionary_get(dict, "command"), &out);
2490                                 LM_ERR("can't send command \"%.*s\" to RTP proxy <%s>\n", out.len, out.s, node->rn_url.s);
2491                                 goto badproxy;
2492                         }
2493             rtpengine_tout_ms = cfg_get(rtpengine,rtpengine_cfg,rtpengine_tout_ms);
2494                         while ((poll(fds, 1, rtpengine_tout_ms) == 1) &&
2495                                 (fds[0].revents & POLLIN) != 0) {
2496                                 do {
2497                                         len = recv(rtpp_socks[node->idx], buf, sizeof(buf)-1, 0);
2498                                 } while (len == -1 && errno == EINTR);
2499                                 if (len <= 0) {
2500                                         LM_ERR("can't read reply from RTP proxy <%s>\n", node->rn_url.s);
2501                                         goto badproxy;
2502                                 }
2503                                 if (len >= (v[0].iov_len - 1) &&
2504                                         memcmp(buf, v[0].iov_base, (v[0].iov_len - 1)) == 0) {
2505                                         len -= (v[0].iov_len - 1);
2506                                         cp += (v[0].iov_len - 1);
2507                                         if (len != 0) {
2508                                                 len--;
2509                                                 cp++;
2510                                         }
2511                                         goto out;
2512                                 }
2513                                 fds[0].revents = 0;
2514                         }
2515                 }
2516                 if (i == rtpengine_retr) {
2517                         LM_ERR("timeout waiting reply from RTP proxy <%s>\n", node->rn_url.s);
2518                         goto badproxy;
2519                 }
2520         }
2521
2522 out:
2523         cp[len] = '\0';
2524         *outlen = len;
2525         return cp;
2526
2527 badproxy:
2528         return NULL;
2529 }
2530
2531 /*
2532  * select the set with the id_set id
2533  */
2534
2535 static struct rtpp_set * select_rtpp_set(unsigned int id_set ){
2536
2537         struct rtpp_set * rtpp_list;
2538         /*is it a valid set_id?*/
2539
2540         if (!rtpp_set_list) {
2541                 LM_ERR("no rtpp_set_list\n");
2542                 return 0;
2543         }
2544
2545         lock_get(rtpp_set_list->rset_head_lock);
2546         if (!rtpp_set_list->rset_first) {
2547                 LM_ERR("no rtpp_set_list->rset_first\n");
2548                 lock_release(rtpp_set_list->rset_head_lock);
2549                 return 0;
2550         }
2551
2552         for (rtpp_list=rtpp_set_list->rset_first; rtpp_list!=0 &&
2553                         rtpp_list->id_set!=id_set; rtpp_list=rtpp_list->rset_next);
2554         if (!rtpp_list) {
2555                 LM_ERR(" script error-invalid id_set to be selected\n");
2556         }
2557         lock_release(rtpp_set_list->rset_head_lock);
2558
2559         return rtpp_list;
2560 }
2561
2562 /*
2563  * run the selection algorithm and return the new selected node
2564  */
2565 static struct rtpp_node *
2566 select_rtpp_node_new(str callid, str viabranch, int do_test, struct rtpp_node **queried_nodes_ptr, int queried_nodes)
2567 {
2568         struct rtpp_node* node;
2569         unsigned i, sum, sumcut, weight_sum;
2570         int was_forced = 0;
2571
2572         /* XXX Use quick-and-dirty hashing algo */
2573         sum = 0;
2574         for(i = 0; i < callid.len; i++)
2575                 sum += callid.s[i];
2576         sum &= 0xff;
2577
2578 retry:
2579         weight_sum = 0;
2580
2581         lock_get(active_rtpp_set->rset_lock);
2582         for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
2583                 /* Select only between displayed machines */
2584                 if (!node->rn_displayed) {
2585                         continue;
2586                 }
2587
2588                 /* Try to enable if it's time to try. */
2589                 if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks()){
2590                         node->rn_disabled = rtpp_test(node, 1, 0);
2591                 }
2592
2593                 /* Select only between enabled machines */
2594                 if (!node->rn_disabled && !is_queried_node(node, queried_nodes_ptr, queried_nodes)) {
2595                         weight_sum += node->rn_weight;
2596                 }
2597         }
2598         lock_release(active_rtpp_set->rset_lock);
2599
2600         /* No proxies? Force all to be redetected, if not yet */
2601         if (weight_sum == 0) {
2602                 if (was_forced) {
2603                         return NULL;
2604                 }
2605
2606                 was_forced = 1;
2607
2608                 lock_get(active_rtpp_set->rset_lock);
2609                 for(node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
2610                         /* Select only between displayed machines */
2611                         if (!node->rn_displayed) {
2612                                 continue;
2613                         }
2614
2615                         node->rn_disabled = rtpp_test(node, 1, 1);
2616                 }
2617                 lock_release(active_rtpp_set->rset_lock);
2618
2619                 goto retry;
2620         }
2621
2622         /* sumcut here lays from 0 to weight_sum-1 */
2623         sumcut = sum % weight_sum;
2624
2625         /*
2626          * Scan proxy list and decrease until appropriate proxy is found.
2627          */
2628         lock_get(active_rtpp_set->rset_lock);
2629         for (node=active_rtpp_set->rn_first; node!=NULL; node=node->rn_next) {
2630                 /* Select only between displayed machines */
2631                 if (!node->rn_displayed) {
2632                         continue;
2633                 }
2634
2635                 /* Select only between enabled machines */
2636                 if (node->rn_disabled)
2637                         continue;
2638
2639                 /* Select only between not already queried machines */
2640                 if (is_queried_node(node, queried_nodes_ptr, queried_nodes))
2641                         continue;
2642
2643                 /* Found machine */
2644                 if (sumcut < node->rn_weight) {
2645                         lock_release(active_rtpp_set->rset_lock);
2646                         goto found;
2647                 }
2648
2649                 /* Update sumcut if enabled machine */
2650                 sumcut -= node->rn_weight;
2651         }
2652         lock_release(active_rtpp_set->rset_lock);
2653
2654         /* No node list */
2655         return NULL;
2656
2657 found:
2658         if (do_test) {
2659                 lock_get(active_rtpp_set->rset_lock);
2660                 node->rn_disabled = rtpp_test(node, node->rn_disabled, 0);
2661                 if (node->rn_disabled) {
2662                         lock_release(active_rtpp_set->rset_lock);
2663                         goto retry;
2664                 }
2665                 lock_release(active_rtpp_set->rset_lock);
2666         }
2667
2668         /* return selected node */
2669         return node;
2670 }
2671
2672 /*
2673  * lookup the hastable (key=callid value=node) and get the old node (e.g. for answer/delete)
2674  */
2675 static struct rtpp_node *
2676 select_rtpp_node_old(str callid, str viabranch, int do_test, enum rtpe_operation op)
2677 {
2678         struct rtpp_node *node = NULL;
2679
2680         node = rtpengine_hash_table_lookup(callid, viabranch, op);
2681
2682         if (!node) {
2683                 LM_DBG("rtpengine hash table lookup failed to find node for calllen=%d callid=%.*s viabranch=%.*s\n",
2684                         callid.len, callid.len, callid.s, viabranch.len, viabranch.s);
2685                 return NULL;
2686         } else {
2687                 LM_DBG("rtpengine hash table lookup find node=%.*s for calllen=%d callid=%.*s viabranch=%.*s\n",
2688                         node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s, viabranch.len, viabranch.s);
2689         }
2690
2691         return node;
2692 }
2693
2694 /*
2695  * Main balancing routine. This DO try to keep the same proxy for
2696  * the call if some proxies were disabled or enabled (e.g. kamctl command)
2697  */
2698 static struct rtpp_node *
2699 select_rtpp_node(str callid, str viabranch, int do_test, struct rtpp_node **queried_nodes_ptr, int queried_nodes, enum rtpe_operation op)
2700 {
2701         struct rtpp_node *node = NULL;
2702
2703         if (build_rtpp_socks()) {
2704                 LM_ERR("out of memory\n");
2705                 return NULL;
2706         }
2707
2708         if (!active_rtpp_set) {
2709                 default_rtpp_set = select_rtpp_set(setid_default);
2710                 active_rtpp_set = default_rtpp_set;
2711         }
2712
2713         if (!active_rtpp_set) {
2714                 LM_ERR("script error - no valid set selected\n");
2715                 return NULL;
2716         }
2717
2718         // lookup node
2719         node = select_rtpp_node_old(callid, viabranch, do_test, op);
2720
2721         // check node
2722         if (!node) {
2723                 // run the selection algorithm
2724                 node = select_rtpp_node_new(callid, viabranch, do_test, queried_nodes_ptr, queried_nodes);
2725
2726                 // check node
2727                 if (!node) {
2728                         LM_ERR("rtpengine failed to select new for calllen=%d callid=%.*s\n",
2729                                 callid.len, callid.len, callid.s);
2730                         return NULL;
2731                 }
2732         }
2733
2734         // if node enabled, return it
2735         if (!node->rn_disabled) {
2736                 return node;
2737         }
2738
2739         // if proper configuration and node manually or timeout disabled, return it
2740         if (rtpengine_allow_op) {
2741                 if (node->rn_recheck_ticks == RTPENGINE_MAX_RECHECK_TICKS) {
2742                         LM_DBG("node=%.*s for calllen=%d callid=%.*s is disabled(permanent) (probably still UP)! Return it\n",
2743                                 node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s);
2744                 } else {
2745                         LM_DBG("node=%.*s for calllen=%d callid=%.*s is disabled, either broke or timeout disabled! Return it\n",
2746                                 node->rn_url.len, node->rn_url.s, callid.len, callid.len, callid.s);
2747                 }
2748                 return node;
2749         }
2750
2751         return NULL;
2752 }
2753
2754 static int
2755 get_extra_id(struct sip_msg* msg, str *id_str) {
2756         if (msg == NULL || extra_id_pv == NULL || id_str == NULL) {
2757                 LM_ERR("bad parameters\n");
2758                 return -1;
2759         }
2760         if (pv_printf_s(msg, extra_id_pv, id_str) < 0) {
2761                 LM_ERR("cannot print the additional id\n");
2762                 return -1;
2763         }
2764
2765         return 1;
2766
2767 }
2768
2769 static int
2770 set_rtpengine_set_from_avp(struct sip_msg *msg, int direction)
2771 {
2772         struct usr_avp *avp;
2773         int_str setid_val;
2774
2775         if ((setid_avp_param == NULL) ||
2776            (avp = search_first_avp(setid_avp_type, setid_avp, &setid_val, 0)) == NULL) {
2777                 if (direction == 1 || !selected_rtpp_set_2)
2778                         active_rtpp_set = selected_rtpp_set_1;
2779                 else
2780                         active_rtpp_set = selected_rtpp_set_2;
2781                 return 1;
2782         }
2783
2784         if (avp->flags&AVP_VAL_STR) {
2785                 LM_ERR("setid_avp must hold an integer value\n");
2786                 return -1;
2787         }
2788
2789         active_rtpp_set = select_rtpp_set(setid_val.n);
2790         if(active_rtpp_set == NULL) {
2791                 LM_ERR("could not locate rtpproxy set %u\n", setid_val.n);
2792                 return -1;
2793         }
2794
2795         LM_DBG("using rtpengine set %u\n", setid_val.n);
2796
2797         current_msg_id = msg->id;
2798
2799         return 1;
2800 }
2801
2802 static void avp_print_s(pv_elem_t *pv, char *str, int len, struct sip_msg *msg) {
2803         pv_value_t val;
2804
2805         if (!pv)
2806                 return;
2807
2808         memset(&val, 0, sizeof(val));
2809         val.flags = PV_VAL_STR;
2810         val.rs.s = str;
2811         val.rs.len = len;
2812         pv->spec->setf(msg, &pv->spec->pvp, EQ_T, &val);
2813 }
2814
2815 static void avp_print_decimal(pv_elem_t *pv, int num, struct sip_msg *msg) {
2816         int len;
2817         char buf[8];
2818
2819         len = snprintf(buf, sizeof(buf), "%i.%i", num / 10, abs(num % 10));
2820         avp_print_s(pv, buf, len, msg);
2821 }
2822 static void avp_print_int(pv_elem_t *pv, int num, struct sip_msg *msg) {
2823         int len;
2824         char buf[8];
2825
2826         len = snprintf(buf, sizeof(buf), "%i", num);
2827         avp_print_s(pv, buf, len, msg);
2828 }
2829 static void avp_print_time(pv_elem_t *pv, int num, struct sip_msg *msg) {
2830         int len;
2831         char buf[8];
2832
2833         len = snprintf(buf, sizeof(buf), "%i:%02i", num / 60, abs(num % 60));
2834         avp_print_s(pv, buf, len, msg);
2835 }
2836
2837 static void avp_print_mos(struct minmax_mos_stats *s, struct minmax_stats_vals *vals, long long created,
2838                 struct sip_msg *msg)
2839 {
2840         if (!vals->avg_samples)
2841                 return;
2842
2843         avp_print_decimal(s->mos_pv, vals->mos / vals->avg_samples, msg);
2844         avp_print_time(s->at_pv, vals->at - created, msg);
2845         avp_print_int(s->packetloss_pv, vals->packetloss / vals->avg_samples, msg);
2846         avp_print_int(s->jitter_pv, vals->jitter / vals->avg_samples, msg);
2847         avp_print_int(s->roundtrip_pv, vals->roundtrip / vals->avg_samples, msg);
2848         avp_print_int(s->samples_pv, vals->samples / vals->avg_samples, msg);
2849 }
2850
2851 static int decode_mos_vals_dict(struct minmax_stats_vals *vals, bencode_item_t *dict, const char *key) {
2852         bencode_item_t *mos_ent;
2853
2854         mos_ent = bencode_dictionary_get_expect(dict, key, BENCODE_DICTIONARY);
2855         if (!mos_ent)
2856                 return 0;
2857
2858         vals->mos = bencode_dictionary_get_integer(mos_ent, "MOS", -1);
2859         vals->at = bencode_dictionary_get_integer(mos_ent, "reported at", -1);
2860         vals->packetloss = bencode_dictionary_get_integer(mos_ent, "packet loss", -1);
2861         vals->jitter = bencode_dictionary_get_integer(mos_ent, "jitter", -1);
2862         vals->roundtrip = bencode_dictionary_get_integer(mos_ent, "round-trip time", -1);
2863         vals->samples = bencode_dictionary_get_integer(mos_ent, "samples", -1);
2864         vals->avg_samples = 1;
2865
2866         return 1;
2867 }
2868
2869 static void parse_call_stats_1(struct minmax_mos_label_stats *mmls, bencode_item_t *dict,
2870                 struct sip_msg *msg)
2871 {
2872         long long created;
2873         str label, check;
2874         long long ssrcs[4];
2875         unsigned int num_ssrcs = 0, i;
2876         long long ssrc;
2877         char *endp;
2878         bencode_item_t *ssrc_list,
2879                        *ssrc_key,
2880                        *ssrc_dict,
2881                        *tags,
2882                        *tag_key,
2883                        *tag_dict,
2884                        *medias,
2885                        *media,
2886                        *streams,
2887                        *stream;
2888         struct minmax_stats_vals min_vals = { .mos = 100 },
2889                                  max_vals = { .mos = -1  },
2890                                  average_vals = { .avg_samples = 0 },
2891                                  vals_decoded;
2892
2893         if (!mmls->got_any_pvs)
2894                 return;
2895
2896         /* check if only a subset of info is requested */
2897         if (!mmls->label_pv)
2898                 goto ssrcs_done;
2899
2900         if (pv_printf_s(msg, mmls->label_pv, &label)) {
2901                 LM_ERR("error printing label PV\n");
2902                 return;
2903         }
2904         LM_DBG("rtpengine: looking for label '%.*s'\n", label.len, label.s);
2905
2906         /* walk through tags to find the label we're looking for */
2907         tags = bencode_dictionary_get_expect(dict, "tags", BENCODE_DICTIONARY);
2908         if (!tags)
2909                 return; /* label wanted but no tags found - return nothing */
2910         LM_DBG("rtpengine: XXX got tags\n");
2911
2912         for (tag_key = tags->child; tag_key; tag_key = tag_key->sibling->sibling) {
2913                 LM_DBG("rtpengine: XXX got tag\n");
2914                 tag_dict = tag_key->sibling;
2915                 /* compare label */
2916                 if (!bencode_dictionary_get_str(tag_dict, "label", &check))
2917                         continue;
2918                 LM_DBG("rtpengine: XXX got label %.*s\n", check.len, check.s);
2919                 if (str_cmp(&check, &label))
2920                         continue;
2921                 LM_DBG("rtpengine: XXX label match\n");
2922                 medias = bencode_dictionary_get_expect(tag_dict, "medias", BENCODE_LIST);
2923                 if (!medias)
2924                         continue;
2925                 LM_DBG("rtpengine: XXX got medias\n");
2926                 for (media = medias->child; media; media = media->sibling) {
2927                         LM_DBG("rtpengine: XXX got media\n");
2928                         streams = bencode_dictionary_get_expect(media, "streams", BENCODE_LIST);
2929                         if (!streams)
2930                                 continue;
2931                         LM_DBG("rtpengine: XXX got streams\n");
2932                         /* only check the first stream (RTP) */
2933                         stream = streams->child;
2934                         if (!stream)
2935                                 continue;
2936                         LM_DBG("rtpengine: XXX got stream type %i\n", stream->type);
2937                         LM_DBG("rtpengine: XXX stream child '%.*s'\n", (int) stream->child->iov[1].iov_len, (char *) stream->child->iov[1].iov_base);
2938                         LM_DBG("rtpengine: XXX stream child val type %i\n", stream->child->sibling->type);
2939                         if ((ssrc = bencode_dictionary_get_integer(stream, "SSRC", -1)) == -1)
2940                                 continue;
2941                         /* got a valid SSRC to watch for */
2942                         ssrcs[num_ssrcs] = ssrc;
2943                         LM_DBG("rtpengine: found SSRC '%lli' for label '%.*s'\n",
2944                                         ssrc,
2945                                         label.len, label.s);
2946                         num_ssrcs++;
2947                         /* see if we can do more */
2948                         if (num_ssrcs >= (sizeof(ssrcs) / sizeof(*ssrcs)))
2949                                 goto ssrcs_done;
2950                 }
2951         }
2952         /* if we get here, we were looking for label. see if we found one. if not, return nothing */
2953         if (num_ssrcs == 0)
2954                 return;
2955
2956 ssrcs_done:
2957         /* now look for the stats values */
2958         created = bencode_dictionary_get_integer(dict, "created", 0);
2959         ssrc_list = bencode_dictionary_get_expect(dict, "SSRC", BENCODE_DICTIONARY);
2960         if (!ssrc_list)
2961                 return;
2962
2963         for (ssrc_key = ssrc_list->child; ssrc_key; ssrc_key = ssrc_key->sibling->sibling) {
2964                 /* see if this is a SSRC we're interested in */
2965                 if (num_ssrcs == 0)
2966                         goto ssrc_ok;
2967                 if (!bencode_get_str(ssrc_key, &check))
2968                         continue;
2969                 ssrc = strtoll(check.s, &endp, 10);
2970                 for (i = 0; i < num_ssrcs; i++) {
2971                         if (ssrcs[i] != ssrc)
2972                                 continue;
2973                         /* it's a match */
2974                         LM_DBG("rtpengine: considering SSRC '%.*s'\n",
2975                                         check.len, check.s);
2976                         goto ssrc_ok;
2977                 }
2978                 /* no match */
2979                 continue;
2980
2981 ssrc_ok:
2982                 ssrc_dict = ssrc_key->sibling;
2983                 if (!ssrc_dict)
2984                         continue;
2985
2986                 if (decode_mos_vals_dict(&vals_decoded, ssrc_dict, "average MOS")) {
2987                         average_vals.avg_samples++;
2988                         average_vals.mos += vals_decoded.mos;
2989                         average_vals.packetloss += vals_decoded.packetloss;
2990                         average_vals.jitter += vals_decoded.jitter;
2991                         average_vals.roundtrip += vals_decoded.roundtrip;
2992                         average_vals.samples += vals_decoded.samples;
2993                 }
2994
2995                 if (decode_mos_vals_dict(&vals_decoded, ssrc_dict, "highest MOS")) {
2996                         if (vals_decoded.mos > max_vals.mos)
2997                                 max_vals = vals_decoded;
2998                 }
2999                 if (decode_mos_vals_dict(&vals_decoded, ssrc_dict, "lowest MOS")