dmq: many safety checks for mem mallocs and function return codes
authorDaniel-Constantin Mierla <miconda@gmail.com>
Sun, 26 May 2013 08:43:24 +0000 (10:43 +0200)
committerDaniel-Constantin Mierla <miconda@gmail.com>
Sun, 26 May 2013 08:43:24 +0000 (10:43 +0200)
- added license header in the files

16 files changed:
modules/dmq/bind_dmq.c
modules/dmq/bind_dmq.h
modules/dmq/dmq.c
modules/dmq/dmq.h
modules/dmq/dmq_funcs.c
modules/dmq/dmq_funcs.h
modules/dmq/dmqnode.c
modules/dmq/dmqnode.h
modules/dmq/message.c
modules/dmq/message.h
modules/dmq/notification_peer.c
modules/dmq/notification_peer.h
modules/dmq/peer.c
modules/dmq/peer.h
modules/dmq/worker.c
modules/dmq/worker.h

index 573cf82..9032119 100644 (file)
@@ -1,8 +1,35 @@
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
 #include "dmq.h"
 #include "bind_dmq.h"
 #include "peer.h"
 #include "dmq_funcs.h"
 
+/**
+ * @brief bind dmq module api
+ */
 int bind_dmq(dmq_api_t* api) {
        api->register_dmq_peer = register_dmq_peer;
        api->send_message = dmq_send_message;
index d89b534..d0330d4 100644 (file)
@@ -1,12 +1,39 @@
-#ifndef BIND_DMQ_H
-#define BIND_DMQ_H
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef _BIND_DMQ_H_
+#define _BIND_DMQ_H_
 
 #include "peer.h"
 #include "dmqnode.h"
 #include "dmq_funcs.h"
 
-typedef int (*bcast_message_t)(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback, int max_forwards);
-typedef int (*send_message_t)(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards);
+typedef int (*bcast_message_t)(dmq_peer_t* peer, str* body, dmq_node_t* except,
+               dmq_resp_cback_t* resp_cback, int max_forwards);
+typedef int (*send_message_t)(dmq_peer_t* peer, str* body, dmq_node_t* node,
+               dmq_resp_cback_t* resp_cback, int max_forwards);
 
 typedef struct dmq_api {
        register_dmq_peer_t register_dmq_peer;
index afd1e7c..4ed75bf 100644 (file)
  * along with this program; if not, write to the Free Software 
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  *
- * History:
- * --------
- *  2010-03-29  initial version (mariusbucur)
  */
+
 #include <stdio.h>
 #include <string.h>
 #include <stdlib.h>
@@ -135,7 +133,8 @@ struct module_exports exports = {
 /**
  * init module function
  */
-static int mod_init(void) {
+static int mod_init(void)
+{
        
        if(register_mi_mod(exports.name, mi_cmds)!=0) {
                LM_ERR("failed to register MI commands\n");
@@ -153,12 +152,20 @@ static int mod_init(void) {
                LM_ERR("can't load tm functions. TM module probably not loaded\n");
                return -1;
        }
+
        /* load peer list - the list containing the module callbacks for dmq */
-       
        peer_list = init_peer_list();
+       if(peer_list==NULL) {
+               LM_ERR("cannot initialize peer list\n");
+               return -1;
+       }
        
        /* load the dmq node list - the list containing the dmq servers */
        node_list = init_dmq_node_list();
+       if(node_list==NULL) {
+               LM_ERR("cannot initialize node list\n");
+               return -1;
+       }
        
        /* register worker processes - add one because of the ping process */
        register_procs(num_workers);
@@ -169,7 +176,8 @@ static int mod_init(void) {
                return -1;
        }
        
-       if(parse_server_address(&dmq_notification_address, &dmq_notification_uri) < 0) {
+       if(parse_server_address(&dmq_notification_address,
+                               &dmq_notification_uri) < 0) {
                LM_ERR("notification address invalid\n");
                return -1;
        }
@@ -182,10 +190,13 @@ static int mod_init(void) {
        }
        
        /**
-         * add the dmq notification peer.
+        * add the dmq notification peer.
         * the dmq is a peer itself so that it can receive node notifications
         */
-       add_notification_peer();
+       if(add_notification_peer()<0) {
+               LM_ERR("cannot add notification peer\n");
+               return -1;
+       }
        
        startup_time = (int) time(NULL);
        
@@ -196,7 +207,10 @@ static int mod_init(void) {
        if(ping_interval < MIN_PING_INTERVAL) {
                ping_interval = MIN_PING_INTERVAL;
        }
-       register_timer(ping_servers, 0, ping_interval);
+       if(register_timer(ping_servers, 0, ping_interval)<0) {
+               LM_ERR("cannot register timer callback\n");
+               return -1;
+       }
        
        return 0;
 }
@@ -204,7 +218,8 @@ static int mod_init(void) {
 /**
  * initialize children
  */
-static int child_init(int rank) {
+static int child_init(int rank)
+{
        int i, newpid;
        if (rank == PROC_MAIN) {
                /* fork worker processes */
@@ -216,7 +231,7 @@ static int child_init(int rank) {
                                LM_ERR("failed to form process\n");
                                return -1;
                        } else if(newpid == 0) {
-                               // child - this will loop forever
+                               /* child - this will loop forever */
                                worker_loop(i);
                        } else {
                                workers[i].pid = newpid;
@@ -259,15 +274,18 @@ static void destroy(void) {
        }
 }
 
-static int handle_dmq_fixup(void** param, int param_no) {
+static int handle_dmq_fixup(void** param, int param_no)
+{
        return 0;
 }
 
-static int send_dmq_fixup(void** param, int param_no) {
+static int send_dmq_fixup(void** param, int param_no)
+{
        return fixup_spve_null(param, 1);
 }
 
-static int parse_server_address(str* uri, struct sip_uri* parsed_uri) {
+static int parse_server_address(str* uri, struct sip_uri* parsed_uri)
+{
        if(!uri->s) {
                goto empty;
        }
index 88dabb0..02c1fcc 100644 (file)
@@ -1,5 +1,30 @@
-#ifndef DMQ_H
-#define DMQ_H
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef _DMQ_H_
+#define _DMQ_H_
 
 #include "../../dprint.h"
 #include "../../error.h"
index 6d56a1e..33c4ab1 100644 (file)
@@ -1,7 +1,35 @@
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
 #include "dmq_funcs.h"
 #include "notification_peer.h"
 
-dmq_peer_t* register_dmq_peer(dmq_peer_t* peer) {
+/**
+ * @brief register a DMQ peer
+ */
+dmq_peer_t* register_dmq_peer(dmq_peer_t* peer)
+{
        dmq_peer_t* new_peer;
        lock_get(&peer_list->lock);
        if(search_peer_list(peer_list, peer)) {
@@ -15,8 +43,18 @@ dmq_peer_t* register_dmq_peer(dmq_peer_t* peer) {
        return new_peer;
 }
 
-void dmq_tm_callback(struct cell *t, int type, struct tmcb_params *ps) {
-       dmq_cback_param_t* cb_param = (dmq_cback_param_t*)(*ps->param);
+/**
+ * @brief dmq tm callback
+ */
+void dmq_tm_callback(struct cell *t, int type, struct tmcb_params *ps)
+{
+       dmq_cback_param_t* cb_param;
+       
+       cb_param = (dmq_cback_param_t*)(*ps->param);
+
+       if(cb_param==NULL)
+               return;
+
        LM_DBG("dmq_tm_callback start\n");
        if(cb_param->resp_cback.f) {
                if(cb_param->resp_cback.f(ps->rpl, ps->code, cb_param->node, cb_param->resp_cback.param) < 0) {
@@ -26,11 +64,15 @@ void dmq_tm_callback(struct cell *t, int type, struct tmcb_params *ps) {
        LM_DBG("dmq_tm_callback done\n");
        shm_free_node(cb_param->node);
        shm_free(cb_param);
+       *ps->param = NULL;
 }
 
-int build_uri_str(str* username, struct sip_uri* uri, str* from) {
+int build_uri_str(str* username, struct sip_uri* uri, str* from)
+{
        /* sip:user@host:port */
-       int from_len = username->len + uri->host.len + uri->port.len + 10;
+       int from_len;
+       
+       from_len = username->len + uri->host.len + uri->port.len + 10;
        if(!uri->host.s || !uri->host.len) {
                LM_ERR("no host in uri\n");
                return -1;
@@ -40,6 +82,10 @@ int build_uri_str(str* username, struct sip_uri* uri, str* from) {
                return -1;
        }
        from->s = pkg_malloc(from_len);
+       if(from->s==NULL) {
+               LM_ERR("no more pkg\n");
+               return -1;
+       }
        from->len = 0;
        
        memcpy(from->s, "sip:", 4);
@@ -63,13 +109,17 @@ int build_uri_str(str* username, struct sip_uri* uri, str* from) {
        return 0;
 }
 
-/* broadcast a dmq message
+/**
+ * @brief broadcast a dmq message
+ *
  * peer - the peer structure on behalf of which we are sending
  * body - the body of the message
  * except - we do not send the message to this node
  * resp_cback - a response callback that gets called when the transaction is complete
  */
-int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback, int max_forwards) {
+int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except,
+               dmq_resp_cback_t* resp_cback, int max_forwards)
+{
        dmq_node_t* node;
        
        lock_get(&node_list->lock);
@@ -80,7 +130,8 @@ int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_
                 *   - itself
                 *   - any inactive nodes
                 */
-               if((except && cmp_dmq_node(node, except)) || node->local || node->status != DMQ_NODE_ACTIVE) {
+               if((except && cmp_dmq_node(node, except)) || node->local
+                               || node->status != DMQ_NODE_ACTIVE) {
                        LM_DBG("skipping node %.*s\n", STR_FMT(&node->orig_uri));
                        node = node->next;
                        continue;
@@ -98,13 +149,17 @@ error:
        return -1;
 }
 
-/* send a dmq message
+/**
+ * @brief send a dmq message
+ *
  * peer - the peer structure on behalf of which we are sending
  * body - the body of the message
  * node - we send the message to this node
  * resp_cback - a response callback that gets called when the transaction is complete
  */
-int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards) {
+int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node,
+               dmq_resp_cback_t* resp_cback, int max_forwards)
+{
        uac_req_t uac_r;
        str str_hdr = {0, 0};
        str from, to, req_uri;
@@ -113,8 +168,12 @@ int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cba
        int len = 0;
        
        /* Max-Forwards */
-       str_hdr.len = 18 + CRLF_LEN;
+       str_hdr.len = 20 + CRLF_LEN;
        str_hdr.s = pkg_malloc(str_hdr.len);
+       if(str_hdr.s==NULL) {
+               LM_ERR("no more pkg\n");
+               return -1;
+       }
        len += sprintf(str_hdr.s, "Max-Forwards: %d%s", max_forwards, CRLF);
        str_hdr.len = len;
        
@@ -124,7 +183,8 @@ int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cba
        cb_param->node = shm_dup_node(node);
        
        if(build_uri_str(&peer->peer_id, &dmq_server_uri, &from) < 0) {
-               LM_ERR("error building from string [username %.*s]\n", STR_FMT(&peer->peer_id));
+               LM_ERR("error building from string [username %.*s]\n",
+                               STR_FMT(&peer->peer_id));
                goto error;
        }
        if(build_uri_str(&peer->peer_id, &node->uri, &to) < 0) {
@@ -133,8 +193,8 @@ int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cba
        }
        req_uri = to;
        
-       set_uac_req(&uac_r, &dmq_request_method, &str_hdr, body, NULL, TMCB_LOCAL_COMPLETED,
-                       dmq_tm_callback, (void*)cb_param);
+       set_uac_req(&uac_r, &dmq_request_method, &str_hdr, body, NULL,
+                       TMCB_LOCAL_COMPLETED, dmq_tm_callback, (void*)cb_param);
        result = tmb.t_request(&uac_r, &req_uri,
                               &to, &from,
                               NULL);
@@ -149,13 +209,28 @@ error:
        return -1;
 }
 
-int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to, char* body) {
+/**
+ * @brief config file function for sending dmq message
+ */
+int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to, char* body)
+{
        str peer_str;
-       get_str_fparam(&peer_str, msg, (fparam_t*)peer);
        str to_str;
-       get_str_fparam(&to_str, msg, (fparam_t*)to);
        str body_str;
-       get_str_fparam(&body_str, msg, (fparam_t*)body);
+       
+       if(get_str_fparam(&peer_str, msg, (fparam_t*)peer)<0) {
+               LM_ERR("cannot get peer value\n");
+               return -1;
+       }
+       if(get_str_fparam(&to_str, msg, (fparam_t*)to)<0) {
+               LM_ERR("cannot get dst value\n");
+               return -1;
+       }
+       if(get_str_fparam(&body_str, msg, (fparam_t*)body)<0) {
+               LM_ERR("cannot get body value\n");
+               return -1;
+       }
+       
        LM_INFO("cfg_dmq_send_message: %.*s - %.*s - %.*s\n",
                peer_str.len, peer_str.s,
                to_str.len, to_str.s,
@@ -180,27 +255,32 @@ int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to, char* body)
                LM_ERR("cannot find dmq_node: %.*s\n", to_str.len, to_str.s);
                goto error;
        }
-       if(dmq_send_message(destination_peer, &body_str, to_dmq_node, &notification_callback, 1) < 0) {
+       if(dmq_send_message(destination_peer, &body_str, to_dmq_node,
+                               &notification_callback, 1) < 0) {
                LM_ERR("cannot send dmq message\n");
                goto error;
        }
-       return 0;
+       return 1;
 error:
        return -1;
 }
 
-/* pings the servers in the nodelist
+/**
+ * @brief pings the servers in the nodelist
+ *
  * if the server does not reply to the ping, it is removed from the list
  * the ping messages are actualy notification requests
  * this way the ping will have two uses:
  *   - checks if the servers in the list are up and running
  *   - updates the list of servers from the other nodes
  */
-void ping_servers(unsigned int ticks,void *param) {
-       str* body = build_notification_body();
+void ping_servers(unsigned int ticks, void *param) {
+       str* body;
        int ret;
        LM_DBG("ping_servers\n");
-       ret = bcast_dmq_message(dmq_notification_peer, body, notification_node, &notification_callback, 1);
+       body = build_notification_body();
+       ret = bcast_dmq_message(dmq_notification_peer, body, notification_node,
+                       &notification_callback, 1);
        pkg_free(body->s);
        pkg_free(body);
        if(ret < 0) {
index 9cce2fe..e0f615a 100644 (file)
@@ -1,5 +1,30 @@
-#ifndef DMQ_FUNCS_H
-#define DMQ_FUNCS_H
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef _DMQ_FUNCS_H_
+#define _DMQ_FUNCS_H_
 
 #include "../../str.h"
 #include "../../modules/tm/dlg.h"
@@ -21,10 +46,13 @@ typedef struct dmq_cback_param {
        dmq_node_t* node;
 } dmq_cback_param_t;
 
-int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to, char* body);
+int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to,
+               char* body);
 dmq_peer_t* register_dmq_peer(dmq_peer_t* peer);
-int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards);
-int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except, dmq_resp_cback_t* resp_cback, int max_forwards);
+int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node,
+               dmq_resp_cback_t* resp_cback, int max_forwards);
+int bcast_dmq_message(dmq_peer_t* peer, str* body, dmq_node_t* except,
+               dmq_resp_cback_t* resp_cback, int max_forwards);
 
 #endif
 
index 36971e9..5bf0fc3 100644 (file)
@@ -1,3 +1,28 @@
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
 #include "../../ut.h"
 #include "dmqnode.h"
 #include "dmq.h"
@@ -12,7 +37,11 @@ str dmq_node_active_str = str_init("active");
 str dmq_node_disabled_str = str_init("disabled");
 str dmq_node_timeout_str = str_init("timeout");
 
-str* get_status_str(int status) {
+/**
+ * @brief get the string status of the node
+ */
+str* get_status_str(int status)
+{
        switch(status) {
                case DMQ_NODE_ACTIVE: {
                        return &dmq_node_active_str;
@@ -29,14 +58,27 @@ str* get_status_str(int status) {
        }
 }
 
-dmq_node_list_t* init_dmq_node_list() {
-       dmq_node_list_t* node_list = shm_malloc(sizeof(dmq_node_list_t));
+/**
+ * @brief initialize dmg node list
+ */
+dmq_node_list_t* init_dmq_node_list()
+{
+       dmq_node_list_t* node_list;
+       node_list = shm_malloc(sizeof(dmq_node_list_t));
+       if(node_list==NULL) {
+               LM_ERR("no more shm\n");
+               return NULL;
+       }
        memset(node_list, 0, sizeof(dmq_node_list_t));
        lock_init(&node_list->lock);
        return node_list;
 }
 
-int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode) {
+/**
+ * @brief compare dmq node addresses
+ */
+int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode)
+{
        if(!node || !cmpnode) {
                LM_ERR("cmp_dmq_node - null node received\n");
                return -1;
@@ -45,7 +87,11 @@ int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode) {
               STR_EQ(node->uri.port, cmpnode->uri.port);
 }
 
-str* get_param_value(param_t* params, str* param) {
+/**
+ * @brief get the value of a parameter
+ */
+str* get_param_value(param_t* params, str* param)
+{
        while (params) {
                if ((params->name.len == param->len) &&
                    (strncmp(params->name.s, param->s, param->len) == 0)) {
@@ -56,7 +102,11 @@ str* get_param_value(param_t* params, str* param) {
        return NULL;
 }
 
-int set_dmq_node_params(dmq_node_t* node, param_t* params) {
+/**
+ * @brief set the parameters for the node
+ */
+int set_dmq_node_params(dmq_node_t* node, param_t* params)
+{
        str* status;
        if(!params) {
                LM_DBG("no parameters given\n");
@@ -80,26 +130,45 @@ error:
        return -1;
 }
 
-int set_default_dmq_node_params(dmq_node_t* node) {
+/**
+ * @brief set default node params
+ */
+int set_default_dmq_node_params(dmq_node_t* node)
+{
        node->status = DMQ_NODE_ACTIVE;
        return 0;
 }
 
+/**
+ * @brief build a dmq node
+ */
 dmq_node_t* build_dmq_node(str* uri, int shm) {
-       dmq_node_t* ret;
+       dmq_node_t* ret = NULL;
        param_hooks_t hooks;
        param_t* params;
        
        LM_DBG("build_dmq_node %.*s with %s memory\n", STR_FMT(uri), shm?"shm":"private");
        
        if(shm) {
-               ret = shm_malloc(sizeof(*ret));
-               memset(ret, 0, sizeof(*ret));
-               shm_str_dup(&ret->orig_uri, uri);
+               ret = shm_malloc(sizeof(dmq_node_t));
+               if(ret==NULL) {
+                       LM_ERR("no more shm\n");
+                       goto error;
+               }
+               memset(ret, 0, sizeof(dmq_node_t));
+               if(shm_str_dup(&ret->orig_uri, uri)<0) {
+                       goto error;
+               }
        } else {
-               ret = pkg_malloc(sizeof(*ret));
-               memset(ret, 0, sizeof(*ret));
-               pkg_str_dup(&ret->orig_uri, uri);
+               ret = pkg_malloc(sizeof(dmq_node_t));
+               if(ret==NULL) {
+                       LM_ERR("no more pkg\n");
+                       goto error;
+               }
+               memset(ret, 0, sizeof(dmq_node_t));
+               if(pkg_str_dup(&ret->orig_uri, uri)<0) {
+                       goto error;
+               }
        }
        set_default_dmq_node_params(ret);
        if(parse_uri(ret->orig_uri.s, ret->orig_uri.len, &ret->uri) < 0) {
@@ -131,19 +200,39 @@ dmq_node_t* build_dmq_node(str* uri, int shm) {
                LM_DBG("no dmqnode params found\n");            
        }
        return ret;
+
 error:
+       if(ret!=NULL) {
+               /* tbd: free uri and params */
+               if(shm) {
+                       shm_free(ret);
+               } else {
+                       pkg_free(ret);
+               }
+       }
        return NULL;
 }
 
-dmq_node_t* find_dmq_node_uri(dmq_node_list_t* list, str* uri) {
+/**
+ * @brief find dmq node by uri
+ */
+dmq_node_t* find_dmq_node_uri(dmq_node_list_t* list, str* uri)
+{
        dmq_node_t *ret, *find;
        find =  build_dmq_node(uri, 0);
+       if(find==NULL)
+               return NULL;
        ret = find_dmq_node(list, find);
        destroy_dmq_node(find, 0);
        return ret;
 }
 
-void destroy_dmq_node(dmq_node_t* node, int shm) {
+/**
+ * @brief destroy dmq node
+ */
+void destroy_dmq_node(dmq_node_t* node, int shm)
+{
+       /* tbd: check inner fields */
        if(shm) {
                shm_free_node(node);
        } else {
@@ -151,7 +240,11 @@ void destroy_dmq_node(dmq_node_t* node, int shm) {
        }
 }
 
-dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
+/**
+ * @brief find dmq node
+ */
+dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node)
+{
        dmq_node_t* cur = list->nodes;
        while(cur) {
                if(cmp_dmq_node(cur, node)) {
@@ -162,32 +255,58 @@ dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
        return NULL;
 }
 
-dmq_node_t* shm_dup_node(dmq_node_t* node) {
-       dmq_node_t* newnode = shm_malloc(sizeof(dmq_node_t));
+/**
+ * @brief duplicate dmq node
+ */
+dmq_node_t* shm_dup_node(dmq_node_t* node)
+{
+       dmq_node_t* newnode;
+       newnode = shm_malloc(sizeof(dmq_node_t));
+       if(newnode==NULL) {
+               LM_ERR("no more shm\n");
+               return NULL;
+       }
        memcpy(newnode, node, sizeof(dmq_node_t));
-       shm_str_dup(&newnode->orig_uri, &node->orig_uri);
-       if(parse_uri(newnode->orig_uri.s, newnode->orig_uri.len, &newnode->uri) < 0) {
+       newnode->orig_uri.s = NULL;
+       if(shm_str_dup(&newnode->orig_uri, &node->orig_uri)<0) {
+               goto error;
+       }
+       if(parse_uri(newnode->orig_uri.s, newnode->orig_uri.len,
+                               &newnode->uri) < 0) {
                LM_ERR("error in parsing node uri\n");
                goto error;
        }
        return newnode;
 error:
-       shm_free(newnode->orig_uri.s);
+       if(newnode->orig_uri.s!=NULL)
+               shm_free(newnode->orig_uri.s);
        shm_free(newnode);
        return NULL;
 }
 
-void shm_free_node(dmq_node_t* node) {
+/**
+ * @brief free shm dmq node
+ */
+void shm_free_node(dmq_node_t* node)
+{
        shm_free(node->orig_uri.s);
        shm_free(node);
 }
 
-void pkg_free_node(dmq_node_t* node) {
+/**
+ * @brief free pkg dmq node
+ */
+void pkg_free_node(dmq_node_t* node)
+{
        pkg_free(node->orig_uri.s);
        pkg_free(node);
 }
 
-int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
+/**
+ * @brief delete dmq node
+ */
+int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node)
+{
        dmq_node_t *cur, **prev;
        lock_get(&list->lock);
        cur = list->nodes;
@@ -206,8 +325,14 @@ int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node) {
        return 0;
 }
 
-dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri) {
-       dmq_node_t* newnode = build_dmq_node(uri, 1);
+/**
+ * @brief add dmq node
+ */
+dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri)
+{
+       dmq_node_t* newnode;
+       
+       newnode = build_dmq_node(uri, 1);
        if(!newnode) {
                LM_ERR("error creating node\n");
                goto error;
@@ -223,6 +348,9 @@ error:
        return NULL;
 }
 
+/**
+ * @brief build dmq node string
+ */
 int build_node_str(dmq_node_t* node, char* buf, int buflen) {
        /* sip:host:port;status=[status] */
        int len = 0;
@@ -242,7 +370,8 @@ int build_node_str(dmq_node_t* node, char* buf, int buflen) {
        len += 1;
        memcpy(buf + len, "status=", 7);
        len += 7;
-       memcpy(buf + len, get_status_str(node->status)->s, get_status_str(node->status)->len);
+       memcpy(buf + len, get_status_str(node->status)->s,
+                       get_status_str(node->status)->len);
        len += get_status_str(node->status)->len;
        return len;
 }
index 43b4820..33e1262 100644 (file)
@@ -1,5 +1,30 @@
-#ifndef DMQNODE_H
-#define DMQNODE_H
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef _DMQNODE_H_
+#define _DMQNODE_H_
 
 #include <string.h>
 #include <stdlib.h>
index 494bd5f..a6ce2d4 100644 (file)
@@ -1,6 +1,32 @@
 #include "../../parser/parse_to.h"
 #include "../../parser/parse_uri.h"
 #include "../../sip_msg_clone.h"
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+
 #include "../../parser/parse_content.h"
 #include "../../parser/parse_from.h"
 #include "../../ut.h"
@@ -14,7 +40,11 @@ str dmq_400_rpl  = str_init("Bad request");
 str dmq_500_rpl  = str_init("Server Internal Error");
 str dmq_404_rpl  = str_init("User Not Found");
 
-int dmq_handle_message(struct sip_msg* msg, char* str1, char* str2) {
+/**
+ * @brief config function to handle dmq messages
+ */
+int dmq_handle_message(struct sip_msg* msg, char* str1, char* str2)
+{
        dmq_peer_t* peer;
        struct sip_msg* cloned_msg = NULL;
        int cloned_msg_len;
@@ -43,7 +73,10 @@ int dmq_handle_message(struct sip_msg* msg, char* str1, char* str2) {
                LM_ERR("error cloning sip message\n");
                goto error;
        }
-       add_dmq_job(cloned_msg, peer);
+       if(add_dmq_job(cloned_msg, peer)<0) {
+               LM_ERR("failed to add dmq job\n");
+               goto error;
+       }
        return 0;
 error:
        return -1;
index 03aa2a6..6763960 100644 (file)
@@ -1,3 +1,31 @@
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef _MESSAGE_H_
+#define _MESSAGE_H_
 
 int dmq_handle_message(struct sip_msg*, char*, char*);
 
+#endif
index 4ba6225..3ea0239 100644 (file)
@@ -1,9 +1,39 @@
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+
 #include "notification_peer.h"
 
 static str notification_content_type = str_init("text/plain");
 dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};
 
-int add_notification_peer() {
+/**
+ * @brief add notification peer
+ */
+int add_notification_peer()
+{
        dmq_peer_t not_peer;
        not_peer.callback = dmq_notification_callback;
        not_peer.description.s = "notification_peer";
@@ -28,9 +58,15 @@ error:
        return -1;
 }
 
-dmq_node_t* add_server_and_notify(str* server_address) {
+/**
+ * @brief add a server node and notify it
+ */
+dmq_node_t* add_server_and_notify(str* server_address)
+{
        /* add the notification server to the node list - if any */
-       dmq_node_t* node = add_dmq_node(node_list, server_address);
+       dmq_node_t* node;
+       
+       node = add_dmq_node(node_list, server_address);
        if(!node) {
                LM_ERR("error adding notification node\n");
                goto error;
@@ -58,7 +94,8 @@ error:
  *     sip:host2:port2;param2=value2
  *     ...
  */
-int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg) {
+int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg)
+{
        int content_length, total_nodes = 0;
        str body;
        str tmp_uri;
@@ -115,7 +152,11 @@ error:
        return -1;
 }
 
-int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
+/**
+ * @brief dmq notification callback
+ */
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
+{
        int nodes_recv;
        str* response_body = NULL;
        unsigned int maxforwards = 1;
@@ -137,6 +178,10 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
        nodes_recv = extract_node_list(node_list, msg);
        LM_DBG("received %d new nodes\n", nodes_recv);
        response_body = build_notification_body();
+       if(response_body==NULL) {
+               LM_ERR("no response body\n");
+               goto error;
+       }
        resp->content_type = notification_content_type;
        resp->reason = dmq_200_rpl;
        resp->body = *response_body;
@@ -145,7 +190,8 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) {
        /* if we received any new nodes tell about them to the others */
        if(nodes_recv > 0 && maxforwards > 0) {
                /* maxforwards is set to 0 so that the message is will not be in a spiral */
-               bcast_dmq_message(dmq_notification_peer, response_body, 0, &notification_callback, maxforwards);
+               bcast_dmq_message(dmq_notification_peer, response_body, 0,
+                               &notification_callback, maxforwards);
        }
        LM_DBG("broadcasted message\n");
        pkg_free(response_body);
@@ -161,7 +207,8 @@ error:
  * sip:host2:port2;param2=value2\r\n
  * sip:host3:port3;param3=value3
  */
-str* build_notification_body() {
+str* build_notification_body()
+{
        /* the length of the current line describing the server */
        int slen;
        /* the current length of the body */
@@ -169,10 +216,19 @@ str* build_notification_body() {
        dmq_node_t* cur_node = NULL;
        str* body;
        body = pkg_malloc(sizeof(str));
+       if(body==NULL) {
+               LM_ERR("no more pkg\n");
+               return NULL;
+       }
        memset(body, 0, sizeof(str));
        /* we allocate a chunk of data for the body */
        body->len = NBODY_LEN;
        body->s = pkg_malloc(body->len);
+       if(body->s==NULL) {
+               LM_ERR("no more pkg\n");
+               pkg_free(body);
+               return NULL;
+       }
        /* we add each server to the body - each on a different line */
        lock_get(&node_list->lock);
        cur_node = node_list->nodes;
@@ -198,16 +254,31 @@ error:
        return NULL;
 }
 
-int request_nodelist(dmq_node_t* node, int forward) {
-       str* body = build_notification_body();
+/**
+ * @brief request node list
+ */
+int request_nodelist(dmq_node_t* node, int forward)
+{
+       str* body;
        int ret;
-       ret = dmq_send_message(dmq_notification_peer, body, node, &notification_callback, forward);
+       body = build_notification_body();
+       if(body==NULL) {
+               LM_ERR("no notification body\n");
+               return -1;
+       }
+       ret = dmq_send_message(dmq_notification_peer, body, node,
+                       &notification_callback, forward);
        pkg_free(body->s);
        pkg_free(body);
        return ret;
 }
 
-int notification_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param) {
+/**
+ * @brief notification response callback
+ */
+int notification_resp_callback_f(struct sip_msg* msg, int code,
+               dmq_node_t* node, void* param)
+{
        int ret;
        LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
        if(code == 408) {
index 001b530..a25371a 100644 (file)
@@ -1,3 +1,30 @@
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#ifndef _NOTIFICATION_PEER_H_
+#define _NOTIFICATION_PEER_H_
+
 #include "../../parser/msg_parser.h"
 #include "../../parser/parse_content.h"
 #include "../../ut.h"
@@ -22,6 +49,8 @@ int request_nodelist(dmq_node_t* node, int forward);
 dmq_node_t* add_server_and_notify(str* server_address);
 
 /* helper functions */
-extern int notification_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
+extern int notification_resp_callback_f(struct sip_msg* msg, int code,
+               dmq_node_t* node, void* param);
 extern dmq_resp_cback_t notification_callback;
 
+#endif
index 3fa91df..62f1274 100644 (file)
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
 #include "peer.h"
 #include "dmq.h"
 
-dmq_peer_list_t* init_peer_list() {
-       dmq_peer_list_t* peer_list = shm_malloc(sizeof(dmq_peer_list_t));
+/**
+ * @brief init peer list
+ */
+dmq_peer_list_t* init_peer_list()
+{
+       dmq_peer_list_t* peer_list;
+       peer_list = shm_malloc(sizeof(dmq_peer_list_t));
+       if(peer_list==NULL) {
+               LM_ERR("no more shm\n");
+               return NULL;
+       }
        memset(peer_list, 0, sizeof(dmq_peer_list_t));
        lock_init(&peer_list->lock);
        return peer_list;
 }
 
-dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
-       dmq_peer_t* cur = peer_list->peers;
+/**
+ * @brief search peer list
+ */
+dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer)
+{
+       dmq_peer_t* crt;
        int len;
-       while(cur) {
+       crt = peer_list->peers;
+       while(crt) {
                /* len - the minimum length of the two strings */
-               len = cur->peer_id.len < peer->peer_id.len ? cur->peer_id.len:peer->peer_id.len;
-               if(strncasecmp(cur->peer_id.s, peer->peer_id.s, len) == 0) {
-                       return cur;
+               len = (crt->peer_id.len < peer->peer_id.len)
+                       ? crt->peer_id.len:peer->peer_id.len;
+               if(strncasecmp(crt->peer_id.s, peer->peer_id.s, len) == 0) {
+                       return crt;
                }
-               cur = cur->next;
+               crt = crt->next;
        }
        return 0;
 }
 
-dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
-       dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t));
+/**
+ * @brief add peer
+ */
+dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer)
+{
+       dmq_peer_t* new_peer = NULL;
+       
+       new_peer = shm_malloc(sizeof(dmq_peer_t));
+       if(new_peer==NULL) {
+               LM_ERR("no more shm\n");
+               return NULL;
+       }
        *new_peer = *peer;
        
        /* copy the str's */
        new_peer->peer_id.s = shm_malloc(peer->peer_id.len);
+       if(new_peer->peer_id.s==NULL) {
+               LM_ERR("no more shm\n");
+               shm_free(new_peer);
+               return NULL;
+       }
        memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len);
+       new_peer->peer_id.len = peer->peer_id.len;
+
        new_peer->description.s = shm_malloc(peer->description.len);
+       if(new_peer->description.s==NULL) {
+               LM_ERR("no more shm\n");
+               shm_free(new_peer->peer_id.s);
+               shm_free(new_peer);
+               return NULL;
+       }
        memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len);
+       new_peer->peer_id.len = peer->peer_id.len;
        
        new_peer->next = peer_list->peers;
        peer_list->peers = new_peer;
        return new_peer;
 }
 
-dmq_peer_t* find_peer(str peer_id) {
+/**
+ * @brief find peer by id
+ */
+dmq_peer_t* find_peer(str peer_id)
+{
        dmq_peer_t foo_peer;
        foo_peer.peer_id = peer_id;
        return search_peer_list(peer_list, &foo_peer);
 }
 
-int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp) {
+/**
+ * @empty callback
+ */
+int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp)
+{
        return 0;
 }
 
index 5ccb980..b7b425b 100644 (file)
@@ -1,5 +1,30 @@
-#ifndef PEER_H
-#define PEER_H
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+
+#ifndef _PEER_H_
+#define _PEER_H_
 
 #include <string.h>
 #include <stdlib.h>
index 9da3b9f..a265712 100644 (file)
@@ -1,10 +1,37 @@
+/*
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
 #include "dmq.h"
 #include "peer.h"
 #include "worker.h"
 #include "../../data_lump_rpl.h"
 #include "../../mod_fix.h"
 
-/* set the body of a response */
+/**
+ * @brief set the body of a response
+ */
 static int set_reply_body(struct sip_msg* msg, str* body, str* content_type)
 {
        char* buf;
@@ -41,16 +68,23 @@ static int set_reply_body(struct sip_msg* msg, str* body, str* content_type)
        return 1;
 }
 
-void worker_loop(int id) {
-       dmq_worker_t* worker = &workers[id];
+/**
+ * @brief dmq worker loop
+ */
+void worker_loop(int id)
+{
+       dmq_worker_t* worker;
        dmq_job_t* current_job;
        peer_reponse_t peer_response;
        int ret_value;
+
+       worker = &workers[id];
        for(;;) {
                LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
                lock_get(&worker->lock);
                LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
-               /* multiple lock_release calls might be performed, so remove from queue until empty */
+               /* multiple lock_release calls might be performed, so remove
+                * from queue until empty */
                do {
                        /* fill the response with 0's */
                        memset(&peer_response, 0, sizeof(peer_response));
@@ -64,13 +98,15 @@ void worker_loop(int id) {
                                }
                                /* add the body to the reply */
                                if(peer_response.body.s) {
-                                       if(set_reply_body(current_job->msg, &peer_response.body, &peer_response.content_type) < 0) {
+                                       if(set_reply_body(current_job->msg, &peer_response.body,
+                                                               &peer_response.content_type) < 0) {
                                                LM_ERR("error adding lumps\n");
                                                continue;
                                        }
                                }
                                /* send the reply */
-                               if(slb.freply(current_job->msg, peer_response.resp_code, &peer_response.reason) < 0)
+                               if(slb.freply(current_job->msg, peer_response.resp_code,
+                                                       &peer_response.reason) < 0)
                                {
                                        LM_ERR("error sending reply\n");
                                }
@@ -89,10 +125,17 @@ void worker_loop(int id) {
        }
 }
 
-int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
+/**
+ * @brief add a dmq job
+ */
+int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer)
+{
        int i, found_available = 0;
+       int ret;
        dmq_job_t new_job = { 0 };
        dmq_worker_t* worker;
+
+       ret = 0;
        new_job.f = peer->callback;
        new_job.msg = msg;
        new_job.orig_peer = peer;
@@ -102,26 +145,33 @@ int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
        }
        /* initialize the worker with the first one */
        worker = workers;
-       /* search for an available worker, or, if not possible, for the least busy one */
+       /* search for an available worker, or, if not possible,
+        * for the least busy one */
        for(i = 0; i < num_workers; i++) {
                if(job_queue_size(workers[i].queue) == 0) {
                        worker = &workers[i];
                        found_available = 1;
                        break;
-               } else if(job_queue_size(workers[i].queue) < job_queue_size(worker->queue)) {
+               } else if(job_queue_size(workers[i].queue)
+                               < job_queue_size(worker->queue)) {
                        worker = &workers[i];
                }
        }
        if(!found_available) {
-               LM_DBG("no available worker found, passing job to the least busy one [%d %d]\n",
-                      worker->pid, job_queue_size(worker->queue));
+               LM_DBG("no available worker found, passing job"
+                               " to the least busy one [%d %d]\n",
+                               worker->pid, job_queue_size(worker->queue));
        }
-       job_queue_push(worker->queue, &new_job);
+       ret = job_queue_push(worker->queue, &new_job);
        lock_release(&worker->lock);
-       return 0;
+       return ret;
 }
 
-void init_worker(dmq_worker_t* worker) {
+/**
+ * @brief init dmq worker
+ */
+void init_worker(dmq_worker_t* worker)
+{
        memset(worker, 0, sizeof(*worker));
        lock_init(&worker->lock);
        // acquire the lock for the first time - so that dmq_worker_loop blocks
@@ -129,26 +179,55 @@ void init_worker(dmq_worker_t* worker) {
        worker->queue = alloc_job_queue();
 }
 
-job_queue_t* alloc_job_queue() {
-       job_queue_t* queue = shm_malloc(sizeof(job_queue_t));
+/**
+ * @brief allog dmq job queue
+ */
+job_queue_t* alloc_job_queue()
+{
+       job_queue_t* queue;
+       
+       queue = shm_malloc(sizeof(job_queue_t));
+       if(queue==NULL) {
+               LM_ERR("no more shm\n");
+               return NULL;
+       }
+       memset(queue, 0, sizeof(job_queue_t));
        atomic_set(&queue->count, 0);
-       queue->front = NULL;
-       queue->back = NULL;
        lock_init(&queue->lock);
        return queue;
 }
 
-void destroy_job_queue(job_queue_t* queue) {
-       shm_free(queue);
+/**
+ * @ brief destroy job queue
+ */
+void destroy_job_queue(job_queue_t* queue)
+{
+       if(queue!=NULL)
+               shm_free(queue);
 }
 
-int job_queue_size(job_queue_t* queue) {
+/**
+ * @brief return job queue size
+ */
+int job_queue_size(job_queue_t* queue)
+{
        return atomic_get(&queue->count);
 }
 
-void job_queue_push(job_queue_t* queue, dmq_job_t* job) {
+/**
+ * @brief push to job queue
+ */
+int job_queue_push(job_queue_t* queue, dmq_job_t* job)
+{
        /* we need to copy the dmq_job into a newly created dmq_job in shm */
-       dmq_job_t* newjob = shm_malloc(sizeof(dmq_job_t));
+       dmq_job_t* newjob;
+       
+       newjob = shm_malloc(sizeof(dmq_job_t));
+       if(newjob==NULL) {
+               LM_ERR("no more shm\n");
+               return -1;
+       }
+
        *newjob = *job;
        
        lock_get(&queue->lock);
@@ -163,8 +242,14 @@ void job_queue_push(job_queue_t* queue, dmq_job_t* job) {
        }
        atomic_inc(&queue->count);
        lock_release(&queue->lock);
+       return 0;
 }
-dmq_job_t* job_queue_pop(job_queue_t* queue) {
+
+/**
+ * @brief pop from job queue
+ */
+dmq_job_t* job_queue_pop(job_queue_t* queue)
+{
        dmq_job_t* front;
        lock_get(&queue->lock);
        if(!queue->front) {
index d6f8dbd..bda80b4 100644 (file)
@@ -1,5 +1,29 @@
-#ifndef DMQ_WORKER_H
-#define DMQ_WORKER_H
+/**
+ * $Id$
+ *
+ * dmq module - distributed message queue
+ *
+ * Copyright (C) 2011 Bucur Marius - Ovidiu
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#ifndef _DMQ_WORKER_H_
+#define _DMQ_WORKER_H_
 
 #include "peer.h"
 #include "../../locking.h"
@@ -36,7 +60,7 @@ void worker_loop(int id);
 
 job_queue_t* alloc_job_queue();
 void destroy_job_queue(job_queue_t* queue);
-void job_queue_push(job_queue_t* queue, dmq_job_t* job);
+int job_queue_push(job_queue_t* queue, dmq_job_t* job);
 dmq_job_t* job_queue_pop(job_queue_t* queue);
 int job_queue_size(job_queue_t* queue);