ndb_redis: add pipeline suppport for REDIS commands
authorClaudiu Boriga <paul.boriga@1and1.ro>
Mon, 24 Apr 2017 19:02:10 +0000 (21:02 +0200)
committerDaniel-Constantin Mierla <miconda@gmail.com>
Mon, 24 Apr 2017 19:02:10 +0000 (21:02 +0200)
src/modules/ndb_redis/doc/ndb_redis_admin.xml
src/modules/ndb_redis/ndb_redis_mod.c
src/modules/ndb_redis/redis_client.c
src/modules/ndb_redis/redis_client.h

index 50bd033..6b74f85 100644 (file)
@@ -265,6 +265,93 @@ if(redis_cmd("srvN", "HMGET foo_key field1 field3", "r")) {
 </programlisting>
                </example>
        </section>
+       <section id="ndb_redis.f.redis_pipe_cmd">
+               <title>
+               <function moreinfo="none">redis_pipe_cmd(srvname, command, ..., replyid)</function>
+               </title>
+               <para>
+                       Add a command to be sent to REDIS server identified by srvname. 
+                       All the commands will be stored in a buffer until a call to 
+                       redis_execute is made. When calling redis_execute the stored commands 
+                       are sent using the pipelining functionality of redis. The replies
+                       will be stored in local containers identified by the replyid of each 
+                       added command. All the parameters can be strings with pseudo-variables
+                       that are evaluated at runtime.
+               </para>
+               <para>
+                       This command is similar in syntax with redis_cmd, the only difference 
+                       is that it does not send the command but instead appends it to a buffer.
+               </para>
+               <para>
+                       See examples from redis_execute.
+               </para>
+               <para>
+                       Note: Pipelining feature is incompatible with the clustering feature. 
+                       If cluster parameter is set to 1, this function will log an error and do nothing.
+               </para>
+       </section>
+       <section id="ndb_redis.f.redis_execute">
+       <title>
+               <function moreinfo="none">redis_execute([srvname])</function>
+       </title>
+       <para>
+               Sends commands to REDIS server identified by srvname. Commands are added 
+               with redis_pipe_cmd function, and will be stored for an existing REDIS server.
+               When this function is called all the commands will be sent in a single message
+               to the REDIS server.
+
+       </para>
+       <para>
+               If this function is called without any parameters, it will iterate through all 
+               existing servers and send the existing pipelined commands for them.
+       </para>
+       <para>
+               When using redis_cmd together with redis_pipe_cmd it is recommended that a call to 
+               redis_execute is made before calling redis_cmd in case there are pipelined commands,
+               otherwise when calling redis_cmd, if pipelined messages exist, a call to redis_execute
+               is made automatically and a warning message is logged. 
+       </para>
+       <para>
+               Note: Pipelining feature is incompatible with the clustering feature. 
+               If cluster parameter is set to 1, this function will log an error and do nothing.
+       </para>
+       <example>
+               <title><function>redis_execute</function> usage</title>
+               <programlisting format="linespecific">
+...
+After several redis command calls:
+       redis_pipe_cmd("srvA", "SET foo bar", "r1");
+
+       redis_pipe_cmd("srvB", "SET ruri $ru", "r2");
+
+       redis_pipe_cmd("srvC", "GET foo", "r3");
+
+Send the data and retrieve the results:
+       redis_execute("srvA"); //send command to srvA and wait for reply. Store the reply in r1
+
+       redis_execute(); //send remaining commands (the set to srvB and get to srvC), wait for replies, and store the replies in r2 and r3
+
+Using both redis_cmd and redis_pipe_cmd:
+       redis_pipe_cmd("srvA", "SET foo bar", "r1");
+
+       redis_pipe_cmd("srvA", "SET ruri $ru", "r2");
+
+       redis_execute("srvA"); //send commands to srvA and wait for reply. Store replies in r1 and r2 
+
+       redis_cmd("srvA", "GET foo", "r3"); //send command, wait for reply and store it in r3
+
+
+       redis_pipe_cmd("srvA", "SET foo bar", "r1");
+
+       redis_pipe_cmd("srvA", "SET ruri $ru", "r2");
+
+       redis_cmd("srvA", "GET foo", "r3"); //first call redis execute (replies are stored in r1 and r2), log warning and execute redis_cmd
+
+       redis_execute("srvA"); //this does nothing as there are no more pipelined commands. The call is not necessary
+...
+               </programlisting>
+       </example>
+       </section>
        <section id="ndb_redis.f.redis_free">
        <title>
                <function moreinfo="none">redis_free(replyid)</function>
index 862bda0..d224782 100644 (file)
@@ -57,7 +57,17 @@ static int w_redis_cmd5(struct sip_msg* msg, char* ssrv, char* scmd,
                char *sargv1, char *sargv2, char* sres);
 static int w_redis_cmd6(struct sip_msg* msg, char* ssrv, char* scmd,
                char *sargv1, char *sargv2, char *sargv3, char* sres);
+static int w_redis_pipe_cmd3(struct sip_msg* msg, char* ssrv, char* scmd,
+               char* sres);
+static int w_redis_pipe_cmd4(struct sip_msg* msg, char* ssrv, char* scmd,
+               char *sargv1, char* sres);
+static int w_redis_pipe_cmd5(struct sip_msg* msg, char* ssrv, char* scmd,
+               char *sargv1, char *sargv2, char* sres);
+static int w_redis_pipe_cmd6(struct sip_msg* msg, char* ssrv, char* scmd,
+               char *sargv1, char *sargv2, char *sargv3, char* sres);
 static int fixup_redis_cmd6(void** param, int param_no);
+static int w_redis_execute_noargs(struct sip_msg* msg);
+static int w_redis_execute(struct sip_msg* msg, char* ssrv);
 
 static int w_redis_free_reply(struct sip_msg* msg, char* res);
 
@@ -86,6 +96,18 @@ static cmd_export_t cmds[]={
                0, ANY_ROUTE},
        {"redis_cmd", (cmd_function)w_redis_cmd6, 6, fixup_redis_cmd6,
                0, ANY_ROUTE},
+       {"redis_pipe_cmd", (cmd_function)w_redis_pipe_cmd3, 3, fixup_redis_cmd6,
+               0, ANY_ROUTE},
+       {"redis_pipe_cmd", (cmd_function)w_redis_pipe_cmd4, 4, fixup_redis_cmd6,
+               0, ANY_ROUTE},
+       {"redis_pipe_cmd", (cmd_function)w_redis_pipe_cmd5, 5, fixup_redis_cmd6,
+               0, ANY_ROUTE},
+       {"redis_pipe_cmd", (cmd_function)w_redis_pipe_cmd6, 6, fixup_redis_cmd6,
+               0, ANY_ROUTE},
+       {"redis_execute", (cmd_function)w_redis_execute, 1, fixup_redis_cmd6,
+               0, ANY_ROUTE},
+       {"redis_execute", (cmd_function)w_redis_execute_noargs, 0, 0,
+               0, ANY_ROUTE},
        {"redis_free", (cmd_function)w_redis_free_reply, 1, fixup_spve_null,
                0, ANY_ROUTE},
 
@@ -326,6 +348,239 @@ static int w_redis_cmd6(struct sip_msg* msg, char* ssrv, char* scmd,
 /**
  *
  */
+static int w_redis_pipe_cmd3(struct sip_msg* msg, char* ssrv, char* scmd, char* sres)
+{
+       str s[3];
+
+       if (redis_cluster_param) 
+       {
+               LM_ERR("Pipelining is not supported if cluster parameter is enabled\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)ssrv, &s[0])!=0)
+       {
+               LM_ERR("no redis server name\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)scmd, &s[1])!=0)
+       {
+               LM_ERR("no redis command\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)sres, &s[2])!=0)
+       {
+               LM_ERR("no redis reply name\n");
+               return -1;
+       }
+
+       if(redisc_append_cmd(&s[0], &s[2], &s[1])<0)
+               return -1;
+       return 1;
+}
+
+/**
+ *
+ */
+static int w_redis_pipe_cmd4(struct sip_msg* msg, char* ssrv, char* scmd,
+               char *sargv1, char* sres)
+{
+       str s[3];
+       str arg1;
+       char c1;
+
+       if (redis_cluster_param) 
+       {
+               LM_ERR("Pipelining is not supported if cluster parameter is enabled\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)ssrv, &s[0])!=0)
+       {
+               LM_ERR("no redis server name\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)scmd, &s[1])!=0)
+       {
+               LM_ERR("no redis command\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)sargv1, &arg1)!=0)
+       {
+               LM_ERR("no argument 1\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)sres, &s[2])!=0)
+       {
+               LM_ERR("no redis reply name\n");
+               return -1;
+       }
+
+       c1 = arg1.s[arg1.len];
+       arg1.s[arg1.len] = '\0';
+       if(redisc_append_cmd(&s[0], &s[2], &s[1], arg1.s)<0) {
+               arg1.s[arg1.len] = c1;
+               return -1;
+       }
+       arg1.s[arg1.len] = c1;
+       return 1;
+}
+
+/**
+ *
+ */
+static int w_redis_pipe_cmd5(struct sip_msg* msg, char* ssrv, char* scmd,
+               char *sargv1, char *sargv2, char* sres)
+{
+       str s[3];
+       str arg1, arg2;
+       char c1, c2;
+
+       if (redis_cluster_param) 
+       {
+               LM_ERR("Pipelining is not supported if cluster parameter is enabled\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)ssrv, &s[0])!=0)
+       {
+               LM_ERR("no redis server name\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)scmd, &s[1])!=0)
+       {
+               LM_ERR("no redis command\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)sargv1, &arg1)!=0)
+       {
+               LM_ERR("no argument 1\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)sargv2, &arg2)!=0)
+       {
+               LM_ERR("no argument 2\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)sres, &s[2])!=0)
+       {
+               LM_ERR("no redis reply name\n");
+               return -1;
+       }
+
+       c1 = arg1.s[arg1.len];
+       c2 = arg2.s[arg2.len];
+       arg1.s[arg1.len] = '\0';
+       arg2.s[arg2.len] = '\0';
+       if(redisc_append_cmd(&s[0], &s[2], &s[1], arg1.s, arg2.s)<0) {
+               arg1.s[arg1.len] = c1;
+               arg2.s[arg2.len] = c2;
+               return -1;
+       }
+       arg1.s[arg1.len] = c1;
+       arg2.s[arg2.len] = c2;
+       return 1;
+}
+
+/**
+ *
+ */
+static int w_redis_pipe_cmd6(struct sip_msg* msg, char* ssrv, char* scmd,
+               char *sargv1, char *sargv2, char *sargv3, char* sres)
+{
+       str s[3];
+       str arg1, arg2, arg3;
+       char c1, c2, c3;
+
+       if (redis_cluster_param) 
+       {
+               LM_ERR("Pipelining is not supported if cluster parameter is enabled\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)ssrv, &s[0])!=0)
+       {
+               LM_ERR("no redis server name\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)scmd, &s[1])!=0)
+       {
+               LM_ERR("no redis command\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)sargv1, &arg1)!=0)
+       {
+               LM_ERR("no argument 1\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)sargv2, &arg2)!=0)
+       {
+               LM_ERR("no argument 2\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)sargv3, &arg3)!=0)
+       {
+               LM_ERR("no argument 3\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)sres, &s[2])!=0)
+       {
+               LM_ERR("no redis reply name\n");
+               return -1;
+       }
+
+       c1 = arg1.s[arg1.len];
+       c2 = arg2.s[arg2.len];
+       c3 = arg3.s[arg3.len];
+       arg1.s[arg1.len] = '\0';
+       arg2.s[arg2.len] = '\0';
+       arg3.s[arg3.len] = '\0';
+       if(redisc_append_cmd(&s[0], &s[2], &s[1], arg1.s, arg2.s, arg3.s)<0) {
+               arg1.s[arg1.len] = c1;
+               arg2.s[arg2.len] = c2;
+               arg3.s[arg3.len] = c3;
+               return -1;
+       }
+       arg1.s[arg1.len] = c1;
+       arg2.s[arg2.len] = c2;
+       arg3.s[arg3.len] = c3;
+       return 1;
+}
+
+/**
+ *
+ */
+static int w_redis_execute_noargs(struct sip_msg* msg)
+{
+       if (redis_cluster_param) 
+       {
+               LM_ERR("Pipelining is not supported if cluster parameter is enabled\n");
+               return -1;
+       }
+       redisc_exec_pipelined_cmd_all();
+       return 1;
+}
+
+/**
+ *
+ */
+static int w_redis_execute(struct sip_msg* msg, char* ssrv)
+{
+       str s;
+
+       if (redis_cluster_param) 
+       {
+               LM_ERR("Pipelining is not supported if cluster parameter is enabled\n");
+               return -1;
+       }
+       if(fixup_get_svalue(msg, (gparam_t*)ssrv, &s)!=0)
+       {
+               LM_ERR("no redis server name\n");
+               return -1;
+       }
+       redisc_exec_pipelined_cmd(&s);
+       return 1;
+}
+
+/**
+ *
+ */
 static int fixup_redis_cmd6(void** param, int param_no)
 {
        return fixup_spve_null(param, 1);
index 26f86da..c4fee91 100644 (file)
@@ -388,6 +388,191 @@ err:
        return -1;
 }
 
+/**
+ *
+ */
+int redisc_append_cmd(str *srv, str *res, str *cmd, ...)
+{
+       redisc_server_t *rsrv=NULL;
+       redisc_reply_t *rpl;
+       char c;
+       va_list ap;
+
+       va_start(ap, cmd);
+
+       if(srv==NULL || cmd==NULL || res==NULL)
+       {
+               LM_ERR("invalid parameters");
+               goto error_cmd;
+       }
+       if(srv->len==0 || res->len==0 || cmd->len==0)
+       {
+               LM_ERR("invalid parameters");
+               goto error_cmd;
+       }
+       rsrv = redisc_get_server(srv);
+       if(rsrv==NULL)
+       {
+               LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
+               goto error_cmd;
+       }
+       if(rsrv->ctxRedis==NULL)
+       {
+               LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
+               goto error_cmd;
+       }
+       if (rsrv->pendingReplies >= MAXIMUM_PIPELINED_COMMANDS)
+       {
+               LM_ERR("Too many pipelined commands, maximum is %d\n",MAXIMUM_PIPELINED_COMMANDS);
+               goto error_cmd;
+       }
+       rpl = redisc_get_reply(res);
+       if(rpl==NULL)
+       {
+               LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
+               goto error_cmd;
+       }
+
+       c = cmd->s[cmd->len];
+       cmd->s[cmd->len] = '\0';
+       if (redisvAppendCommand(rsrv->ctxRedis,cmd->s,ap) != REDIS_OK)
+       {
+               LM_ERR("Invalid redis command : %s\n",cmd->s);
+               goto error_cmd;
+       }
+       rsrv->pipelinedReplies[rsrv->pendingReplies]=rpl;
+       rsrv->pendingReplies++;
+
+       cmd->s[cmd->len] = c;
+       va_end(ap);
+       return 0;
+
+error_cmd:
+       va_end(ap);
+       return -1;
+
+}
+
+
+/**
+ *
+ */
+int redisc_exec_pipelined_cmd(str *srv)
+{
+       redisc_server_t *rsrv=NULL;
+
+       if (srv == NULL)
+       {
+               LM_ERR("invalid parameters");
+               return -1;
+       }
+       if (srv->len == 0)
+       {
+               LM_ERR("invalid parameters");
+               return -1;
+       }
+       rsrv = redisc_get_server(srv);
+       if (rsrv == NULL)
+       {
+               LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
+               return -1;
+       }
+       if (rsrv->ctxRedis == NULL)
+       {
+               LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
+               return -1;
+       }
+       return redisc_exec_pipelined(rsrv);
+}
+
+/**
+ *
+ */
+int redisc_exec_pipelined_cmd_all()
+{
+       redisc_server_t *rsrv=NULL;
+
+       rsrv=_redisc_srv_list;
+       while(rsrv!=NULL)
+       {
+               if ((rsrv->ctxRedis != NULL) && (rsrv->pendingReplies != 0))
+               {
+                       redisc_exec_pipelined(rsrv);
+               }
+               rsrv=rsrv->next;
+       }
+
+       return 0;
+}
+
+/**
+ *
+ */
+int redisc_exec_pipelined(redisc_server_t *rsrv)
+{
+       redisc_reply_t *rpl;
+       int i;
+       LM_DBG("redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
+       if (rsrv->pendingReplies == 0)
+       {
+               LM_ERR("call for redis_cmd without any pipelined commands\n");
+               return -1;
+       }
+
+       /* send the first command and wait for the replies */
+       rpl=rsrv->pipelinedReplies[0];
+
+       if(rpl->rplRedis!=NULL)
+       {
+               /* clean up previous redis reply */
+               freeReplyObject(rpl->rplRedis);
+               rpl->rplRedis = NULL;
+       }
+
+       redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
+       if (rpl->rplRedis == NULL)
+       {
+               /* null reply, reconnect and try again */
+               if (rsrv->ctxRedis->err)
+               {
+                       LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
+               }
+               if (redisc_reconnect_server(rsrv) == 0)
+               {
+                       redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
+                       if (rpl->rplRedis == NULL)
+                       {
+                               LM_ERR("Unable to read reply\n");
+                               goto error_exec;
+                       }
+               }
+               else
+               {
+                       LM_ERR("unable to reconnect to redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
+                       goto error_exec;
+               }
+       }
+       LM_DBG("reply is [%s]",rpl->rplRedis->str);
+
+       /* replies are received just retrieve them */
+       for (i=1;i<rsrv->pendingReplies;i++)
+       {
+               rpl=rsrv->pipelinedReplies[i];
+               if (redisGetReplyFromReader(rsrv->ctxRedis, (void**) &rpl->rplRedis) != REDIS_OK)
+               {
+                       LM_ERR("Unable to read reply\n");
+                       goto error_exec;
+               }
+               LM_DBG("reply is [%s]",rpl->rplRedis->str);
+       }
+       rsrv->pendingReplies = 0;
+       return 0;
+
+error_exec:
+       rsrv->pendingReplies = 0;
+       return -1;
+}
+
 int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) {
        redisc_server_t *rsrv_new;
        char buffername[100];
@@ -473,7 +658,13 @@ int redisc_exec(str *srv, str *res, str *cmd, ...)
                goto error_exec;
        }
        LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
-
+  
+       if (rsrv->pendingReplies != 0)
+       {
+               LM_NOTICE("Calling redis_cmd with pipelined commands in the buffer. Automatically call redis_execute");
+               redisc_exec_pipelined(rsrv);
+       }
+  
        rpl = redisc_get_reply(res);
        if(rpl==NULL)
        {
index ce334df..b99da28 100644 (file)
 #include "../../core/parser/parse_param.h"
 #include "../../core/mod_fix.h"
 
+#define MAXIMUM_PIPELINED_COMMANDS 1000
+
 int redisc_init(void);
 int redisc_destroy(void);
 int redisc_add_server(char *spec);
-int redisc_exec(str *srv, str *res, str *cmd, ...);
+
+typedef struct redisc_reply {
+       str rname;
+       unsigned int hname;
+       redisReply *rplRedis;
+       struct redisc_reply *next;
+} redisc_reply_t;
 
 typedef struct redisc_server {
        str *sname;
@@ -45,15 +53,10 @@ typedef struct redisc_server {
        param_t *attrs;
        redisContext *ctxRedis;
        struct redisc_server *next;
+       redisc_reply_t *pipelinedReplies[MAXIMUM_PIPELINED_COMMANDS];
+       int pendingReplies;
 } redisc_server_t;
 
-typedef struct redisc_reply {
-       str rname;
-       unsigned int hname;
-       redisReply *rplRedis;
-       struct redisc_reply *next;
-} redisc_reply_t;
-
 typedef struct redisc_pv {
        str rname;
        redisc_reply_t *reply;
@@ -68,6 +71,10 @@ int redisc_reconnect_server(redisc_server_t *rsrv);
 
 /* Command related functions */
 int redisc_exec(str *srv, str *res, str *cmd, ...);
+int redisc_append_cmd(str *srv, str *res, str *cmd, ...);
+int redisc_exec_pipelined_cmd(str *srv);
+int redisc_exec_pipelined_cmd_all();
+int redisc_exec_pipelined(redisc_server_t *rsrv);
 redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv,
                const size_t *argvlen);
 redisc_reply_t *redisc_get_reply(str *name);