ndb_redis: add disable server on failure feature
authorClaudiu Boriga <paul.boriga@1and1.ro>
Wed, 3 May 2017 13:19:04 +0000 (16:19 +0300)
committerClaudiu Boriga <paul.boriga@1and1.ro>
Wed, 3 May 2017 13:19:04 +0000 (16:19 +0300)
  - if a server fails multiple consecutive times
    it is disabled temporarily and commands to it
    will not do anything.

src/modules/ndb_redis/doc/ndb_redis_admin.xml
src/modules/ndb_redis/ndb_redis_mod.c
src/modules/ndb_redis/redis_client.c
src/modules/ndb_redis/redis_client.h

index f7cc944..e7dc381 100644 (file)
@@ -181,6 +181,64 @@ modparam("ndb_redis", "cluster", 1)
                        </programlisting>
                </example>
        </section>
+       <section id="ndb_redis.p.allowed_timeouts">
+               <title><varname>allowed_timeouts</varname> (integer)</title>
+               <para>
+                       If this is set to a non-negative value, it sets the number
+                       of consecutive REDIS commands that can fail before temporarily
+                       disabling the REDIS server. This is similar to rtpengine_disable_tout
+                       parameter from the rtpengine module.
+               </para>
+               <para>
+                       When communicating with a REDIS server, if redis_cmd or redis_execute
+                       will fail for more than <quote>allowed_timeouts</quote> consecutive
+                       times, the server will be temporary disabled for a number of seconds 
+                       configured by the <quote>disable_time</quote> parameter.
+               </para>
+               <para>
+                       Disabling a server means that further redis_cmd and redis_execute commands
+                       will not do anything and return a negative value <quote>-2</quote>. 
+                       Messages are also logged when disabling and re-enabling a server.
+               </para>
+               <para>
+                       The number of consecutive fails are counted by each Kamailio process, 
+                       so when disabling a server this is done just for that process, not globally.
+               </para>
+               <para>
+               <emphasis>
+                       Default value is <quote>-1</quote> (disabled).
+               </emphasis>
+               </para>
+               <example>
+                       <title>Set <varname>allowed_timeots</varname> parameter</title>
+                       <programlisting format="linespecific">
+...
+modparam("ndb_redis", "allowed_timeouts", 3)
+...
+                       </programlisting>
+               </example>
+       </section>
+       <section id="ndb_redis.p.disable_time">
+               <title><varname>disable_time</varname> (integer)</title>
+               <para>
+                       If allowed_timeouts is set to a non negative value this determines the 
+                       number of seconds the REDIS server will be disabled
+               </para>
+               <para>
+               <emphasis>
+                       Default value is <quote>0</quote>.
+               </emphasis>
+               </para>
+               <example>
+                       <title>Set <varname>disable_time</varname> parameter</title>
+                       <programlisting format="linespecific">
+...
+modparam("ndb_redis", "allowed_timeouts", 0)
+modparam("ndb_redis", "disable_time", 30)
+...
+                       </programlisting>
+               </example>
+       </section>
        </section>
 
        <section>
index 36b4efd..0906aab 100644 (file)
@@ -48,6 +48,8 @@ int init_without_redis = 0;
 int redis_connect_timeout_param = 1000;
 int redis_cmd_timeout_param = 1000;
 int redis_cluster_param = 0;
+int disable_time=0;
+int allowed_timeouts=-1;
 
 static int w_redis_cmd3(struct sip_msg* msg, char* ssrv, char* scmd,
                char* sres);
@@ -120,6 +122,8 @@ static param_export_t params[]={
        {"connect_timeout", INT_PARAM, &redis_connect_timeout_param},
        {"cmd_timeout", INT_PARAM, &redis_cmd_timeout_param},
        {"cluster", INT_PARAM, &redis_cluster_param},
+       {"disable_time", INT_PARAM, &disable_time},
+       {"allowed_timeouts", INT_PARAM, &allowed_timeouts},
        {0, 0, 0}
 };
 
index 4ebdeb1..728a93d 100644 (file)
@@ -49,6 +49,8 @@ extern int init_without_redis;
 extern int redis_connect_timeout_param;
 extern int redis_cmd_timeout_param;
 extern int redis_cluster_param;
+extern int disable_time;
+extern int allowed_timeouts;
 
 /* backwards compatibility with hiredis < 0.12 */
 #if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
