dmq: use module prefix for global variables
authorDaniel-Constantin Mierla <miconda@gmail.com>
Mon, 18 May 2020 10:06:05 +0000 (12:06 +0200)
committerDaniel-Constantin Mierla <miconda@gmail.com>
Mon, 18 May 2020 10:06:05 +0000 (12:06 +0200)
- avoid potential conflicts with other globals, given that this module
is used by other modules to perform replication

14 files changed:
src/modules/dmq/bind_dmq.h
src/modules/dmq/dmq.c
src/modules/dmq/dmq.h
src/modules/dmq/dmq_funcs.c
src/modules/dmq/dmq_funcs.h
src/modules/dmq/dmqnode.c
src/modules/dmq/dmqnode.h
src/modules/dmq/message.h
src/modules/dmq/notification_peer.c
src/modules/dmq/notification_peer.h
src/modules/dmq/peer.c
src/modules/dmq/peer.h
src/modules/dmq/worker.c
src/modules/dmq/worker.h

index 8b4694e..78a09cb 100644 (file)
@@ -16,8 +16,8 @@
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
- * You should have received a copy of the GNU General Public License 
- * along with this program; if not, write to the Free Software 
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
  */
 
index 4fa61d9..509c42c 100644 (file)
 #include "notification_peer.h"
 #include "dmqnode.h"
 
-static int mod_init(void);
-static int child_init(int);
-static void destroy(void);
-
 MODULE_VERSION
 
-int startup_time = 0;
-int pid = 0;
+int dmq_startup_time = 0;
+int dmq_pid = 0;
 
 /* module parameters */
-int num_workers = DEFAULT_NUM_WORKERS;
-int worker_usleep = 0;
+int dmq_num_workers = DEFAULT_NUM_WORKERS;
+int dmq_worker_usleep = 0;
 str dmq_server_address = {0, 0};
 str dmq_server_socket = {0, 0};
-struct sip_uri dmq_server_uri;
+sip_uri_t dmq_server_uri = {0};
 
 str dmq_notification_address = {0, 0};
-int multi_notify = 0;
-struct sip_uri dmq_notification_uri;
-int ping_interval = 60;
+int dmq_multi_notify = 0;
+sip_uri_t dmq_notification_uri = {0};
+int dmq_ping_interval = 60;
 
 /* TM bind */
-struct tm_binds tmb;
+struct tm_binds tmb = {0};
 /* SL API structure */
-sl_api_t slb;
+sl_api_t slb = {0};
 
 /** module variables */
 str dmq_request_method = str_init("KDMQ");
-dmq_worker_t *workers = NULL;
-dmq_peer_list_t *peer_list = 0;
+dmq_worker_t *dmq_workers = NULL;
+dmq_peer_list_t *dmq_peer_list = 0;
 /* the list of dmq servers */
-dmq_node_list_t *node_list = NULL;
-// the dmq module is a peer itself for receiving notifications regarding nodes
+dmq_node_list_t *dmq_node_list = NULL;
+/* dmq module is a peer itself for receiving notifications regarding nodes */
 dmq_peer_t *dmq_notification_peer = NULL;
 
 /** module functions */
@@ -109,12 +105,12 @@ static cmd_export_t cmds[] = {
 };
 
 static param_export_t params[] = {
-       {"num_workers", INT_PARAM, &num_workers},
-       {"ping_interval", INT_PARAM, &ping_interval},
+       {"num_workers", INT_PARAM, &dmq_num_workers},
+       {"ping_interval", INT_PARAM, &dmq_ping_interval},
        {"server_address", PARAM_STR, &dmq_server_address},
        {"notification_address", PARAM_STR, &dmq_notification_address},
-       {"multi_notify", INT_PARAM, &multi_notify},
-       {"worker_usleep", INT_PARAM, &worker_usleep},
+       {"multi_notify", INT_PARAM, &dmq_multi_notify},
+       {"worker_usleep", INT_PARAM, &dmq_worker_usleep},
        {0, 0, 0}
 };
 
@@ -184,15 +180,15 @@ static int mod_init(void)
        }
 
        /* load peer list - the list containing the module callbacks for dmq */
