ndb_redis: use the core macros for ending string value with '\0' and to restore
[sip-router] / src / modules / ndb_redis / redis_client.c
1 /**
2  * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com)
3  *
4  * Copyright (C) 2012 Vicente Hernando Ara (System One: www.systemonenoc.com)
5  *     - for: redis array reply support
6  *
7  * Copyright (C) 2017 Carsten Bock (ng-voice GmbH)
8  *     - for: Cluster support
9  *
10  * This file is part of Kamailio, a free SIP server.
11  *
12  * Kamailio is free software; you can redistribute it and/or modify
13  * it under the terms of the GNU General Public License as published by
14  * the Free Software Foundation; either version 2 of the License, or
15  * (at your option) any later version
16  *
17  * Kamailio is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20  * GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License
23  * along with this program; if not, write to the Free Software
24  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
25  *
26  */
27
28 #include <stdio.h>
29 #include <unistd.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <sys/time.h>
33 #include <stdarg.h>
34
35 #include "../../core/mem/mem.h"
36 #include "../../core/dprint.h"
37 #include "../../core/hashes.h"
38 #include "../../core/ut.h"
39
40 #include "redis_client.h"
41
42 #define redisCommandNR(a...) (int)({ void *__tmp; __tmp = redisCommand(a); \
43                 if (__tmp) freeReplyObject(__tmp); __tmp ? 0 : -1;})
44
45 static redisc_server_t * _redisc_srv_list=NULL;
46
47 static redisc_reply_t *_redisc_rpl_list=NULL;
48
49 extern int init_without_redis;
50 extern int redis_connect_timeout_param;
51 extern int redis_cmd_timeout_param;
52 extern int redis_cluster_param;
53 extern int redis_disable_time_param;
54 extern int redis_allowed_timeouts_param;
55 extern int redis_flush_on_reconnect_param;
56 extern int redis_allow_dynamic_nodes_param;
57
58 /* backwards compatibility with hiredis < 0.12 */
59 #if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
60 typedef char *sds;
61 sds sdscatlen(sds s, const void *t, size_t len);
62 int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len);
63 #else
64 #define redis_append_formatted_command redisAppendFormattedCommand
65 #endif
66
67 /**
68  *
69  */
70 int redisc_init(void)
71 {
72         char addr[256], pass[256], unix_sock_path[256], sentinel_group[256];
73
74         unsigned int port, db, sock = 0, haspass = 0, sentinel_master = 1;
75         int i, row;
76         redisc_server_t *rsrv=NULL;
77         param_t *pit = NULL;
78         struct timeval tv_conn;
79         struct timeval tv_cmd;
80
81         tv_conn.tv_sec = (int) redis_connect_timeout_param / 1000;
82         tv_conn.tv_usec = (int) (redis_connect_timeout_param % 1000) * 1000;
83
84         tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000;
85         tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000;
86
87         if(_redisc_srv_list==NULL)
88         {
89                 LM_ERR("no redis servers defined\n");
90                 return -1;
91         }
92
93         for(rsrv=_redisc_srv_list; rsrv; rsrv=rsrv->next)
94         {
95                 char sentinels[MAXIMUM_SENTINELS][256];
96                 uint8_t sentinels_count = 0;
97
98                 port = 6379;
99                 db = 0;
100                 haspass = 0;
101                 sock = 0;
102
103                 memset(addr, 0, sizeof(addr));
104                 memset(pass, 0, sizeof(pass));
105                 memset(unix_sock_path, 0, sizeof(unix_sock_path));
106
107                 for (pit = rsrv->attrs; pit; pit=pit->next)
108                 {
109                         if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) {
110                                 snprintf(unix_sock_path, sizeof(unix_sock_path)-1, "%.*s",
111                                                 pit->body.len, pit->body.s);
112                                 sock = 1;
113                         } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
114                                 snprintf(addr, sizeof(addr)-1, "%.*s",
115                                                 pit->body.len, pit->body.s);
116                         } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
117                                 if(str2int(&pit->body, &port) < 0)
118                                         port = 6379;
119                         } else if(pit->name.len==2 && strncmp(pit->name.s, "db", 2)==0) {
120                                 if(str2int(&pit->body, &db) < 0)
121                                         db = 0;
122                         } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) {
123                                 snprintf(pass, sizeof(pass)-1, "%.*s",
124                                                 pit->body.len, pit->body.s);
125                                 haspass = 1;
126                         } else if(pit->name.len==14 && strncmp(pit->name.s,
127                                                 "sentinel_group", 14)==0) {
128                                 snprintf(sentinel_group, sizeof(sentinel_group)-1, "%.*s",
129                                                 pit->body.len, pit->body.s);
130                         } else if(pit->name.len==15 && strncmp(pit->name.s,
131                                                 "sentinel_master", 15)==0) {
132                                 if(str2int(&pit->body, &sentinel_master) < 0)
133                                         sentinel_master = 1;
134                         } else if(pit->name.len==8 && strncmp(pit->name.s,
135                                                 "sentinel", 8)==0) {
136                                 if( sentinels_count < MAXIMUM_SENTINELS ){
137                                         snprintf(sentinels[sentinels_count],
138                                                         sizeof(sentinels[sentinels_count])-1, "%.*s",
139                                                         pit->body.len, pit->body.s);
140                                         sentinels_count++;
141                                 }
142                                 else {
143                                         LM_ERR("too many sentinels, maximum %d supported.\n",
144                                                         MAXIMUM_SENTINELS);
145                                         return -1;
146                                 }
147                         }
148                 }
149
150                 // if sentinels are provided, we need to connect to them and retrieve the redis server
151                 // address / port
152                 if(sentinels_count > 0) {
153                         for(i= 0; i< sentinels_count; i++) {
154                                 char *sentinelAddr = sentinels[i];
155                                 char *pos;
156                                 redisContext *redis;
157                                 redisReply *res, *res2;
158
159                                 port = 6379;
160                                 if( (pos = strchr(sentinelAddr, ':')) != NULL ) {
161                                         port = atoi(pos+1);
162                                         pos[i] = '\0';
163                                 }
164
165                                 redis = redisConnectWithTimeout(sentinelAddr, port, tv_conn);
166                                 if( redis ) {
167                                         if(sentinel_master != 0) {
168                                                 res = redisCommand(redis,
169                                                                 "SENTINEL get-master-addr-by-name %s",
170                                                                 sentinel_group);
171                                                 if( res && (res->type == REDIS_REPLY_ARRAY)
172                                                                 && (res->elements == 2) ) {
173                                                         strncpy(addr, res->element[0]->str,
174                                                                         res->element[0]->len + 1);
175                                                         port = atoi(res->element[1]->str);
176                                                         LM_DBG("sentinel replied: %s:%d\n", addr, port);
177                                                 }
178                                         }
179                                         else {
180                                                 res = redisCommand(redis, "SENTINEL slaves %s",
181                                                                 sentinel_group);
182                                                 if( res && (res->type == REDIS_REPLY_ARRAY) ) {
183                                                         for(row = 0; row< res->elements; row++){
184                                                                 res2 = res->element[row];
185                                                                 for(i= 0; i< res2->elements; i+= 2) {
186                                                                         if( strncmp(res2->element[i]->str,
187                                                                                                 "ip", 2) == 0 ) {
188                                                                                 strncpy(addr, res2->element[i+1]->str,
189                                                                                                 res2->element[i+1]->len);
190                                                                                 addr[res2->element[i+1]->len] = '\0';
191                                                                         }
192                                                                         else if( strncmp(res2->element[i]->str,
193                                                                                                 "port", 4) == 0) {
194                                                                                 port = atoi(res2->element[i+1]->str);
195                                                                                 break;
196                                                                         }
197                                                                 }
198                                                         }
199                                                         LM_DBG("slave for %s: %s:%d\n", sentinel_group,
200                                                                         addr, port);
201                                                 }
202                                         }
203                                 }
204                         }
205                 }
206
207                 if(sock != 0) {
208                         LM_DBG("Connecting to unix socket: %s\n", unix_sock_path);
209                         rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path,
210                                         tv_conn);
211                 } else {
212                         LM_DBG("Connecting to %s:%d\n", addr, port);
213                         rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn);
214                 }
215
216                 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
217
218                 if(!rsrv->ctxRedis) {
219                         LM_ERR("Failed to create REDIS-Context.\n");
220                         goto err;
221                 }
222                 if (rsrv->ctxRedis->err) {
223                         LM_ERR("Failed to create REDIS returned an error: %s\n",
224                                         rsrv->ctxRedis->errstr);
225                         goto err2;
226                 }
227                 if ((haspass != 0) && redisc_check_auth(rsrv, pass)) {
228                         LM_ERR("Authentication failed.\n");
229                         goto err2;
230                 }
231                 if (redisSetTimeout(rsrv->ctxRedis, tv_cmd)) {
232                         LM_ERR("Failed to set timeout.\n");
233                         goto err2;
234                 }
235                 if (redisCommandNR(rsrv->ctxRedis, "PING")) {
236                         LM_ERR("Failed to send PING (REDIS returned %s).\n",
237                                         rsrv->ctxRedis->errstr);
238                         goto err2;
239                 }
240                 if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis,
241                                         "SELECT %i", db)) {
242                         LM_ERR("Failed to send \"SELECT %i\" (REDIS returned \"%s\","
243                                         " and not in cluster mode).\n", db, rsrv->ctxRedis->errstr);
244                         goto err2;
245                 }
246         }
247
248         return 0;
249
250 err2:
251         if (sock != 0) {
252                 LM_ERR("error communicating with redis server [%.*s]"
253                                 " (unix:%s db:%d): %s\n",
254                                 rsrv->sname->len, rsrv->sname->s, unix_sock_path, db,
255                                 rsrv->ctxRedis->errstr);
256         } else {
257                 LM_ERR("error communicating with redis server [%.*s] (%s:%d/%d): %s\n",
258                                 rsrv->sname->len, rsrv->sname->s, addr, port, db,
259                                 rsrv->ctxRedis->errstr);
260         }
261         if (init_without_redis==1)
262         {
263                 LM_WARN("failed to initialize redis connections, but initializing"
264                                 " module anyway.\n");
265                 return 0;
266         }
267
268         return -1;
269 err:
270         if (sock != 0) {
271                 LM_ERR("failed to connect to redis server [%.*s] (unix:%s db:%d)\n",
272                                 rsrv->sname->len, rsrv->sname->s, unix_sock_path, db);
273         } else {
274                 LM_ERR("failed to connect to redis server [%.*s] (%s:%d/%d)\n",
275                                 rsrv->sname->len, rsrv->sname->s, addr, port, db);
276         }
277         if (init_without_redis==1)
278         {
279                 LM_WARN("failed to initialize redis connections, but initializing"
280                                 " module anyway.\n");
281                 return 0;
282         }
283
284         return -1;
285 }
286
287 /**
288  *
289  */
290 int redisc_destroy(void)
291 {
292         redisc_reply_t *rpl, *next_rpl;
293
294         redisc_server_t *rsrv=NULL;
295         redisc_server_t *rsrv1=NULL;
296
297         rpl = _redisc_rpl_list;
298         while(rpl != NULL)
299         {
300                 next_rpl = rpl->next;
301                 if(rpl->rplRedis)
302                         freeReplyObject(rpl->rplRedis);
303
304                 if(rpl->rname.s != NULL)
305                         pkg_free(rpl->rname.s);
306
307                 pkg_free(rpl);
308                 rpl = next_rpl;
309         }
310         _redisc_rpl_list = NULL;
311
312         if(_redisc_srv_list==NULL)
313                 return -1;
314         rsrv=_redisc_srv_list;
315         while(rsrv!=NULL)
316         {
317                 rsrv1 = rsrv;
318                 rsrv=rsrv->next;
319                 if (rsrv1->ctxRedis!=NULL)
320                         redisFree(rsrv1->ctxRedis);
321                 free_params(rsrv1->attrs);
322                 pkg_free(rsrv1);
323         }
324         _redisc_srv_list = NULL;
325
326         return 0;
327 }
328
329 /**
330  *
331  */
332 int redisc_add_server(char *spec)
333 {
334         param_t *pit=NULL;
335         param_hooks_t phooks;
336         redisc_server_t *rsrv=NULL;
337         str s;
338
339         s.s = spec;
340         s.len = strlen(spec);
341         if(s.s[s.len-1]==';')
342                 s.len--;
343         if (parse_params(&s, CLASS_ANY, &phooks, &pit)<0)
344         {
345                 LM_ERR("failed parsing params value\n");
346                 goto error;
347         }
348         rsrv = (redisc_server_t*)pkg_malloc(sizeof(redisc_server_t));
349         if(rsrv==NULL)
350         {
351                 LM_ERR("no more pkg\n");
352                 goto error;
353         }
354         memset(rsrv, 0, sizeof(redisc_server_t));
355         rsrv->attrs = pit;
356         rsrv->spec = spec;
357         for (pit = rsrv->attrs; pit; pit=pit->next)
358         {
359                 if(pit->name.len==4 && strncmp(pit->name.s, "name", 4)==0) {
360                         rsrv->sname = &pit->body;
361                         rsrv->hname = get_hash1_raw(rsrv->sname->s, rsrv->sname->len);
362                         break;
363                 }
364         }
365         if(rsrv->sname==NULL)
366         {
367                 LM_ERR("no server name\n");
368                 goto error;
369         }
370         rsrv->next = _redisc_srv_list;
371         _redisc_srv_list = rsrv;
372
373         return 0;
374 error:
375         if(pit!=NULL)
376                 free_params(pit);
377         if(rsrv!=NULL)
378                 pkg_free(rsrv);
379         return -1;
380 }
381
382 /**
383  *
384  */
385 redisc_server_t *redisc_get_server(str *name)
386 {
387         redisc_server_t *rsrv=NULL;
388         unsigned int hname;
389
390         hname = get_hash1_raw(name->s, name->len);
391         LM_DBG("Hash %u (%.*s)\n", hname, name->len, name->s);
392         rsrv=_redisc_srv_list;
393         while(rsrv!=NULL)
394         {
395                 LM_DBG("Entry %u (%.*s)\n", rsrv->hname,
396                                 rsrv->sname->len, rsrv->sname->s);
397                 if(rsrv->hname==hname && rsrv->sname->len==name->len
398                                 && strncmp(rsrv->sname->s, name->s, name->len)==0)
399                         return rsrv;
400                 rsrv=rsrv->next;
401         }
402         LM_DBG("No entry found.\n");
403         return NULL;
404 }
405
406 /**
407  *
408  */
409 int redisc_reconnect_server(redisc_server_t *rsrv)
410 {
411         char addr[256], pass[256], unix_sock_path[256];
412         unsigned int port, db, sock = 0, haspass = 0;
413         param_t *pit = NULL;
414         struct timeval tv_conn;
415         struct timeval tv_cmd;
416
417         tv_conn.tv_sec = (int) redis_connect_timeout_param / 1000;
418         tv_conn.tv_usec = (int) (redis_connect_timeout_param % 1000) * 1000;
419
420         tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000;
421         tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000;
422
423         memset(addr, 0, sizeof(addr));
424         port = 6379;
425         db = 0;
426         memset(pass, 0, sizeof(pass));
427         memset(unix_sock_path, 0, sizeof(unix_sock_path));
428         for (pit = rsrv->attrs; pit; pit=pit->next)
429         {
430                 if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) {
431                         snprintf(unix_sock_path, sizeof(unix_sock_path)-1, "%.*s",
432                                         pit->body.len, pit->body.s);
433                         sock = 1;
434                 } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) {
435                         snprintf(addr, sizeof(addr)-1, "%.*s", pit->body.len, pit->body.s);
436                 } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) {
437                         if(str2int(&pit->body, &port) < 0)
438                                 port = 6379;
439                 } else if(pit->name.len==2 && strncmp(pit->name.s, "db", 2)==0) {
440                         if(str2int(&pit->body, &db) < 0)
441                                 db = 0;
442                 } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) {
443                         snprintf(pass, sizeof(pass)-1, "%.*s", pit->body.len, pit->body.s);
444                         haspass = 1;
445                 }
446         }
447
448         LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
449         if(rsrv->ctxRedis!=NULL) {
450                 redisFree(rsrv->ctxRedis);
451                 rsrv->ctxRedis = NULL;
452         }
453
454         if(sock != 0) {
455                 rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path, tv_conn);
456         } else {
457                 rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn);
458         }
459         LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
460         if(!rsrv->ctxRedis)
461                 goto err;
462         if (rsrv->ctxRedis->err)
463                 goto err2;
464         if ((haspass) && redisc_check_auth(rsrv, pass))
465                 goto err2;
466         if (redisSetTimeout(rsrv->ctxRedis, tv_cmd))
467                 goto err2;
468         if (redisCommandNR(rsrv->ctxRedis, "PING"))
469                 goto err2;
470         if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis,
471                                 "SELECT %i", db))
472                 goto err2;
473         if (redis_flush_on_reconnect_param)
474                 if (redisCommandNR(rsrv->ctxRedis, "FLUSHALL"))
475                         goto err2;
476         return 0;
477
478 err2:
479         if (sock != 0) {
480                 LM_ERR("error communicating with redis server [%.*s]"
481                                 " (unix:%s db:%d): %s\n",
482                                 rsrv->sname->len, rsrv->sname->s, unix_sock_path, db,
483                                 rsrv->ctxRedis->errstr);
484         } else {
485                 LM_ERR("error communicating with redis server [%.*s] (%s:%d/%d): %s\n",
486                                 rsrv->sname->len, rsrv->sname->s, addr, port, db,
487                                 rsrv->ctxRedis->errstr);
488         }
489 err:
490         if (sock != 0) {
491                 LM_ERR("failed to connect to redis server [%.*s] (unix:%s db:%d)\n",
492                                 rsrv->sname->len, rsrv->sname->s, unix_sock_path, db);
493         } else {
494                 LM_ERR("failed to connect to redis server [%.*s] (%s:%d/%d)\n",
495                                 rsrv->sname->len, rsrv->sname->s, addr, port, db);
496         }
497         return -1;
498 }
499
500 /**
501  *
502  */
503 int redisc_append_cmd(str *srv, str *res, str *cmd, ...)
504 {
505         redisc_server_t *rsrv=NULL;
506         redisc_reply_t *rpl;
507         char c;
508         va_list ap;
509
510         va_start(ap, cmd);
511
512         if(srv==NULL || cmd==NULL || res==NULL)
513         {
514                 LM_ERR("invalid parameters");
515                 goto error_cmd;
516         }
517         if(srv->len==0 || res->len==0 || cmd->len==0)
518         {
519                 LM_ERR("invalid parameters");
520                 goto error_cmd;
521         }
522         rsrv = redisc_get_server(srv);
523         if(rsrv==NULL)
524         {
525                 LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
526                 goto error_cmd;
527         }
528         if(rsrv->ctxRedis==NULL)
529         {
530                 LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
531                 goto error_cmd;
532         }
533         if (rsrv->piped.pending_commands >= MAXIMUM_PIPELINED_COMMANDS)
534         {
535                 LM_ERR("Too many pipelined commands, maximum is %d\n",
536                                 MAXIMUM_PIPELINED_COMMANDS);
537                 goto error_cmd;
538         }
539         rpl = redisc_get_reply(res);
540         if(rpl==NULL)
541         {
542                 LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
543                 goto error_cmd;
544         }
545         STR_VTOZ(cmd->s[cmd->len], c);
546         rsrv->piped.commands[rsrv->piped.pending_commands].len = redisvFormatCommand(
547                         &rsrv->piped.commands[rsrv->piped.pending_commands].s,
548                         cmd->s,
549                         ap);
550         if (rsrv->piped.commands[rsrv->piped.pending_commands].len < 0)
551         {
552                 LM_ERR("Invalid redis command : %s\n",cmd->s);
553                 goto error_cmd;
554         }
555         rsrv->piped.replies[rsrv->piped.pending_commands]=rpl;
556         rsrv->piped.pending_commands++;
557
558         STR_ZTOV(cmd->s[cmd->len], c);
559         va_end(ap);
560         return 0;
561
562 error_cmd:
563         va_end(ap);
564         return -1;
565
566 }
567
568
569 /**
570  *
571  */
572 int redisc_exec_pipelined_cmd(str *srv)
573 {
574         redisc_server_t *rsrv=NULL;
575
576         if (srv == NULL)
577         {
578                 LM_ERR("invalid parameters");
579                 return -1;
580         }
581         if (srv->len == 0)
582         {
583                 LM_ERR("invalid parameters");
584                 return -1;
585         }
586         rsrv = redisc_get_server(srv);
587         if (rsrv == NULL)
588         {
589                 LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
590                 return -1;
591         }
592         if (rsrv->ctxRedis == NULL)
593         {
594                 LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
595                 return -1;
596         }
597         return redisc_exec_pipelined(rsrv);
598 }
599
600 /**
601  *
602  */
603 int redisc_create_pipelined_message(redisc_server_t *rsrv)
604 {
605         int i;
606
607         if (rsrv->ctxRedis->err)
608         {
609                 LM_DBG("Reconnecting server because of error %d: \"%s\"",
610                                 rsrv->ctxRedis->err,rsrv->ctxRedis->errstr);
611                 if (redisc_reconnect_server(rsrv))
612                 {
613                         LM_ERR("unable to reconnect to REDIS server: %.*s\n",
614                                         rsrv->sname->len,rsrv->sname->s);
615                         return -1;
616                 }
617         }
618
619         for (i=0;i<rsrv->piped.pending_commands;i++)
620         {
621                 if (redis_append_formatted_command(rsrv->ctxRedis,
622                                         rsrv->piped.commands[i].s,rsrv->piped.commands[i].len)
623                                                 != REDIS_OK)
624                 {
625                         LM_ERR("Error while appending command %d",i);
626                         return -1;
627                 }
628         }
629         return 0;
630 }
631
632 /**
633  *
634  */
635 void redisc_free_pipelined_cmds(redisc_server_t *rsrv)
636 {
637         int i;
638         for (i=0;i<rsrv->piped.pending_commands;i++)
639         {
640                 free(rsrv->piped.commands[i].s);
641                 rsrv->piped.commands[i].len=0;
642         }
643         rsrv->piped.pending_commands=0;
644 }
645
646 /**
647  *
648  */
649 int redisc_exec_pipelined(redisc_server_t *rsrv)
650 {
651         redisc_reply_t *rpl;
652         int i;
653
654         LM_DBG("redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s);
655
656         /* if server is disabled do nothing unless the disable time has passed */
657         if (redis_check_server(rsrv))
658         {
659                 goto srv_disabled;
660         }
661
662         if (rsrv->piped.pending_commands == 0)
663         {
664                 LM_WARN("call for redis_cmd without any pipelined commands\n");
665                 return -1;
666         }
667         if(rsrv->ctxRedis==NULL)
668         {
669                 LM_ERR("no redis context for server: %.*s\n",
670                                 rsrv->sname->len,rsrv->sname->s);
671                 goto error_exec;
672         }
673
674         /* send the commands and retrieve the first reply */
675         rpl=rsrv->piped.replies[0];
676
677         if(rpl->rplRedis!=NULL)
678         {
679                 /* clean up previous redis reply */
680                 freeReplyObject(rpl->rplRedis);
681                 rpl->rplRedis = NULL;
682         }
683
684         redisc_create_pipelined_message(rsrv);
685         redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
686
687         if (rpl->rplRedis == NULL)
688         {
689                 /* null reply, reconnect and try again */
690                 if (rsrv->ctxRedis->err)
691                 {
692                         LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
693                 }
694                 if (redisc_create_pipelined_message(rsrv) == 0)
695                 {
696                         redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis);
697                         if (rpl->rplRedis == NULL)
698                         {
699                                 redis_count_err_and_disable(rsrv);
700                                 LM_ERR("Unable to read reply\n");
701                                 goto error_exec;
702                         }
703                 }
704                 else
705                 {
706                         redis_count_err_and_disable(rsrv);
707                         goto error_exec;
708                 }
709         }
710         LM_DBG_redis_reply(rpl->rplRedis);
711
712         /* replies are received just retrieve them */
713         for (i=1;i<rsrv->piped.pending_commands;i++)
714         {
715                 rpl=rsrv->piped.replies[i];
716                 if(rpl->rplRedis!=NULL)
717                 {
718                         /* clean up previous redis reply */
719                         freeReplyObject(rpl->rplRedis);
720                         rpl->rplRedis = NULL;
721                 }
722                 if (redisGetReplyFromReader(rsrv->ctxRedis, (void**) &rpl->rplRedis)
723                                 != REDIS_OK)
724                 {
725                         LM_ERR("Unable to read reply\n");
726                         continue;
727                 }
728                 if (rpl->rplRedis == NULL)
729                 {
730                         LM_ERR("Trying to read reply for command %.*s but nothing in buffer!",
731                                         rsrv->piped.commands[i].len,rsrv->piped.commands[i].s);
732                         continue;
733                 }
734                 LM_DBG_redis_reply(rpl->rplRedis);
735         }
736         redisc_free_pipelined_cmds(rsrv);
737         rsrv->disable.consecutive_errors = 0;
738         return 0;
739
740 error_exec:
741         redisc_free_pipelined_cmds(rsrv);
742         return -1;
743
744 srv_disabled:
745         redisc_free_pipelined_cmds(rsrv);
746         return -2;
747 }
748
749 int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv)
750 {
751         redisc_server_t *rsrv_new;
752         char buffername[100];
753         unsigned int port;
754         str addr = {0, 0}, tmpstr = {0, 0}, name = {0, 0};
755         int server_len = 0;
756         char spec_new[100];
757
758         if(redis_cluster_param) {
759                 LM_DBG("Redis replied: \"%.*s\"\n", (int)reply->len, reply->str);
760                 if((reply->len > 7) && (strncmp(reply->str, "MOVED", 5) == 0)) {
761                         port = 6379;
762                         if(strchr(reply->str, ':') > 0) {
763                                 tmpstr.s = strchr(reply->str, ':') + 1;
764                                 tmpstr.len = reply->len - (tmpstr.s - reply->str);
765                                 if(str2int(&tmpstr, &port) < 0)
766                                         port = 6379;
767                                 LM_DBG("Port \"%.*s\" [%i] => %i\n", tmpstr.len, tmpstr.s,
768                                                 tmpstr.len, port);
769                         } else {
770                                 LM_ERR("No Port in REDIS MOVED Reply (%.*s)\n", (int)reply->len,
771                                                 reply->str);
772                                 return 0;
773                         }
774                         if(strchr(reply->str + 6, ' ') > 0) {
775                                 addr.len = tmpstr.s - strchr(reply->str + 6, ' ') - 2;
776                                 addr.s = strchr(reply->str + 6, ' ') + 1;
777                                 LM_DBG("Host \"%.*s\" [%i]\n", addr.len, addr.s, addr.len);
778                         } else {
779                                 LM_ERR("No Host in REDIS MOVED Reply (%.*s)\n", (int)reply->len,
780                                                 reply->str);
781                                 return 0;
782                         }
783
784                         memset(buffername, 0, sizeof(buffername));
785                         name.len = snprintf(buffername, sizeof(buffername), "%.*s:%i",
786                                         addr.len, addr.s, port);
787                         name.s = buffername;
788                         LM_DBG("Name of new connection: %.*s\n", name.len, name.s);
789                         rsrv_new = redisc_get_server(&name);
790                         if(rsrv_new) {
791                                 LM_DBG("Reusing Connection\n");
792                                 *rsrv = rsrv_new;
793                                 return 1;
794                         } else if(redis_allow_dynamic_nodes_param) {
795                                 /* New param redis_allow_dynamic_nodes_param:
796                                 * if set, we allow ndb_redis to add nodes that were
797                                 * not defined explicitly in the module configuration */
798                                 char *server_new;
799
800                                 memset(spec_new, 0, sizeof(spec_new));
801                                 /* For now the only way this can work is if
802                                  * the new node is accessible with default
803                                  * parameters for sock and db */
804                                 server_len = snprintf(spec_new, sizeof(spec_new) - 1,
805                                                 "name=%.*s;addr=%.*s;port=%i", name.len, name.s,
806                                                 addr.len, addr.s, port);
807
808                                 if(server_len<0 || server_len>sizeof(spec_new) - 1) {
809                                         LM_ERR("failed to print server spec string\n");
810                                         return 0;
811                                 }
812                                 server_new = (char *)pkg_malloc(server_len + 1);
813                                 if(server_new == NULL) {
814                                         LM_ERR("Error allocating pkg mem\n");
815                                         return 0;
816                                 }
817
818                                 strncpy(server_new, spec_new, server_len);
819                                 server_new[server_len] = '\0';
820
821                                 if(redisc_add_server(server_new) == 0) {
822                                         rsrv_new = redisc_get_server(&name);
823
824                                         if(rsrv_new) {
825                                                 *rsrv = rsrv_new;
826                                                 /* Need to connect to the new server now */
827                                                 if(redisc_reconnect_server(rsrv_new) == 0) {
828                                                         LM_DBG("Connected to the new server with name: "
829                                                                    "%.*s\n",
830                                                                         name.len, name.s);
831                                                         return 1;
832                                                 } else {
833                                                         LM_ERR("ERROR connecting to the new server with "
834                                                                    "name: %.*s\n",
835                                                                         name.len, name.s);
836                                                         return 0;
837                                                 }
838                                         } else {
839                                                 /* Adding the new node failed
840                                                  * - cannot perform redirection */
841                                                 LM_ERR("No new connection with name (%.*s) was "
842                                                                 "created\n", name.len, name.s);
843                                         }
844                                 } else {
845                                         LM_ERR("Could not add a new connection with name %.*s\n",
846                                                         name.len, name.s);
847                                         pkg_free(server_new);
848                                 }
849                         } else {
850                                 LM_ERR("No Connection with name (%.*s)\n", name.len, name.s);
851                         }
852                 }
853         }
854         return 0;
855 }
856
857 /**
858  *
859  */
860 int redisc_exec(str *srv, str *res, str *cmd, ...)
861 {
862         redisc_server_t *rsrv=NULL;
863         redisc_reply_t *rpl;
864         char c;
865         va_list ap, ap2, ap3, ap4;
866         int ret = -1;
867
868         va_start(ap, cmd);
869         va_copy(ap2, ap);
870         va_copy(ap3, ap);
871         va_copy(ap4, ap);
872
873         if(srv==NULL || cmd==NULL || res==NULL)
874         {
875                 LM_ERR("invalid parameters");
876                 goto error;
877         }
878         if(srv->len==0 || res->len==0 || cmd->len==0)
879         {
880                 LM_ERR("invalid parameters");
881                 goto error;
882         }
883
884         STR_VTOZ(cmd->s[cmd->len], c);
885
886         rsrv = redisc_get_server(srv);
887         if(rsrv==NULL)
888         {
889                 LM_ERR("no redis server found: %.*s\n", srv->len, srv->s);
890                 goto error_exec;
891         }
892
893         LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
894
895         if(rsrv->ctxRedis==NULL)
896         {
897                 LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
898                 goto error_exec;
899         }
900         LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
901
902         if (rsrv->piped.pending_commands != 0)
903         {
904                 LM_NOTICE("Calling redis_cmd with pipelined commands in the buffer."
905                                 " Automatically call redis_execute");
906                 redisc_exec_pipelined(rsrv);
907         }
908         /* if server is disabled do nothing unless the disable time has passed */
909         if (redis_check_server(rsrv))
910         {
911                 goto srv_disabled;
912         }
913
914         rpl = redisc_get_reply(res);
915         if(rpl==NULL)
916         {
917                 LM_ERR("no redis reply id found: %.*s\n", res->len, res->s);
918                 goto error_exec;
919         }
920         if(rpl->rplRedis!=NULL)
921         {
922                 /* clean up previous redis reply */
923                 freeReplyObject(rpl->rplRedis);
924                 rpl->rplRedis = NULL;
925         }
926
927         rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap );
928         if(rpl->rplRedis == NULL)
929         {
930                 /* null reply, reconnect and try again */
931                 if(rsrv->ctxRedis->err)
932                 {
933                         LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
934                 }
935                 if(redisc_reconnect_server(rsrv)==0)
936                 {
937                         rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap2);
938                         if (rpl->rplRedis ==NULL)
939                         {
940                                 redis_count_err_and_disable(rsrv);
941                                 goto error_exec;
942                         }
943                 }
944                 else
945                 {
946                         redis_count_err_and_disable(rsrv);
947                         LM_ERR("unable to reconnect to redis server: %.*s\n",
948                                         srv->len, srv->s);
949                         STR_ZTOV(cmd->s[cmd->len], c);
950                         goto error_exec;
951                 }
952         }
953         if (check_cluster_reply(rpl->rplRedis, &rsrv)) {
954                 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
955                 if(rsrv->ctxRedis==NULL)
956                 {
957                         LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s);
958                         goto error_exec;
959                 }
960
961                 LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
962
963                 if(rpl->rplRedis!=NULL)
964                 {
965                         /* clean up previous redis reply */
966                         freeReplyObject(rpl->rplRedis);
967                         rpl->rplRedis = NULL;
968                 }
969                 rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap3 );
970                 if(rpl->rplRedis == NULL)
971                 {
972                         /* null reply, reconnect and try again */
973                         if(rsrv->ctxRedis->err)
974                         {
975                                 LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
976                         }
977                         if(redisc_reconnect_server(rsrv)==0)
978                         {
979                                 rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap4);
980                         } else {
981                                 LM_ERR("unable to reconnect to redis server: %.*s\n",
982                                                 srv->len, srv->s);
983                                 STR_ZTOV(cmd->s[cmd->len], c);
984                                 goto error_exec;
985                         }
986                 }
987         }
988         STR_ZTOV(cmd->s[cmd->len], c);
989         rsrv->disable.consecutive_errors = 0;
990         va_end(ap);
991         va_end(ap2);
992         va_end(ap3);
993         va_end(ap4);
994
995         LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
996
997         return 0;
998
999 error_exec:
1000         STR_ZTOV(cmd->s[cmd->len], c);
1001         ret = -1;
1002         goto error;
1003
1004 srv_disabled:
1005         STR_ZTOV(cmd->s[cmd->len], c);
1006         ret = -2;
1007         goto error;
1008
1009 error:
1010         va_end(ap);
1011         va_end(ap2);
1012         va_end(ap3);
1013         va_end(ap4);
1014         return ret;
1015 }
1016
1017 /**
1018  * Executes a redis command.
1019  * Command is coded using a vector of strings, and a vector of lengths.
1020  *
1021  * @param rsrv Pointer to a redis_server_t structure.
1022  * @param argc number of elements in the command vector.
1023  * @param argv vector of zero terminated strings forming the command.
1024  * @param argvlen vector of command string lengths or NULL.
1025  * @return redisReply structure or NULL if there was an error.
1026  */
1027 redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv,
1028                 const size_t *argvlen)
1029 {
1030         redisReply *res=NULL;
1031
1032         if(rsrv==NULL)
1033         {
1034                 LM_ERR("no redis context found for server %.*s\n",
1035                                 (rsrv)?rsrv->sname->len:0,
1036                                 (rsrv)?rsrv->sname->s:"");
1037                 return NULL;
1038         }
1039
1040         LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis);
1041         if(rsrv->ctxRedis==NULL)
1042         {
1043                 LM_ERR("no redis context found for server %.*s\n",
1044                         (rsrv)?rsrv->sname->len:0,
1045                         (rsrv)?rsrv->sname->s:"");
1046                 return NULL;
1047         }
1048
1049         if(argc<=0)
1050         {
1051                 LM_ERR("invalid parameters\n");
1052                 return NULL;
1053         }
1054         if(argv==NULL || *argv==NULL)
1055         {
1056                 LM_ERR("invalid parameters\n");
1057                 return NULL;
1058         }
1059 again:
1060         res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
1061
1062         /* null reply, reconnect and try again */
1063         if(rsrv->ctxRedis->err)
1064         {
1065                 LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr);
1066         }
1067
1068         if(res)
1069         {
1070                 if (check_cluster_reply(res, &rsrv)) {
1071                         goto again;
1072                 }
1073                 return res;
1074         }
1075
1076         if(redisc_reconnect_server(rsrv)==0)
1077         {
1078                 res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen);
1079                 if (res) {
1080                         if (check_cluster_reply(res, &rsrv)) {
1081                                 goto again;
1082                         }
1083                 }
1084         }
1085         else
1086         {
1087                 LM_ERR("Unable to reconnect to server: %.*s\n",
1088                                 rsrv->sname->len, rsrv->sname->s);
1089                 return NULL;
1090         }
1091
1092         return res;
1093 }
1094
1095 /**
1096  *
1097  */
1098 redisc_reply_t *redisc_get_reply(str *name)
1099 {
1100         redisc_reply_t *rpl;
1101         unsigned int hid;
1102
1103         hid = get_hash1_raw(name->s, name->len);
1104
1105         for(rpl=_redisc_rpl_list; rpl; rpl=rpl->next) {
1106                 if(rpl->hname==hid && rpl->rname.len==name->len
1107                                 && strncmp(rpl->rname.s, name->s, name->len)==0)
1108                         return rpl;
1109         }
1110         /* not found - add a new one */
1111
1112         rpl = (redisc_reply_t*)pkg_malloc(sizeof(redisc_reply_t));
1113         if(rpl==NULL)
1114         {
1115                 LM_ERR("no more pkg\n");
1116                 return NULL;
1117         }
1118         memset(rpl, 0, sizeof(redisc_reply_t));
1119         rpl->hname = hid;
1120         rpl->rname.s = (char*)pkg_malloc(name->len+1);
1121         if(rpl->rname.s==NULL)
1122         {
1123                 LM_ERR("no more pkg.\n");
1124                 pkg_free(rpl);
1125                 return NULL;
1126         }
1127         strncpy(rpl->rname.s, name->s, name->len);
1128         rpl->rname.len = name->len;
1129         rpl->rname.s[name->len] = '\0';
1130         rpl->next = _redisc_rpl_list;
1131         _redisc_rpl_list = rpl;
1132         return rpl;
1133 }
1134
1135
1136 /**
1137  *
1138  */
1139 int redisc_free_reply(str *name)
1140 {
1141         redisc_reply_t *rpl;
1142         unsigned int hid;
1143
1144         if(name==NULL || name->len==0) {
1145                 LM_ERR("invalid parameters");
1146                 return -1;
1147         }
1148
1149         hid = get_hash1_raw(name->s, name->len);
1150
1151         rpl = _redisc_rpl_list;
1152         while(rpl) {
1153
1154                 if(rpl->hname==hid && rpl->rname.len==name->len
1155                                 && strncmp(rpl->rname.s, name->s, name->len)==0) {
1156                         if(rpl->rplRedis) {
1157                                 freeReplyObject(rpl->rplRedis);
1158                                 rpl->rplRedis = NULL;
1159                         }
1160
1161                         return 0;
1162                 }
1163
1164                 rpl = rpl->next;
1165         }
1166
1167         /* reply entry not found. */
1168         return -1;
1169 }
1170
1171 int redisc_check_auth(redisc_server_t *rsrv, char *pass)
1172 {
1173         redisReply *reply;
1174         int retval = 0;
1175
1176         reply = redisCommand(rsrv->ctxRedis, "AUTH %s", pass);
1177         if (reply->type == REDIS_REPLY_ERROR) {
1178                 LM_ERR("Redis authentication error\n");
1179                 retval = -1;
1180         }
1181         freeReplyObject(reply);
1182         return retval;
1183 }
1184
1185 /* backwards compatibility with hiredis < 0.12 */
1186 #if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12)
1187 int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len)
1188 {
1189         sds newbuf;
1190
1191         newbuf = sdscatlen(c->obuf,cmd,len);
1192         if (newbuf == NULL) {
1193                 c->err = REDIS_ERR_OOM;
1194                 strcpy(c->errstr,"Out of memory");
1195                 return REDIS_ERR;
1196         }
1197         c->obuf = newbuf;
1198         return REDIS_OK;
1199 }
1200 #endif
1201
1202 int redis_check_server(redisc_server_t *rsrv)
1203 {
1204
1205         if (rsrv->disable.disabled)
1206         {
1207                 if (get_ticks() > rsrv->disable.restore_tick)
1208                 {
1209                         LM_NOTICE("REDIS server %.*s re-enabled",
1210                                         rsrv->sname->len, rsrv->sname->s);
1211                         rsrv->disable.disabled = 0;
1212                         rsrv->disable.consecutive_errors = 0;
1213                 }
1214                 else
1215                 {
1216                         return 1;
1217                 }
1218         }
1219         return 0;
1220 }
1221
1222 int redis_count_err_and_disable(redisc_server_t *rsrv)
1223 {
1224         if (redis_allowed_timeouts_param < 0)
1225         {
1226                 return 0;
1227         }
1228
1229         rsrv->disable.consecutive_errors++;
1230         if (rsrv->disable.consecutive_errors > redis_allowed_timeouts_param)
1231         {
1232                 rsrv->disable.disabled=1;
1233                 rsrv->disable.restore_tick=get_ticks() + redis_disable_time_param;
1234                 LM_WARN("REDIS server %.*s disabled for %d seconds", rsrv->sname->len,
1235                                 rsrv->sname->s, redis_disable_time_param);
1236                 return 1;
1237         }
1238         return 0;
1239 }
1240
1241 void print_redis_reply(int log_level, redisReply *rpl,int offset)
1242 {
1243         int i;
1244         char padding[MAXIMUM_NESTED_KEYS + 1];
1245
1246         if(!is_printable(log_level))
1247                 return;
1248
1249         if (!rpl)
1250         {
1251                 LM_ERR("Unexpected null reply");
1252                 return;
1253         }
1254
1255         if (offset > MAXIMUM_NESTED_KEYS)
1256         {
1257                 LM_ERR("Offset is too big");
1258                 return;
1259         }
1260
1261         for (i=0;i<offset;i++)
1262         {
1263                 padding[i]='\t';
1264         }
1265         padding[offset]='\0';
1266
1267         switch (rpl->type)
1268         {
1269         case REDIS_REPLY_STRING:
1270                 LOG(log_level,"%sstring reply: [%s]", padding, rpl->str);
1271                 break;
1272         case REDIS_REPLY_INTEGER:
1273                 LOG(log_level,"%sinteger reply: %lld", padding, rpl->integer);
1274                 break;
1275         case REDIS_REPLY_ARRAY:
1276                 LOG(log_level,"%sarray reply with %d elements", padding,
1277                                 (int)rpl->elements);
1278                 for (i=0; i < rpl->elements; i++)
1279                 {
1280                         LOG(log_level,"%selement %d:",padding,i);
1281                         print_redis_reply(log_level,rpl->element[i],offset+1);
1282                 }
1283                 break;
1284         case REDIS_REPLY_NIL:
1285                 LOG(log_level,"%snil reply",padding);
1286                 break;
1287         case REDIS_REPLY_STATUS:
1288                 LOG(log_level,"%sstatus reply: %s", padding, rpl->str);
1289                 break;
1290         case REDIS_REPLY_ERROR:
1291                 LOG(log_level,"%serror reply: %s", padding, rpl->str);
1292                 break;
1293         }
1294 }