@@ -547,7 +549,15 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
 {
        redisc_reply_t *rpl;
        int i;
+
        LM_DBG("redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
+
+       /* if server is disabled do nothing unless the disable time has passed */
+       if (redis_check_server(rsrv))
+       {
+               goto srv_disabled;
+       }
+
        if (rsrv->piped.pending_commands == 0)
        {
                LM_WARN("call for redis_cmd without any pipelined commands\n");
@@ -584,12 +594,14 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
                        redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
                        if (rpl->rplRedis == NULL)
                        {
+                               redis_count_err_and_disable(rsrv);
                                LM_ERR("Unable to read reply\n");
                                goto error_exec;
                        }
                }
                else
                {
+                       redis_count_err_and_disable(rsrv);
                        goto error_exec;
                }
        }
@@ -613,11 +625,16 @@ int redisc_exec_pipelined(redisc_server_t *rsrv)
                LM_DBG("reply is [%s]",rpl->rplRedis->str);
        }
        redisc_free_pipelined_cmds(rsrv);
+       rsrv->disable.consecutive_errors = 0;
        return 0;
 
 error_exec:
        redisc_free_pipelined_cmds(rsrv);
        return -1;
+
+srv_disabled:
+       redisc_free_pipelined_cmds(rsrv);
+       return -2;
 }
 
 int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) {
@@ -711,6 +728,11 @@ int redisc_exec(str *srv, str *res, str *cmd, ...)
                LM_NOTICE("Calling redis_cmd with pipelined commands in the buffer. Automatically call redis_execute");
                redisc_exec_pipelined(rsrv);
        }
+       /* if server is disabled do nothing unless the disable time has passed */
+       if (redis_check_server(rsrv))
+       {
+               goto srv_disabled;
+       }
   
        rpl = redisc_get_reply(res);
        if(rpl==NULL)
@@ -738,7 +760,15 @@ int redisc_exec(str *srv, str *res, str *cmd, ...)
                if(redisc_reconnect_server(rsrv)==0)
                {
                        rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap2);
-               } else {
+                       if (rpl->rplRedis ==NULL)
+                       {
+                               redis_count_err_and_disable(rsrv);
+                               goto error_exec;
+                       }
+               }
+               else
+               {
+                       redis_count_err_and_disable(rsrv);
                        LM_ERR("unable to reconnect to redis server: %.*s\n",
                                        srv->len, srv->s);
                        cmd->s[cmd->len] = c;
@@ -781,6 +811,7 @@ int redisc_exec(str *srv, str *res, str *cmd, ...)
                }
        }
        cmd->s[cmd->len] = c;
+       rsrv->disable.consecutive_errors = 0;
        va_end(ap);
        va_end(ap2);
        va_end(ap3);
@@ -797,6 +828,13 @@ error_exec:
        va_end(ap4);
        return -1;
 
+srv_disabled:
+       va_end(ap);
+       va_end(ap2);
+       va_end(ap3);
+       va_end(ap4);
+       return -2;
+
 }
 
 /**
@@ -983,3 +1021,40 @@ int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len)
        return REDIS_OK;
 }
 #endif
+
+int redis_check_server(redisc_server_t *rsrv)
+{
+
+       if (rsrv->disable.disabled)
+       {
+               if (get_ticks() > rsrv->disable.restore_tick)
+               {
+                       LM_INFO("REDIS server %.*s re-enabled",rsrv->sname->len,rsrv->sname->s);
+                       rsrv->disable.disabled = 0;
+                       rsrv->disable.consecutive_errors = 0;
+               }
+               else
+               {
+                       return 1;
+               }
+       }
+       return 0;
+}
+
+int redis_count_err_and_disable(redisc_server_t *rsrv)
+{
+       if (allowed_timeouts < 0)
+       {
+               return 0;
+       }
+
+       rsrv->disable.consecutive_errors++;
+       if (rsrv->disable.consecutive_errors > allowed_timeouts)
+       {
+               rsrv->disable.disabled=1;
+               rsrv->disable.restore_tick=get_ticks() + disable_time;
+               LM_WARN("REDIS server %.*s disabled for %d seconds",rsrv->sname->len,rsrv->sname->s,disable_time);
+               return 1;
+       }
+       return 0;
+}
index f0dcf94..de52d33 100644 (file)
@@ -53,6 +53,12 @@ typedef struct redisc_piped_cmds {
        int pending_commands;
 } redisc_piped_cmds_t;
 
+typedef struct redisc_srv_disable {
+       int disabled;
+       int consecutive_errors;
+       time_t restore_tick;
+} redisc_srv_disable_t;
+
 typedef struct redisc_server {
        str *sname;
        unsigned int hname;
@@ -60,6 +66,7 @@ typedef struct redisc_server {
        redisContext *ctxRedis;
        struct redisc_server *next;
        redisc_piped_cmds_t piped;
+       redisc_srv_disable_t disable;
 } redisc_server_t;
 
 typedef struct redisc_pv {
@@ -86,4 +93,6 @@ redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv,
 redisc_reply_t *redisc_get_reply(str *name);
 int redisc_free_reply(str *name);
 int redisc_check_auth(redisc_server_t *rsrv, char *pass);
+int redis_check_server(redisc_server_t *rsrv);
+int redis_count_err_and_disable(redisc_server_t *rsrv);
 #endif