keepalive: early start of OPTIONS checking
[sip-router] / src / modules / dmq / dmq_funcs.c
1 /*
2  * dmq module - distributed message queue
3  *
4  * Copyright (C) 2011 Bucur Marius - Ovidiu
5  *
6  * This file is part of Kamailio, a free SIP server.
7  *
8  * Kamailio is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version
12  *
13  * Kamailio is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License 
19  * along with this program; if not, write to the Free Software 
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21  */
22
23 #include "dmq_funcs.h"
24 #include "notification_peer.h"
25 #include "../../core/dset.h"
26
27 /**
28  * @brief register a DMQ peer
29  */
30 dmq_peer_t *register_dmq_peer(dmq_peer_t *peer)
31 {
32         dmq_peer_t *new_peer;
33         if(!peer_list) {
34                 LM_ERR("peer list not initialized\n");
35                 return NULL;
36         }
37         lock_get(&peer_list->lock);
38         if(search_peer_list(peer_list, peer)) {
39                 LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len,
40                                 peer->peer_id.s, peer->description.len, peer->description.s);
41                 lock_release(&peer_list->lock);
42                 return NULL;
43         }
44         new_peer = add_peer(peer_list, peer);
45         lock_release(&peer_list->lock);
46         return new_peer;
47 }
48
49 /**
50  * @brief dmq tm callback
51  */
52 void dmq_tm_callback(struct cell *t, int type, struct tmcb_params *ps)
53 {
54         dmq_cback_param_t *cb_param;
55
56         cb_param = (dmq_cback_param_t *)(*ps->param);
57
58         if(cb_param == NULL)
59                 return;
60
61         LM_DBG("dmq_tm_callback start\n");
62         if(cb_param->resp_cback.f) {
63                 if(cb_param->resp_cback.f(ps->rpl, ps->code, cb_param->node,
64                                    cb_param->resp_cback.param)
65                                 < 0) {
66                         LM_ERR("error in response callback\n");
67                 }
68         }
69         LM_DBG("dmq_tm_callback done\n");
70         shm_free_node(cb_param->node);
71         shm_free(cb_param);
72         *ps->param = NULL;
73 }
74
75 int build_uri_str(str *username, struct sip_uri *uri, str *from)
76 {
77         /* sip:user@host:port */
78         int from_len;
79
80         if(!uri->host.s || !uri->host.len) {
81                 LM_ERR("no host in uri\n");
82                 return -1;
83         }
84         if(!username->s || !username->len) {
85                 LM_ERR("no username given\n");
86                 return -1;
87         }
88
89         from_len = username->len + uri->host.len + uri->port.len + 10;
90         from->s = pkg_malloc(from_len);
91         if(from->s == NULL) {
92                 LM_ERR("no more pkg\n");
93                 return -1;
94         }
95         from->len = 0;
96
97         memcpy(from->s, "sip:", 4);
98         from->len += 4;
99
100         memcpy(from->s + from->len, username->s, username->len);
101         from->len += username->len;
102
103         memcpy(from->s + from->len, "@", 1);
104         from->len += 1;
105
106         memcpy(from->s + from->len, uri->host.s, uri->host.len);
107         from->len += uri->host.len;
108
109         if(uri->port.s && uri->port.len) {
110                 memcpy(from->s + from->len, ":", 1);
111                 from->len += 1;
112                 memcpy(from->s + from->len, uri->port.s, uri->port.len);
113                 from->len += uri->port.len;
114         }
115         return 0;
116 }
117
118 /* Checks if the request (sip_msg_t* msg) comes from another DMQ node based on source IP. */
119 int is_from_remote_node(sip_msg_t *msg)
120 {
121         ip_addr_t *ip;
122         dmq_node_t *node;
123         int result = -1;
124
125         ip = &msg->rcv.src_ip;
126
127         lock_get(&node_list->lock);
128         node = node_list->nodes;
129
130         while(node) {
131                 if(!node->local && ip_addr_cmp(ip, &node->ip_address)) {
132                         result = 1;
133                         goto done;
134                 }
135                 node = node->next;
136         }
137 done:
138         lock_release(&node_list->lock);
139         return result;
140 }
141
142 /**
143  * @brief broadcast a dmq message
144  *
145  * peer - the peer structure on behalf of which we are sending
146  * body - the body of the message
147  * except - we do not send the message to this node
148  * resp_cback - a response callback that gets called when the transaction is complete
149  */
150 int bcast_dmq_message1(dmq_peer_t *peer, str *body, dmq_node_t *except,
151                 dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type,
152                 int incl_inactive)
153 {
154         dmq_node_t *node;
155
156         lock_get(&node_list->lock);
157         node = node_list->nodes;
158         while(node) {
159                 /* we do not send the message to the following:
160                  *   - the except node
161                  *   - itself
162                  *   - any inactive nodes (unless incl_inactive is specified)
163                  */
164                 if((except && cmp_dmq_node(node, except)) || node->local
165                                 || (node->status != DMQ_NODE_ACTIVE && !incl_inactive)) {
166                         LM_DBG("skipping node %.*s\n", STR_FMT(&node->orig_uri));
167                         node = node->next;
168                         continue;
169                 }
170                 if(dmq_send_message(
171                                    peer, body, node, resp_cback, max_forwards, content_type)
172                                 < 0) {
173                         LM_ERR("error sending dmq message\n");
174                         goto error;
175                 }
176                 node = node->next;
177         }
178         lock_release(&node_list->lock);
179         return 0;
180 error:
181         lock_release(&node_list->lock);
182         return -1;
183 }
184
185 int bcast_dmq_message(dmq_peer_t *peer, str *body, dmq_node_t *except,
186                 dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type)
187 {
188         return bcast_dmq_message1(peer, body, except, resp_cback, max_forwards, content_type, 0);
189 }
190
191 /**
192  * @brief send a dmq message
193  *
194  * peer - the peer structure on behalf of which we are sending
195  * body - the body of the message
196  * node - we send the message to this node
197  * resp_cback - a response callback that gets called when the transaction is complete
198  */
199 int dmq_send_message(dmq_peer_t *peer, str *body, dmq_node_t *node,
200                 dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type)
201 {
202         uac_req_t uac_r;
203         str str_hdr = {0, 0};
204         str from = {0, 0}, to = {0, 0};
205         dmq_cback_param_t *cb_param = NULL;
206         int result = 0;
207         int len = 0;
208
209         if(!content_type) {
210                 LM_ERR("content-type is null\n");
211                 return -1;
212         }
213         /* add Max-Forwards and Content-Type headers */
214         str_hdr.len = 34 + content_type->len + (CRLF_LEN * 2);
215         str_hdr.s = pkg_malloc(str_hdr.len);
216         if(str_hdr.s == NULL) {
217                 LM_ERR("no more pkg\n");
218                 return -1;
219         }
220         len += sprintf(str_hdr.s, "Max-Forwards: %d" CRLF "Content-Type: %.*s" CRLF,
221                         max_forwards, content_type->len, content_type->s);
222         str_hdr.len = len;
223
224         cb_param = shm_malloc(sizeof(*cb_param));
225         if(cb_param == NULL) {
226                 LM_ERR("no more shm for building callback parameter\n");
227                 goto error;
228         }
229         memset(cb_param, 0, sizeof(*cb_param));
230         cb_param->resp_cback = *resp_cback;
231         cb_param->node = shm_dup_node(node);
232         if(cb_param->node == NULL) {
233                 LM_ERR("error building callback parameter\n");
234                 goto error;
235         }
236
237         if(build_uri_str(&peer->peer_id, &dmq_server_uri, &from) < 0) {
238                 LM_ERR("error building from string [username %.*s]\n",
239                                 STR_FMT(&peer->peer_id));
240                 goto error;
241         }
242         if(build_uri_str(&peer->peer_id, &node->uri, &to) < 0) {
243                 LM_ERR("error building to string\n");
244                 goto error;
245         }
246
247         set_uac_req(&uac_r, &dmq_request_method, &str_hdr, body, NULL,
248                         TMCB_LOCAL_COMPLETED, dmq_tm_callback, (void *)cb_param);
249         uac_r.ssock = &dmq_server_socket;
250
251         result = tmb.t_request(&uac_r, &to, &to, &from, NULL);
252         if(result < 0) {
253                 LM_ERR("error in tmb.t_request_within\n");
254                 goto error;
255         }
256         pkg_free(str_hdr.s);
257         pkg_free(from.s);
258         pkg_free(to.s);
259         return 0;
260 error:
261         pkg_free(str_hdr.s);
262         if(from.s != NULL)
263                 pkg_free(from.s);
264         if(to.s != NULL)
265                 pkg_free(to.s);
266         if(cb_param) {
267                 if(cb_param->node)
268                         destroy_dmq_node(cb_param->node, 1);
269                 shm_free(cb_param);
270         }
271         return -1;
272 }
273
274 /**
275  * @brief kemi function for sending dmq message
276  */
277 int ki_dmq_send_message(sip_msg_t *msg, str *peer_str, str *to_str,
278                 str *body_str, str *ct_str)
279 {
280         LM_DBG("cfg_dmq_send_message: %.*s - %.*s - %.*s - %.*s\n", peer_str->len,
281                         peer_str->s, to_str->len, to_str->s, body_str->len, body_str->s,
282                         ct_str->len, ct_str->s);
283
284         dmq_peer_t *destination_peer = find_peer(*peer_str);
285         if(!destination_peer) {
286                 LM_INFO("cannot find peer %.*s\n", peer_str->len, peer_str->s);
287                 dmq_peer_t new_peer;
288                 new_peer.callback = empty_peer_callback;
289                 new_peer.description.s = "";
290                 new_peer.description.len = 0;
291                 new_peer.peer_id = *peer_str;
292                 destination_peer = register_dmq_peer(&new_peer);
293                 if(!destination_peer) {
294                         LM_ERR("error in register_dmq_peer\n");
295                         goto error;
296                 }
297         }
298         dmq_node_t *to_dmq_node = find_dmq_node_uri(node_list, to_str);
299         if(!to_dmq_node) {
300                 LM_ERR("cannot find dmq_node: %.*s\n", to_str->len, to_str->s);
301                 goto error;
302         }
303         if(dmq_send_message(destination_peer, body_str, to_dmq_node,
304                            &notification_callback, 1, ct_str)
305                         < 0) {
306                 LM_ERR("cannot send dmq message\n");
307                 goto error;
308         }
309         return 1;
310 error:
311         return -1;
312 }
313
314 /**
315  * @brief config file function for sending dmq message
316  */
317 int cfg_dmq_send_message(struct sip_msg *msg, char *peer, char *to, char *body,
318                 char *content_type)
319 {
320         str peer_str;
321         str to_str;
322         str body_str;
323         str ct_str;
324
325         if(get_str_fparam(&peer_str, msg, (fparam_t *)peer) < 0) {
326                 LM_ERR("cannot get peer value\n");
327                 return -1;
328         }
329         if(get_str_fparam(&to_str, msg, (fparam_t *)to) < 0) {
330                 LM_ERR("cannot get dst value\n");
331                 return -1;
332         }
333         if(get_str_fparam(&body_str, msg, (fparam_t *)body) < 0) {
334                 LM_ERR("cannot get body value\n");
335                 return -1;
336         }
337         if(get_str_fparam(&ct_str, msg, (fparam_t *)content_type) < 0) {
338                 LM_ERR("cannot get content-type value\n");
339                 return -1;
340         }
341
342         return ki_dmq_send_message(msg, &peer_str, &to_str, &body_str, &ct_str);
343 }
344
345 /**
346  * @brief config file function for broadcasting dmq message
347  */
348 int ki_dmq_bcast_message(sip_msg_t *msg, str *peer_str, str *body_str,
349                 str *ct_str)
350 {
351         LM_DBG("cfg_dmq_bcast_message: %.*s - %.*s - %.*s\n", peer_str->len,
352                         peer_str->s, body_str->len, body_str->s, ct_str->len, ct_str->s);
353
354         dmq_peer_t *destination_peer = find_peer(*peer_str);
355         if(!destination_peer) {
356                 LM_INFO("cannot find peer %.*s - adding it.\n", peer_str->len,
357                                 peer_str->s);
358                 dmq_peer_t new_peer;
359                 new_peer.callback = empty_peer_callback;
360                 new_peer.description.s = "";
361                 new_peer.description.len = 0;
362                 new_peer.peer_id = *peer_str;
363                 destination_peer = register_dmq_peer(&new_peer);
364                 if(!destination_peer) {
365                         LM_ERR("error in register_dmq_peer\n");
366                         goto error;
367                 }
368         }
369         if(bcast_dmq_message(destination_peer, body_str, 0, &notification_callback,
370                            1, ct_str) < 0) {
371                 LM_ERR("cannot send dmq message\n");
372                 goto error;
373         }
374         return 1;
375 error:
376         return -1;
377 }
378
379 /**
380  * @brief config file function for broadcasting dmq message
381  */
382 int cfg_dmq_bcast_message(sip_msg_t *msg, char *peer, char *body,
383                 char *content_type)
384 {
385         str peer_str;
386         str body_str;
387         str ct_str;
388
389         if(get_str_fparam(&peer_str, msg, (fparam_t *)peer) < 0) {
390                 LM_ERR("cannot get peer value\n");
391                 return -1;
392         }
393         if(get_str_fparam(&body_str, msg, (fparam_t *)body) < 0) {
394                 LM_ERR("cannot get body value\n");
395                 return -1;
396         }
397         if(get_str_fparam(&ct_str, msg, (fparam_t *)content_type) < 0) {
398                 LM_ERR("cannot get content-type value\n");
399                 return -1;
400         }
401
402         return ki_dmq_bcast_message(msg, &peer_str, &body_str, &ct_str);
403 }
404
405 /**
406  * @brief config file function for replicating SIP message to all nodes (wraps t_replicate)
407  */
408 int ki_dmq_t_replicate_mode(struct sip_msg *msg, int mode)
409 {
410         dmq_node_t *node;
411         struct socket_info *sock;
412         int first = 1;
413
414         /* avoid loops - do not replicate if message has come from another node
415          * (override if optional parameter is set)
416          */
417         if(mode==0      && is_from_remote_node(msg) > 0) {
418                 LM_DBG("message is from another node - skipping replication\n");
419                 return -1;
420         }
421
422         /* TODO - backup/restore original send socket */
423         sock = lookup_local_socket(&dmq_server_socket);
424         if(sock) {
425                 set_force_socket(msg, sock);
426         }
427
428         lock_get(&node_list->lock);
429         node = node_list->nodes;
430         while(node) {
431                 /* we do not send the message to the following:
432                  *   - ourself
433                  *   - any inactive nodes
434                  */
435                 if(node->local || node->status != DMQ_NODE_ACTIVE) {
436                         LM_DBG("skipping node %.*s\n", STR_FMT(&node->orig_uri));
437                         node = node->next;
438                         continue;
439                 }
440
441                 if(!first) {
442                         if(append_branch(msg, 0, 0, 0, Q_UNSPECIFIED, 0, sock, 0, 0, 0, 0)
443                                         == -1) {
444                                 LM_ERR("failed to append a branch\n");
445                                 node = node->next;
446                                 continue;
447                         }
448                 } else {
449                         first = 0;
450                 }
451
452                 if(tmb.t_replicate(msg, &node->orig_uri) < 0) {
453                         LM_ERR("error calling t_replicate\n");
454                         goto error;
455                 }
456
457                 node = node->next;
458         }
459         lock_release(&node_list->lock);
460         return 0;
461 error:
462         lock_release(&node_list->lock);
463         return -1;
464 }
465
466 int ki_dmq_t_replicate(sip_msg_t *msg)
467 {
468         return ki_dmq_t_replicate_mode(msg, 0);
469 }
470
471 /**
472  * @brief config file function for replicating SIP message to all nodes (wraps t_replicate)
473  */
474 int cfg_dmq_t_replicate(struct sip_msg *msg, char *s, char *p2)
475 {
476         int i = 0;
477         if(s!=NULL && get_int_fparam(&i, msg, (fparam_t *)s) < 0) {
478                 LM_ERR("failed to get parameter value\n");
479                 return -1;
480         }
481         return ki_dmq_t_replicate_mode(msg, i);
482 }
483
484 /*
485  * @brief config file function to check if received message is from another DMQ node based on source IP
486  */
487 int cfg_dmq_is_from_node(struct sip_msg *msg, char *p1, char *p2)
488 {
489         return is_from_remote_node(msg);
490 }
491
492 /**
493  * @brief pings the servers in the nodelist
494  *
495  * if the server does not reply to the ping, it is removed from the list
496  * the ping messages are actualy notification requests
497  * this way the ping will have two uses:
498  *   - checks if the servers in the list are up and running
499  *   - updates the list of servers from the other nodes
500  */
501 void ping_servers(unsigned int ticks, void *param)
502 {
503         str *body;
504         int ret;
505         LM_DBG("ping_servers\n");
506
507         if(!node_list->nodes
508                         || (node_list->nodes->local && !node_list->nodes->next)) {
509                 LM_DBG("node list is empty - attempt to rebuild from notification "
510                            "address\n");
511                 *dmq_init_callback_done = 0;
512                 if(dmq_notification_address.s) {
513                         notification_node =
514                                         add_server_and_notify(&dmq_notification_address);
515                         if(!notification_node) {
516                                 LM_ERR("cannot retrieve initial nodelist from %.*s\n",
517                                                 STR_FMT(&dmq_notification_address));
518                         }
519                 } else {
520                         LM_ERR("no notification address");
521                 }
522                 return;
523         }
524
525         body = build_notification_body();
526         if(!body) {
527                 LM_ERR("could not build notification body\n");
528                 return;
529         }
530         ret = bcast_dmq_message1(dmq_notification_peer, body, NULL,
531                         &notification_callback, 1, &notification_content_type, 1);
532         pkg_free(body->s);
533         pkg_free(body);
534         if(ret < 0) {
535                 LM_ERR("error broadcasting message\n");
536         }
537 }