2 * dmq module - distributed message queue
4 * Copyright (C) 2011 Bucur Marius - Ovidiu
6 * This file is part of Kamailio, a free SIP server.
8 * Kamailio is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version
13 * Kamailio is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 #include "dmq_funcs.h"
24 #include "notification_peer.h"
25 #include "../../core/dset.h"
28 * @brief register a DMQ peer
30 dmq_peer_t *register_dmq_peer(dmq_peer_t *peer)
34 LM_ERR("peer list not initialized\n");
37 lock_get(&peer_list->lock);
38 if(search_peer_list(peer_list, peer)) {
39 LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len,
40 peer->peer_id.s, peer->description.len, peer->description.s);
41 lock_release(&peer_list->lock);
44 new_peer = add_peer(peer_list, peer);
45 lock_release(&peer_list->lock);
50 * @brief dmq tm callback
52 void dmq_tm_callback(struct cell *t, int type, struct tmcb_params *ps)
54 dmq_cback_param_t *cb_param;
56 cb_param = (dmq_cback_param_t *)(*ps->param);
61 LM_DBG("dmq_tm_callback start\n");
62 if(cb_param->resp_cback.f) {
63 if(cb_param->resp_cback.f(ps->rpl, ps->code, cb_param->node,
64 cb_param->resp_cback.param)
66 LM_ERR("error in response callback\n");
69 LM_DBG("dmq_tm_callback done\n");
70 shm_free_node(cb_param->node);
75 int build_uri_str(str *username, struct sip_uri *uri, str *from)
77 /* sip:user@host:port */
80 if(!uri->host.s || !uri->host.len) {
81 LM_ERR("no host in uri\n");
84 if(!username->s || !username->len) {
85 LM_ERR("no username given\n");
89 from_len = username->len + uri->host.len + uri->port.len + 10;
90 from->s = pkg_malloc(from_len);
92 LM_ERR("no more pkg\n");
97 memcpy(from->s, "sip:", 4);
100 memcpy(from->s + from->len, username->s, username->len);
101 from->len += username->len;
103 memcpy(from->s + from->len, "@", 1);
106 memcpy(from->s + from->len, uri->host.s, uri->host.len);
107 from->len += uri->host.len;
109 if(uri->port.s && uri->port.len) {
110 memcpy(from->s + from->len, ":", 1);
112 memcpy(from->s + from->len, uri->port.s, uri->port.len);
113 from->len += uri->port.len;
118 /* Checks if the request (sip_msg_t* msg) comes from another DMQ node based on source IP. */
119 int is_from_remote_node(sip_msg_t *msg)
125 ip = &msg->rcv.src_ip;
127 lock_get(&node_list->lock);
128 node = node_list->nodes;
131 if(!node->local && ip_addr_cmp(ip, &node->ip_address)) {
138 lock_release(&node_list->lock);
143 * @brief broadcast a dmq message
145 * peer - the peer structure on behalf of which we are sending
146 * body - the body of the message
147 * except - we do not send the message to this node
148 * resp_cback - a response callback that gets called when the transaction is complete
150 int bcast_dmq_message1(dmq_peer_t *peer, str *body, dmq_node_t *except,
151 dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type,
156 lock_get(&node_list->lock);
157 node = node_list->nodes;
159 /* we do not send the message to the following:
162 * - any inactive nodes (unless incl_inactive is specified)
164 if((except && cmp_dmq_node(node, except)) || node->local
165 || (node->status != DMQ_NODE_ACTIVE && !incl_inactive)) {
166 LM_DBG("skipping node %.*s\n", STR_FMT(&node->orig_uri));
171 peer, body, node, resp_cback, max_forwards, content_type)
173 LM_ERR("error sending dmq message\n");
178 lock_release(&node_list->lock);
181 lock_release(&node_list->lock);
185 int bcast_dmq_message(dmq_peer_t *peer, str *body, dmq_node_t *except,
186 dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type)
188 return bcast_dmq_message1(peer, body, except, resp_cback, max_forwards, content_type, 0);
192 * @brief send a dmq message
194 * peer - the peer structure on behalf of which we are sending
195 * body - the body of the message
196 * node - we send the message to this node
197 * resp_cback - a response callback that gets called when the transaction is complete
199 int dmq_send_message(dmq_peer_t *peer, str *body, dmq_node_t *node,
200 dmq_resp_cback_t *resp_cback, int max_forwards, str *content_type)
203 str str_hdr = {0, 0};
204 str from = {0, 0}, to = {0, 0};
205 dmq_cback_param_t *cb_param = NULL;
210 LM_ERR("content-type is null\n");
213 /* add Max-Forwards and Content-Type headers */
214 str_hdr.len = 34 + content_type->len + (CRLF_LEN * 2);
215 str_hdr.s = pkg_malloc(str_hdr.len);
216 if(str_hdr.s == NULL) {
217 LM_ERR("no more pkg\n");
220 len += sprintf(str_hdr.s, "Max-Forwards: %d" CRLF "Content-Type: %.*s" CRLF,
221 max_forwards, content_type->len, content_type->s);
224 cb_param = shm_malloc(sizeof(*cb_param));
225 if(cb_param == NULL) {
226 LM_ERR("no more shm for building callback parameter\n");
229 memset(cb_param, 0, sizeof(*cb_param));
230 cb_param->resp_cback = *resp_cback;
231 cb_param->node = shm_dup_node(node);
232 if(cb_param->node == NULL) {
233 LM_ERR("error building callback parameter\n");
237 if(build_uri_str(&peer->peer_id, &dmq_server_uri, &from) < 0) {
238 LM_ERR("error building from string [username %.*s]\n",
239 STR_FMT(&peer->peer_id));
242 if(build_uri_str(&peer->peer_id, &node->uri, &to) < 0) {
243 LM_ERR("error building to string\n");
247 set_uac_req(&uac_r, &dmq_request_method, &str_hdr, body, NULL,
248 TMCB_LOCAL_COMPLETED, dmq_tm_callback, (void *)cb_param);
249 uac_r.ssock = &dmq_server_socket;
251 result = tmb.t_request(&uac_r, &to, &to, &from, NULL);
253 LM_ERR("error in tmb.t_request_within\n");
268 destroy_dmq_node(cb_param->node, 1);
275 * @brief kemi function for sending dmq message
277 int ki_dmq_send_message(sip_msg_t *msg, str *peer_str, str *to_str,
278 str *body_str, str *ct_str)
280 LM_DBG("cfg_dmq_send_message: %.*s - %.*s - %.*s - %.*s\n", peer_str->len,
281 peer_str->s, to_str->len, to_str->s, body_str->len, body_str->s,
282 ct_str->len, ct_str->s);
284 dmq_peer_t *destination_peer = find_peer(*peer_str);
285 if(!destination_peer) {
286 LM_INFO("cannot find peer %.*s\n", peer_str->len, peer_str->s);
288 new_peer.callback = empty_peer_callback;
289 new_peer.description.s = "";
290 new_peer.description.len = 0;
291 new_peer.peer_id = *peer_str;
292 destination_peer = register_dmq_peer(&new_peer);
293 if(!destination_peer) {
294 LM_ERR("error in register_dmq_peer\n");
298 dmq_node_t *to_dmq_node = find_dmq_node_uri(node_list, to_str);
300 LM_ERR("cannot find dmq_node: %.*s\n", to_str->len, to_str->s);
303 if(dmq_send_message(destination_peer, body_str, to_dmq_node,
304 ¬ification_callback, 1, ct_str)
306 LM_ERR("cannot send dmq message\n");
315 * @brief config file function for sending dmq message
317 int cfg_dmq_send_message(struct sip_msg *msg, char *peer, char *to, char *body,
325 if(get_str_fparam(&peer_str, msg, (fparam_t *)peer) < 0) {
326 LM_ERR("cannot get peer value\n");
329 if(get_str_fparam(&to_str, msg, (fparam_t *)to) < 0) {
330 LM_ERR("cannot get dst value\n");
333 if(get_str_fparam(&body_str, msg, (fparam_t *)body) < 0) {
334 LM_ERR("cannot get body value\n");
337 if(get_str_fparam(&ct_str, msg, (fparam_t *)content_type) < 0) {
338 LM_ERR("cannot get content-type value\n");
342 return ki_dmq_send_message(msg, &peer_str, &to_str, &body_str, &ct_str);
346 * @brief config file function for broadcasting dmq message
348 int ki_dmq_bcast_message(sip_msg_t *msg, str *peer_str, str *body_str,
351 LM_DBG("cfg_dmq_bcast_message: %.*s - %.*s - %.*s\n", peer_str->len,
352 peer_str->s, body_str->len, body_str->s, ct_str->len, ct_str->s);
354 dmq_peer_t *destination_peer = find_peer(*peer_str);
355 if(!destination_peer) {
356 LM_INFO("cannot find peer %.*s - adding it.\n", peer_str->len,
359 new_peer.callback = empty_peer_callback;
360 new_peer.description.s = "";
361 new_peer.description.len = 0;
362 new_peer.peer_id = *peer_str;
363 destination_peer = register_dmq_peer(&new_peer);
364 if(!destination_peer) {
365 LM_ERR("error in register_dmq_peer\n");
369 if(bcast_dmq_message(destination_peer, body_str, 0, ¬ification_callback,
371 LM_ERR("cannot send dmq message\n");
380 * @brief config file function for broadcasting dmq message
382 int cfg_dmq_bcast_message(sip_msg_t *msg, char *peer, char *body,
389 if(get_str_fparam(&peer_str, msg, (fparam_t *)peer) < 0) {
390 LM_ERR("cannot get peer value\n");
393 if(get_str_fparam(&body_str, msg, (fparam_t *)body) < 0) {
394 LM_ERR("cannot get body value\n");
397 if(get_str_fparam(&ct_str, msg, (fparam_t *)content_type) < 0) {
398 LM_ERR("cannot get content-type value\n");
402 return ki_dmq_bcast_message(msg, &peer_str, &body_str, &ct_str);
406 * @brief config file function for replicating SIP message to all nodes (wraps t_replicate)
408 int ki_dmq_t_replicate_mode(struct sip_msg *msg, int mode)
411 struct socket_info *sock;
414 /* avoid loops - do not replicate if message has come from another node
415 * (override if optional parameter is set)
417 if(mode==0 && is_from_remote_node(msg) > 0) {
418 LM_DBG("message is from another node - skipping replication\n");
422 /* TODO - backup/restore original send socket */
423 sock = lookup_local_socket(&dmq_server_socket);
425 set_force_socket(msg, sock);
428 lock_get(&node_list->lock);
429 node = node_list->nodes;
431 /* we do not send the message to the following:
433 * - any inactive nodes
435 if(node->local || node->status != DMQ_NODE_ACTIVE) {
436 LM_DBG("skipping node %.*s\n", STR_FMT(&node->orig_uri));
442 if(append_branch(msg, 0, 0, 0, Q_UNSPECIFIED, 0, sock, 0, 0, 0, 0)
444 LM_ERR("failed to append a branch\n");
452 if(tmb.t_replicate(msg, &node->orig_uri) < 0) {
453 LM_ERR("error calling t_replicate\n");
459 lock_release(&node_list->lock);
462 lock_release(&node_list->lock);
466 int ki_dmq_t_replicate(sip_msg_t *msg)
468 return ki_dmq_t_replicate_mode(msg, 0);
472 * @brief config file function for replicating SIP message to all nodes (wraps t_replicate)
474 int cfg_dmq_t_replicate(struct sip_msg *msg, char *s, char *p2)
477 if(s!=NULL && get_int_fparam(&i, msg, (fparam_t *)s) < 0) {
478 LM_ERR("failed to get parameter value\n");
481 return ki_dmq_t_replicate_mode(msg, i);
485 * @brief config file function to check if received message is from another DMQ node based on source IP
487 int cfg_dmq_is_from_node(struct sip_msg *msg, char *p1, char *p2)
489 return is_from_remote_node(msg);
493 * @brief pings the servers in the nodelist
495 * if the server does not reply to the ping, it is removed from the list
496 * the ping messages are actualy notification requests
497 * this way the ping will have two uses:
498 * - checks if the servers in the list are up and running
499 * - updates the list of servers from the other nodes
501 void ping_servers(unsigned int ticks, void *param)
505 LM_DBG("ping_servers\n");
508 || (node_list->nodes->local && !node_list->nodes->next)) {
509 LM_DBG("node list is empty - attempt to rebuild from notification "
511 *dmq_init_callback_done = 0;
512 if(dmq_notification_address.s) {
514 add_server_and_notify(&dmq_notification_address);
515 if(!notification_node) {
516 LM_ERR("cannot retrieve initial nodelist from %.*s\n",
517 STR_FMT(&dmq_notification_address));
520 LM_ERR("no notification address");
525 body = build_notification_body();
527 LM_ERR("could not build notification body\n");
530 ret = bcast_dmq_message1(dmq_notification_peer, body, NULL,
531 ¬ification_callback, 1, ¬ification_content_type, 1);
535 LM_ERR("error broadcasting message\n");