Fix ndb_redis Cluster implementation
authorCarsten Bock <carsten@ng-voice.com>
Mon, 24 Apr 2017 12:23:47 +0000 (14:23 +0200)
committerCarsten Bock <carsten@ng-voice.com>
Mon, 24 Apr 2017 12:23:47 +0000 (14:23 +0200)
src/modules/ndb_redis/doc/ndb_redis_admin.xml
src/modules/ndb_redis/ndb_redis_mod.c
src/modules/ndb_redis/redis_client.c
src/modules/ndb_redis/redis_client.h

index cad2893..50bd033 100644 (file)
@@ -157,6 +157,12 @@ modparam("ndb_redis", "cmd_timeout", 500)
                <para>
                        If set to 1, the module will connect to servers indicated in the "MOVED" reply.
                </para>
                <para>
                        If set to 1, the module will connect to servers indicated in the "MOVED" reply.
                </para>
+               <para>
+                       The module needs to know all existing REDIS-Nodes at startup.
+                       The nodes are searched by the name "ip:port", e.g. if REDIS
+                       replies with "MOVED 127.0.0.1:4711", ndb_redis needs to know
+                       the databases "127.0.0.1:4711".
+               </para>
                <para>
                <emphasis>
                        Default value is <quote>0</quote> (disabled).
                <para>
                <emphasis>
                        Default value is <quote>0</quote> (disabled).
@@ -166,6 +172,10 @@ modparam("ndb_redis", "cmd_timeout", 500)
                        <title>Set <varname>cluster</varname> parameter</title>
                        <programlisting format="linespecific">
 ...
                        <title>Set <varname>cluster</varname> parameter</title>
                        <programlisting format="linespecific">
 ...
+modparam("ndb_redis", "server", "name=127.0.0.1:26001;addr=127.0.0.1;port=26001")
+modparam("ndb_redis", "server", "name=127.0.0.1:26004;addr=127.0.0.1;port=26004")
+modparam("ndb_redis", "server", "name=127.0.0.1:26008;addr=127.0.0.1;port=26008")
+...
 modparam("ndb_redis", "cluster", 1)
 ...
                        </programlisting>
 modparam("ndb_redis", "cluster", 1)
 ...
                        </programlisting>
index 6bb74d9..862bda0 100644 (file)
@@ -61,7 +61,6 @@ static int fixup_redis_cmd6(void** param, int param_no);
 
 static int w_redis_free_reply(struct sip_msg* msg, char* res);
 
 
 static int w_redis_free_reply(struct sip_msg* msg, char* res);
 
-static int mod_init(void);
 static void mod_destroy(void);
 static int  child_init(int rank);
 
 static void mod_destroy(void);
 static int  child_init(int rank);
 
@@ -114,7 +113,7 @@ struct module_exports exports = {
        0,              /* exported MI functions */
        mod_pvs,        /* exported pseudo-variables */
        0,              /* extra processes */
        0,              /* exported MI functions */
        mod_pvs,        /* exported pseudo-variables */
        0,              /* extra processes */
-       mod_init,       /* module initialization function */
+       0,              /* module initialization function */
        0,              /* response function */
        mod_destroy,    /* destroy function */
        child_init      /* per child init function */
        0,              /* response function */
        mod_destroy,    /* destroy function */
        child_init      /* per child init function */
@@ -136,18 +135,6 @@ static int child_init(int rank)
        return 0;
 }
 
        return 0;
 }
 
-/**
- *
- */
-static int mod_init(void)
-{
-       if (init_list() < 0) {
-               LM_ERR("failed to initialize redis connections\n");
-               return -1;
-       }
-       return 0;
-}
-
 /**
  *
  */
 /**
  *
  */
index 407967a..26f86da 100644 (file)
@@ -41,7 +41,7 @@
 
 #define redisCommandNR(a...) (int)({ void *__tmp; __tmp = redisCommand(a); if (__tmp) freeReplyObject(__tmp); __tmp ? 0 : -1;})
 
 
 #define redisCommandNR(a...) (int)({ void *__tmp; __tmp = redisCommand(a); if (__tmp) freeReplyObject(__tmp); __tmp ? 0 : -1;})
 