-       peer_list = init_peer_list();
-       if(peer_list == NULL) {
+       dmq_peer_list = init_peer_list();
+       if(dmq_peer_list == NULL) {
                LM_ERR("cannot initialize peer list\n");
                return -1;
        }
 
        /* load the dmq node list - the list containing the dmq servers */
-       node_list = init_dmq_node_list();
-       if(node_list == NULL) {
+       dmq_node_list = init_dmq_node_list();
+       if(dmq_node_list == NULL) {
                LM_ERR("cannot initialize node list\n");
                return -1;
        }
@@ -203,7 +199,7 @@ static int mod_init(void)
        }
 
        /* register worker processes - add one because of the ping process */
-       register_procs(num_workers);
+       register_procs(dmq_num_workers);
 
        /* check server_address and notification_address are not empty and correct */
        if(parse_uri(dmq_server_address.s, dmq_server_address.len, &dmq_server_uri)
@@ -230,12 +226,12 @@ static int mod_init(void)
        }
 
        /* allocate workers array */
-       workers = shm_malloc(num_workers * sizeof(dmq_worker_t));
-       if(workers == NULL) {
+       dmq_workers = shm_malloc(dmq_num_workers * sizeof(dmq_worker_t));
+       if(dmq_workers == NULL) {
                LM_ERR("error in shm_malloc\n");
                return -1;
        }
-       memset(workers, 0, num_workers * sizeof(dmq_worker_t));
+       memset(dmq_workers, 0, dmq_num_workers * sizeof(dmq_worker_t));
 
        dmq_init_callback_done = shm_malloc(sizeof(int));
        if(!dmq_init_callback_done) {
@@ -253,16 +249,16 @@ static int mod_init(void)
                return -1;
        }
 
-       startup_time = (int)time(NULL);
+       dmq_startup_time = (int)time(NULL);
 
        /**
         * add the ping timer
         * it pings the servers once in a while so that we know which failed
         */
-       if(ping_interval < MIN_PING_INTERVAL) {
-               ping_interval = MIN_PING_INTERVAL;
+       if(dmq_ping_interval < MIN_PING_INTERVAL) {
+               dmq_ping_interval = MIN_PING_INTERVAL;
        }
-       if(register_timer(ping_servers, 0, ping_interval) < 0) {
+       if(register_timer(ping_servers, 0, dmq_ping_interval) < 0) {
                LM_ERR("cannot register timer callback\n");
                return -1;
        }
@@ -278,8 +274,8 @@ static int child_init(int rank)
        int i, newpid;
 
        if(rank == PROC_INIT) {
-               for(i = 0; i < num_workers; i++) {
-                       if (init_worker(&workers[i]) < 0) {
+               for(i = 0; i < dmq_num_workers; i++) {
+                       if (init_worker(&dmq_workers[i]) < 0) {
                                LM_ERR("failed to init struct for worker[%d]\n", i);
                                return -1;
                        }
@@ -289,7 +285,7 @@ static int child_init(int rank)
 
        if(rank == PROC_MAIN) {
                /* fork worker processes */
-               for(i = 0; i < num_workers; i++) {
+               for(i = 0; i < dmq_num_workers; i++) {
                        LM_DBG("starting worker process %d\n", i);
                        newpid = fork_process(PROC_RPC, "DMQ WORKER", 0);
                        if(newpid < 0) {
@@ -299,7 +295,7 @@ static int child_init(int rank)
                                /* child - this will loop forever */
                                worker_loop(i);
                        } else {
-                               workers[i].pid = newpid;
+                               dmq_workers[i].pid = newpid;
                        }
                }
                /* notification_node - the node from which the Kamailio instance
@@ -309,9 +305,9 @@ static int child_init(int rank)
                 * a master in this architecture
                 */
                if(dmq_notification_address.s) {
-                       notification_node =
+                       dmq_notification_node =
                                        add_server_and_notify(&dmq_notification_address);
-                       if(!notification_node) {
+                       if(!dmq_notification_node) {
                                LM_WARN("cannot retrieve initial nodelist from %.*s\n",
                                                STR_FMT(&dmq_notification_address));
                        }
@@ -323,7 +319,7 @@ static int child_init(int rank)
                return 0;
        }
 
-       pid = my_pid();
+       dmq_pid = my_pid();
        return 0;
 }
 
@@ -333,10 +329,10 @@ static int child_init(int rank)
 static void destroy(void)
 {
        /* TODO unregister dmq node, free resources */
-       if(dmq_notification_address.s && notification_node && self_node) {
-               LM_DBG("unregistering node %.*s\n", STR_FMT(&self_node->orig_uri));
-               self_node->status = DMQ_NODE_DISABLED;
-               request_nodelist(notification_node, 1);
+       if(dmq_notification_address.s && dmq_notification_node && dmq_self_node) {
+               LM_DBG("unregistering node %.*s\n", STR_FMT(&dmq_self_node->orig_uri));
+               dmq_self_node->status = DMQ_NODE_DISABLED;
+               request_nodelist(dmq_notification_node, 1);
        }
        if(dmq_server_socket.s) {
                pkg_free(dmq_server_socket.s);
@@ -349,7 +345,7 @@ static void destroy(void)
 static void dmq_rpc_list_nodes(rpc_t *rpc, void *c)
 {
        void *h;
-       dmq_node_t *cur = node_list->nodes;
+       dmq_node_t *cur = dmq_node_list->nodes;
        char ip[IP6_MAX_STR_SIZE + 1];
 
        while(cur) {
index 411e41a..dbf5cfb 100644 (file)
@@ -15,8 +15,8 @@
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
- * You should have received a copy of the GNU General Public License 
- * along with this program; if not, write to the Free Software 
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
  */
 
 #define DEFAULT_NUM_WORKERS 2
 #define MIN_PING_INTERVAL 5
 
-extern int num_workers;
-extern int worker_usleep;
-extern dmq_worker_t *workers;
+extern int dmq_num_workers;
+extern int dmq_worker_usleep;
+extern dmq_worker_t *dmq_workers;
 extern dmq_peer_t *dmq_notification_peer;
 extern str dmq_server_address;
-extern dmq_peer_list_t *peer_list;
+extern dmq_peer_list_t *dmq_peer_list;
 extern str dmq_request_method;
 extern str dmq_server_socket;
-extern struct sip_uri dmq_server_uri;
+extern sip_uri_t dmq_server_uri;
 extern str dmq_notification_address;
-extern int multi_notify;
-extern struct sip_uri dmq_notification_uri;
+extern int dmq_multi_notify;
+extern sip_uri_t dmq_notification_uri;
 /* sl and tm */
 extern struct tm_binds tmb;
 extern sl_api_t slb;
index 27afbc4..f9fb5c7 100644 (file)
@@ -15,8 +15,8 @@
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
- * You should have received a copy of the GNU General Public License 
- * along with this program; if not, write to the Free Software 
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
  */
 
 dmq_peer_t *register_dmq_peer(dmq_peer_t *peer)
 {
        dmq_peer_t *new_peer;
-       if(!peer_list) {
+       if(!dmq_peer_list) {
                LM_ERR("peer list not initialized\n");
                return NULL;
        }
-       lock_get(&peer_list->lock);
-       if(search_peer_list(peer_list, peer)) {
+       lock_get(&dmq_peer_list->lock);
+       if(search_peer_list(dmq_peer_list, peer)) {
                LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len,
                                peer->peer_id.s, peer->description.len, peer->description.s);
-               lock_release(&peer_list->lock);
+               lock_release(&dmq_peer_list->lock);
                return NULL;
        }
-       new_peer = add_peer(peer_list, peer);
-       lock_release(&peer_list->lock);
+       new_peer = add_peer(dmq_peer_list, peer);
+       lock_release(&dmq_peer_list->lock);
        return new_peer;
 }
 
@@ -124,8 +124,8 @@ int is_from_remote_node(sip_msg_t *msg)
 
        ip = &msg->rcv.src_ip;
 
-       lock_get(&node_list->lock);
-       node = node_list->nodes;
+       lock_get(&dmq_node_list->lock);
+       node = dmq_node_list->nodes;
 
        while(node) {
                if(!node->local && ip_addr_cmp(ip, &node->ip_address)) {
@@ -135,7 +135,7 @@ int is_from_remote_node(sip_msg_t *msg)
                node = node->next;
        }
 done:
-       lock_release(&node_list->lock);
+       lock_release(&dmq_node_list->lock);
        return result;
 }
 
@@ -153,8 +153,8 @@ int bcast_dmq_message1(dmq_peer_t *peer, str *body, dmq_node_t *except,
 {
        dmq_node_t *node;
 
-       lock_get(&node_list->lock);
-       node = node_list->nodes;
+       lock_get(&dmq_node_list->lock);
+       node = dmq_node_list->nodes;
        while(node) {
                /* we do not send the message to the following:
                 *   - the except node
@@ -175,10 +175,10 @@ int bcast_dmq_message1(dmq_peer_t *peer, str *body, dmq_node_t *except,
                }
                node = node->next;
        }
-       lock_release(&node_list->lock);
+       lock_release(&dmq_node_list->lock);
        return 0;
 error:
-       lock_release(&node_list->lock);
+       lock_release(&dmq_node_list->lock);
        return -1;
 }
 
@@ -295,13 +295,13 @@ int ki_dmq_send_message(sip_msg_t *msg, str *peer_str, str *to_str,
                        goto error;
                }
        }
-       dmq_node_t *to_dmq_node = find_dmq_node_uri(node_list, to_str);
+       dmq_node_t *to_dmq_node = find_dmq_node_uri(dmq_node_list, to_str);
        if(!to_dmq_node) {
                LM_ERR("cannot find dmq_node: %.*s\n", to_str->len, to_str->s);
                goto error;
        }
        if(dmq_send_message(destination_peer, body_str, to_dmq_node,
-                          &notification_callback, 1, ct_str)
+                          &dmq_notification_resp_callback, 1, ct_str)
                        < 0) {
                LM_ERR("cannot send dmq message\n");
                goto error;
@@ -366,7 +366,7 @@ int ki_dmq_bcast_message(sip_msg_t *msg, str *peer_str, str *body_str,
                        goto error;
                }
        }
-       if(bcast_dmq_message(destination_peer, body_str, 0, &notification_callback,
+       if(bcast_dmq_message(destination_peer, body_str, 0, &dmq_notification_resp_callback,
                           1, ct_str) < 0) {
                LM_ERR("cannot send dmq message\n");
                goto error;
@@ -425,8 +425,8 @@ int ki_dmq_t_replicate_mode(struct sip_msg *msg, int mode)
                set_force_socket(msg, sock);
        }
 
-       lock_get(&node_list->lock);
-       node = node_list->nodes;
+       lock_get(&dmq_node_list->lock);
+       node = dmq_node_list->nodes;
        while(node) {
                /* we do not send the message to the following:
                 *   - ourself
@@ -456,10 +456,10 @@ int ki_dmq_t_replicate_mode(struct sip_msg *msg, int mode)
 
                node = node->next;
        }
-       lock_release(&node_list->lock);
+       lock_release(&dmq_node_list->lock);
        return 0;
 error:
-       lock_release(&node_list->lock);
+       lock_release(&dmq_node_list->lock);
        return -1;
 }
 
@@ -504,15 +504,15 @@ void ping_servers(unsigned int ticks, void *param)
        int ret;
        LM_DBG("ping_servers\n");
 
-       if(!node_list->nodes
-                       || (node_list->nodes->local && !node_list->nodes->next)) {
+       if(!dmq_node_list->nodes
+                       || (dmq_node_list->nodes->local && !dmq_node_list->nodes->next)) {
                LM_DBG("node list is empty - attempt to rebuild from notification "
                           "address\n");
                *dmq_init_callback_done = 0;
                if(dmq_notification_address.s) {
-                       notification_node =
+                       dmq_notification_node =
                                        add_server_and_notify(&dmq_notification_address);
-                       if(!notification_node) {
+                       if(!dmq_notification_node) {
                                LM_ERR("cannot retrieve initial nodelist from %.*s\n",
                                                STR_FMT(&dmq_notification_address));
                        }
@@ -528,7 +528,7 @@ void ping_servers(unsigned int ticks, void *param)
                return;
        }
        ret = bcast_dmq_message1(dmq_notification_peer, body, NULL,
-                       &notification_callback, 1, &notification_content_type, 1);
+                       &dmq_notification_resp_callback, 1, &dmq_notification_content_type, 1);
        pkg_free(body->s);
        pkg_free(body);
        if(ret < 0) {
index a9c6da5..4cec59d 100644 (file)
@@ -15,8 +15,8 @@
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
- * You should have received a copy of the GNU General Public License 
- * along with this program; if not, write to the Free Software 
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
  */
 
index 4ed926f..2eb3b1c 100644 (file)
@@ -26,8 +26,8 @@
 #include "dmqnode.h"
 #include "dmq.h"
 
-dmq_node_t *self_node;
-dmq_node_t *notification_node;
+dmq_node_t *dmq_self_node;
+dmq_node_t *dmq_notification_node;
 
 /* name */
 str dmq_node_status_str = str_init("status");
@@ -240,7 +240,7 @@ dmq_node_t *find_dmq_node_uri(dmq_node_list_t *list, str *uri)
 
 dmq_node_t *find_dmq_node_uri2(str *uri)
 {
-       return find_dmq_node_uri(node_list, uri);
+       return find_dmq_node_uri(dmq_node_list, uri);
 }
 
 /**
index fb0a9dc..45cac60 100644 (file)
@@ -15,8 +15,8 @@
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
- * You should have received a copy of the GNU General Public License 
- * along with this program; if not, write to the Free Software 
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
  */
 
@@ -59,7 +59,7 @@ typedef struct dmq_node_list
 } dmq_node_list_t;
 
 extern str dmq_node_status_str;
-extern dmq_node_list_t *node_list;
+extern dmq_node_list_t *dmq_node_list;
 
 dmq_node_list_t *init_dmq_node_list();
 dmq_node_t *build_dmq_node(str *uri, int shm);
@@ -80,7 +80,7 @@ int set_dmq_node_params(dmq_node_t *node, param_t *params);
 str *dmq_get_status_str(int status);
 int build_node_str(dmq_node_t *node, char *buf, int buflen);
 
-extern dmq_node_t *self_node;
-extern dmq_node_t *notification_node;
+extern dmq_node_t *dmq_self_node;
+extern dmq_node_t *dmq_notification_node;
 
 #endif
index d47e421..d31386c 100644 (file)
@@ -15,8 +15,8 @@
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
- * You should have received a copy of the GNU General Public License 
- * along with this program; if not, write to the Free Software 
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
  */
 
index 769b0e3..4cb44c0 100644 (file)
@@ -27,8 +27,8 @@
 #define MAXDMQURILEN 255
 #define MAXDMQHOSTS 30
 
-str notification_content_type = str_init("text/plain");
-dmq_resp_cback_t notification_callback = {&notification_resp_callback_f, 0};
+str dmq_notification_content_type = str_init("text/plain");
+dmq_resp_cback_t dmq_notification_resp_callback = {&notification_resp_callback_f, 0};
 
 int *dmq_init_callback_done = 0;
 
@@ -41,7 +41,7 @@ int add_notification_peer()
        dmq_peer_t not_peer;
 
        memset(&not_peer, 0, sizeof(dmq_peer_t));
-       not_peer.callback = dmq_notification_callback;
+       not_peer.callback = dmq_notification_callback_f;
        not_peer.init_callback = NULL;
        not_peer.description.s = "notification_peer";
        not_peer.description.len = 17;
@@ -53,14 +53,14 @@ int add_notification_peer()
                goto error;
        }
        /* add itself to the node list */
-       self_node = add_dmq_node(node_list, &dmq_server_address);
-       if(!self_node) {
+       dmq_self_node = add_dmq_node(dmq_node_list, &dmq_server_address);
+       if(!dmq_self_node) {
                LM_ERR("error adding self node\n");
                goto error;
        }
        /* local node - only for self */
-       self_node->local = 1;
-       self_node->status = DMQ_NODE_ACTIVE;
+       dmq_self_node->local = 1;
+       dmq_self_node->status = DMQ_NODE_ACTIVE;
        return 0;
 error:
        return -1;
@@ -296,8 +296,8 @@ dmq_node_t *add_server_and_notify(str *paddr)
        * o process list
        **********/
 
-       if(!multi_notify) {
-               pfirst = add_dmq_node(node_list, paddr);
+       if(!dmq_multi_notify) {
+               pfirst = add_dmq_node(dmq_node_list, paddr);
        } else {
                /**********
                * o init data area
@@ -319,8 +319,8 @@ dmq_node_t *add_server_and_notify(str *paddr)
                for(index = 0; index < host_cnt; index++) {
                        pstr->s = puri_list[index];
                        pstr->len = strlen(puri_list[index]);
-                       if(!find_dmq_node_uri(node_list, pstr)) { // check for duplicates
-                               pnode = add_dmq_node(node_list, pstr);
+                       if(!find_dmq_node_uri(dmq_node_list, pstr)) { // check for duplicates
+                               pnode = add_dmq_node(dmq_node_list, pstr);
                                if(pnode && !pfirst) {
                                        pfirst = pnode;
                                }
@@ -436,11 +436,11 @@ int run_init_callbacks()
 {
        dmq_peer_t *crt;
 
-       if(peer_list == 0) {
+       if(dmq_peer_list == 0) {
                LM_WARN("peer list is null\n");
                return 0;
        }
-       crt = peer_list->peers;
+       crt = dmq_peer_list->peers;
        while(crt) {
                if(crt->init_callback) {
                        crt->init_callback();
@@ -454,7 +454,7 @@ int run_init_callbacks()
 /**
  * @brief dmq notification callback
  */
-int dmq_notification_callback(
+int dmq_notification_callback_f(
                struct sip_msg *msg, peer_reponse_t *resp, dmq_node_t *dmq_node)
 {
        int nodes_recv;
@@ -474,14 +474,14 @@ int dmq_notification_callback(
                        maxforwards--;
                }
        }
-       nodes_recv = extract_node_list(node_list, msg);
+       nodes_recv = extract_node_list(dmq_node_list, msg);
        LM_DBG("received %d new or changed nodes\n", nodes_recv);
        response_body = build_notification_body();
        if(response_body == NULL) {
                LM_ERR("no response body\n");
                goto error;
        }
-       resp->content_type = notification_content_type;
+       resp->content_type = dmq_notification_content_type;
        resp->reason = dmq_200_rpl;
        resp->body = *response_body;
        resp->resp_code = 200;
@@ -490,8 +490,8 @@ int dmq_notification_callback(
        if(nodes_recv > 0 && maxforwards > 0) {
                /* maxforwards is set to 0 so that the message is will not be in a spiral */
                bcast_dmq_message(dmq_notification_peer, response_body, 0,
-                               &notification_callback, maxforwards,
-                               &notification_content_type);
+                               &dmq_notification_resp_callback, maxforwards,
+                               &dmq_notification_content_type);
        }
        pkg_free(response_body);
        if(dmq_init_callback_done && !*dmq_init_callback_done) {
@@ -533,8 +533,8 @@ str *build_notification_body()
                return NULL;
        }
        /* we add each server to the body - each on a different line */
-       lock_get(&node_list->lock);
-       cur_node = node_list->nodes;
+       lock_get(&dmq_node_list->lock);
+       cur_node = dmq_node_list->nodes;
        while(cur_node) {
                if (cur_node->local || cur_node->status == DMQ_NODE_ACTIVE) {
                        LM_DBG("body_len = %d - clen = %d\n", body->len, clen);
@@ -550,11 +550,11 @@ str *build_notification_body()
                }
                cur_node = cur_node->next;
        }
-       lock_release(&node_list->lock);
+       lock_release(&dmq_node_list->lock);
        body->len = clen;
        return body;
 error:
-       lock_release(&node_list->lock);
+       lock_release(&dmq_node_list->lock);
        pkg_free(body->s);
        pkg_free(body);
        return NULL;
@@ -573,7 +573,8 @@ int request_nodelist(dmq_node_t *node, int forward)
                return -1;
        }
        ret = bcast_dmq_message1(dmq_notification_peer, body, NULL,
-                       &notification_callback, forward, &notification_content_type, 1);
+                       &dmq_notification_resp_callback, forward,
+                       &dmq_notification_content_type, 1);
        pkg_free(body->s);
        pkg_free(body);
        return ret;
@@ -591,8 +592,8 @@ int notification_resp_callback_f(
        LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param);
        if(code == 200) {
                /* be sure that the node that answered is in active state */
-               update_dmq_node_status(node_list, node, DMQ_NODE_ACTIVE);
-               nodes_recv = extract_node_list(node_list, msg);
+               update_dmq_node_status(dmq_node_list, node, DMQ_NODE_ACTIVE);
+               nodes_recv = extract_node_list(dmq_node_list, msg);
                LM_DBG("received %d new or changed nodes\n", nodes_recv);
                if(dmq_init_callback_done && !*dmq_init_callback_done) {
                        *dmq_init_callback_done = 1;
@@ -601,18 +602,18 @@ int notification_resp_callback_f(
        } else if(code == 408) {
                if(STR_EQ(node->orig_uri, dmq_notification_address)) {
                        LM_ERR("not deleting notification_peer\n");
-                       update_dmq_node_status(node_list, node, DMQ_NODE_PENDING);      
+                       update_dmq_node_status(dmq_node_list, node, DMQ_NODE_PENDING);  
                        return 0;
                }
                if (node->status == DMQ_NODE_DISABLED) {
                        /* deleting node - the server did not respond */
                        LM_ERR("deleting server %.*s because of failed request\n",
                                STR_FMT(&node->orig_uri));
-                       ret = del_dmq_node(node_list, node);
+                       ret = del_dmq_node(dmq_node_list, node);
                        LM_DBG("del_dmq_node returned %d\n", ret);
                } else {
                        /* put the node in disabled state and wait for the next ping before deleting it */
-                       update_dmq_node_status(node_list, node, DMQ_NODE_DISABLED);
+                       update_dmq_node_status(dmq_node_list, node, DMQ_NODE_DISABLED);
                }
        }
        return 0;
index 2f0ac42..e913445 100644 (file)
@@ -15,8 +15,8 @@
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
- * You should have received a copy of the GNU General Public License 
- * along with this program; if not, write to the Free Software 
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
  */
 
 #include "peer.h"
 #include "dmq_funcs.h"
 
-extern str notification_content_type;
+extern str dmq_notification_content_type;
 extern int *dmq_init_callback_done;
 
 int add_notification_peer();
-int dmq_notification_callback(
+int dmq_notification_callback_f(
                struct sip_msg *msg, peer_reponse_t *resp, dmq_node_t *dmq_node);
 int extract_node_list(dmq_node_list_t *update_list, struct sip_msg *msg);
 str *build_notification_body();
@@ -44,7 +44,7 @@ int build_node_str(dmq_node_t *node, char *buf, int buflen);
  * this is acomplished by a KDMQ request
  * KDMQ notification@server:port
  * node - the node to send to
- * forward - flag that tells if the node receiving the message is allowed to 
+ * forward - flag that tells if the node receiving the message is allowed to
  *           forward the request to its own list
  */
 int request_nodelist(dmq_node_t *node, int forward);
@@ -53,6 +53,6 @@ dmq_node_t *add_server_and_notify(str *server_address);
 /* helper functions */
 extern int notification_resp_callback_f(
                struct sip_msg *msg, int code, dmq_node_t *node, void *param);
-extern dmq_resp_cback_t notification_callback;
+extern dmq_resp_cback_t dmq_notification_resp_callback;
 
 #endif
index 7c2bf29..8311d7a 100644 (file)
@@ -15,8 +15,8 @@
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
- * You should have received a copy of the GNU General Public License 
- * along with this program; if not, write to the Free Software 
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
  *
  */
  */
 dmq_peer_list_t *init_peer_list()
 {
-       dmq_peer_list_t *peer_list;
-       peer_list = shm_malloc(sizeof(dmq_peer_list_t));
-       if(peer_list == NULL) {
+       dmq_peer_list_t *dmq_peer_list;
+       dmq_peer_list = shm_malloc(sizeof(dmq_peer_list_t));
+       if(dmq_peer_list == NULL) {
                LM_ERR("no more shm\n");
                return NULL;
        }
-       memset(peer_list, 0, sizeof(dmq_peer_list_t));
-       lock_init(&peer_list->lock);
-       return peer_list;
+       memset(dmq_peer_list, 0, sizeof(dmq_peer_list_t));
+       lock_init(&dmq_peer_list->lock);
+       return dmq_peer_list;
 }
 
 /**
@@ -90,7 +90,7 @@ dmq_peer_t *find_peer(str peer_id)
 {
        dmq_peer_t foo_peer;
        foo_peer.peer_id = peer_id;
-       return search_peer_list(peer_list, &foo_peer);
+       return search_peer_list(dmq_peer_list, &foo_peer);
 }
 
 /**
index d4feba7..40a0c82 100644 (file)
@@ -61,7 +61,7 @@ typedef struct dmq_peer_list
        int count;
 } dmq_peer_list_t;
 
-extern dmq_peer_list_t *peer_list;
+extern dmq_peer_list_t *dmq_peer_list;
 
 dmq_peer_list_t *init_peer_list();
 dmq_peer_t *search_peer_list(dmq_peer_list_t *peer_list, dmq_peer_t *peer);
index a2ccc63..43d76da 100644 (file)
@@ -79,14 +79,14 @@ void worker_loop(int id)
        int not_parsed;
        dmq_node_t *dmq_node = NULL;
 
-       worker = &workers[id];
+       worker = &dmq_workers[id];
        for(;;) {
-               if(worker_usleep <= 0) {
+               if(dmq_worker_usleep <= 0) {
                        LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
                        lock_get(&worker->lock);
                        LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
                } else {
-                       sleep_us(worker_usleep);
+                       sleep_us(dmq_worker_usleep);
                }
 
                /* remove from queue until empty */
@@ -105,7 +105,7 @@ void worker_loop(int id)
                                if(parse_from_header(current_job->msg) < 0) {
                                        LM_ERR("bad sip message or missing From hdr\n");
                                } else {
-                                       dmq_node = find_dmq_node_uri(node_list,
+                                       dmq_node = find_dmq_node_uri(dmq_node_list,
                                                        &((struct to_body *)current_job->msg->from->parsed)
                                                                         ->uri);
                                }
@@ -185,26 +185,26 @@ int add_dmq_job(struct sip_msg *msg, dmq_peer_t *peer)
        new_job.f = peer->callback;
        new_job.msg = cloned_msg;
        new_job.orig_peer = peer;
-       if(!num_workers) {
+       if(!dmq_num_workers) {
                LM_ERR("error in add_dmq_job: no workers spawned\n");
                goto error;
        }
-       if(!workers[0].queue) {
+       if(!dmq_workers[0].queue) {
                LM_ERR("workers not (yet) initialized\n");
                goto error;
        }
        /* initialize the worker with the first one */
-       worker = workers;
+       worker = dmq_workers;
        /* search for an available worker, or, if not possible,
         * for the least busy one */
-       for(i = 0; i < num_workers; i++) {
-               if(job_queue_size(workers[i].queue) == 0) {
-                       worker = &workers[i];
+       for(i = 0; i < dmq_num_workers; i++) {
+               if(job_queue_size(dmq_workers[i].queue) == 0) {
+                       worker = &dmq_workers[i];
                        found_available = 1;
                        break;
-               } else if(job_queue_size(workers[i].queue)
+               } else if(job_queue_size(dmq_workers[i].queue)
                                  < job_queue_size(worker->queue)) {
-                       worker = &workers[i];
+                       worker = &dmq_workers[i];
                }
        }
        if(!found_available) {
@@ -215,7 +215,7 @@ int add_dmq_job(struct sip_msg *msg, dmq_peer_t *peer)
        if(job_queue_push(worker->queue, &new_job) < 0) {
                goto error;
        }
-       if(worker_usleep <= 0) {
+       if(dmq_worker_usleep <= 0) {
                lock_release(&worker->lock);
        }
        return 0;
@@ -232,7 +232,7 @@ error:
 int init_worker(dmq_worker_t *worker)
 {
        memset(worker, 0, sizeof(*worker));
-       if(worker_usleep <= 0) {
+       if(dmq_worker_usleep <= 0) {
                lock_init(&worker->lock);
                // acquire the lock for the first time - so that dmq_worker_loop blocks
                lock_get(&worker->lock);
index cb4fce8..bca4c35 100644 (file)
@@ -15,8 +15,8 @@
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  *
- * You should have received a copy of the GNU General Public License 
- * along with this program; if not, write to the Free Software 
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
  */