keepalive: early start of OPTIONS checking
[sip-router] / src / modules / dmq / dmq.c
1 /*
2  * dmq module - distributed message queue
3  *
4  * Copyright (C) 2011 Bucur Marius - Ovidiu
5  *
6  * This file is part of Kamailio, a free SIP server.
7  *
8  * Kamailio is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version
12  *
13  * Kamailio is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21  *
22  */
23
24 #include <stdio.h>
25 #include <string.h>
26 #include <stdlib.h>
27 #include <sys/types.h>
28 #include <sys/ipc.h>
29 #include <unistd.h>
30 #include <fcntl.h>
31 #include <time.h>
32
33 #include "../../core/ut.h"
34 #include "../../core/mem/mem.h"
35 #include "../../core/mem/shm_mem.h"
36 #include "../../core/usr_avp.h"
37 #include "../../core/pt.h"
38 #include "../../core/hashes.h"
39 #include "../../core/mod_fix.h"
40 #include "../../core/rpc_lookup.h"
41 #include "../../core/kemi.h"
42
43 #include "dmq.h"
44 #include "dmq_funcs.h"
45 #include "bind_dmq.h"
46 #include "message.h"
47 #include "notification_peer.h"
48 #include "dmqnode.h"
49
50 static int mod_init(void);
51 static int child_init(int);
52 static void destroy(void);
53
54 MODULE_VERSION
55
56 int startup_time = 0;
57 int pid = 0;
58
59 /* module parameters */
60 int num_workers = DEFAULT_NUM_WORKERS;
61 int worker_usleep = 0;
62 str dmq_server_address = {0, 0};
63 str dmq_server_socket = {0, 0};
64 struct sip_uri dmq_server_uri;
65
66 str dmq_notification_address = {0, 0};
67 int multi_notify = 0;
68 struct sip_uri dmq_notification_uri;
69 int ping_interval = 60;
70
71 /* TM bind */
72 struct tm_binds tmb;
73 /* SL API structure */
74 sl_api_t slb;
75
76 /** module variables */
77 str dmq_request_method = str_init("KDMQ");
78 dmq_worker_t *workers = NULL;
79 dmq_peer_list_t *peer_list = 0;
80 /* the list of dmq servers */
81 dmq_node_list_t *node_list = NULL;
82 // the dmq module is a peer itself for receiving notifications regarding nodes
83 dmq_peer_t *dmq_notification_peer = NULL;
84
85 /** module functions */
86 static int mod_init(void);
87 static int child_init(int);
88 static void destroy(void);
89
90 /* clang-format off */
91 static cmd_export_t cmds[] = {
92         {"dmq_handle_message", (cmd_function)dmq_handle_message, 0,
93                 0, 0, REQUEST_ROUTE},
94         {"dmq_handle_message", (cmd_function)w_dmq_handle_message, 1,
95                 fixup_int_1, 0, REQUEST_ROUTE},
96         {"dmq_send_message", (cmd_function)cfg_dmq_send_message, 4,
97                 fixup_spve_all, 0, ANY_ROUTE},
98         {"dmq_bcast_message", (cmd_function)cfg_dmq_bcast_message, 3,
99                 fixup_spve_all, 0, ANY_ROUTE},
100         {"dmq_t_replicate", (cmd_function)cfg_dmq_t_replicate, 0,
101                 0, 0, REQUEST_ROUTE},
102         {"dmq_t_replicate", (cmd_function)cfg_dmq_t_replicate, 1,
103                 fixup_spve_null, 0, REQUEST_ROUTE},
104         {"dmq_is_from_node", (cmd_function)cfg_dmq_is_from_node, 0,
105                 0, 0, REQUEST_ROUTE},
106         {"bind_dmq", (cmd_function)bind_dmq, 0,
107                 0, 0, 0},
108         {0, 0, 0, 0, 0, 0}
109 };
110
111 static param_export_t params[] = {
112         {"num_workers", INT_PARAM, &num_workers},
113         {"ping_interval", INT_PARAM, &ping_interval},
114         {"server_address", PARAM_STR, &dmq_server_address},
115         {"notification_address", PARAM_STR, &dmq_notification_address},
116         {"multi_notify", INT_PARAM, &multi_notify},
117         {"worker_usleep", INT_PARAM, &worker_usleep},
118         {0, 0, 0}
119 };
120
121 static rpc_export_t rpc_methods[];
122
123 /** module exports */
124 struct module_exports exports = {
125         "dmq",                          /* module name */
126         DEFAULT_DLFLAGS,        /* dlopen flags */
127         cmds,                           /* exported functions */
128         params,                         /* exported parameters */
129         0,                                      /* RPC method exports */
130         0,                                      /* exported pseudo-variables */
131         0,                                      /* response handling function */
132         mod_init,                       /* module initialization function */
133         child_init,                     /* per-child init function */
134         destroy                         /* module destroy function */
135 };
136 /* clang-format on */
137
138
139 static int make_socket_str_from_uri(struct sip_uri *uri, str *socket)
140 {
141         if(!uri->host.s || !uri->host.len) {
142                 LM_ERR("no host in uri\n");
143                 return -1;
144         }
145
146         socket->len = uri->host.len + uri->port.len + 6;
147         socket->s = pkg_malloc(socket->len);
148         if(socket->s == NULL) {
149                 LM_ERR("no more pkg\n");
150                 return -1;
151         }
152         memcpy(socket->s, "udp:", 4);
153         socket->len = 4;
154
155         memcpy(socket->s + socket->len, uri->host.s, uri->host.len);
156         socket->len += uri->host.len;
157
158         if(uri->port.s && uri->port.len) {
159                 socket->s[socket->len++] = ':';
160                 memcpy(socket->s + socket->len, uri->port.s, uri->port.len);
161                 socket->len += uri->port.len;
162         }
163         socket->s[socket->len] = '\0';
164
165         return 0;
166 }
167
168
169 /**
170  * init module function
171  */
172 static int mod_init(void)
173 {
174         /* bind the SL API */
175         if(sl_load_api(&slb) != 0) {
176                 LM_ERR("cannot bind to SL API\n");
177                 return -1;
178         }
179
180         /* load all TM stuff */
181         if(load_tm_api(&tmb) == -1) {
182                 LM_ERR("can't load tm functions. TM module probably not loaded\n");
183                 return -1;
184         }
185
186         /* load peer list - the list containing the module callbacks for dmq */
187         peer_list = init_peer_list();
188         if(peer_list == NULL) {
189                 LM_ERR("cannot initialize peer list\n");
190                 return -1;
191         }
192
193         /* load the dmq node list - the list containing the dmq servers */
194         node_list = init_dmq_node_list();
195         if(node_list == NULL) {
196                 LM_ERR("cannot initialize node list\n");
197                 return -1;
198         }
199
200         if(rpc_register_array(rpc_methods) != 0) {
201                 LM_ERR("failed to register RPC commands\n");
202                 return -1;
203         }
204
205         /* register worker processes - add one because of the ping process */
206         register_procs(num_workers);
207
208         /* check server_address and notification_address are not empty and correct */
209         if(parse_uri(dmq_server_address.s, dmq_server_address.len, &dmq_server_uri)
210                         < 0) {
211                 LM_ERR("server address invalid\n");
212                 return -1;
213         }
214
215         if(parse_uri(dmq_notification_address.s, dmq_notification_address.len,
216                            &dmq_notification_uri)
217                         < 0) {
218                 LM_ERR("notification address invalid\n");
219                 return -1;
220         }
221
222         /* create socket string out of the server_uri */
223         if(make_socket_str_from_uri(&dmq_server_uri, &dmq_server_socket) < 0) {
224                 LM_ERR("failed to create socket out of server_uri\n");
225                 return -1;
226         }
227         if(lookup_local_socket(&dmq_server_socket) == NULL) {
228                 LM_ERR("server_uri is not a socket the proxy is listening on\n");
229                 return -1;
230         }
231
232         /* allocate workers array */
233         workers = shm_malloc(num_workers * sizeof(dmq_worker_t));
234         if(workers == NULL) {
235                 LM_ERR("error in shm_malloc\n");
236                 return -1;
237         }
238         memset(workers, 0, num_workers * sizeof(dmq_worker_t));
239
240         dmq_init_callback_done = shm_malloc(sizeof(int));
241         if(!dmq_init_callback_done) {
242                 LM_ERR("no more shm\n");
243                 return -1;
244         }
245         *dmq_init_callback_done = 0;
246
247         /**
248          * add the dmq notification peer.
249          * the dmq is a peer itself so that it can receive node notifications
250          */
251         if(add_notification_peer() < 0) {
252                 LM_ERR("cannot add notification peer\n");
253                 return -1;
254         }
255
256         startup_time = (int)time(NULL);
257
258         /**
259          * add the ping timer
260          * it pings the servers once in a while so that we know which failed
261          */
262         if(ping_interval < MIN_PING_INTERVAL) {
263                 ping_interval = MIN_PING_INTERVAL;
264         }
265         if(register_timer(ping_servers, 0, ping_interval) < 0) {
266                 LM_ERR("cannot register timer callback\n");
267                 return -1;
268         }
269
270         return 0;
271 }
272
273 /**
274  * initialize children
275  */
276 static int child_init(int rank)
277 {
278         int i, newpid;
279
280         if(rank == PROC_INIT) {
281                 for(i = 0; i < num_workers; i++) {
282                         if (init_worker(&workers[i]) < 0) {
283                                 LM_ERR("failed to init struct for worker[%d]\n", i);
284                                 return -1;
285                         }
286                 }
287                 return 0;
288         }
289
290         if(rank == PROC_MAIN) {
291                 /* fork worker processes */
292                 for(i = 0; i < num_workers; i++) {
293                         LM_DBG("starting worker process %d\n", i);
294                         newpid = fork_process(PROC_RPC, "DMQ WORKER", 0);
295                         if(newpid < 0) {
296                                 LM_ERR("failed to fork worker process %d\n", i);
297                                 return -1;
298                         } else if(newpid == 0) {
299                                 /* child - this will loop forever */
300                                 worker_loop(i);
301                         } else {
302                                 workers[i].pid = newpid;
303                         }
304                 }
305                 /* notification_node - the node from which the Kamailio instance
306                  * gets the server list on startup.
307                  * the address is given as a module parameter in dmq_notification_address
308                  * the module MUST have this parameter if the Kamailio instance is not
309                  * a master in this architecture
310                  */
311                 if(dmq_notification_address.s) {
312                         notification_node =
313                                         add_server_and_notify(&dmq_notification_address);
314                         if(!notification_node) {
315                                 LM_WARN("cannot retrieve initial nodelist from %.*s\n",
316                                                 STR_FMT(&dmq_notification_address));
317                         }
318                 }
319                 return 0;
320         }
321         if(rank == PROC_TCP_MAIN) {
322                 /* do nothing for the main process */
323                 return 0;
324         }
325
326         pid = my_pid();
327         return 0;
328 }
329
330 /*
331  * destroy function
332  */
333 static void destroy(void)
334 {
335         /* TODO unregister dmq node, free resources */
336         if(dmq_notification_address.s && notification_node && self_node) {
337                 LM_DBG("unregistering node %.*s\n", STR_FMT(&self_node->orig_uri));
338                 self_node->status = DMQ_NODE_DISABLED;
339                 request_nodelist(notification_node, 1);
340         }
341         if(dmq_server_socket.s) {
342                 pkg_free(dmq_server_socket.s);
343         }
344         if(dmq_init_callback_done) {
345                 shm_free(dmq_init_callback_done);
346         }
347 }
348
349 static void dmq_rpc_list_nodes(rpc_t *rpc, void *c)
350 {
351         void *h;
352         dmq_node_t *cur = node_list->nodes;
353         char ip[IP6_MAX_STR_SIZE + 1];
354
355         while(cur) {
356                 memset(ip, 0, IP6_MAX_STR_SIZE + 1);
357                 ip_addr2sbuf(&cur->ip_address, ip, IP6_MAX_STR_SIZE);
358                 if(rpc->add(c, "{", &h) < 0)
359                         goto error;
360                 if(rpc->struct_add(h, "SSsSdd", "host", &cur->uri.host, "port",
361                                    &cur->uri.port, "resolved_ip", ip, "status",
362                                    dmq_get_status_str(cur->status), "last_notification",
363                                    cur->last_notification, "local", cur->local)
364                                 < 0)
365                         goto error;
366                 cur = cur->next;
367         }
368         return;
369 error:
370         LM_ERR("Failed to add item to RPC response\n");
371         rpc->fault(c, 500, "Server failure");
372         return;
373 }
374
375 static const char *dmq_rpc_list_nodes_doc[2] = {"Print all nodes", 0};
376
377 static rpc_export_t rpc_methods[] = {
378         {"dmq.list_nodes", dmq_rpc_list_nodes, dmq_rpc_list_nodes_doc, RET_ARRAY},
379         {0, 0, 0, 0}
380 };
381
382 /**
383  *
384  */
385 /* clang-format off */
386 static sr_kemi_t sr_kemi_dmq_exports[] = {
387         { str_init("dmq"), str_init("handle_message"),
388                 SR_KEMIP_INT, ki_dmq_handle_message,
389                 { SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE,
390                         SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
391         },
392         { str_init("dmq"), str_init("handle_message_rc"),
393                 SR_KEMIP_INT, ki_dmq_handle_message_rc,
394                 { SR_KEMIP_INT, SR_KEMIP_NONE, SR_KEMIP_NONE,
395                         SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
396         },
397         { str_init("dmq"), str_init("is_from_node"),
398                 SR_KEMIP_INT, is_from_remote_node,
399                 { SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE,
400                         SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
401         },
402         { str_init("dmq"), str_init("t_replicate"),
403                 SR_KEMIP_INT, ki_dmq_t_replicate,
404                 { SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE,
405                         SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
406         },
407         { str_init("dmq"), str_init("t_replicate_mode"),
408                 SR_KEMIP_INT, ki_dmq_t_replicate_mode,
409                 { SR_KEMIP_INT, SR_KEMIP_NONE, SR_KEMIP_NONE,
410                         SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
411         },
412         { str_init("dmq"), str_init("send_message"),
413                 SR_KEMIP_INT, ki_dmq_send_message,
414                 { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR,
415                         SR_KEMIP_STR, SR_KEMIP_NONE, SR_KEMIP_NONE }
416         },
417         { str_init("dmq"), str_init("bcast_message"),
418                 SR_KEMIP_INT, ki_dmq_bcast_message,
419                 { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR,
420                         SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
421         },
422
423         { {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
424 };
425 /* clang-format on */
426
427 int mod_register(char *path, int *dlflags, void *p1, void *p2)
428 {
429         sr_kemi_modules_add(sr_kemi_dmq_exports);
430         return 0;
431 }