-static redisc_server_t ** _redisc_srv_list=NULL;
+static redisc_server_t * _redisc_srv_list=NULL;
 
 static redisc_reply_t *_redisc_rpl_list=NULL;
 
 
 static redisc_reply_t *_redisc_rpl_list=NULL;
 
@@ -55,8 +55,9 @@ extern int redis_cluster_param;
  */
 int redisc_init(void)
 {
  */
 int redisc_init(void)
 {
-       char *addr, *pass, *unix_sock_path = NULL;
-       unsigned int port, db;
+       char addr[256], pass[256], unix_sock_path[256];
+
+       unsigned int port, db, sock = 0, haspass = 0;
        redisc_server_t *rsrv=NULL;
        param_t *pit = NULL;
        struct timeval tv_conn;
        redisc_server_t *rsrv=NULL;
        param_t *pit = NULL;
        struct timeval tv_conn;
@@ -68,27 +69,30 @@ int redisc_init(void)
        tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000;
        tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000;
 
        tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000;
        tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000;
 
-       if(*_redisc_srv_list==NULL)
+       if(_redisc_srv_list==NULL)
        {
                LM_ERR("no redis servers defined\n");
                return -1;
        }
 
        {
                LM_ERR("no redis servers defined\n");
                return -1;
        }
 
-       for(rsrv=*_redisc_srv_list; rsrv; rsrv=rsrv->next)
+       for(rsrv=_redisc_srv_list; rsrv; rsrv=rsrv->next)
        {
        {
-               addr = "127.0.0.1";
                port = 6379;
                db = 0;
                port = 6379;
                db = 0;
-               pass = NULL;
+               haspass = 0;
+               sock = 0;
+
+               memset(addr, 0, sizeof(addr));
+               memset(pass, 0, sizeof(pass));
+               memset(unix_sock_path, 0, sizeof(unix_sock_path));
 
                for (pit = rsrv->attrs; pit; pit=pit->next)
                {
                        if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) {
 
                for (pit = rsrv->attrs; pit; pit=pit->next)
                {
                        if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) {
-                               unix_sock_path = pit->body.s;
-                               unix_sock_path[pit->body.len] = '\0';
+                               snprintf(unix_sock_path, sizeof(unix_sock_path)-1, "%.*s", pit->body.len, pit->body.s);
+                               sock = 1;
                        } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
                        } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
-                               addr = pit->body.s;
-                               addr[pit->body.len] = '\0';
+                               snprintf(addr, sizeof(addr)-1, "%.*s", pit->body.len, pit->body.s);
                        } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
                                if(str2int(&pit->body, &port) < 0)
                                        port = 6379;
                        } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
                                if(str2int(&pit->body, &port) < 0)
                                        port = 6379;
@@ -96,38 +100,52 @@ int redisc_init(void)
                                if(str2int(&pit->body, &db) < 0)
                                        db = 0;
                        } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) {
                                if(str2int(&pit->body, &db) < 0)
                                        db = 0;
                        } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) {
-                               pass = pit->body.s;
-                               pass[pit->body.len] = '\0';
+                               snprintf(pass, sizeof(pass)-1, "%.*s", pit->body.len, pit->body.s);
+                               haspass = 1;
                        }
                }
 
                        }
                }
 
-               if(unix_sock_path != NULL) {
+               if(sock != 0) {
                        LM_DBG("Connecting to unix socket: %s\n", unix_sock_path);
                        rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path,
                                        tv_conn);
                } else {
                        LM_DBG("Connecting to unix socket: %s\n", unix_sock_path);
                        rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path,
                                        tv_conn);
                } else {
+                       LM_DBG("Connecting to %s:%d\n", addr, port);
                        rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn);
                }
 
                        rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn);
                }
 
