support vhost in connection
authorlazedo <luis.azedo@factorlusitano.com>
Thu, 11 Sep 2014 13:47:33 +0000 (14:47 +0100)
committerLuis Azedo <luis@2600hz.com>
Thu, 11 Sep 2014 13:48:57 +0000 (14:48 +0100)
modules/kazoo/kz_amqp.c
modules/kazoo/kz_amqp.h

index 1e3c0d2..3931d27 100644 (file)
@@ -127,6 +127,17 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind)
        shm_free(bind);
 }
 
+void kz_amqp_free_connection(kz_amqp_conn_ptr conn)
+{
+       if(!conn)
+               return;
+
+       if(conn->url)
+               shm_free(conn->url);
+       shm_free(conn);
+}
+
+
 void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd)
 {
        if(cmd == NULL)
@@ -269,9 +280,9 @@ void kz_amqp_destroy() {
        if(kz_pool != NULL) {
                kz_amqp_conn_ptr conn = kz_pool->head;
                while(conn != NULL) {
-                       kz_amqp_conn_ptr free = conn;
+                       kz_amqp_conn_ptr tofree = conn;
                        conn = conn->next;
-                       shm_free(free);
+                       kz_amqp_free_connection(tofree);
                }
                shm_free(kz_pool);
        }
@@ -279,13 +290,45 @@ void kz_amqp_destroy() {
 
 }
 
+#define KZ_URL_MAX_SIZE 50
+static char* KZ_URL_ROOT = "/";
+
 int kz_amqp_add_connection(modparam_t type, void* val)
 {
        kz_amqp_init_connection_pool(); // find a better way
 
+       char* url = (char*) val;
+       int len = strlen(url);
+       if(len > KZ_URL_MAX_SIZE) {
+               LM_ERR("connection url exceeds max size %d\n", KZ_URL_MAX_SIZE);
+               return -1;
+       }
+
        kz_amqp_conn_ptr newConn = shm_malloc(sizeof(kz_amqp_conn));
        memset(newConn, 0, sizeof(kz_amqp_conn));
 
+       newConn->url = shm_malloc( (KZ_URL_MAX_SIZE + 1) * sizeof(char) );
+       memset(newConn->url, 0, (KZ_URL_MAX_SIZE + 1) * sizeof(char));
+       // maintain compatibility
+       if (!strncmp((char*)val, "kazoo://", 8)) {
+               sprintf(newConn->url, "amqp://%s", (char*)(url+(8*sizeof(char))) );
+       } else {
+               strcpy(newConn->url, url);
+               newConn->url[len] = '\0';
+       }
+
+    if(amqp_parse_url(newConn->url, &newConn->info) == AMQP_STATUS_BAD_URL) {
+        LM_ERR("ERROR PARSING URL \"%s\"\n", newConn->url);
+       goto error;
+    }
+
+
+    if(newConn->info.vhost == NULL) {
+       newConn->info.vhost = KZ_URL_ROOT;
+    } else if(newConn->info.vhost[0] == '/' && strlen(newConn->info.vhost) == 1) { // bug in amqp_parse_url ?
+       newConn->info.vhost++;
+    }
+
        if(kz_pool->head == NULL)
                kz_pool->head = newConn;
 
@@ -294,9 +337,12 @@ int kz_amqp_add_connection(modparam_t type, void* val)
 
        kz_pool->tail = newConn;
 
-    amqp_parse_url((char*)val, &newConn->info);
-
        return 0;
+
+error:
+       kz_amqp_free_connection(newConn);
+       return -1;
+
 }
 
 void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) {
@@ -313,9 +359,6 @@ void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) {
                rmq->conn = NULL;
                rmq->socket = NULL;
                rmq->channel_count = 0;
-
-//             lock_release(&kz_pool->lock);
-
     }
 
 }
@@ -348,7 +391,7 @@ int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) {
     }
 
     if (kz_amqp_error("Logging in", amqp_login(rmq->conn,
-                                          "/", //rmq->info.vhost,
+                                          rmq->info.vhost,
                                           0,
                                           131072,
                                           0,
index ce4aa5d..10b9095 100644 (file)
@@ -30,11 +30,11 @@ extern int dbk_consumer_processes;
 
 typedef struct kz_amqp_conn_t {
        kz_amqp_connection_info info;
+       char* url;
        amqp_connection_state_t conn;
        amqp_socket_t *socket;
        amqp_channel_t channel_count;
        amqp_channel_t channel_counter;
-//    gen_lock_t lock;
     struct kz_amqp_conn_t* next;
 } kz_amqp_conn, *kz_amqp_conn_ptr;
 
@@ -42,7 +42,6 @@ typedef struct {
        kz_amqp_conn_ptr current;
        kz_amqp_conn_ptr head;
        kz_amqp_conn_ptr tail;
-//    gen_lock_t lock;
 } kz_amqp_conn_pool, *kz_amqp_conn_pool_ptr;