2 * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com)
4 * Copyright (C) 2012 Vicente Hernando Ara (System One: www.systemonenoc.com)
5 * - for: redis array reply support
7 * Copyright (C) 2017 Carsten Bock (ng-voice GmbH)
8 * - for: Cluster support
10 * This file is part of Kamailio, a free SIP server.
12 * Kamailio is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version
17 * Kamailio is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU General Public License for more details.
22 * You should have received a copy of the GNU General Public License
23 * along with this program; if not, write to the Free Software
24 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
35 #include "../../core/mem/mem.h"
36 #include "../../core/dprint.h"
37 #include "../../core/hashes.h"
38 #include "../../core/ut.h"
40 #include "redis_client.h"
42 #define redisCommandNR(a...) (int)({ void *__tmp; __tmp = redisCommand(a); \
43 if (__tmp) freeReplyObject(__tmp); __tmp ? 0 : -1;})
45 static redisc_server_t * _redisc_srv_list=NULL;
47 static redisc_reply_t *_redisc_rpl_list=NULL;
49 extern int init_without_redis;
50 extern int redis_connect_timeout_param;
51 extern int redis_cmd_timeout_param;
52 extern int redis_cluster_param;
53 extern int redis_disable_time_param;
54 extern int redis_allowed_timeouts_param;
55 extern int redis_flush_on_reconnect_param;
56 extern int redis_allow_dynamic_nodes_param;
58 /* backwards compatibility with hiredis < 0.12 */
59 #if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
61 sds sdscatlen(sds s, const void *t, size_t len);
62 int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len);
64 #define redis_append_formatted_command redisAppendFormattedCommand
72 char addr[256], pass[256], unix_sock_path[256], sentinel_group[256];
74 unsigned int port, db, sock = 0, haspass = 0, sentinel_master = 1;
76 redisc_server_t *rsrv=NULL;
78 struct timeval tv_conn;
79 struct timeval tv_cmd;
81 tv_conn.tv_sec = (int) redis_connect_timeout_param / 1000;
82 tv_conn.tv_usec = (int) (redis_connect_timeout_param % 1000) * 1000;
84 tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000;
85 tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000;
87 if(_redisc_srv_list==NULL)
89 LM_ERR("no redis servers defined\n");
93 for(rsrv=_redisc_srv_list; rsrv; rsrv=rsrv->next)
95 char sentinels[MAXIMUM_SENTINELS][256];
96 uint8_t sentinels_count = 0;
103 memset(addr, 0, sizeof(addr));
104 memset(pass, 0, sizeof(pass));
105 memset(unix_sock_path, 0, sizeof(unix_sock_path));
107 for (pit = rsrv->attrs; pit; pit=pit->next)
109 if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) {
110 snprintf(unix_sock_path, sizeof(unix_sock_path)-1, "%.*s",
111 pit->body.len, pit->body.s);
113 } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
114 snprintf(addr, sizeof(addr)-1, "%.*s",
115 pit->body.len, pit->body.s);
116 } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
117 if(str2int(&pit->body, &port) < 0)
119 } else if(pit->name.len==2 && strncmp(pit->name.s, "db", 2)==0) {
120 if(str2int(&pit->body, &db) < 0)
122 } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) {
123 snprintf(pass, sizeof(pass)-1, "%.*s",
124 pit->body.len, pit->body.s);
126 } else if(pit->name.len==14 && strncmp(pit->name.s,
127 "sentinel_group", 14)==0) {
128 snprintf(sentinel_group, sizeof(sentinel_group)-1, "%.*s",
129 pit->body.len, pit->body.s);
130 } else if(pit->name.len==15 && strncmp(pit->name.s,
131 "sentinel_master", 15)==0) {
132 if(str2int(&pit->body, &sentinel_master) < 0)
134 } else if(pit->name.len==8 && strncmp(pit->name.s,
136 if( sentinels_count < MAXIMUM_SENTINELS ){
137 snprintf(sentinels[sentinels_count],
138 sizeof(sentinels[sentinels_count])-1, "%.*s",
139 pit->body.len, pit->body.s);
143 LM_ERR("too many sentinels, maximum %d supported.\n",
150 // if sentinels are provided, we need to connect to them and retrieve the redis server
152 if(sentinels_count > 0) {
153 for(i= 0; i< sentinels_count; i++) {
154 char *sentinelAddr = sentinels[i];
157 redisReply *res, *res2;
160 if( (pos = strchr(sentinelAddr, ':')) != NULL ) {
165 redis = redisConnectWithTimeout(sentinelAddr, port, tv_conn);
167 if(sentinel_master != 0) {
168 res = redisCommand(redis,
169 "SENTINEL get-master-addr-by-name %s",
171 if( res && (res->type == REDIS_REPLY_ARRAY)
172 && (res->elements == 2) ) {
173 strncpy(addr, res->element[0]->str,
174 res->element[0]->len + 1);
175 port = atoi(res->element[1]->str);
176 LM_DBG("sentinel replied: %s:%d\n", addr, port);
180 res = redisCommand(redis, "SENTINEL slaves %s",
182 if( res && (res->type == REDIS_REPLY_ARRAY) ) {
183 for(row = 0; row< res->elements; row++){
184 res2 = res->element[row];
185 for(i= 0; i< res2->elements; i+= 2) {
186 if( strncmp(res2->element[i]->str,
188 strncpy(addr, res2->element[i+1]->str,
189 res2->element[i+1]->len);
190 addr[res2->element[i+1]->len] = '\0';
192 else if( strncmp(res2->element[i]->str,
194 port = atoi(res2->element[i+1]->str);
199 LM_DBG("slave for %s: %s:%d\n", sentinel_group,
208 LM_DBG("Connecting to unix socket: %s\n", unix_sock_path);
209 rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path,
212 LM_DBG("Connecting to %s:%d\n", addr, port);
213 rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn);
216 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
218 if(!rsrv->ctxRedis) {
219 LM_ERR("Failed to create REDIS-Context.\n");
222 if (rsrv->ctxRedis->err) {
223 LM_ERR("Failed to create REDIS returned an error: %s\n",
224 rsrv->ctxRedis->errstr);
227 if ((haspass != 0) && redisc_check_auth(rsrv, pass)) {
228 LM_ERR("Authentication failed.\n");
231 if (redisSetTimeout(rsrv->ctxRedis, tv_cmd)) {
232 LM_ERR("Failed to set timeout.\n");
235 if (redisCommandNR(rsrv->ctxRedis, "PING")) {
236 LM_ERR("Failed to send PING (REDIS returned %s).\n",
237 rsrv->ctxRedis->errstr);
240 if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis,
242 LM_ERR("Failed to send \"SELECT %i\" (REDIS returned \"%s\","
243 " and not in cluster mode).\n", db, rsrv->ctxRedis->errstr);
252 LM_ERR("error communicating with redis server [%.*s]"
253 " (unix:%s db:%d): %s\n",
254 rsrv->sname->len, rsrv->sname->s, unix_sock_path, db,
255 rsrv->ctxRedis->errstr);
257 LM_ERR("error communicating with redis server [%.*s] (%s:%d/%d): %s\n",
258 rsrv->sname->len, rsrv->sname->s, addr, port, db,
259 rsrv->ctxRedis->errstr);
261 if (init_without_redis==1)
263 LM_WARN("failed to initialize redis connections, but initializing"
264 " module anyway.\n");
271 LM_ERR("failed to connect to redis server [%.*s] (unix:%s db:%d)\n",
272 rsrv->sname->len, rsrv->sname->s, unix_sock_path, db);
274 LM_ERR("failed to connect to redis server [%.*s] (%s:%d/%d)\n",
275 rsrv->sname->len, rsrv->sname->s, addr, port, db);
277 if (init_without_redis==1)
279 LM_WARN("failed to initialize redis connections, but initializing"
280 " module anyway.\n");
290 int redisc_destroy(void)
292 redisc_reply_t *rpl, *next_rpl;
294 redisc_server_t *rsrv=NULL;
295 redisc_server_t *rsrv1=NULL;
297 rpl = _redisc_rpl_list;
300 next_rpl = rpl->next;
302 freeReplyObject(rpl->rplRedis);
304 if(rpl->rname.s != NULL)
305 pkg_free(rpl->rname.s);
310 _redisc_rpl_list = NULL;
312 if(_redisc_srv_list==NULL)
314 rsrv=_redisc_srv_list;
319 if (rsrv1->ctxRedis!=NULL)
320 redisFree(rsrv1->ctxRedis);
321 free_params(rsrv1->attrs);
324 _redisc_srv_list = NULL;
332 int redisc_add_server(char *spec)
335 param_hooks_t phooks;
336 redisc_server_t *rsrv=NULL;
340 s.len = strlen(spec);
341 if(s.s[s.len-1]==';')
343 if (parse_params(&s, CLASS_ANY, &phooks, &pit)<0)
345 LM_ERR("failed parsing params value\n");
348 rsrv = (redisc_server_t*)pkg_malloc(sizeof(redisc_server_t));
351 LM_ERR("no more pkg\n");
354 memset(rsrv, 0, sizeof(redisc_server_t));
357 for (pit = rsrv->attrs; pit; pit=pit->next)
359 if(pit->name.len==4 && strncmp(pit->name.s, "name", 4)==0) {
360 rsrv->sname = &pit->body;
361 rsrv->hname = get_hash1_raw(rsrv->sname->s, rsrv->sname->len);
365 if(rsrv->sname==NULL)
367 LM_ERR("no server name\n");
370 rsrv->next = _redisc_srv_list;
371 _redisc_srv_list = rsrv;
385 redisc_server_t *redisc_get_server(str *name)
387 redisc_server_t *rsrv=NULL;
390 hname = get_hash1_raw(name->s, name->len);
391 LM_DBG("Hash %u (%.*s)\n", hname, name->len, name->s);
392 rsrv=_redisc_srv_list;
395 LM_DBG("Entry %u (%.*s)\n", rsrv->hname,
396 rsrv->sname->len, rsrv->sname->s);
397 if(rsrv->hname==hname && rsrv->sname->len==name->len
398 && strncmp(rsrv->sname->s, name->s, name->len)==0)
402 LM_DBG("No entry found.\n");
409 int redisc_reconnect_server(redisc_server_t *rsrv)
411 char addr[256], pass[256], unix_sock_path[256];
412 unsigned int port, db, sock = 0, haspass = 0;
414 struct timeval tv_conn;
415 struct timeval tv_cmd;
417 tv_conn.tv_sec = (int) redis_connect_timeout_param / 1000;
418 tv_conn.tv_usec = (int) (redis_connect_timeout_param % 1000) * 1000;
420 tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000;
421 tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000;
423 memset(addr, 0, sizeof(addr));
426 memset(pass, 0, sizeof(pass));
427 memset(unix_sock_path, 0, sizeof(unix_sock_path));
428 for (pit = rsrv->attrs; pit; pit=pit->next)
430 if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) {
431 snprintf(unix_sock_path, sizeof(unix_sock_path)-1, "%.*s",
432 pit->body.len, pit->body.s);
434 } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
435 snprintf(addr, sizeof(addr)-1, "%.*s", pit->body.len, pit->body.s);
436 } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
437 if(str2int(&pit->body, &port) < 0)
439 } else if(pit->name.len==2 && strncmp(pit->name.s, "db", 2)==0) {
440 if(str2int(&pit->body, &db) < 0)
442 } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) {
443 snprintf(pass, sizeof(pass)-1, "%.*s", pit->body.len, pit->body.s);
448 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
449 if(rsrv->ctxRedis!=NULL) {
450 redisFree(rsrv->ctxRedis);
451 rsrv->ctxRedis = NULL;
455 rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path, tv_conn);
457 rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn);
459 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
462 if (rsrv->ctxRedis->err)
464 if ((haspass) && redisc_check_auth(rsrv, pass))
466 if (redisSetTimeout(rsrv->ctxRedis, tv_cmd))
468 if (redisCommandNR(rsrv->ctxRedis, "PING"))
470 if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis,
473 if (redis_flush_on_reconnect_param)
474 if (redisCommandNR(rsrv->ctxRedis, "FLUSHALL"))
480 LM_ERR("error communicating with redis server [%.*s]"
481 " (unix:%s db:%d): %s\n",
482 rsrv->sname->len, rsrv->sname->s, unix_sock_path, db,
483 rsrv->ctxRedis->errstr);
485 LM_ERR("error communicating with redis server [%.*s] (%s:%d/%d): %s\n",
486 rsrv->sname->len, rsrv->sname->s, addr, port, db,
487 rsrv->ctxRedis->errstr);
491 LM_ERR("failed to connect to redis server [%.*s] (unix:%s db:%d)\n",
492 rsrv->sname->len, rsrv->sname->s, unix_sock_path, db);
494 LM_ERR("failed to connect to redis server [%.*s] (%s:%d/%d)\n",
495 rsrv->sname->len, rsrv->sname->s, addr, port, db);
503 int redisc_append_cmd(str *srv, str *res, str *cmd, ...)
505 redisc_server_t *rsrv=NULL;
512 if(srv==NULL || cmd==NULL || res==NULL)
514 LM_ERR("invalid parameters");
517 if(srv->len==0 || res->len==0 || cmd->len==0)
519 LM_ERR("invalid parameters");
522 rsrv = redisc_get_server(srv);
525 LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
528 if(rsrv->ctxRedis==NULL)
530 LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
533 if (rsrv->piped.pending_commands >= MAXIMUM_PIPELINED_COMMANDS)
535 LM_ERR("Too many pipelined commands, maximum is %d\n",
536 MAXIMUM_PIPELINED_COMMANDS);
539 rpl = redisc_get_reply(res);
542 LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
546 c = cmd->s[cmd->len];
547 cmd->s[cmd->len] = '\0';
548 rsrv->piped.commands[rsrv->piped.pending_commands].len = redisvFormatCommand(
549 &rsrv->piped.commands[rsrv->piped.pending_commands].s,
552 if (rsrv->piped.commands[rsrv->piped.pending_commands].len < 0)
554 LM_ERR("Invalid redis command : %s\n",cmd->s);
557 rsrv->piped.replies[rsrv->piped.pending_commands]=rpl;
558 rsrv->piped.pending_commands++;
560 cmd->s[cmd->len] = c;
574 int redisc_exec_pipelined_cmd(str *srv)
576 redisc_server_t *rsrv=NULL;
580 LM_ERR("invalid parameters");
585 LM_ERR("invalid parameters");
588 rsrv = redisc_get_server(srv);
591 LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
594 if (rsrv->ctxRedis == NULL)
596 LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
599 return redisc_exec_pipelined(rsrv);
605 int redisc_create_pipelined_message(redisc_server_t *rsrv)
609 if (rsrv->ctxRedis->err)
611 LM_DBG("Reconnecting server because of error %d: \"%s\"",
612 rsrv->ctxRedis->err,rsrv->ctxRedis->errstr);
613 if (redisc_reconnect_server(rsrv))
615 LM_ERR("unable to reconnect to REDIS server: %.*s\n",
616 rsrv->sname->len,rsrv->sname->s);
621 for (i=0;i<rsrv->piped.pending_commands;i++)
623 if (redis_append_formatted_command(rsrv->ctxRedis,
624 rsrv->piped.commands[i].s,rsrv->piped.commands[i].len)
627 LM_ERR("Error while appending command %d",i);
637 void redisc_free_pipelined_cmds(redisc_server_t *rsrv)
640 for (i=0;i<rsrv->piped.pending_commands;i++)
642 free(rsrv->piped.commands[i].s);
643 rsrv->piped.commands[i].len=0;
645 rsrv->piped.pending_commands=0;
651 int redisc_exec_pipelined(redisc_server_t *rsrv)
656 LM_DBG("redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
658 /* if server is disabled do nothing unless the disable time has passed */
659 if (redis_check_server(rsrv))
664 if (rsrv->piped.pending_commands == 0)
666 LM_WARN("call for redis_cmd without any pipelined commands\n");
669 if(rsrv->ctxRedis==NULL)
671 LM_ERR("no redis context for server: %.*s\n",
672 rsrv->sname->len,rsrv->sname->s);
676 /* send the commands and retrieve the first reply */
677 rpl=rsrv->piped.replies[0];
679 if(rpl->rplRedis!=NULL)
681 /* clean up previous redis reply */
682 freeReplyObject(rpl->rplRedis);
683 rpl->rplRedis = NULL;
686 redisc_create_pipelined_message(rsrv);
687 redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
689 if (rpl->rplRedis == NULL)
691 /* null reply, reconnect and try again */
692 if (rsrv->ctxRedis->err)
694 LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
696 if (redisc_create_pipelined_message(rsrv) == 0)
698 redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
699 if (rpl->rplRedis == NULL)
701 redis_count_err_and_disable(rsrv);
702 LM_ERR("Unable to read reply\n");
708 redis_count_err_and_disable(rsrv);
712 LM_DBG_redis_reply(rpl->rplRedis);
714 /* replies are received just retrieve them */
715 for (i=1;i<rsrv->piped.pending_commands;i++)
717 rpl=rsrv->piped.replies[i];
718 if(rpl->rplRedis!=NULL)
720 /* clean up previous redis reply */
721 freeReplyObject(rpl->rplRedis);
722 rpl->rplRedis = NULL;
724 if (redisGetReplyFromReader(rsrv->ctxRedis, (void**) &rpl->rplRedis)
727 LM_ERR("Unable to read reply\n");
730 if (rpl->rplRedis == NULL)
732 LM_ERR("Trying to read reply for command %.*s but nothing in buffer!",
733 rsrv->piped.commands[i].len,rsrv->piped.commands[i].s);
736 LM_DBG_redis_reply(rpl->rplRedis);
738 redisc_free_pipelined_cmds(rsrv);
739 rsrv->disable.consecutive_errors = 0;
743 redisc_free_pipelined_cmds(rsrv);
747 redisc_free_pipelined_cmds(rsrv);
751 int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv)
753 redisc_server_t *rsrv_new;
754 char buffername[100];
756 str addr = {0, 0}, tmpstr = {0, 0}, name = {0, 0};
760 if(redis_cluster_param) {
761 LM_DBG("Redis replied: \"%.*s\"\n", (int)reply->len, reply->str);
762 if((reply->len > 7) && (strncmp(reply->str, "MOVED", 5) == 0)) {
764 if(strchr(reply->str, ':') > 0) {
765 tmpstr.s = strchr(reply->str, ':') + 1;
766 tmpstr.len = reply->len - (tmpstr.s - reply->str);
767 if(str2int(&tmpstr, &port) < 0)
769 LM_DBG("Port \"%.*s\" [%i] => %i\n", tmpstr.len, tmpstr.s,
772 LM_ERR("No Port in REDIS MOVED Reply (%.*s)\n", (int)reply->len,
776 if(strchr(reply->str + 6, ' ') > 0) {
777 addr.len = tmpstr.s - strchr(reply->str + 6, ' ') - 2;
778 addr.s = strchr(reply->str + 6, ' ') + 1;
779 LM_DBG("Host \"%.*s\" [%i]\n", addr.len, addr.s, addr.len);
781 LM_ERR("No Host in REDIS MOVED Reply (%.*s)\n", (int)reply->len,
786 memset(buffername, 0, sizeof(buffername));
787 name.len = snprintf(buffername, sizeof(buffername), "%.*s:%i",
788 addr.len, addr.s, port);
790 LM_DBG("Name of new connection: %.*s\n", name.len, name.s);
791 rsrv_new = redisc_get_server(&name);
793 LM_DBG("Reusing Connection\n");
796 } else if(redis_allow_dynamic_nodes_param) {
797 /* New param redis_allow_dynamic_nodes_param:
798 * if set, we allow ndb_redis to add nodes that were
799 * not defined explicitly in the module configuration */
802 memset(spec_new, 0, sizeof(spec_new));
803 /* For now the only way this can work is if
804 * the new node is accessible with default
805 * parameters for sock and db */
806 server_len = snprintf(spec_new, sizeof(spec_new) - 1,
807 "name=%.*s;addr=%.*s;port=%i", name.len, name.s,
808 addr.len, addr.s, port);
810 if(server_len<0 || server_len>sizeof(spec_new) - 1) {
811 LM_ERR("failed to print server spec string\n");
814 server_new = (char *)pkg_malloc(server_len + 1);
815 if(server_new == NULL) {
816 LM_ERR("Error allocating pkg mem\n");
820 strncpy(server_new, spec_new, server_len);
821 server_new[server_len] = '\0';
823 if(redisc_add_server(server_new) == 0) {
824 rsrv_new = redisc_get_server(&name);
828 /* Need to connect to the new server now */
829 if(redisc_reconnect_server(rsrv_new) == 0) {
830 LM_DBG("Connected to the new server with name: "
835 LM_ERR("ERROR connecting to the new server with "
841 /* Adding the new node failed
842 * - cannot perform redirection */
843 LM_ERR("No new connection with name (%.*s) was "
844 "created\n", name.len, name.s);
847 LM_ERR("Could not add a new connection with name %.*s\n",
849 pkg_free(server_new);
852 LM_ERR("No Connection with name (%.*s)\n", name.len, name.s);
862 int redisc_exec(str *srv, str *res, str *cmd, ...)
864 redisc_server_t *rsrv=NULL;
867 va_list ap, ap2, ap3, ap4;
875 if(srv==NULL || cmd==NULL || res==NULL)
877 LM_ERR("invalid parameters");
880 if(srv->len==0 || res->len==0 || cmd->len==0)
882 LM_ERR("invalid parameters");
886 c = cmd->s[cmd->len];
887 cmd->s[cmd->len] = '\0';
889 rsrv = redisc_get_server(srv);
892 LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
896 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
898 if(rsrv->ctxRedis==NULL)
900 LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
903 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
905 if (rsrv->piped.pending_commands != 0)
907 LM_NOTICE("Calling redis_cmd with pipelined commands in the buffer."
908 " Automatically call redis_execute");
909 redisc_exec_pipelined(rsrv);
911 /* if server is disabled do nothing unless the disable time has passed */
912 if (redis_check_server(rsrv))
917 rpl = redisc_get_reply(res);
920 LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
923 if(rpl->rplRedis!=NULL)
925 /* clean up previous redis reply */
926 freeReplyObject(rpl->rplRedis);
927 rpl->rplRedis = NULL;
930 rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap );
931 if(rpl->rplRedis == NULL)
933 /* null reply, reconnect and try again */
934 if(rsrv->ctxRedis->err)
936 LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
938 if(redisc_reconnect_server(rsrv)==0)
940 rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap2);
941 if (rpl->rplRedis ==NULL)
943 redis_count_err_and_disable(rsrv);
949 redis_count_err_and_disable(rsrv);
950 LM_ERR("unable to reconnect to redis server: %.*s\n",
952 cmd->s[cmd->len] = c;
956 if (check_cluster_reply(rpl->rplRedis, &rsrv)) {
957 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
958 if(rsrv->ctxRedis==NULL)
960 LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
964 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
966 if(rpl->rplRedis!=NULL)
968 /* clean up previous redis reply */
969 freeReplyObject(rpl->rplRedis);
970 rpl->rplRedis = NULL;
972 rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap3 );
973 if(rpl->rplRedis == NULL)
975 /* null reply, reconnect and try again */
976 if(rsrv->ctxRedis->err)
978 LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
980 if(redisc_reconnect_server(rsrv)==0)
982 rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap4);
984 LM_ERR("unable to reconnect to redis server: %.*s\n",
986 cmd->s[cmd->len] = c;
991 cmd->s[cmd->len] = c;
992 rsrv->disable.consecutive_errors = 0;
998 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
1003 cmd->s[cmd->len] = c;
1008 cmd->s[cmd->len] = c;
1021 * Executes a redis command.
1022 * Command is coded using a vector of strings, and a vector of lengths.
1024 * @param rsrv Pointer to a redis_server_t structure.
1025 * @param argc number of elements in the command vector.
1026 * @param argv vector of zero terminated strings forming the command.
1027 * @param argvlen vector of command string lengths or NULL.
1028 * @return redisReply structure or NULL if there was an error.
1030 redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv,
1031 const size_t *argvlen)
1033 redisReply *res=NULL;
1037 LM_ERR("no redis context found for server %.*s\n",
1038 (rsrv)?rsrv->sname->len:0,
1039 (rsrv)?rsrv->sname->s:"");
1043 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
1044 if(rsrv->ctxRedis==NULL)
1046 LM_ERR("no redis context found for server %.*s\n",
1047 (rsrv)?rsrv->sname->len:0,
1048 (rsrv)?rsrv->sname->s:"");
1054 LM_ERR("invalid parameters\n");
1057 if(argv==NULL || *argv==NULL)
1059 LM_ERR("invalid parameters\n");
1063 res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
1065 /* null reply, reconnect and try again */
1066 if(rsrv->ctxRedis->err)
1068 LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
1073 if (check_cluster_reply(res, &rsrv)) {
1079 if(redisc_reconnect_server(rsrv)==0)
1081 res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
1083 if (check_cluster_reply(res, &rsrv)) {
1090 LM_ERR("Unable to reconnect to server: %.*s\n",
1091 rsrv->sname->len, rsrv->sname->s);
1101 redisc_reply_t *redisc_get_reply(str *name)
1103 redisc_reply_t *rpl;
1106 hid = get_hash1_raw(name->s, name->len);
1108 for(rpl=_redisc_rpl_list; rpl; rpl=rpl->next) {
1109 if(rpl->hname==hid && rpl->rname.len==name->len
1110 && strncmp(rpl->rname.s, name->s, name->len)==0)
1113 /* not found - add a new one */
1115 rpl = (redisc_reply_t*)pkg_malloc(sizeof(redisc_reply_t));
1118 LM_ERR("no more pkg\n");
1121 memset(rpl, 0, sizeof(redisc_reply_t));
1123 rpl->rname.s = (char*)pkg_malloc(name->len+1);
1124 if(rpl->rname.s==NULL)
1126 LM_ERR("no more pkg.\n");
1130 strncpy(rpl->rname.s, name->s, name->len);
1131 rpl->rname.len = name->len;
1132 rpl->rname.s[name->len] = '\0';
1133 rpl->next = _redisc_rpl_list;
1134 _redisc_rpl_list = rpl;
1142 int redisc_free_reply(str *name)
1144 redisc_reply_t *rpl;
1147 if(name==NULL || name->len==0) {
1148 LM_ERR("invalid parameters");
1152 hid = get_hash1_raw(name->s, name->len);
1154 rpl = _redisc_rpl_list;
1157 if(rpl->hname==hid && rpl->rname.len==name->len
1158 && strncmp(rpl->rname.s, name->s, name->len)==0) {
1160 freeReplyObject(rpl->rplRedis);
1161 rpl->rplRedis = NULL;
1170 /* reply entry not found. */
1174 int redisc_check_auth(redisc_server_t *rsrv, char *pass)
1179 reply = redisCommand(rsrv->ctxRedis, "AUTH %s", pass);
1180 if (reply->type == REDIS_REPLY_ERROR) {
1181 LM_ERR("Redis authentication error\n");
1184 freeReplyObject(reply);
1188 /* backwards compatibility with hiredis < 0.12 */
1189 #if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
1190 int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len)
1194 newbuf = sdscatlen(c->obuf,cmd,len);
1195 if (newbuf == NULL) {
1196 c->err = REDIS_ERR_OOM;
1197 strcpy(c->errstr,"Out of memory");
1205 int redis_check_server(redisc_server_t *rsrv)
1208 if (rsrv->disable.disabled)
1210 if (get_ticks() > rsrv->disable.restore_tick)
1212 LM_NOTICE("REDIS server %.*s re-enabled",
1213 rsrv->sname->len, rsrv->sname->s);
1214 rsrv->disable.disabled = 0;
1215 rsrv->disable.consecutive_errors = 0;
1225 int redis_count_err_and_disable(redisc_server_t *rsrv)
1227 if (redis_allowed_timeouts_param < 0)
1232 rsrv->disable.consecutive_errors++;
1233 if (rsrv->disable.consecutive_errors > redis_allowed_timeouts_param)
1235 rsrv->disable.disabled=1;
1236 rsrv->disable.restore_tick=get_ticks() + redis_disable_time_param;
1237 LM_WARN("REDIS server %.*s disabled for %d seconds", rsrv->sname->len,
1238 rsrv->sname->s, redis_disable_time_param);
1244 void print_redis_reply(int log_level, redisReply *rpl,int offset)
1247 char padding[MAXIMUM_NESTED_KEYS + 1];
1249 if(!is_printable(log_level))
1254 LM_ERR("Unexpected null reply");
1258 if (offset > MAXIMUM_NESTED_KEYS)
1260 LM_ERR("Offset is too big");
1264 for (i=0;i<offset;i++)
1268 padding[offset]='\0';
1272 case REDIS_REPLY_STRING:
1273 LOG(log_level,"%sstring reply: [%s]", padding, rpl->str);
1275 case REDIS_REPLY_INTEGER:
1276 LOG(log_level,"%sinteger reply: %lld", padding, rpl->integer);
1278 case REDIS_REPLY_ARRAY:
1279 LOG(log_level,"%sarray reply with %d elements", padding,
1280 (int)rpl->elements);
1281 for (i=0; i < rpl->elements; i++)
1283 LOG(log_level,"%selement %d:",padding,i);
1284 print_redis_reply(log_level,rpl->element[i],offset+1);
1287 case REDIS_REPLY_NIL:
1288 LOG(log_level,"%snil reply",padding);
1290 case REDIS_REPLY_STATUS:
1291 LOG(log_level,"%sstatus reply: %s", padding, rpl->str);
1293 case REDIS_REPLY_ERROR:
1294 LOG(log_level,"%serror reply: %s", padding, rpl->str);