-               if(!rsrv->ctxRedis)
+               LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
+
+               if(!rsrv->ctxRedis) {
+                       LM_ERR("Failed to create REDIS-Context.\n");
                        goto err;
                        goto err;
-               if (rsrv->ctxRedis->err)
+               }
+               if (rsrv->ctxRedis->err) {
+                       LM_ERR("Failed to create REDIS returned an error: %s\n", rsrv->ctxRedis->errstr);
                        goto err2;
                        goto err2;
-               if ((pass != NULL) && redisc_check_auth(rsrv, pass))
+               }
+               if ((haspass != 0) && redisc_check_auth(rsrv, pass)) {
+                       LM_ERR("Authentication failed.\n");
                        goto err2;
                        goto err2;
-               if (redisSetTimeout(rsrv->ctxRedis, tv_cmd))
+               }
+               if (redisSetTimeout(rsrv->ctxRedis, tv_cmd)) {
+                       LM_ERR("Failed to set timeout.\n");
                        goto err2;
                        goto err2;
-               if (redisCommandNR(rsrv->ctxRedis, "PING"))
+               }
+               if (redisCommandNR(rsrv->ctxRedis, "PING")) {
+                       LM_ERR("Failed to send PING (REDIS returned %s).\n", rsrv->ctxRedis->errstr);
                        goto err2;
                        goto err2;
-               if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis, "SELECT %i", db))
+               }
+               if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis, "SELECT %i", db)) {
+                       LM_ERR("Failed to send \"SELECT %i\" (REDIS returned \"%s\", and not in cluster mode).\n", db, rsrv->ctxRedis->errstr);
                        goto err2;
                        goto err2;
-
+               }
        }
 
        return 0;
 
 err2:
        }
 
        return 0;
 
 err2:
-       if (unix_sock_path != NULL) {
+       if (sock != 0) {
                LM_ERR("error communicating with redis server [%.*s]"
                                " (unix:%s db:%d): %s\n",
                                rsrv->sname->len, rsrv->sname->s, unix_sock_path, db,
                LM_ERR("error communicating with redis server [%.*s]"
                                " (unix:%s db:%d): %s\n",
                                rsrv->sname->len, rsrv->sname->s, unix_sock_path, db,
@@ -190,38 +208,21 @@ int redisc_destroy(void)
 
        if(_redisc_srv_list==NULL)
                return -1;
 
        if(_redisc_srv_list==NULL)
                return -1;
-       if(*_redisc_srv_list==NULL)
-               return -1;
-       rsrv=*_redisc_srv_list;
+       rsrv=_redisc_srv_list;
        while(rsrv!=NULL)
        {
                rsrv1 = rsrv;
                rsrv=rsrv->next;
        while(rsrv!=NULL)
        {
                rsrv1 = rsrv;
                rsrv=rsrv->next;
-               if (rsrv1->settings != NULL)
-                       shm_free(rsrv1->settings);
                if (rsrv1->ctxRedis!=NULL)
                        redisFree(rsrv1->ctxRedis);
                free_params(rsrv1->attrs);
                if (rsrv1->ctxRedis!=NULL)
                        redisFree(rsrv1->ctxRedis);
                free_params(rsrv1->attrs);
-               shm_free(rsrv1);
+               pkg_free(rsrv1);
        }
        }
-       shm_free(*_redisc_srv_list);
-       *_redisc_srv_list = NULL;
        _redisc_srv_list = NULL;
 
        return 0;
 }
 
        _redisc_srv_list = NULL;
 
        return 0;
 }
 
-int init_list(void) {
-       if (_redisc_srv_list == NULL) {
-               _redisc_srv_list = (redisc_server_t **)shm_malloc(sizeof(redisc_server_t*));
-               if(!_redisc_srv_list) {
-                       LM_ERR("Out of memory\n");
-                       return -1;
-               }
-       }
-       return 0;
-}
-
 /**
  *
  */
 /**
  *
  */
@@ -241,17 +242,10 @@ int redisc_add_server(char *spec)
                LM_ERR("failed parsing params value\n");
                goto error;
        }
                LM_ERR("failed parsing params value\n");
                goto error;
        }
-       if (_redisc_srv_list == NULL) {
-               _redisc_srv_list = (redisc_server_t **)shm_malloc(sizeof(redisc_server_t*));
-               if(!_redisc_srv_list) {
-                       LM_ERR("Out of memory\n");
-                       return -1;
-               }
-       }
-       rsrv = (redisc_server_t*)shm_malloc(sizeof(redisc_server_t));
+       rsrv = (redisc_server_t*)pkg_malloc(sizeof(redisc_server_t));
        if(rsrv==NULL)
        {
        if(rsrv==NULL)
        {
-               LM_ERR("no more shm\n");
+               LM_ERR("no more pkg\n");
                goto error;
        }
        memset(rsrv, 0, sizeof(redisc_server_t));
                goto error;
        }
        memset(rsrv, 0, sizeof(redisc_server_t));
