ndb_redis: fix connection problems with pipelining
authorClaudiu Boriga <paul.boriga@1and1.ro>
Thu, 27 Apr 2017 14:05:25 +0000 (17:05 +0300)
committerClaudiu Boriga <paul.boriga@1and1.ro>
Tue, 2 May 2017 14:55:01 +0000 (17:55 +0300)
  -fix problem when a connection with a REDIS server
   fails and the pipelined command line is lost,
   while a new connection will not be established

src/modules/ndb_redis/redis_client.c
src/modules/ndb_redis/redis_client.h

index df2b981..4998e65 100644 (file)
@@ -50,6 +50,15 @@ extern int redis_connect_timeout_param;
 extern int redis_cmd_timeout_param;
 extern int redis_cluster_param;
 
+/* backwards compatibility with hiredis < 0.12 */
+#if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
+typedef char *sds;
+sds sdscatlen(sds s, const void *t, size_t len);
+int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len);
+#else
+#define redis_append_formatted_command redisAppendFormattedCommand
+#endif
+
 /**
  *
  */
@@ -421,7 +430,7 @@ int redisc_append_cmd(str *srv, str *res, str *cmd, ...)
                LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
                goto error_cmd;
        }
-       if (rsrv->pendingReplies >= MAXIMUM_PIPELINED_COMMANDS)
+       if (rsrv->piped.pending_commands >= MAXIMUM_PIPELINED_COMMANDS)
        {
                LM_ERR("Too many pipelined commands, maximum is %d\n",MAXIMUM_PIPELINED_COMMANDS);
                goto error_cmd;
@@ -435,13 +444,17 @@ int redisc_append_cmd(str *srv, str *res, str *cmd, ...)
 
        c = cmd->s[cmd->len];
        cmd->s[cmd->len] = '\0';
-       if (redisvAppendCommand(rsrv->ctxRedis,cmd->s,ap) != REDIS_OK)
+       rsrv->piped.commands[rsrv->piped.pending_commands].len = redisvFormatCommand(
+                       &rsrv->piped.commands[rsrv->piped.pending_commands].s,
+                       cmd->s,
+                       ap);
+       if (rsrv->piped.commands[rsrv->piped.pending_commands].len < 0)
        {
                LM_ERR("Invalid redis command : %s\n",cmd->s);
                goto error_cmd;
        }
-       rsrv->pipelinedReplies[rsrv->pendingReplies]=rpl;
-       rsrv->pendingReplies++;
+       rsrv->piped.replies[rsrv->piped.pending_commands]=rpl;
+       rsrv->piped.pending_commands++;
 
        cmd->s[cmd->len] = c;
        va_end(ap);
@@ -488,19 +501,66 @@ int redisc_exec_pipelined_cmd(str *srv)
 /**
  *
  */
+int redisc_create_pipelined_message(redisc_server_t *rsrv)
+{
+       int i;
+
+       if (rsrv->ctxRedis->err)
+       {
+               LM_DBG("Reconnecting server because of error %d: \"%s\"",rsrv->ctxRedis->err,rsrv->ctxRedis->errstr);
+               if (redisc_reconnect_server(rsrv))
+               {
+                       LM_ERR("unable to reconnect to REDIS server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
+                       return -1;
+               }
+       }
+
+       for (i=0;i<rsrv->piped.pending_commands;i++)
+       {
+               if (redis_append_formatted_command(rsrv->ctxRedis,rsrv->piped.commands[i].s,rsrv->piped.commands[i].len) != REDIS_OK)
+               {
+                       LM_ERR("Error while appending command %d",i);
+                       return -1;
+               }
+       }
+       return 0;
+}
+
+/**
+ *
+ */
+void redisc_free_pipelined_cmds(redisc_server_t *rsrv)
+{
+       int i;
+       for (i=0;i<rsrv->piped.pending_commands;i++)
+       {
+               free(rsrv->piped.commands[i].s);
+               rsrv->piped.commands[i].len=0;
+       }
+       rsrv->piped.pending_commands=0;
+}
+
+/**
+ *
+ */
 int redisc_exec_pipelined(redisc_server_t *rsrv)
 {
        redisc_reply_t *rpl;
        int i;
        LM_DBG("redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
-       if (rsrv->pendingReplies == 0)
+       if (rsrv->piped.pending_commands == 0)
        {
-               LM_ERR("call for redis_cmd without any pipelined commands\n");
+               LM_WARN("call for redis_cmd without any pipelined commands\n");
                return -1;
        }
+       if(rsrv->ctxRedis==NULL)
+       {
+               LM_ERR("no redis context for server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
+               goto error_exec;
+       }
 
-       /* send the first command and wait for the replies */
-       rpl=rsrv->pipelinedReplies[0];
+       /* send the commands and retrieve the first reply */
+       rpl=rsrv->piped.replies[0];
 
        if(rpl->rplRedis!=NULL)
        {
@@ -509,7 +569,9 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
                rpl->rplRedis = NULL;
        }
 
+       redisc_create_pipelined_message(rsrv);
        redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
+
        if (rpl->rplRedis == NULL)
        {
                /* null reply, reconnect and try again */
@@ -517,7 +579,7 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
                {
                        LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
                }
-               if (redisc_reconnect_server(rsrv) == 0)
+               if (redisc_create_pipelined_message(rsrv) == 0)
                {
                        redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
                        if (rpl->rplRedis == NULL)
@@ -528,28 +590,27 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
                }
                else
                {
-                       LM_ERR("unable to reconnect to redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
                        goto error_exec;
                }
        }
        LM_DBG("reply is [%s]",rpl->rplRedis->str);
 
        /* replies are received just retrieve them */
-       for (i=1;i<rsrv->pendingReplies;i++)
+       for (i=1;i<rsrv->piped.pending_commands;i++)
        {
-               rpl=rsrv->pipelinedReplies[i];
+               rpl=rsrv->piped.replies[i];
                if (redisGetReplyFromReader(rsrv->ctxRedis, (void**) &rpl->rplRedis) != REDIS_OK)
                {
                        LM_ERR("Unable to read reply\n");
-                       goto error_exec;
+                       continue;
                }
                LM_DBG("reply is [%s]",rpl->rplRedis->str);
        }
-       rsrv->pendingReplies = 0;
+       redisc_free_pipelined_cmds(rsrv);
        return 0;
 
 error_exec:
-       rsrv->pendingReplies = 0;
+       redisc_free_pipelined_cmds(rsrv);
        return -1;
 }
 
@@ -639,7 +700,7 @@ int redisc_exec(str *srv, str *res, str *cmd, ...)
        }
        LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
   
-       if (rsrv->pendingReplies != 0)
+       if (rsrv->piped.pending_commands != 0)
        {
                LM_NOTICE("Calling redis_cmd with pipelined commands in the buffer. Automatically call redis_execute");
                redisc_exec_pipelined(rsrv);
@@ -899,3 +960,20 @@ int redisc_check_auth(redisc_server_t *rsrv, char *pass)
        freeReplyObject(reply);
        return retval;
 }
+
+/* backwards compatibility with hiredis < 0.12 */
+#if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
+int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len)
+{
+       sds newbuf;
+
+       newbuf = sdscatlen(c->obuf,cmd,len);
+       if (newbuf == NULL) {
+               c->err = REDIS_ERR_OOM;
+               strcpy(c->errstr,"Out of memory");
+               return REDIS_ERR;
+       }
+       c->obuf = newbuf;
+       return REDIS_OK;
+}
+#endif
index 958925c..f0dcf94 100644 (file)
@@ -47,14 +47,19 @@ typedef struct redisc_reply {
        struct redisc_reply *next;
 } redisc_reply_t;
 
+typedef struct redisc_piped_cmds {
+       str commands[MAXIMUM_PIPELINED_COMMANDS];
+       redisc_reply_t *replies[MAXIMUM_PIPELINED_COMMANDS];
+       int pending_commands;
+} redisc_piped_cmds_t;
+
 typedef struct redisc_server {
        str *sname;
        unsigned int hname;
        param_t *attrs;
        redisContext *ctxRedis;
        struct redisc_server *next;
-       redisc_reply_t *pipelinedReplies[MAXIMUM_PIPELINED_COMMANDS];
-       int pendingReplies;
+       redisc_piped_cmds_t piped;
 } redisc_server_t;
 
 typedef struct redisc_pv {
@@ -74,6 +79,8 @@ int redisc_exec(str *srv, str *res, str *cmd, ...);
 int redisc_append_cmd(str *srv, str *res, str *cmd, ...);
 int redisc_exec_pipelined_cmd(str *srv);
 int redisc_exec_pipelined(redisc_server_t *rsrv);
+int redisc_create_pipelined_message(redisc_server_t *rsrv);
+void redisc_free_pipelined_cmds(redisc_server_t *rsrv);
 redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv,
                const size_t *argvlen);
 redisc_reply_t *redisc_get_reply(str *name);