Merge pull request #2326 from NGSegovia/keepalive/first_check_on_start
[sip-router] / src / modules / dmq / dmq.c
1 /*
2  * dmq module - distributed message queue
3  *
4  * Copyright (C) 2011 Bucur Marius - Ovidiu
5  *
6  * This file is part of Kamailio, a free SIP server.
7  *
8  * Kamailio is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version
12  *
13  * Kamailio is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21  *
22  */
23
24 #include <stdio.h>
25 #include <string.h>
26 #include <stdlib.h>
27 #include <sys/types.h>
28 #include <sys/ipc.h>
29 #include <unistd.h>
30 #include <fcntl.h>
31 #include <time.h>
32
33 #include "../../core/ut.h"
34 #include "../../core/mem/mem.h"
35 #include "../../core/mem/shm_mem.h"
36 #include "../../core/usr_avp.h"
37 #include "../../core/pt.h"
38 #include "../../core/hashes.h"
39 #include "../../core/mod_fix.h"
40 #include "../../core/rpc_lookup.h"
41 #include "../../core/kemi.h"
42
43 #include "dmq.h"
44 #include "dmq_funcs.h"
45 #include "bind_dmq.h"
46 #include "message.h"
47 #include "notification_peer.h"
48 #include "dmqnode.h"
49
50 MODULE_VERSION
51
52 int dmq_startup_time = 0;
53 int dmq_pid = 0;
54
55 /* module parameters */
56 int dmq_num_workers = DEFAULT_NUM_WORKERS;
57 int dmq_worker_usleep = 0;
58 str dmq_server_address = {0, 0};
59 str dmq_server_socket = {0, 0};
60 sip_uri_t dmq_server_uri = {0};
61
62 str dmq_notification_address = {0, 0};
63 int dmq_multi_notify = 0;
64 sip_uri_t dmq_notification_uri = {0};
65 int dmq_ping_interval = 60;
66
67 /* TM bind */
68 struct tm_binds tmb = {0};
69 /* SL API structure */
70 sl_api_t slb = {0};
71
72 /** module variables */
73 str dmq_request_method = str_init("KDMQ");
74 dmq_worker_t *dmq_workers = NULL;
75 dmq_peer_list_t *dmq_peer_list = 0;
76 /* the list of dmq servers */
77 dmq_node_list_t *dmq_node_list = NULL;
78 /* dmq module is a peer itself for receiving notifications regarding nodes */
79 dmq_peer_t *dmq_notification_peer = NULL;
80
81 /** module functions */
82 static int mod_init(void);
83 static int child_init(int);
84 static void destroy(void);
85
86 /* clang-format off */
87 static cmd_export_t cmds[] = {
88         {"dmq_handle_message", (cmd_function)dmq_handle_message, 0,
89                 0, 0, REQUEST_ROUTE},
90         {"dmq_handle_message", (cmd_function)w_dmq_handle_message, 1,
91                 fixup_int_1, 0, REQUEST_ROUTE},
92         {"dmq_send_message", (cmd_function)cfg_dmq_send_message, 4,
93                 fixup_spve_all, 0, ANY_ROUTE},
94         {"dmq_bcast_message", (cmd_function)cfg_dmq_bcast_message, 3,
95                 fixup_spve_all, 0, ANY_ROUTE},
96         {"dmq_t_replicate", (cmd_function)cfg_dmq_t_replicate, 0,
97                 0, 0, REQUEST_ROUTE},
98         {"dmq_t_replicate", (cmd_function)cfg_dmq_t_replicate, 1,
99                 fixup_spve_null, 0, REQUEST_ROUTE},
100         {"dmq_is_from_node", (cmd_function)cfg_dmq_is_from_node, 0,
101                 0, 0, REQUEST_ROUTE},
102         {"bind_dmq", (cmd_function)bind_dmq, 0,
103                 0, 0, 0},
104         {0, 0, 0, 0, 0, 0}
105 };
106
107 static param_export_t params[] = {
108         {"num_workers", INT_PARAM, &dmq_num_workers},
109         {"ping_interval", INT_PARAM, &dmq_ping_interval},
110         {"server_address", PARAM_STR, &dmq_server_address},
111         {"notification_address", PARAM_STR, &dmq_notification_address},
112         {"multi_notify", INT_PARAM, &dmq_multi_notify},
113         {"worker_usleep", INT_PARAM, &dmq_worker_usleep},
114         {0, 0, 0}
115 };
116
117 static rpc_export_t rpc_methods[];
118
119 /** module exports */
120 struct module_exports exports = {
121         "dmq",                          /* module name */
122         DEFAULT_DLFLAGS,        /* dlopen flags */
123         cmds,                           /* exported functions */
124         params,                         /* exported parameters */
125         0,                                      /* RPC method exports */
126         0,                                      /* exported pseudo-variables */
127         0,                                      /* response handling function */
128         mod_init,                       /* module initialization function */
129         child_init,                     /* per-child init function */
130         destroy                         /* module destroy function */
131 };
132 /* clang-format on */
133
134
135 static int make_socket_str_from_uri(struct sip_uri *uri, str *socket)
136 {
137         if(!uri->host.s || !uri->host.len) {
138                 LM_ERR("no host in uri\n");
139                 return -1;
140         }
141
142         socket->len = uri->host.len + uri->port.len + 6;
143         socket->s = pkg_malloc(socket->len);
144         if(socket->s == NULL) {
145                 LM_ERR("no more pkg\n");
146                 return -1;
147         }
148         memcpy(socket->s, "udp:", 4);
149         socket->len = 4;
150
151         memcpy(socket->s + socket->len, uri->host.s, uri->host.len);
152         socket->len += uri->host.len;
153
154         if(uri->port.s && uri->port.len) {
155                 socket->s[socket->len++] = ':';
156                 memcpy(socket->s + socket->len, uri->port.s, uri->port.len);
157                 socket->len += uri->port.len;
158         }
159         socket->s[socket->len] = '\0';
160
161         return 0;
162 }
163
164
165 /**
166  * init module function
167  */
168 static int mod_init(void)
169 {
170         /* bind the SL API */
171         if(sl_load_api(&slb) != 0) {
172                 LM_ERR("cannot bind to SL API\n");
173                 return -1;
174         }
175
176         /* load all TM stuff */
177         if(load_tm_api(&tmb) == -1) {
178                 LM_ERR("can't load tm functions. TM module probably not loaded\n");
179                 return -1;
180         }
181
182         /* load peer list - the list containing the module callbacks for dmq */
183         dmq_peer_list = init_peer_list();
184         if(dmq_peer_list == NULL) {
185                 LM_ERR("cannot initialize peer list\n");
186                 return -1;
187         }
188
189         /* load the dmq node list - the list containing the dmq servers */
190         dmq_node_list = init_dmq_node_list();
191         if(dmq_node_list == NULL) {
192                 LM_ERR("cannot initialize node list\n");
193                 return -1;
194         }
195
196         if(rpc_register_array(rpc_methods) != 0) {
197                 LM_ERR("failed to register RPC commands\n");
198                 return -1;
199         }
200
201         /* register worker processes - add one because of the ping process */
202         register_procs(dmq_num_workers);
203
204         /* check server_address and notification_address are not empty and correct */
205         if(parse_uri(dmq_server_address.s, dmq_server_address.len, &dmq_server_uri)
206                         < 0) {
207                 LM_ERR("server address invalid\n");
208                 return -1;
209         }
210
211         if(parse_uri(dmq_notification_address.s, dmq_notification_address.len,
212                            &dmq_notification_uri)
213                         < 0) {
214                 LM_ERR("notification address invalid\n");
215                 return -1;
216         }
217
218         /* create socket string out of the server_uri */
219         if(make_socket_str_from_uri(&dmq_server_uri, &dmq_server_socket) < 0) {
220                 LM_ERR("failed to create socket out of server_uri\n");
221                 return -1;
222         }
223         if(lookup_local_socket(&dmq_server_socket) == NULL) {
224                 LM_ERR("server_uri is not a socket the proxy is listening on\n");
225                 return -1;
226         }
227
228         /* allocate workers array */
229         dmq_workers = shm_malloc(dmq_num_workers * sizeof(dmq_worker_t));
230         if(dmq_workers == NULL) {
231                 LM_ERR("error in shm_malloc\n");
232                 return -1;
233         }
234         memset(dmq_workers, 0, dmq_num_workers * sizeof(dmq_worker_t));
235
236         dmq_init_callback_done = shm_malloc(sizeof(int));
237         if(!dmq_init_callback_done) {
238                 LM_ERR("no more shm\n");
239                 return -1;
240         }
241         *dmq_init_callback_done = 0;
242
243         /**
244          * add the dmq notification peer.
245          * the dmq is a peer itself so that it can receive node notifications
246          */
247         if(add_notification_peer() < 0) {
248                 LM_ERR("cannot add notification peer\n");
249                 return -1;
250         }
251
252         dmq_startup_time = (int)time(NULL);
253
254         /**
255          * add the ping timer
256          * it pings the servers once in a while so that we know which failed
257          */
258         if(dmq_ping_interval < MIN_PING_INTERVAL) {
259                 dmq_ping_interval = MIN_PING_INTERVAL;
260         }
261         if(register_timer(ping_servers, 0, dmq_ping_interval) < 0) {
262                 LM_ERR("cannot register timer callback\n");
263                 return -1;
264         }
265
266         return 0;
267 }
268
269 /**
270  * initialize children
271  */
272 static int child_init(int rank)
273 {
274         int i, newpid;
275
276         if(rank == PROC_INIT) {
277                 for(i = 0; i < dmq_num_workers; i++) {
278                         if (init_worker(&dmq_workers[i]) < 0) {
279                                 LM_ERR("failed to init struct for worker[%d]\n", i);
280                                 return -1;
281                         }
282                 }
283                 return 0;
284         }
285
286         if(rank == PROC_MAIN) {
287                 /* fork worker processes */
288                 for(i = 0; i < dmq_num_workers; i++) {
289                         LM_DBG("starting worker process %d\n", i);
290                         newpid = fork_process(PROC_RPC, "DMQ WORKER", 0);
291                         if(newpid < 0) {
292                                 LM_ERR("failed to fork worker process %d\n", i);
293                                 return -1;
294                         } else if(newpid == 0) {
295                                 /* child - this will loop forever */
296                                 worker_loop(i);
297                         } else {
298                                 dmq_workers[i].pid = newpid;
299                         }
300                 }
301                 /* notification_node - the node from which the Kamailio instance
302                  * gets the server list on startup.
303                  * the address is given as a module parameter in dmq_notification_address
304                  * the module MUST have this parameter if the Kamailio instance is not
305                  * a master in this architecture
306                  */
307                 if(dmq_notification_address.s) {
308                         dmq_notification_node =
309                                         add_server_and_notify(&dmq_notification_address);
310                         if(!dmq_notification_node) {
311                                 LM_WARN("cannot retrieve initial nodelist from %.*s\n",
312                                                 STR_FMT(&dmq_notification_address));
313                         }
314                 }
315                 return 0;
316         }
317         if(rank == PROC_TCP_MAIN) {
318                 /* do nothing for the main process */
319                 return 0;
320         }
321
322         dmq_pid = my_pid();
323         return 0;
324 }
325
326 /*
327  * destroy function
328  */
329 static void destroy(void)
330 {
331         /* TODO unregister dmq node, free resources */
332         if(dmq_notification_address.s && dmq_notification_node && dmq_self_node) {
333                 LM_DBG("unregistering node %.*s\n", STR_FMT(&dmq_self_node->orig_uri));
334                 dmq_self_node->status = DMQ_NODE_DISABLED;
335                 request_nodelist(dmq_notification_node, 1);
336         }
337         if(dmq_server_socket.s) {
338                 pkg_free(dmq_server_socket.s);
339         }
340         if(dmq_init_callback_done) {
341                 shm_free(dmq_init_callback_done);
342         }
343 }
344
345 static void dmq_rpc_list_nodes(rpc_t *rpc, void *c)
346 {
347         void *h;
348         dmq_node_t *cur = dmq_node_list->nodes;
349         char ip[IP6_MAX_STR_SIZE + 1];
350
351         while(cur) {
352                 memset(ip, 0, IP6_MAX_STR_SIZE + 1);
353                 ip_addr2sbuf(&cur->ip_address, ip, IP6_MAX_STR_SIZE);
354                 if(rpc->add(c, "{", &h) < 0)
355                         goto error;
356                 if(rpc->struct_add(h, "SSsSdd", "host", &cur->uri.host, "port",
357                                    &cur->uri.port, "resolved_ip", ip, "status",
358                                    dmq_get_status_str(cur->status), "last_notification",
359                                    cur->last_notification, "local", cur->local)
360                                 < 0)
361                         goto error;
362                 cur = cur->next;
363         }
364         return;
365 error:
366         LM_ERR("Failed to add item to RPC response\n");
367         rpc->fault(c, 500, "Server failure");
368         return;
369 }
370
371 static const char *dmq_rpc_list_nodes_doc[2] = {"Print all nodes", 0};
372
373 static rpc_export_t rpc_methods[] = {
374         {"dmq.list_nodes", dmq_rpc_list_nodes, dmq_rpc_list_nodes_doc, RET_ARRAY},
375         {0, 0, 0, 0}
376 };
377
378 /**
379  *
380  */
381 /* clang-format off */
382 static sr_kemi_t sr_kemi_dmq_exports[] = {
383         { str_init("dmq"), str_init("handle_message"),
384                 SR_KEMIP_INT, ki_dmq_handle_message,
385                 { SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE,
386                         SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
387         },
388         { str_init("dmq"), str_init("handle_message_rc"),
389                 SR_KEMIP_INT, ki_dmq_handle_message_rc,
390                 { SR_KEMIP_INT, SR_KEMIP_NONE, SR_KEMIP_NONE,
391                         SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
392         },
393         { str_init("dmq"), str_init("is_from_node"),
394                 SR_KEMIP_INT, is_from_remote_node,
395                 { SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE,
396                         SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
397         },
398         { str_init("dmq"), str_init("t_replicate"),
399                 SR_KEMIP_INT, ki_dmq_t_replicate,
400                 { SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE,
401                         SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
402         },
403         { str_init("dmq"), str_init("t_replicate_mode"),
404                 SR_KEMIP_INT, ki_dmq_t_replicate_mode,
405                 { SR_KEMIP_INT, SR_KEMIP_NONE, SR_KEMIP_NONE,
406                         SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
407         },
408         { str_init("dmq"), str_init("send_message"),
409                 SR_KEMIP_INT, ki_dmq_send_message,
410                 { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR,
411                         SR_KEMIP_STR, SR_KEMIP_NONE, SR_KEMIP_NONE }
412         },
413         { str_init("dmq"), str_init("bcast_message"),
414                 SR_KEMIP_INT, ki_dmq_bcast_message,
415                 { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR,
416                         SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
417         },
418
419         { {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
420 };
421 /* clang-format on */
422
423 int mod_register(char *path, int *dlflags, void *p1, void *p2)
424 {
425         sr_kemi_modules_add(sr_kemi_dmq_exports);
426         return 0;
427 }