@@ -269,15 +263,15 @@ int redisc_add_server(char *spec)
                LM_ERR("no server name\n");
                goto error;
        }
                LM_ERR("no server name\n");
                goto error;
        }
-       rsrv->next = *_redisc_srv_list;
-       *_redisc_srv_list = rsrv;
+       rsrv->next = _redisc_srv_list;
+       _redisc_srv_list = rsrv;
 
        return 0;
 error:
        if(pit!=NULL)
                free_params(pit);
        if(rsrv!=NULL)
 
        return 0;
 error:
        if(pit!=NULL)
                free_params(pit);
        if(rsrv!=NULL)
-               shm_free(rsrv);
+               pkg_free(rsrv);
        return -1;
 }
 
        return -1;
 }
 
@@ -291,7 +285,7 @@ redisc_server_t *redisc_get_server(str *name)
 
        hname = get_hash1_raw(name->s, name->len);
        LM_DBG("Hash %u (%.*s)\n", hname, name->len, name->s);
 
        hname = get_hash1_raw(name->s, name->len);
        LM_DBG("Hash %u (%.*s)\n", hname, name->len, name->s);
-       rsrv=*_redisc_srv_list;
+       rsrv=_redisc_srv_list;
        while(rsrv!=NULL)
        {
                LM_DBG("Entry %u (%.*s)\n", rsrv->hname, rsrv->sname->len, rsrv->sname->s);
        while(rsrv!=NULL)
        {
                LM_DBG("Entry %u (%.*s)\n", rsrv->hname, rsrv->sname->len, rsrv->sname->s);
@@ -309,8 +303,8 @@ redisc_server_t *redisc_get_server(str *name)
  */
 int redisc_reconnect_server(redisc_server_t *rsrv)
 {
  */
 int redisc_reconnect_server(redisc_server_t *rsrv)
 {
-       char *addr, *pass, *unix_sock_path = NULL;
-       unsigned int port, db;
+       char addr[256], pass[256], unix_sock_path[256];
+       unsigned int port, db, sock = 0, haspass = 0;
        param_t *pit = NULL;
        struct timeval tv_conn;
        struct timeval tv_cmd;
        param_t *pit = NULL;
        struct timeval tv_conn;
        struct timeval tv_cmd;
@@ -321,18 +315,18 @@ int redisc_reconnect_server(redisc_server_t *rsrv)
        tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000;
        tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000;
 
        tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000;
        tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000;
 
-       addr = "127.0.0.1";
+       memset(addr, 0, sizeof(addr));
        port = 6379;
        db = 0;
        port = 6379;
        db = 0;
-       pass = NULL;
+       memset(pass, 0, sizeof(pass));
+       memset(unix_sock_path, 0, sizeof(unix_sock_path));
        for (pit = rsrv->attrs; pit; pit=pit->next)
        {
                if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) {
        for (pit = rsrv->attrs; pit; pit=pit->next)
        {
                if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) {
-                       unix_sock_path = pit->body.s;
-                       unix_sock_path[pit->body.len] = '\0';
+                       snprintf(unix_sock_path, sizeof(unix_sock_path)-1, "%.*s", pit->body.len, pit->body.s);
+                       sock = 1;
                } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
                } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
-                       addr = pit->body.s;
-                       addr[pit->body.len] = '\0';
+                       snprintf(addr, sizeof(addr)-1, "%.*s", pit->body.len, pit->body.s);
                } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
                        if(str2int(&pit->body, &port) < 0)
                                port = 6379;
                } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
                        if(str2int(&pit->body, &port) < 0)
                                port = 6379;
@@ -340,25 +334,28 @@ int redisc_reconnect_server(redisc_server_t *rsrv)
                        if(str2int(&pit->body, &db) < 0)
                                db = 0;
                } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) {
                        if(str2int(&pit->body, &db) < 0)
                                db = 0;
                } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) {
-                       pass = pit->body.s;
-                       pass[pit->body.len] = '\0';
+                       snprintf(pass, sizeof(pass)-1, "%.*s", pit->body.len, pit->body.s);
+                       haspass = 1;
                }
        }
                }
        }
