ndb_redis: small spelling fix: lenght -> length
[kamailio] / src / modules / ndb_redis / redis_client.c
1 /**
2  * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com)
3  *
4  * Copyright (C) 2012 Vicente Hernando Ara (System One: www.systemonenoc.com)
5  *     - for: redis array reply support
6  *
7  * Copyright (C) 2017 Carsten Bock (ng-voice GmbH)
8  *     - for: Cluster support
9  *
10  * This file is part of Kamailio, a free SIP server.
11  *
12  * Kamailio is free software; you can redistribute it and/or modify
13  * it under the terms of the GNU General Public License as published by
14  * the Free Software Foundation; either version 2 of the License, or
15  * (at your option) any later version
16  *
17  * Kamailio is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20  * GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License
23  * along with this program; if not, write to the Free Software
24  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
25  *
26  */
27
28 #include <stdio.h>
29 #include <unistd.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <sys/time.h>
33 #include <stdarg.h>
34
35 #include "../../core/mem/mem.h"
36 #include "../../core/dprint.h"
37 #include "../../core/hashes.h"
38 #include "../../core/ut.h"
39
40 #include "redis_client.h"
41
42 #define redisCommandNR(a...) (int)({ void *__tmp; __tmp = redisCommand(a); \
43                 if (__tmp) freeReplyObject(__tmp); __tmp ? 0 : -1;})
44
45 static redisc_server_t * _redisc_srv_list=NULL;
46
47 static redisc_reply_t *_redisc_rpl_list=NULL;
48
49 extern int init_without_redis;
50 extern int redis_connect_timeout_param;
51 extern int redis_cmd_timeout_param;
52 extern int redis_cluster_param;
53 extern int redis_disable_time_param;
54 extern int redis_allowed_timeouts_param;
55 extern int redis_flush_on_reconnect_param;
56 extern int redis_allow_dynamic_nodes_param;
57
58 /* backwards compatibility with hiredis < 0.12 */
59 #if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
60 typedef char *sds;
61 sds sdscatlen(sds s, const void *t, size_t len);
62 int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len);
63 #else
64 #define redis_append_formatted_command redisAppendFormattedCommand
65 #endif
66
67 /**
68  *
69  */
70 int redisc_init(void)
71 {
72         char addr[256], pass[256], unix_sock_path[256], sentinel_group[256];
73
74         unsigned int port, db, sock = 0, haspass = 0, sentinel_master = 1;
75         int i, row;
76         redisc_server_t *rsrv=NULL;
77         param_t *pit = NULL;
78         struct timeval tv_conn;
79         struct timeval tv_cmd;
80
81         tv_conn.tv_sec = (int) redis_connect_timeout_param / 1000;
82         tv_conn.tv_usec = (int) (redis_connect_timeout_param % 1000) * 1000;
83
84         tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000;
85         tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000;
86
87         if(_redisc_srv_list==NULL)
88         {
89                 LM_ERR("no redis servers defined\n");
90                 return -1;
91         }
92
93         for(rsrv=_redisc_srv_list; rsrv; rsrv=rsrv->next)
94         {
95                 char sentinels[MAXIMUM_SENTINELS][256];
96                 uint8_t sentinels_count = 0;
97
98                 port = 6379;
99                 db = 0;
100                 haspass = 0;
101                 sock = 0;
102
103                 memset(addr, 0, sizeof(addr));
104                 memset(pass, 0, sizeof(pass));
105                 memset(unix_sock_path, 0, sizeof(unix_sock_path));
106
107                 for (pit = rsrv->attrs; pit; pit=pit->next)
108                 {
109                         if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) {
110                                 snprintf(unix_sock_path, sizeof(unix_sock_path)-1, "%.*s",
111                                                 pit->body.len, pit->body.s);
112                                 sock = 1;
113                         } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
114                                 snprintf(addr, sizeof(addr)-1, "%.*s",
115                                                 pit->body.len, pit->body.s);
116                         } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
117                                 if(str2int(&pit->body, &port) < 0)
118                                         port = 6379;
119                         } else if(pit->name.len==2 && strncmp(pit->name.s, "db", 2)==0) {
120                                 if(str2int(&pit->body, &db) < 0)
121                                         db = 0;
122                         } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) {
123                                 snprintf(pass, sizeof(pass)-1, "%.*s",
124                                                 pit->body.len, pit->body.s);
125                                 haspass = 1;
126                         } else if(pit->name.len==14 && strncmp(pit->name.s,
127                                                 "sentinel_group", 14)==0) {
128                                 snprintf(sentinel_group, sizeof(sentinel_group)-1, "%.*s",
129                                                 pit->body.len, pit->body.s);
130                         } else if(pit->name.len==15 && strncmp(pit->name.s,
131                                                 "sentinel_master", 15)==0) {
132                                 if(str2int(&pit->body, &sentinel_master) < 0)
133                                         sentinel_master = 1;
134                         } else if(pit->name.len==8 && strncmp(pit->name.s,
135                                                 "sentinel", 8)==0) {
136                                 if( sentinels_count < MAXIMUM_SENTINELS ){
137                                         snprintf(sentinels[sentinels_count],
138                                                         sizeof(sentinels[sentinels_count])-1, "%.*s",
139                                                         pit->body.len, pit->body.s);
140                                         sentinels_count++;
141                                 }
142                                 else {
143                                         LM_ERR("too many sentinels, maximum %d supported.\n",
144                                                         MAXIMUM_SENTINELS);
145                                         return -1;
146                                 }
147                         }
148                 }
149
150                 // if sentinels are provided, we need to connect to them and retrieve the redis server
151                 // address / port
152                 if(sentinels_count > 0) {
153                         for(i= 0; i< sentinels_count; i++) {
154                                 char *sentinelAddr = sentinels[i];
155                                 char *pos;
156                                 redisContext *redis;
157                                 redisReply *res, *res2;
158
159                                 port = 6379;
160                                 if( (pos = strchr(sentinelAddr, ':')) != NULL ) {
161                                         port = atoi(pos+1);
162                                         pos[i] = '\0';
163                                 }
164
165                                 redis = redisConnectWithTimeout(sentinelAddr, port, tv_conn);
166                                 if( redis ) {
167                                         if(sentinel_master != 0) {
168                                                 res = redisCommand(redis,
169                                                                 "SENTINEL get-master-addr-by-name %s",
170                                                                 sentinel_group);
171                                                 if( res && (res->type == REDIS_REPLY_ARRAY)
172                                                                 && (res->elements == 2) ) {
173                                                         strncpy(addr, res->element[0]->str,
174                                                                         res->element[0]->len + 1);
175                                                         port = atoi(res->element[1]->str);
176                                                         LM_DBG("sentinel replied: %s:%d\n", addr, port);
177                                                 }
178                                         }
179                                         else {
180                                                 res = redisCommand(redis, "SENTINEL slaves %s",
181                                                                 sentinel_group);
182                                                 if( res && (res->type == REDIS_REPLY_ARRAY) ) {
183                                                         for(row = 0; row< res->elements; row++){
184                                                                 res2 = res->element[row];
185                                                                 for(i= 0; i< res2->elements; i+= 2) {
186                                                                         if( strncmp(res2->element[i]->str,
187                                                                                                 "ip", 2) == 0 ) {
188                                                                                 strncpy(addr, res2->element[i+1]->str,
189                                                                                                 res2->element[i+1]->len);
190                                                                                 addr[res2->element[i+1]->len] = '\0';
191                                                                         }
192                                                                         else if( strncmp(res2->element[i]->str,
193                                                                                                 "port", 4) == 0) {
194                                                                                 port = atoi(res2->element[i+1]->str);
195                                                                                 break;
196                                                                         }
197                                                                 }
198                                                         }
199                                                         LM_DBG("slave for %s: %s:%d\n", sentinel_group,
200                                                                         addr, port);
201                                                 }
202                                         }
203                                 }
204                         }
205                 }
206
207                 if(sock != 0) {
208                         LM_DBG("Connecting to unix socket: %s\n", unix_sock_path);
209                         rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path,
210                                         tv_conn);
211                 } else {
212                         LM_DBG("Connecting to %s:%d\n", addr, port);
213                         rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn);
214                 }
215
216                 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
217
218                 if(!rsrv->ctxRedis) {
219                         LM_ERR("Failed to create REDIS-Context.\n");
220                         goto err;
221                 }
222                 if (rsrv->ctxRedis->err) {
223                         LM_ERR("Failed to create REDIS returned an error: %s\n",
224                                         rsrv->ctxRedis->errstr);
225                         goto err2;
226                 }
227                 if ((haspass != 0) && redisc_check_auth(rsrv, pass)) {
228                         LM_ERR("Authentication failed.\n");
229                         goto err2;
230                 }
231                 if (redisSetTimeout(rsrv->ctxRedis, tv_cmd)) {
232                         LM_ERR("Failed to set timeout.\n");
233                         goto err2;
234                 }
235                 if (redisCommandNR(rsrv->ctxRedis, "PING")) {
236                         LM_ERR("Failed to send PING (REDIS returned %s).\n",
237                                         rsrv->ctxRedis->errstr);
238                         goto err2;
239                 }
240                 if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis,
241                                         "SELECT %i", db)) {
242                         LM_ERR("Failed to send \"SELECT %i\" (REDIS returned \"%s\","
243                                         " and not in cluster mode).\n", db, rsrv->ctxRedis->errstr);
244                         goto err2;
245                 }
246         }
247
248         return 0;
249
250 err2:
251         if (sock != 0) {
252                 LM_ERR("error communicating with redis server [%.*s]"
253                                 " (unix:%s db:%d): %s\n",
254                                 rsrv->sname->len, rsrv->sname->s, unix_sock_path, db,
255                                 rsrv->ctxRedis->errstr);
256         } else {
257                 LM_ERR("error communicating with redis server [%.*s] (%s:%d/%d): %s\n",
258                                 rsrv->sname->len, rsrv->sname->s, addr, port, db,
259                                 rsrv->ctxRedis->errstr);
260         }
261         if (init_without_redis==1)
262         {
263                 LM_WARN("failed to initialize redis connections, but initializing"
264                                 " module anyway.\n");
265                 return 0;
266         }
267
268         return -1;
269 err:
270         if (sock != 0) {
271                 LM_ERR("failed to connect to redis server [%.*s] (unix:%s db:%d)\n",
272                                 rsrv->sname->len, rsrv->sname->s, unix_sock_path, db);
273         } else {
274                 LM_ERR("failed to connect to redis server [%.*s] (%s:%d/%d)\n",
275                                 rsrv->sname->len, rsrv->sname->s, addr, port, db);
276         }
277         if (init_without_redis==1)
278         {
279                 LM_WARN("failed to initialize redis connections, but initializing"
280                                 " module anyway.\n");
281                 return 0;
282         }
283
284         return -1;
285 }
286
287 /**
288  *
289  */
290 int redisc_destroy(void)
291 {
292         redisc_reply_t *rpl, *next_rpl;
293
294         redisc_server_t *rsrv=NULL;
295         redisc_server_t *rsrv1=NULL;
296
297         rpl = _redisc_rpl_list;
298         while(rpl != NULL)
299         {
300                 next_rpl = rpl->next;
301                 if(rpl->rplRedis)
302                         freeReplyObject(rpl->rplRedis);
303
304                 if(rpl->rname.s != NULL)
305                         pkg_free(rpl->rname.s);
306
307                 pkg_free(rpl);
308                 rpl = next_rpl;
309         }
310         _redisc_rpl_list = NULL;
311
312         if(_redisc_srv_list==NULL)
313                 return -1;
314         rsrv=_redisc_srv_list;
315         while(rsrv!=NULL)
316         {
317                 rsrv1 = rsrv;
318                 rsrv=rsrv->next;
319                 if (rsrv1->ctxRedis!=NULL)
320                         redisFree(rsrv1->ctxRedis);
321                 free_params(rsrv1->attrs);
322                 pkg_free(rsrv1);
323         }
324         _redisc_srv_list = NULL;
325
326         return 0;
327 }
328
329 /**
330  *
331  */
332 int redisc_add_server(char *spec)
333 {
334         param_t *pit=NULL;
335         param_hooks_t phooks;
336         redisc_server_t *rsrv=NULL;
337         str s;
338
339         s.s = spec;
340         s.len = strlen(spec);
341         if(s.s[s.len-1]==';')
342                 s.len--;
343         if (parse_params(&s, CLASS_ANY, &phooks, &pit)<0)
344         {
345                 LM_ERR("failed parsing params value\n");
346                 goto error;
347         }
348         rsrv = (redisc_server_t*)pkg_malloc(sizeof(redisc_server_t));
349         if(rsrv==NULL)
350         {
351                 LM_ERR("no more pkg\n");
352                 goto error;
353         }
354         memset(rsrv, 0, sizeof(redisc_server_t));
355         rsrv->attrs = pit;
356         rsrv->spec = spec;
357         for (pit = rsrv->attrs; pit; pit=pit->next)
358         {
359                 if(pit->name.len==4 && strncmp(pit->name.s, "name", 4)==0) {
360                         rsrv->sname = &pit->body;
361                         rsrv->hname = get_hash1_raw(rsrv->sname->s, rsrv->sname->len);
362                         break;
363                 }
364         }
365         if(rsrv->sname==NULL)
366         {
367                 LM_ERR("no server name\n");
368                 goto error;
369         }
370         rsrv->next = _redisc_srv_list;
371         _redisc_srv_list = rsrv;
372
373         return 0;
374 error:
375         if(pit!=NULL)
376                 free_params(pit);
377         if(rsrv!=NULL)
378                 pkg_free(rsrv);
379         return -1;
380 }
381
382 /**
383  *
384  */
385 redisc_server_t *redisc_get_server(str *name)
386 {
387         redisc_server_t *rsrv=NULL;
388         unsigned int hname;
389
390         hname = get_hash1_raw(name->s, name->len);
391         LM_DBG("Hash %u (%.*s)\n", hname, name->len, name->s);
392         rsrv=_redisc_srv_list;
393         while(rsrv!=NULL)
394         {
395                 LM_DBG("Entry %u (%.*s)\n", rsrv->hname,
396                                 rsrv->sname->len, rsrv->sname->s);
397                 if(rsrv->hname==hname && rsrv->sname->len==name->len
398                                 && strncmp(rsrv->sname->s, name->s, name->len)==0)
399                         return rsrv;
400                 rsrv=rsrv->next;
401         }
402         LM_DBG("No entry found.\n");
403         return NULL;
404 }
405
406 /**
407  *
408  */
409 int redisc_reconnect_server(redisc_server_t *rsrv)
410 {
411         char addr[256], pass[256], unix_sock_path[256];
412         unsigned int port, db, sock = 0, haspass = 0;
413         param_t *pit = NULL;
414         struct timeval tv_conn;
415         struct timeval tv_cmd;
416
417         tv_conn.tv_sec = (int) redis_connect_timeout_param / 1000;
418         tv_conn.tv_usec = (int) (redis_connect_timeout_param % 1000) * 1000;
419
420         tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000;
421         tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000;
422
423         memset(addr, 0, sizeof(addr));
424         port = 6379;
425         db = 0;
426         memset(pass, 0, sizeof(pass));
427         memset(unix_sock_path, 0, sizeof(unix_sock_path));
428         for (pit = rsrv->attrs; pit; pit=pit->next)
429         {
430                 if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) {
431                         snprintf(unix_sock_path, sizeof(unix_sock_path)-1, "%.*s",
432                                         pit->body.len, pit->body.s);
433                         sock = 1;
434                 } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
435                         snprintf(addr, sizeof(addr)-1, "%.*s", pit->body.len, pit->body.s);
436                 } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
437                         if(str2int(&pit->body, &port) < 0)
438                                 port = 6379;
439                 } else if(pit->name.len==2 && strncmp(pit->name.s, "db", 2)==0) {
440                         if(str2int(&pit->body, &db) < 0)
441                                 db = 0;
442                 } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) {
443                         snprintf(pass, sizeof(pass)-1, "%.*s", pit->body.len, pit->body.s);
444                         haspass = 1;
445                 }
446         }
447
448         LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
449         if(rsrv->ctxRedis!=NULL) {
450                 redisFree(rsrv->ctxRedis);
451                 rsrv->ctxRedis = NULL;
452         }
453
454         if(sock != 0) {
455                 rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path, tv_conn);
456         } else {
457                 rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn);
458         }
459         LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
460         if(!rsrv->ctxRedis)
461                 goto err;
462         if (rsrv->ctxRedis->err)
463                 goto err2;
464         if ((haspass) && redisc_check_auth(rsrv, pass))
465                 goto err2;
466         if (redisSetTimeout(rsrv->ctxRedis, tv_cmd))
467                 goto err2;
468         if (redisCommandNR(rsrv->ctxRedis, "PING"))
469                 goto err2;
470         if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis,
471                                 "SELECT %i", db))
472                 goto err2;
473         if (redis_flush_on_reconnect_param)
474                 if (redisCommandNR(rsrv->ctxRedis, "FLUSHALL"))
475                         goto err2;
476         return 0;
477
478 err2:
479         if (sock != 0) {
480                 LM_ERR("error communicating with redis server [%.*s]"
481                                 " (unix:%s db:%d): %s\n",
482                                 rsrv->sname->len, rsrv->sname->s, unix_sock_path, db,
483                                 rsrv->ctxRedis->errstr);
484         } else {
485                 LM_ERR("error communicating with redis server [%.*s] (%s:%d/%d): %s\n",
486                                 rsrv->sname->len, rsrv->sname->s, addr, port, db,
487                                 rsrv->ctxRedis->errstr);
488         }
489 err:
490         if (sock != 0) {
491                 LM_ERR("failed to connect to redis server [%.*s] (unix:%s db:%d)\n",
492                                 rsrv->sname->len, rsrv->sname->s, unix_sock_path, db);
493         } else {
494                 LM_ERR("failed to connect to redis server [%.*s] (%s:%d/%d)\n",
495                                 rsrv->sname->len, rsrv->sname->s, addr, port, db);
496         }
497         return -1;
498 }
499
500 /**
501  *
502  */
503 int redisc_append_cmd(str *srv, str *res, str *cmd, ...)
504 {
505         redisc_server_t *rsrv=NULL;
506         redisc_reply_t *rpl;
507         char c;
508         va_list ap;
509
510         va_start(ap, cmd);
511
512         if(srv==NULL || cmd==NULL || res==NULL)
513         {
514                 LM_ERR("invalid parameters");
515                 goto error_cmd;
516         }
517         if(srv->len==0 || res->len==0 || cmd->len==0)
518         {
519                 LM_ERR("invalid parameters");
520                 goto error_cmd;
521         }
522         rsrv = redisc_get_server(srv);
523         if(rsrv==NULL)
524         {
525                 LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
526                 goto error_cmd;
527         }
528         if(rsrv->ctxRedis==NULL)
529         {
530                 LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
531                 goto error_cmd;
532         }
533         if (rsrv->piped.pending_commands >= MAXIMUM_PIPELINED_COMMANDS)
534         {
535                 LM_ERR("Too many pipelined commands, maximum is %d\n",
536                                 MAXIMUM_PIPELINED_COMMANDS);
537                 goto error_cmd;
538         }
539         rpl = redisc_get_reply(res);
540         if(rpl==NULL)
541         {
542                 LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
543                 goto error_cmd;
544         }
545
546         c = cmd->s[cmd->len];
547         cmd->s[cmd->len] = '\0';
548         rsrv->piped.commands[rsrv->piped.pending_commands].len = redisvFormatCommand(
549                         &rsrv->piped.commands[rsrv->piped.pending_commands].s,
550                         cmd->s,
551                         ap);
552         if (rsrv->piped.commands[rsrv->piped.pending_commands].len < 0)
553         {
554                 LM_ERR("Invalid redis command : %s\n",cmd->s);
555                 goto error_cmd;
556         }
557         rsrv->piped.replies[rsrv->piped.pending_commands]=rpl;
558         rsrv->piped.pending_commands++;
559
560         cmd->s[cmd->len] = c;
561         va_end(ap);
562         return 0;
563
564 error_cmd:
565         va_end(ap);
566         return -1;
567
568 }
569
570
571 /**
572  *
573  */
574 int redisc_exec_pipelined_cmd(str *srv)
575 {
576         redisc_server_t *rsrv=NULL;
577
578         if (srv == NULL)
579         {
580                 LM_ERR("invalid parameters");
581                 return -1;
582         }
583         if (srv->len == 0)
584         {
585                 LM_ERR("invalid parameters");
586                 return -1;
587         }
588         rsrv = redisc_get_server(srv);
589         if (rsrv == NULL)
590         {
591                 LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
592                 return -1;
593         }
594         if (rsrv->ctxRedis == NULL)
595         {
596                 LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
597                 return -1;
598         }
599         return redisc_exec_pipelined(rsrv);
600 }
601
602 /**
603  *
604  */
605 int redisc_create_pipelined_message(redisc_server_t *rsrv)
606 {
607         int i;
608
609         if (rsrv->ctxRedis->err)
610         {
611                 LM_DBG("Reconnecting server because of error %d: \"%s\"",
612                                 rsrv->ctxRedis->err,rsrv->ctxRedis->errstr);
613                 if (redisc_reconnect_server(rsrv))
614                 {
615                         LM_ERR("unable to reconnect to REDIS server: %.*s\n",
616                                         rsrv->sname->len,rsrv->sname->s);
617                         return -1;
618                 }
619         }
620
621         for (i=0;i<rsrv->piped.pending_commands;i++)
622         {
623                 if (redis_append_formatted_command(rsrv->ctxRedis,
624                                         rsrv->piped.commands[i].s,rsrv->piped.commands[i].len)
625                                                 != REDIS_OK)
626                 {
627                         LM_ERR("Error while appending command %d",i);
628                         return -1;
629                 }
630         }
631         return 0;
632 }
633
634 /**
635  *
636  */
637 void redisc_free_pipelined_cmds(redisc_server_t *rsrv)
638 {
639         int i;
640         for (i=0;i<rsrv->piped.pending_commands;i++)
641         {
642                 free(rsrv->piped.commands[i].s);
643                 rsrv->piped.commands[i].len=0;
644         }
645         rsrv->piped.pending_commands=0;
646 }
647
648 /**
649  *
650  */
651 int redisc_exec_pipelined(redisc_server_t *rsrv)
652 {
653         redisc_reply_t *rpl;
654         int i;
655
656         LM_DBG("redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
657
658         /* if server is disabled do nothing unless the disable time has passed */
659         if (redis_check_server(rsrv))
660         {
661                 goto srv_disabled;
662         }
663
664         if (rsrv->piped.pending_commands == 0)
665         {
666                 LM_WARN("call for redis_cmd without any pipelined commands\n");
667                 return -1;
668         }
669         if(rsrv->ctxRedis==NULL)
670         {
671                 LM_ERR("no redis context for server: %.*s\n",
672                                 rsrv->sname->len,rsrv->sname->s);
673                 goto error_exec;
674         }
675
676         /* send the commands and retrieve the first reply */
677         rpl=rsrv->piped.replies[0];
678
679         if(rpl->rplRedis!=NULL)
680         {
681                 /* clean up previous redis reply */
682                 freeReplyObject(rpl->rplRedis);
683                 rpl->rplRedis = NULL;
684         }
685
686         redisc_create_pipelined_message(rsrv);
687         redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
688
689         if (rpl->rplRedis == NULL)
690         {
691                 /* null reply, reconnect and try again */
692                 if (rsrv->ctxRedis->err)
693                 {
694                         LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
695                 }
696                 if (redisc_create_pipelined_message(rsrv) == 0)
697                 {
698                         redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
699                         if (rpl->rplRedis == NULL)
700                         {
701                                 redis_count_err_and_disable(rsrv);
702                                 LM_ERR("Unable to read reply\n");
703                                 goto error_exec;
704                         }
705                 }
706                 else
707                 {
708                         redis_count_err_and_disable(rsrv);
709                         goto error_exec;
710                 }
711         }
712         LM_DBG_redis_reply(rpl->rplRedis);
713
714         /* replies are received just retrieve them */
715         for (i=1;i<rsrv->piped.pending_commands;i++)
716         {
717                 rpl=rsrv->piped.replies[i];
718                 if(rpl->rplRedis!=NULL)
719                 {
720                         /* clean up previous redis reply */
721                         freeReplyObject(rpl->rplRedis);
722                         rpl->rplRedis = NULL;
723                 }
724                 if (redisGetReplyFromReader(rsrv->ctxRedis, (void**) &rpl->rplRedis)
725                                 != REDIS_OK)
726                 {
727                         LM_ERR("Unable to read reply\n");
728                         continue;
729                 }
730                 if (rpl->rplRedis == NULL)
731                 {
732                         LM_ERR("Trying to read reply for command %.*s but nothing in buffer!",
733                                         rsrv->piped.commands[i].len,rsrv->piped.commands[i].s);
734                         continue;
735                 }
736                 LM_DBG_redis_reply(rpl->rplRedis);
737         }
738         redisc_free_pipelined_cmds(rsrv);
739         rsrv->disable.consecutive_errors = 0;
740         return 0;
741
742 error_exec:
743         redisc_free_pipelined_cmds(rsrv);
744         return -1;
745
746 srv_disabled:
747         redisc_free_pipelined_cmds(rsrv);
748         return -2;
749 }
750
751 int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv)
752 {
753         redisc_server_t *rsrv_new;
754         char buffername[100];
755         unsigned int port;
756         str addr = {0, 0}, tmpstr = {0, 0}, name = {0, 0};
757         int server_len = 0;
758         char spec_new[100];
759
760         if(redis_cluster_param) {
761                 LM_DBG("Redis replied: \"%.*s\"\n", (int)reply->len, reply->str);
762                 if((reply->len > 7) && (strncmp(reply->str, "MOVED", 5) == 0)) {
763                         port = 6379;
764                         if(strchr(reply->str, ':') > 0) {
765                                 tmpstr.s = strchr(reply->str, ':') + 1;
766                                 tmpstr.len = reply->len - (tmpstr.s - reply->str);
767                                 if(str2int(&tmpstr, &port) < 0)
768                                         port = 6379;
769                                 LM_DBG("Port \"%.*s\" [%i] => %i\n", tmpstr.len, tmpstr.s,
770                                                 tmpstr.len, port);
771                         } else {
772                                 LM_ERR("No Port in REDIS MOVED Reply (%.*s)\n", (int)reply->len,
773                                                 reply->str);
774                                 return 0;
775                         }
776                         if(strchr(reply->str + 6, ' ') > 0) {
777                                 addr.len = tmpstr.s - strchr(reply->str + 6, ' ') - 2;
778                                 addr.s = strchr(reply->str + 6, ' ') + 1;
779                                 LM_DBG("Host \"%.*s\" [%i]\n", addr.len, addr.s, addr.len);
780                         } else {
781                                 LM_ERR("No Host in REDIS MOVED Reply (%.*s)\n", (int)reply->len,
782                                                 reply->str);
783                                 return 0;
784                         }
785
786                         memset(buffername, 0, sizeof(buffername));
787                         name.len = snprintf(buffername, sizeof(buffername), "%.*s:%i",
788                                         addr.len, addr.s, port);
789                         name.s = buffername;
790                         LM_DBG("Name of new connection: %.*s\n", name.len, name.s);
791                         rsrv_new = redisc_get_server(&name);
792                         if(rsrv_new) {
793                                 LM_DBG("Reusing Connection\n");
794                                 *rsrv = rsrv_new;
795                                 return 1;
796                         } else if(redis_allow_dynamic_nodes_param) {
797                                 /* New param redis_allow_dynamic_nodes_param:
798                                 * if set, we allow ndb_redis to add nodes that were
799                                 * not defined explicitly in the module configuration */
800                                 char *server_new;
801
802                                 memset(spec_new, 0, sizeof(spec_new));
803                                 /* For now the only way this can work is if
804                                  * the new node is accessible with default
805                                  * parameters for sock and db */
806                                 server_len = snprintf(spec_new, sizeof(spec_new) - 1,
807                                                 "name=%.*s;addr=%.*s;port=%i", name.len, name.s,
808                                                 addr.len, addr.s, port);
809
810                                 if(server_len<0 || server_len>sizeof(spec_new) - 1) {
811                                         LM_ERR("failed to print server spec string\n");
812                                         return 0;
813                                 }
814                                 server_new = (char *)pkg_malloc(server_len + 1);
815                                 if(server_new == NULL) {
816                                         LM_ERR("Error allocating pkg mem\n");
817                                         return 0;
818                                 }
819
820                                 strncpy(server_new, spec_new, server_len);
821                                 server_new[server_len] = '\0';
822
823                                 if(redisc_add_server(server_new) == 0) {
824                                         rsrv_new = redisc_get_server(&name);
825
826                                         if(rsrv_new) {
827                                                 *rsrv = rsrv_new;
828                                                 /* Need to connect to the new server now */
829                                                 if(redisc_reconnect_server(rsrv_new) == 0) {
830                                                         LM_DBG("Connected to the new server with name: "
831                                                                    "%.*s\n",
832                                                                         name.len, name.s);
833                                                         return 1;
834                                                 } else {
835                                                         LM_ERR("ERROR connecting to the new server with "
836                                                                    "name: %.*s\n",
837                                                                         name.len, name.s);
838                                                         return 0;
839                                                 }
840                                         } else {
841                                                 /* Adding the new node failed
842                                                  * - cannot perform redirection */
843                                                 LM_ERR("No new connection with name (%.*s) was "
844                                                                 "created\n", name.len, name.s);
845                                         }
846                                 } else {
847                                         LM_ERR("Could not add a new connection with name %.*s\n",
848                                                         name.len, name.s);
849                                         pkg_free(server_new);
850                                 }
851                         } else {
852                                 LM_ERR("No Connection with name (%.*s)\n", name.len, name.s);
853                         }
854                 }
855         }
856         return 0;
857 }
858
859 /**
860  *
861  */
862 int redisc_exec(str *srv, str *res, str *cmd, ...)
863 {
864         redisc_server_t *rsrv=NULL;
865         redisc_reply_t *rpl;
866         char c;
867         va_list ap, ap2, ap3, ap4;
868         int ret = -1;
869
870         va_start(ap, cmd);
871         va_copy(ap2, ap);
872         va_copy(ap3, ap);
873         va_copy(ap4, ap);
874
875         if(srv==NULL || cmd==NULL || res==NULL)
876         {
877                 LM_ERR("invalid parameters");
878                 goto error;
879         }
880         if(srv->len==0 || res->len==0 || cmd->len==0)
881         {
882                 LM_ERR("invalid parameters");
883                 goto error;
884         }
885
886         c = cmd->s[cmd->len];
887         cmd->s[cmd->len] = '\0';
888
889         rsrv = redisc_get_server(srv);
890         if(rsrv==NULL)
891         {
892                 LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
893                 goto error_exec;
894         }
895
896         LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
897
898         if(rsrv->ctxRedis==NULL)
899         {
900                 LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
901                 goto error_exec;
902         }
903         LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
904
905         if (rsrv->piped.pending_commands != 0)
906         {
907                 LM_NOTICE("Calling redis_cmd with pipelined commands in the buffer."
908                                 " Automatically call redis_execute");
909                 redisc_exec_pipelined(rsrv);
910         }
911         /* if server is disabled do nothing unless the disable time has passed */
912         if (redis_check_server(rsrv))
913         {
914                 goto srv_disabled;
915         }
916
917         rpl = redisc_get_reply(res);
918         if(rpl==NULL)
919         {
920                 LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
921                 goto error_exec;
922         }
923         if(rpl->rplRedis!=NULL)
924         {
925                 /* clean up previous redis reply */
926                 freeReplyObject(rpl->rplRedis);
927                 rpl->rplRedis = NULL;
928         }
929
930         rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap );
931         if(rpl->rplRedis == NULL)
932         {
933                 /* null reply, reconnect and try again */
934                 if(rsrv->ctxRedis->err)
935                 {
936                         LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
937                 }
938                 if(redisc_reconnect_server(rsrv)==0)
939                 {
940                         rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap2);
941                         if (rpl->rplRedis ==NULL)
942                         {
943                                 redis_count_err_and_disable(rsrv);
944                                 goto error_exec;
945                         }
946                 }
947                 else
948                 {
949                         redis_count_err_and_disable(rsrv);
950                         LM_ERR("unable to reconnect to redis server: %.*s\n",
951                                         srv->len, srv->s);
952                         cmd->s[cmd->len] = c;
953                         goto error_exec;
954                 }
955         }
956         if (check_cluster_reply(rpl->rplRedis, &rsrv)) {
957                 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
958                 if(rsrv->ctxRedis==NULL)
959                 {
960                         LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
961                         goto error_exec;
962                 }
963
964                 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
965
966                 if(rpl->rplRedis!=NULL)
967                 {
968                         /* clean up previous redis reply */
969                         freeReplyObject(rpl->rplRedis);
970                         rpl->rplRedis = NULL;
971                 }
972                 rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap3 );
973                 if(rpl->rplRedis == NULL)
974                 {
975                         /* null reply, reconnect and try again */
976                         if(rsrv->ctxRedis->err)
977                         {
978                                 LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
979                         }
980                         if(redisc_reconnect_server(rsrv)==0)
981                         {
982                                 rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap4);
983                         } else {
984                                 LM_ERR("unable to reconnect to redis server: %.*s\n",
985                                                 srv->len, srv->s);
986                                 cmd->s[cmd->len] = c;
987                                 goto error_exec;
988                         }
989                 }
990         }
991         cmd->s[cmd->len] = c;
992         rsrv->disable.consecutive_errors = 0;
993         va_end(ap);
994         va_end(ap2);
995         va_end(ap3);
996         va_end(ap4);
997
998         LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
999
1000         return 0;
1001
1002 error_exec:
1003         cmd->s[cmd->len] = c;
1004         ret = -1;
1005         goto error;
1006
1007 srv_disabled:
1008         cmd->s[cmd->len] = c;
1009         ret = -2;
1010         goto error;
1011
1012 error:
1013         va_end(ap);
1014         va_end(ap2);
1015         va_end(ap3);
1016         va_end(ap4);
1017         return ret;
1018 }
1019
1020 /**
1021  * Executes a redis command.
1022  * Command is coded using a vector of strings, and a vector of lengths.
1023  *
1024  * @param rsrv Pointer to a redis_server_t structure.
1025  * @param argc number of elements in the command vector.
1026  * @param argv vector of zero terminated strings forming the command.
1027  * @param argvlen vector of command string lengths or NULL.
1028  * @return redisReply structure or NULL if there was an error.
1029  */
1030 redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv,
1031                 const size_t *argvlen)
1032 {
1033         redisReply *res=NULL;
1034
1035         if(rsrv==NULL)
1036         {
1037                 LM_ERR("no redis context found for server %.*s\n",
1038                                 (rsrv)?rsrv->sname->len:0,
1039                                 (rsrv)?rsrv->sname->s:"");
1040                 return NULL;
1041         }
1042
1043         LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
1044         if(rsrv->ctxRedis==NULL)
1045         {
1046                 LM_ERR("no redis context found for server %.*s\n",
1047                         (rsrv)?rsrv->sname->len:0,
1048                         (rsrv)?rsrv->sname->s:"");
1049                 return NULL;
1050         }
1051
1052         if(argc<=0)
1053         {
1054                 LM_ERR("invalid parameters\n");
1055                 return NULL;
1056         }
1057         if(argv==NULL || *argv==NULL)
1058         {
1059                 LM_ERR("invalid parameters\n");
1060                 return NULL;
1061         }
1062 again:
1063         res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
1064
1065         /* null reply, reconnect and try again */
1066         if(rsrv->ctxRedis->err)
1067         {
1068                 LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
1069         }
1070
1071         if(res)
1072         {
1073                 if (check_cluster_reply(res, &rsrv)) {
1074                         goto again;
1075                 }
1076                 return res;
1077         }
1078
1079         if(redisc_reconnect_server(rsrv)==0)
1080         {
1081                 res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
1082                 if (res) {
1083                         if (check_cluster_reply(res, &rsrv)) {
1084                                 goto again;
1085                         }
1086                 }
1087         }
1088         else
1089         {
1090                 LM_ERR("Unable to reconnect to server: %.*s\n",
1091                                 rsrv->sname->len, rsrv->sname->s);
1092                 return NULL;
1093         }
1094
1095         return res;
1096 }
1097
1098 /**
1099  *
1100  */
1101 redisc_reply_t *redisc_get_reply(str *name)
1102 {
1103         redisc_reply_t *rpl;
1104         unsigned int hid;
1105
1106         hid = get_hash1_raw(name->s, name->len);
1107
1108         for(rpl=_redisc_rpl_list; rpl; rpl=rpl->next) {
1109                 if(rpl->hname==hid && rpl->rname.len==name->len
1110                                 && strncmp(rpl->rname.s, name->s, name->len)==0)
1111                         return rpl;
1112         }
1113         /* not found - add a new one */
1114
1115         rpl = (redisc_reply_t*)pkg_malloc(sizeof(redisc_reply_t));
1116         if(rpl==NULL)
1117         {
1118                 LM_ERR("no more pkg\n");
1119                 return NULL;
1120         }
1121         memset(rpl, 0, sizeof(redisc_reply_t));
1122         rpl->hname = hid;
1123         rpl->rname.s = (char*)pkg_malloc(name->len+1);
1124         if(rpl->rname.s==NULL)
1125         {
1126                 LM_ERR("no more pkg.\n");
1127                 pkg_free(rpl);
1128                 return NULL;
1129         }
1130         strncpy(rpl->rname.s, name->s, name->len);
1131         rpl->rname.len = name->len;
1132         rpl->rname.s[name->len] = '\0';
1133         rpl->next = _redisc_rpl_list;
1134         _redisc_rpl_list = rpl;
1135         return rpl;
1136 }
1137
1138
1139 /**
1140  *
1141  */
1142 int redisc_free_reply(str *name)
1143 {
1144         redisc_reply_t *rpl;
1145         unsigned int hid;
1146
1147         if(name==NULL || name->len==0) {
1148                 LM_ERR("invalid parameters");
1149                 return -1;
1150         }
1151
1152         hid = get_hash1_raw(name->s, name->len);
1153
1154         rpl = _redisc_rpl_list;
1155         while(rpl) {
1156
1157                 if(rpl->hname==hid && rpl->rname.len==name->len
1158                                 && strncmp(rpl->rname.s, name->s, name->len)==0) {
1159                         if(rpl->rplRedis) {
1160                                 freeReplyObject(rpl->rplRedis);
1161                                 rpl->rplRedis = NULL;
1162                         }
1163
1164                         return 0;
1165                 }
1166
1167                 rpl = rpl->next;
1168         }
1169
1170         /* reply entry not found. */
1171         return -1;
1172 }
1173
1174 int redisc_check_auth(redisc_server_t *rsrv, char *pass)
1175 {
1176         redisReply *reply;
1177         int retval = 0;
1178
1179         reply = redisCommand(rsrv->ctxRedis, "AUTH %s", pass);
1180         if (reply->type == REDIS_REPLY_ERROR) {
1181                 LM_ERR("Redis authentication error\n");
1182                 retval = -1;
1183         }
1184         freeReplyObject(reply);
1185         return retval;
1186 }
1187
1188 /* backwards compatibility with hiredis < 0.12 */
1189 #if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
1190 int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len)
1191 {
1192         sds newbuf;
1193
1194         newbuf = sdscatlen(c->obuf,cmd,len);
1195         if (newbuf == NULL) {
1196                 c->err = REDIS_ERR_OOM;
1197                 strcpy(c->errstr,"Out of memory");
1198                 return REDIS_ERR;
1199         }
1200         c->obuf = newbuf;
1201         return REDIS_OK;
1202 }
1203 #endif
1204
1205 int redis_check_server(redisc_server_t *rsrv)
1206 {
1207
1208         if (rsrv->disable.disabled)
1209         {
1210                 if (get_ticks() > rsrv->disable.restore_tick)
1211                 {
1212                         LM_NOTICE("REDIS server %.*s re-enabled",
1213                                         rsrv->sname->len, rsrv->sname->s);
1214                         rsrv->disable.disabled = 0;
1215                         rsrv->disable.consecutive_errors = 0;
1216                 }
1217                 else
1218                 {
1219                         return 1;
1220                 }
1221         }
1222         return 0;
1223 }
1224
1225 int redis_count_err_and_disable(redisc_server_t *rsrv)
1226 {
1227         if (redis_allowed_timeouts_param < 0)
1228         {
1229                 return 0;
1230         }
1231
1232         rsrv->disable.consecutive_errors++;
1233         if (rsrv->disable.consecutive_errors > redis_allowed_timeouts_param)
1234         {
1235                 rsrv->disable.disabled=1;
1236                 rsrv->disable.restore_tick=get_ticks() + redis_disable_time_param;
1237                 LM_WARN("REDIS server %.*s disabled for %d seconds", rsrv->sname->len,
1238                                 rsrv->sname->s, redis_disable_time_param);
1239                 return 1;
1240         }
1241         return 0;
1242 }
1243
1244 void print_redis_reply(int log_level, redisReply *rpl,int offset)
1245 {
1246         int i;
1247         char padding[MAXIMUM_NESTED_KEYS + 1];
1248
1249         if(!is_printable(log_level))
1250                 return;
1251
1252         if (!rpl)
1253         {
1254                 LM_ERR("Unexpected null reply");
1255                 return;
1256         }
1257
1258         if (offset > MAXIMUM_NESTED_KEYS)
1259         {
1260                 LM_ERR("Offset is too big");
1261                 return;
1262         }
1263
1264         for (i=0;i<offset;i++)
1265         {
1266                 padding[i]='\t';
1267         }
1268         padding[offset]='\0';
1269
1270         switch (rpl->type)
1271         {
1272         case REDIS_REPLY_STRING:
1273                 LOG(log_level,"%sstring reply: [%s]", padding, rpl->str);
1274                 break;
1275         case REDIS_REPLY_INTEGER:
1276                 LOG(log_level,"%sinteger reply: %lld", padding, rpl->integer);
1277                 break;
1278         case REDIS_REPLY_ARRAY:
1279                 LOG(log_level,"%sarray reply with %d elements", padding,
1280                                 (int)rpl->elements);
1281                 for (i=0; i < rpl->elements; i++)
1282                 {
1283                         LOG(log_level,"%selement %d:",padding,i);
1284                         print_redis_reply(log_level,rpl->element[i],offset+1);
1285                 }
1286                 break;
1287         case REDIS_REPLY_NIL:
1288                 LOG(log_level,"%snil reply",padding);
1289                 break;
1290         case REDIS_REPLY_STATUS:
1291                 LOG(log_level,"%sstatus reply: %s", padding, rpl->str);
1292                 break;
1293         case REDIS_REPLY_ERROR:
1294                 LOG(log_level,"%serror reply: %s", padding, rpl->str);
1295                 break;
1296         }
1297 }