Merge pull request #2326 from NGSegovia/keepalive/first_check_on_start
[sip-router] / src / modules / dmq / notification_peer.c
1 /*
2  * dmq module - distributed message queue
3  *
4  * Copyright (C) 2011 Bucur Marius - Ovidiu
5  *
6  * This file is part of Kamailio, a free SIP server.
7  *
8  * Kamailio is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version
12  *
13  * Kamailio is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License 
19  * along with this program; if not, write to the Free Software 
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21  *
22  */
23
24
25 #include "notification_peer.h"
26
27 #define MAXDMQURILEN 255
28 #define MAXDMQHOSTS 30
29
30 str dmq_notification_content_type = str_init("text/plain");
31 dmq_resp_cback_t dmq_notification_resp_callback = {&notification_resp_callback_f, 0};
32
33 int *dmq_init_callback_done = 0;
34
35
36 /**
37  * @brief add notification peer
38  */
39 int add_notification_peer()
40 {
41         dmq_peer_t not_peer;
42
43         memset(&not_peer, 0, sizeof(dmq_peer_t));
44         not_peer.callback = dmq_notification_callback_f;
45         not_peer.init_callback = NULL;
46         not_peer.description.s = "notification_peer";
47         not_peer.description.len = 17;
48         not_peer.peer_id.s = "notification_peer";
49         not_peer.peer_id.len = 17;
50         dmq_notification_peer = register_dmq_peer(&not_peer);
51         if(!dmq_notification_peer) {
52                 LM_ERR("error in register_dmq_peer\n");
53                 goto error;
54         }
55         /* add itself to the node list */
56         dmq_self_node = add_dmq_node(dmq_node_list, &dmq_server_address);
57         if(!dmq_self_node) {
58                 LM_ERR("error adding self node\n");
59                 goto error;
60         }
61         /* local node - only for self */
62         dmq_self_node->local = 1;
63         dmq_self_node->status = DMQ_NODE_ACTIVE;
64         return 0;
65 error:
66         return -1;
67 }
68
69 /**********
70 * Create IP URI
71 *
72 * INPUT:
73 *   Arg (1) = container for hosts
74 *   Arg (2) = host index
75 *   Arg (3) = host name pointer
76 *   Arg (4) = host name length
77 *   Arg (5) = parsed URI pointer
78 * OUTPUT: 0=unable to create URI
79 **********/
80
81 int create_IP_uri(char **puri_list, int host_index, char *phost, int hostlen,
82                 sip_uri_t *puri)
83
84 {
85         int pos;
86         char *plist;
87         char *perr = "notification host count reached max!\n";
88
89         /**********
90         * insert
91         * o scheme
92         * o user name/password
93         * o host
94         * o port
95         * o parameters
96         **********/
97
98         plist = puri_list[host_index];
99         if(puri->type == SIPS_URI_T) {
100                 memcpy(plist, "sips:", 5);
101                 pos = 5;
102         } else {
103                 memcpy(plist, "sip:", 4);
104                 pos = 4;
105         }
106         if(puri->user.s) {
107                 memcpy(&plist[pos], puri->user.s, puri->user.len);
108                 pos += puri->user.len;
109                 if(puri->passwd.s) {
110                         plist[pos++] = ':';
111                         memcpy(&plist[pos], puri->passwd.s, puri->passwd.len);
112                         pos += puri->passwd.len;
113                 }
114                 plist[pos++] = '@';
115         }
116         if((pos + hostlen) > MAXDMQURILEN) {
117                 LM_WARN("%s", perr);
118                 return 0;
119         }
120         memcpy(&plist[pos], phost, hostlen);
121         pos += hostlen;
122         if(puri->port_no) {
123                 if((pos + 6) > MAXDMQURILEN) {
124                         LM_WARN("%s", perr);
125                         return 0;
126                 }
127                 plist[pos++] = ':';
128                 pos += ushort2sbuf(puri->port_no, &plist[pos], 5);
129         }
130         if(puri->params.s) {
131                 if((pos + puri->params.len) >= MAXDMQURILEN) {
132                         LM_WARN("%s", perr);
133                         return 0;
134                 }
135                 plist[pos++] = ';';
136                 memcpy(&plist[pos], puri->params.s, puri->params.len);
137                 pos += puri->params.len;
138         }
139         plist[pos] = '\0';
140         return 1;
141 }
142
143 /**********
144 * Get DMQ Host List
145 *
146 * INPUT:
147 *   Arg (1) = container for hosts
148 *   Arg (2) = maximum number of hosts
149 *   Arg (3) = host string pointer
150 *   Arg (4) = parsed URI pointer
151 *   Arg (5) = search SRV flag
152 * OUTPUT: number of hosts found
153 **********/
154
155 int get_dmq_host_list(
156                 char **puri_list, int max_hosts, str *phost, sip_uri_t *puri, int bSRV)
157
158 {
159         int host_cnt, len;
160         unsigned short origport, port;
161         str pstr[1];
162         char pname[256], pIP[IP6_MAX_STR_SIZE + 2];
163         struct rdata *phead, *prec;
164         struct srv_rdata *psrv;
165
166         /**********
167         * o IP address?
168         * o make null terminated name
169         * o search SRV?
170         **********/
171
172         if(str2ip(phost) || str2ip6(phost)) {
173                 if(!create_IP_uri(puri_list, 0, phost->s, phost->len, puri)) {
174                         LM_DBG("adding DMQ node IP host %.*s=%s\n", phost->len, phost->s,
175                                         puri_list[0]);
176                         return 0;
177                 }
178                 return 1;
179         }
180         strncpy(pname, phost->s, phost->len);
181         pname[phost->len] = '\0';
182         host_cnt = 0;
183         if(bSRV) {
184                 /**********
185                 * get SRV records
186                 **********/
187
188                 port = puri->port_no;
189                 phead = get_record(pname, T_SRV, RES_ONLY_TYPE);
190                 for(prec = phead; prec; prec = prec->next) {
191                         /**********
192                         * o matching port?
193                         * o check max
194                         * o save original port
195                         * o check target
196                         * o restore port
197                         **********/
198
199                         psrv = (struct srv_rdata *)prec->rdata;
200                         if(port && (port != psrv->port)) {
201                                 continue;
202                         }
203                         if(host_cnt == max_hosts) {
204                                 LM_WARN("notification host count reached max!\n");
205                                 free_rdata_list(phead);
206                                 return host_cnt;
207                         }
208                         pstr->s = psrv->name;
209                         pstr->len = psrv->name_len;
210                         origport = puri->port_no;
211                         puri->port_no = psrv->port;
212                         host_cnt += get_dmq_host_list(&puri_list[host_cnt],
213                                         MAXDMQHOSTS - host_cnt, pstr, puri, 0);
214                         puri->port_no = origport;
215                 }
216                 if(phead)
217                         free_rdata_list(phead);
218         }
219
220         /**********
221         * get A records
222         **********/
223
224         phead = get_record(pname, T_A, RES_ONLY_TYPE);
225         for(prec = phead; prec; prec = prec->next) {
226                 /**********
227                 * o check max
228                 * o create URI
229                 **********/
230
231                 if(host_cnt == max_hosts) {
232                         LM_WARN("notification host count reached max!\n");
233                         free_rdata_list(phead);
234                         return host_cnt;
235                 }
236                 len = ip4tosbuf(
237                                 ((struct a_rdata *)prec->rdata)->ip, pIP, IP4_MAX_STR_SIZE);
238                 pIP[len] = '\0';
239                 if(create_IP_uri(puri_list, host_cnt, pIP, len, puri)) {
240                         LM_DBG("adding DMQ node A host %s=%s\n", pname,
241                                         puri_list[host_cnt]);
242                         host_cnt++;
243                 }
244         }
245         if(phead)
246                 free_rdata_list(phead);
247
248         /**********
249         * get AAAA records
250         **********/
251
252         phead = get_record(pname, T_AAAA, RES_ONLY_TYPE);
253         for(prec = phead; prec; prec = prec->next) {
254                 /**********
255                 * o check max
256                 * o create URI
257                 **********/
258
259                 if(host_cnt == max_hosts) {
260                         LM_WARN("notification host count reached max!\n");
261                         free_rdata_list(phead);
262                         return host_cnt;
263                 }
264                 pIP[0] = '[';
265                 len = ip6tosbuf(((struct aaaa_rdata *)prec->rdata)->ip6, &pIP[1],
266                                           IP6_MAX_STR_SIZE)
267                           + 1;
268                 pIP[len++] = ']';
269                 pIP[len] = '\0';
270                 if(create_IP_uri(puri_list, host_cnt, pIP, len, puri)) {
271                         LM_DBG("adding DMQ node AAAA host %s=%s\n", pname,
272                                         puri_list[host_cnt]);
273                         host_cnt++;
274                 }
275         }
276         if(phead)
277                 free_rdata_list(phead);
278         return host_cnt;
279 }
280
281 /**
282  * @brief add a server node and notify it
283  */
284 dmq_node_t *add_server_and_notify(str *paddr)
285 {
286         char puri_data[MAXDMQHOSTS * (MAXDMQURILEN + 1)];
287         char *puri_list[MAXDMQHOSTS];
288         dmq_node_t *pfirst, *pnode;
289         int host_cnt, index;
290         sip_uri_t puri[1];
291         str pstr[1];
292
293         /**********
294         * o init data area
295         * o get list of hosts
296         * o process list
297         **********/
298
299         if(!dmq_multi_notify) {
300                 pfirst = add_dmq_node(dmq_node_list, paddr);
301         } else {
302                 /**********
303                 * o init data area
304                 * o get list of hosts
305                 * o process list
306                 **********/
307
308                 for(index = 0; index < MAXDMQHOSTS; index++) {
309                         puri_list[index] = &puri_data[index * (MAXDMQURILEN + 1)];
310                 }
311                 if(parse_uri(paddr->s, paddr->len, puri) < 0) {
312                         /* this is supposed to be good but just in case... */
313                         LM_ERR("add_server_and_notify address invalid\n");
314                         return 0;
315                 }
316                 pfirst = NULL;
317                 host_cnt =
318                                 get_dmq_host_list(puri_list, MAXDMQHOSTS, &puri->host, puri, 1);
319                 for(index = 0; index < host_cnt; index++) {
320                         pstr->s = puri_list[index];
321                         pstr->len = strlen(puri_list[index]);
322                         if(!find_dmq_node_uri(dmq_node_list, pstr)) { // check for duplicates
323                                 pnode = add_dmq_node(dmq_node_list, pstr);
324                                 if(pnode && !pfirst) {
325                                         pfirst = pnode;
326                                 }
327                         }
328                 }
329         }
330
331         /**********
332         * o found at least one?
333         * o request node list
334         **********/
335
336         if(!pfirst) {
337                 LM_ERR("error adding notification node\n");
338                 return NULL;
339         }
340         if(request_nodelist(pfirst, 2) < 0) {
341                 LM_ERR("error requesting initial nodelist\n");
342                 return NULL;
343         }
344         return pfirst;
345 }
346
347 /**
348  * extract the node list from the body of a notification request SIP message
349  * the SIP request will look something like:
350  *      KDMQ sip:10.0.0.0:5062
351  *      To: ...
352  *      From: ...
353  *      Max-Forwards: ...
354  *      Content-Length: 22
355  *      
356  *      sip:host1:port1;param1=value1
357  *      sip:host2:port2;param2=value2
358  *      ...
359  */
360 int extract_node_list(dmq_node_list_t *update_list, struct sip_msg *msg)
361 {
362         int content_length, total_nodes = 0;
363         str body;
364         str tmp_uri;
365         dmq_node_t *cur = NULL;
366         dmq_node_t *ret, *find;
367         char *tmp, *end, *match;
368
369         if(!msg->content_length && (parse_headers(msg, HDR_CONTENTLENGTH_F, 0) < 0
370                                                                            || !msg->content_length)) {
371                 LM_ERR("no content length header found\n");
372                 return -1;
373         }
374         content_length = get_content_length(msg);
375         if(!content_length) {
376                 LM_DBG("content length is 0\n");
377                 return total_nodes;
378         }
379         body.s = get_body(msg);
380         body.len = content_length;
381         tmp = body.s;
382         end = body.s + body.len;
383
384         /* acquire big list lock */
385         lock_get(&update_list->lock);
386         while(tmp < end) {
387                 match = q_memchr(tmp, '\n', end - tmp);
388                 if(match) {
389                         match++;
390                 } else {
391                         /* for the last line - take all of it */
392                         match = end;
393                 }
394                 /* create the orig_uri from the parsed uri line and trim it */
395                 tmp_uri.s = tmp;
396                 tmp_uri.len = match - tmp - 1;
397                 tmp = match;
398                 /* trim the \r, \n and \0's */
399                 trim_r(tmp_uri);
400                 find = build_dmq_node(&tmp_uri, 0);
401                 if(find == NULL)
402                         return -1;
403                 ret = find_dmq_node(update_list, find);
404                 if(!ret) {
405                         LM_DBG("found new node %.*s\n", STR_FMT(&tmp_uri));
406                         cur = build_dmq_node(&tmp_uri, 1);
407                         if(!cur) {
408                                 LM_ERR("error creating new dmq node\n");
409                                 goto error;
410                         }
411                         cur->next = update_list->nodes;
412                         update_list->nodes = cur;
413                         update_list->count++;
414                         total_nodes++;
415                 } else if(!ret->local && find->uri.params.s && 
416                                         ret->status != find->status && ret->status != DMQ_NODE_DISABLED) {
417                         /* don't update the node if it is in ending state */
418                         LM_DBG("updating status on %.*s from %d to %d\n", STR_FMT(&tmp_uri),
419                                         ret->status, find->status);
420                         ret->status = find->status;
421                         total_nodes++;
422                 }
423                 destroy_dmq_node(find, 0);
424         }
425
426         /* release big list lock */
427         lock_release(&update_list->lock);
428         return total_nodes;
429 error:
430         lock_release(&update_list->lock);
431         return -1;
432 }
433
434
435 int run_init_callbacks()
436 {
437         dmq_peer_t *crt;
438
439         if(dmq_peer_list == 0) {
440                 LM_WARN("peer list is null\n");
441                 return 0;
442         }
443         crt = dmq_peer_list->peers;
444         while(crt) {
445                 if(crt->init_callback) {
446                         crt->init_callback();
447                 }
448                 crt = crt->next;
449         }
450         return 0;
451 }
452
453
454 /**
455  * @brief dmq notification callback
456  */
457 int dmq_notification_callback_f(
458                 struct sip_msg *msg, peer_reponse_t *resp, dmq_node_t *dmq_node)
459 {
460         int nodes_recv;
461         str *response_body = NULL;
462         int maxforwards = 0;
463         /* received dmqnode list */
464         LM_DBG("dmq triggered from dmq_notification_callback\n");
465
466         /* extract the maxforwards value, if any */
467         if(msg->maxforwards) {
468                 if(msg->maxforwards->parsed > 0) {
469                         /* maxfwd module has parsed and decreased the value in the msg buf */
470                         /* maxforwards->parsed contains the original value */
471                         maxforwards = (int)(long)(msg->maxforwards->parsed) - 1;
472                 } else {
473                         str2sint(&msg->maxforwards->body, &maxforwards);
474                         maxforwards--;
475                 }
476         }
477         nodes_recv = extract_node_list(dmq_node_list, msg);
478         LM_DBG("received %d new or changed nodes\n", nodes_recv);
479         response_body = build_notification_body();
480         if(response_body == NULL) {
481                 LM_ERR("no response body\n");
482                 goto error;
483         }
484         resp->content_type = dmq_notification_content_type;
485         resp->reason = dmq_200_rpl;
486         resp->body = *response_body;
487         resp->resp_code = 200;
488
489         /* if we received any new nodes tell about them to the others */
490         if(nodes_recv > 0 && maxforwards > 0) {
491                 /* maxforwards is set to 0 so that the message is will not be in a spiral */
492                 bcast_dmq_message(dmq_notification_peer, response_body, 0,
493                                 &dmq_notification_resp_callback, maxforwards,
494                                 &dmq_notification_content_type);
495         }
496         pkg_free(response_body);
497         if(dmq_init_callback_done && !*dmq_init_callback_done) {
498                 *dmq_init_callback_done = 1;
499                 run_init_callbacks();
500         }
501         return 0;
502 error:
503         return -1;
504 }
505
506 /**
507  * builds the body of a notification message from the list of servers 
508  * the result will look something like:
509  * sip:host1:port1;param1=value1\r\n
510  * sip:host2:port2;param2=value2\r\n
511  * sip:host3:port3;param3=value3
512  */
513 str *build_notification_body()
514 {
515         /* the length of the current line describing the server */
516         int slen;
517         /* the current length of the body */
518         int clen = 0;
519         dmq_node_t *cur_node = NULL;
520         str *body;
521         body = pkg_malloc(sizeof(str));
522         if(body == NULL) {
523                 LM_ERR("no more pkg\n");
524                 return NULL;
525         }
526         memset(body, 0, sizeof(str));
527         /* we allocate a chunk of data for the body */
528         body->len = NBODY_LEN;
529         body->s = pkg_malloc(body->len);
530         if(body->s == NULL) {
531                 LM_ERR("no more pkg\n");
532                 pkg_free(body);
533                 return NULL;
534         }
535         /* we add each server to the body - each on a different line */
536         lock_get(&dmq_node_list->lock);
537         cur_node = dmq_node_list->nodes;
538         while(cur_node) {
539                 if (cur_node->local || cur_node->status == DMQ_NODE_ACTIVE) {
540                         LM_DBG("body_len = %d - clen = %d\n", body->len, clen);
541                         /* body->len - clen - 2 bytes left to write - including the \r\n */
542                         slen = build_node_str(cur_node, body->s + clen, body->len - clen - 2);
543                         if(slen < 0) {
544                                 LM_ERR("cannot build_node_string\n");
545                                 goto error;
546                         }
547                         clen += slen;
548                         body->s[clen++] = '\r';
549                         body->s[clen++] = '\n';
550                 }
551                 cur_node = cur_node->next;
552         }
553         lock_release(&dmq_node_list->lock);
554         body->len = clen;
555         return body;
556 error:
557         lock_release(&dmq_node_list->lock);
558         pkg_free(body->s);
559         pkg_free(body);
560         return NULL;
561 }
562
563 /**
564  * @brief request node list
565  */
566 int request_nodelist(dmq_node_t *node, int forward)
567 {
568         str *body;
569         int ret;
570         body = build_notification_body();
571         if(body == NULL) {
572                 LM_ERR("no notification body\n");
573                 return -1;
574         }
575         ret = bcast_dmq_message1(dmq_notification_peer, body, NULL,
576                         &dmq_notification_resp_callback, forward,
577                         &dmq_notification_content_type, 1);
578         pkg_free(body->s);
579         pkg_free(body);
580         return ret;
581 }
582
583 /**
584  * @brief notification response callback
585  */
586 int notification_resp_callback_f(
587                 struct sip_msg *msg, int code, dmq_node_t *node, void *param)
588 {
589         int ret;
590         int nodes_recv;
591
592         LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
593         if(code == 200) {
594                 /* be sure that the node that answered is in active state */
595                 update_dmq_node_status(dmq_node_list, node, DMQ_NODE_ACTIVE);
596                 nodes_recv = extract_node_list(dmq_node_list, msg);
597                 LM_DBG("received %d new or changed nodes\n", nodes_recv);
598                 if(dmq_init_callback_done && !*dmq_init_callback_done) {
599                         *dmq_init_callback_done = 1;
600                         run_init_callbacks();
601                 }
602         } else if(code == 408) {
603                 if(STR_EQ(node->orig_uri, dmq_notification_address)) {
604                         LM_ERR("not deleting notification_peer\n");
605                         update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);  
606                         return 0;
607                 }
608                 if (node->status == DMQ_NODE_DISABLED) {
609                         /* deleting node - the server did not respond */
610                         LM_ERR("deleting server %.*s because of failed request\n",
611                                 STR_FMT(&node->orig_uri));
612                         ret = del_dmq_node(dmq_node_list, node);
613                         LM_DBG("del_dmq_node returned %d\n", ret);
614                 } else {
615                         /* put the node in disabled state and wait for the next ping before deleting it */
616                         update_dmq_node_status(dmq_node_list, node, DMQ_NODE_DISABLED);
617                 }
618         }
619         return 0;
620 }