+
+       LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
        if(rsrv->ctxRedis!=NULL) {
                redisFree(rsrv->ctxRedis);
                rsrv->ctxRedis = NULL;
        }
 
        if(rsrv->ctxRedis!=NULL) {
                redisFree(rsrv->ctxRedis);
                rsrv->ctxRedis = NULL;
        }
 
-       if(unix_sock_path != NULL) {
+       if(sock != 0) {
                rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path, tv_conn);
        } else {
                rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn);
        }
                rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path, tv_conn);
        } else {
                rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn);
        }
+       LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
        if(!rsrv->ctxRedis)
                goto err;
        if (rsrv->ctxRedis->err)
                goto err2;
        if(!rsrv->ctxRedis)
                goto err;
        if (rsrv->ctxRedis->err)
                goto err2;
-       if ((pass != NULL) && redisc_check_auth(rsrv, pass))
+       if ((haspass) && redisc_check_auth(rsrv, pass))
                goto err2;
        if (redisSetTimeout(rsrv->ctxRedis, tv_cmd))
                goto err2;
                goto err2;
        if (redisSetTimeout(rsrv->ctxRedis, tv_cmd))
                goto err2;
@@ -393,27 +390,13 @@ err:
 
 int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) {
        redisc_server_t *rsrv_new;
 
 int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) {
        redisc_server_t *rsrv_new;
-       char *pass;
-       char buffer[100], buffername[100];
-       unsigned int port, db;
+       char buffername[100];
+       unsigned int port;
        str addr = {0, 0}, tmpstr = {0, 0}, name = {0, 0};
        str addr = {0, 0}, tmpstr = {0, 0}, name = {0, 0};
-       param_t *pit = NULL;
        if (redis_cluster_param) {
                LM_DBG("Redis replied: \"%.*s\"\n", reply->len, reply->str);
                if ((reply->len > 7) && (strncmp(reply->str, "MOVED", 5) == 0)) {
                        port = 6379;
        if (redis_cluster_param) {
                LM_DBG("Redis replied: \"%.*s\"\n", reply->len, reply->str);
                if ((reply->len > 7) && (strncmp(reply->str, "MOVED", 5) == 0)) {
                        port = 6379;
-                       db = 0;
-                       pass = NULL;
-                       // Copy DB and password from current server:
-                       for (pit = (*rsrv)->attrs; pit; pit=pit->next) {
-                               if (pit->name.len==2 && strncmp(pit->name.s, "db", 2) == 0) {
-                                       if (str2int(&pit->body, &db) < 0)
-                                               db = 0;
-                               } else if (pit->name.len==4 && strncmp(pit->name.s, "pass", 4) == 0) {
-                                       pass = pit->body.s;
-                                       pass[pit->body.len] = '\0';
-                               }
-                       }
                        if (strchr(reply->str, ':') > 0) {
                                tmpstr.s = strchr(reply->str, ':') + 1;
                                tmpstr.len = reply->len - (tmpstr.s - reply->str);
                        if (strchr(reply->str, ':') > 0) {
                                tmpstr.s = strchr(reply->str, ':') + 1;
                                tmpstr.len = reply->len - (tmpstr.s - reply->str);
@@ -433,7 +416,8 @@ int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) {
                                return 0;
                        }
 
                                return 0;
                        }
 
-                       name.len = snprintf(buffername, sizeof(buffername), "%.*s-%i-%i", addr.len, addr.s, port, db);
+                       memset(buffername, 0, sizeof(buffername));
+                       name.len = snprintf(buffername, sizeof(buffername), "%.*s:%i", addr.len, addr.s, port);
                        name.s = buffername;
                        LM_DBG("Name of new connection: %.*s\n", name.len, name.s);
                        rsrv_new = redisc_get_server(&name);
                        name.s = buffername;
                        LM_DBG("Name of new connection: %.*s\n", name.len, name.s);
                        rsrv_new = redisc_get_server(&name);
@@ -442,30 +426,7 @@ int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) {
                                *rsrv = rsrv_new;
                                return 1;
                        } else {
                                *rsrv = rsrv_new;
                                return 1;
                        } else {
-                               LM_DBG("New Connection\n");
-                               if (pass) {
-                                       tmpstr.len = snprintf(buffer, sizeof(buffer)-1, "name=%.*s;addr=%.*s;port=%i;db=%i;pass=%s", name.len, name.s, addr.len, addr.s, port, db, pass);
-                                       tmpstr.s = shm_malloc(tmpstr.len + 1);
-                                       memcpy(tmpstr.s, buffer, tmpstr.len);
-                               } else {
-                                       tmpstr.len = snprintf(buffer, sizeof(buffer)-1, "name=%.*s;addr=%.*s;port=%i;db=%i", name.len, name.s, addr.len, addr.s, port, db);
-                                       tmpstr.s = shm_malloc(tmpstr.len + 1);
-                                       memcpy(tmpstr.s, buffer, tmpstr.len);
-                               }
-                               tmpstr.s[tmpstr.len] = '\0';
-                               LM_DBG("Connection setup: %.*s\n", tmpstr.len, tmpstr.s);
-                               if (redisc_add_server(tmpstr.s) == 0) {
-                                       rsrv_new = redisc_get_server(&name);
-                                       if (rsrv_new) {
-                                               rsrv_new->settings = tmpstr.s;
-                                               redisc_reconnect_server(rsrv_new);
-                                               LM_DBG("Connection successful\n");
-                                               *rsrv = rsrv_new;
-                                               return 1;
-                                       }
-                               } else {
-                                       LM_ERR("Failed to add Connection (%.*s)\n", tmpstr.len, tmpstr.s);
-                               }
+                               LM_ERR("No Connection with name (%.*s)\n", name.len, name.s);
                        }
                }
        }
                        }
                }
        }
@@ -480,7 +441,12 @@ int redisc_exec(str *srv, str *res, str *cmd, ...)
        redisc_server_t *rsrv=NULL;
        redisc_reply_t *rpl;
        char c;
        redisc_server_t *rsrv=NULL;
        redisc_reply_t *rpl;
        char c;
-       va_list ap;
+       va_list ap, ap2, ap3, ap4;
+
+       va_start(ap, cmd);
+       va_copy(ap2, ap);
+       va_copy(ap3, ap);
+       va_copy(ap4, ap);
 
        if(srv==NULL || cmd==NULL || res==NULL)
        {
 
        if(srv==NULL || cmd==NULL || res==NULL)
        {
@@ -498,28 +464,31 @@ int redisc_exec(str *srv, str *res, str *cmd, ...)
                LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
                goto error_exec;
        }
                LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
                goto error_exec;
        }
+
+       LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
+
        if(rsrv->ctxRedis==NULL)
        {
                LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
                goto error_exec;
        }
        if(rsrv->ctxRedis==NULL)
        {
                LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
                goto error_exec;
        }
+       LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
+
        rpl = redisc_get_reply(res);
        if(rpl==NULL)
        {
                LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
                goto error_exec;
        }
        rpl = redisc_get_reply(res);
        if(rpl==NULL)
        {
                LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
                goto error_exec;
        }
-query:
-       va_start(ap, cmd);
-
+       c = cmd->s[cmd->len];
+       cmd->s[cmd->len] = '\0';
        if(rpl->rplRedis!=NULL)
        {
                /* clean up previous redis reply */
                freeReplyObject(rpl->rplRedis);
                rpl->rplRedis = NULL;
        }
        if(rpl->rplRedis!=NULL)
        {
                /* clean up previous redis reply */
                freeReplyObject(rpl->rplRedis);
                rpl->rplRedis = NULL;
        }
-       c = cmd->s[cmd->len];
-       cmd->s[cmd->len] = '\0';
+
        rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap );
        if(rpl->rplRedis == NULL)
        {
        rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap );
        if(rpl->rplRedis == NULL)
        {
@@ -530,8 +499,7 @@ query:
                }
                if(redisc_reconnect_server(rsrv)==0)
                {
                }
                if(redisc_reconnect_server(rsrv)==0)
                {
-                       va_end(ap);
-                       rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap);
+                       rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap2);
                } else {
                        LM_ERR("unable to reconnect to redis server: %.*s\n",
                                        srv->len, srv->s);
                } else {
                        LM_ERR("unable to reconnect to redis server: %.*s\n",
                                        srv->len, srv->s);
@@ -540,15 +508,55 @@ query:
                }
        }
        if (check_cluster_reply(rpl->rplRedis, &rsrv)) {
                }
        }
        if (check_cluster_reply(rpl->rplRedis, &rsrv)) {
-               va_end(ap);
-               goto query;     
+               LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
+               if(rsrv->ctxRedis==NULL)
+               {
+                       LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
+                       goto error_exec;
+               }
+
+               LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
+
+               if(rpl->rplRedis!=NULL)
+               {
+                       /* clean up previous redis reply */
+                       freeReplyObject(rpl->rplRedis);
+                       rpl->rplRedis = NULL;
+               }
+               rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap3 );
+               if(rpl->rplRedis == NULL)
+               {
+                       /* null reply, reconnect and try again */
+                       if(rsrv->ctxRedis->err)
+                       {
+                               LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
+                       }
+                       if(redisc_reconnect_server(rsrv)==0)
+                       {
+                               rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap4);
+                       } else {
+                               LM_ERR("unable to reconnect to redis server: %.*s\n",
+                                               srv->len, srv->s);
+                               cmd->s[cmd->len] = c;
+                               goto error_exec;
+                       }
+               }
        }
        cmd->s[cmd->len] = c;
        va_end(ap);
        }
        cmd->s[cmd->len] = c;
        va_end(ap);
+       va_end(ap2);
+       va_end(ap3);
+       va_end(ap4);
+
+       LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
+
        return 0;
 
 error_exec:
        va_end(ap);
        return 0;
 
 error_exec:
        va_end(ap);
+       va_end(ap2);
+       va_end(ap3);
+       va_end(ap4);
        return -1;
 
 }
        return -1;
 
 }
@@ -568,13 +576,23 @@ redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv,
 {
        redisReply *res=NULL;
 
 {
        redisReply *res=NULL;
 
-       if(rsrv==NULL || rsrv->ctxRedis==NULL)
+       if(rsrv==NULL)
        {
                LM_ERR("no redis context found for server %.*s\n",
                                (rsrv)?rsrv->sname->len:0,
                                (rsrv)?rsrv->sname->s:"");
                return NULL;
        }
        {
                LM_ERR("no redis context found for server %.*s\n",
                                (rsrv)?rsrv->sname->len:0,
                                (rsrv)?rsrv->sname->s:"");
                return NULL;
        }
+
+       LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
+       if(rsrv->ctxRedis==NULL)
+       {
+               LM_ERR("no redis context found for server %.*s\n",
+                       (rsrv)?rsrv->sname->len:0,
+                       (rsrv)?rsrv->sname->s:"");
+               return NULL;
+       }
+
        if(argc<=0)
        {
                LM_ERR("invalid parameters\n");
        if(argc<=0)
        {
                LM_ERR("invalid parameters\n");
@@ -587,6 +605,13 @@ redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv,
        }
 again:
        res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
        }
 again:
        res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
+
+       /* null reply, reconnect and try again */
+       if(rsrv->ctxRedis->err)
+       {
+               LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
+       }
+
        if(res)
        {
                if (check_cluster_reply(res, &rsrv)) {
        if(res)
        {
                if (check_cluster_reply(res, &rsrv)) {
@@ -595,11 +620,6 @@ again:
                return res;
        }
 
                return res;
        }
 
-       /* null reply, reconnect and try again */
-       if(rsrv->ctxRedis->err)
-       {
-               LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
-       }
        if(redisc_reconnect_server(rsrv)==0)
        {
                res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
        if(redisc_reconnect_server(rsrv)==0)
        {
                res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
index e8b0b5b..ce334df 100644 (file)
@@ -34,7 +34,6 @@
 #include "../../core/parser/parse_param.h"
 #include "../../core/mod_fix.h"
 
 #include "../../core/parser/parse_param.h"
 #include "../../core/mod_fix.h"
 
-int init_list(void);
 int redisc_init(void);
 int redisc_destroy(void);
 int redisc_add_server(char *spec);
 int redisc_init(void);
 int redisc_destroy(void);
 int redisc_add_server(char *spec);
@@ -46,7 +45,6 @@ typedef struct redisc_server {
        param_t *attrs;
        redisContext *ctxRedis;
        struct redisc_server *next;
        param_t *attrs;
        redisContext *ctxRedis;
        struct redisc_server *next;
-       char * settings;
 } redisc_server_t;
 
 typedef struct redisc_reply {
 } redisc_server_t;
 
 typedef struct redisc_reply {