018f03e977840998a6649efdaad84c5a9f86e080
[sip-router] / src / modules / janssonrpcc / janssonrpc_io.c
1 /**
2  * Copyright (C) 2013 Flowroute LLC (flowroute.com)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * This file is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  *
12  * This file is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  *
21  */
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <stdbool.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <jansson.h>
30 #include <event.h>
31 #include <event2/dns.h>
32 #include <sys/types.h>
33 #include <sys/socket.h>
34 #include <signal.h>
35
36 #include "../../core/sr_module.h"
37 #include "../../core/route.h"
38 #include "../../core/mem/mem.h"
39 #include "../../core/action.h"
40 #include "../../core/route_struct.h"
41 #include "../../core/lvalue.h"
42 #include "../../core/cfg/cfg_struct.h"
43 #include "../../core/rand/fastrand.h"
44 #include "../tm/tm_load.h"
45 #include "../jansson/jansson_utils.h"
46
47 #include "janssonrpc.h"
48 #include "janssonrpc_request.h"
49 #include "janssonrpc_server.h"
50 #include "janssonrpc_io.h"
51 #include "janssonrpc_connect.h"
52 #include "netstring.h"
53
54 struct tm_binds tmb;
55
56 void cmd_pipe_cb(int fd, short event, void *arg);
57 void io_shutdown(int sig);
58
59 int jsonrpc_io_child_process(int cmd_pipe)
60 {
61         struct event* pipe_ev = NULL;
62
63         global_ev_base = event_base_new();
64         global_evdns_base = evdns_base_new(global_ev_base, 1);
65
66         set_non_blocking(cmd_pipe);
67         pipe_ev = event_new(global_ev_base, cmd_pipe,
68                         EV_READ | EV_PERSIST, cmd_pipe_cb, NULL);
69
70         if(!pipe_ev) {
71                 ERR("Failed to create pipe event\n");
72                 return -1;
73         }
74
75         if(event_add(pipe_ev, NULL)<0) {
76                 ERR("Failed to start pipe event\n");
77                 return -1;
78         }
79
80         connect_servers(global_server_group);
81
82 #if 0
83         /* attach shutdown signal handler */
84         /* The shutdown handler are intended to clean up the remaining memory
85          * in the IO process. However, catching the signals causes unpreditable
86          * behavior in the Kamailio shutdown process, so this should be disabled
87          * except when doing memory debugging. */
88         struct sigaction sa;
89         sigemptyset(&sa.sa_mask);
90         sa.sa_flags = 0;
91         sa.sa_handler = io_shutdown;
92         if(sigaction(SIGTERM, &sa, NULL) == -1) {
93                 ERR("Failed to attach IO shutdown handler to SIGTERM\n");
94         } else if(sigaction(SIGINT, NULL, &sa) == -1) {
95                 ERR("Failed to attach IO shutdown handler to SIGINT\n");
96         }
97 #endif
98
99         if(event_base_dispatch(global_ev_base)<0) {
100                 ERR("IO couldn't start event loop\n");
101                 return -1;
102         }
103         return 0;
104 }
105
106 void io_shutdown(int sig)
107 {
108         INFO("Shutting down JSONRPC IO process...\n");
109         lock_get(jsonrpc_server_group_lock); /* blocking */
110
111         INIT_SERVER_LOOP
112         FOREACH_SERVER_IN(global_server_group)
113                 close_server(server);
114         ENDFOR
115
116         evdns_base_free(global_evdns_base, 0);
117         event_base_loopexit(global_ev_base, NULL);
118         event_base_free(global_ev_base);
119
120         lock_release(jsonrpc_server_group_lock);
121 }
122
123 int send_to_script(pv_value_t* val, jsonrpc_req_cmd_t* req_cmd)
124 {
125         if(!(req_cmd)) return -1;
126
127         if(req_cmd->route.len <= 0) return -1;
128
129         jsonrpc_result_pv.setf(req_cmd->msg, &jsonrpc_result_pv.pvp, (int)EQ_T, val);
130
131         int n = route_lookup(&main_rt, req_cmd->route.s);
132         if(n<0) {
133                 ERR("no such route: %s\n", req_cmd->route.s);
134                 return -1;
135         }
136
137         struct action* route = main_rt.rlist[n];
138
139         if(tmb.t_continue(req_cmd->t_hash, req_cmd->t_label, route)<0) {
140                 ERR("Failed to resume transaction\n");
141                 return -1;
142         }
143         return 0;
144 }
145
146 json_t* internal_error(int code, json_t* data)
147 {
148         json_t* ret = json_object();
149         json_t* inner = json_object();
150         char* message;
151
152         switch(code){
153         case JRPC_ERR_REQ_BUILD:
154                 message = "Failed to build request";
155                 break;
156         case JRPC_ERR_SEND:
157                 message = "Failed to send";
158                 break;
159         case JRPC_ERR_BAD_RESP:
160                 message = "Bad response result";
161                 json_object_set(ret, "data", data);
162                 break;
163         case JRPC_ERR_RETRY:
164                 message = "Retry failed";
165                 break;
166         case JRPC_ERR_SERVER_DISCONNECT:
167                 message = "Server disconnected";
168                 break;
169         case JRPC_ERR_TIMEOUT:
170                 message = "Message timeout";
171                 break;
172         case JRPC_ERR_PARSING:
173                 message = "JSON parse error";
174                 break;
175         case JRPC_ERR_BUG:
176                 message = "There is a bug";
177                 break;
178         default:
179                 ERR("Unrecognized error code: %d\n", code);
180                 message = "Unknown error";
181                 break;
182         }
183
184         json_t* message_js = json_string(message);
185         json_object_set(inner, "message", message_js);
186         if(message_js) json_decref(message_js);
187
188         json_t* code_js = json_integer(code);
189         json_object_set(inner, "code", code_js);
190         if(code_js) json_decref(code_js);
191
192         if(data) {
193                 json_object_set(inner, "data", data);
194         }
195
196         json_object_set(ret, "internal_error", inner);
197         if(inner) json_decref(inner);
198         return ret;
199 }
200
201 void fail_request(int code, jsonrpc_request_t* req, char* err_str)
202 {
203         char* req_s;
204         char* freeme = NULL;
205         pv_value_t val;
206         json_t* error;
207
208         if(!req) {
209 null_req:
210                 WARN("%s: (null)\n", err_str);
211                 goto end;
212         }
213
214         if(!(req->cmd) || (req->cmd->route.len <= 0)) {
215 no_route:
216                 req_s = json_dumps(req->payload, JSON_COMPACT);
217                 if(req_s) {
218                         WARN("%s: \n%s\n", err_str, req_s);
219                         free(req_s);
220                         goto end;
221                 }
222                 goto null_req;
223         }
224
225         error = internal_error(code, req->payload);
226         jsontoval(&val, &freeme, error);
227         if(error) json_decref(error);
228         if(send_to_script(&val, req->cmd)<0) {
229                 goto no_route;
230         }
231
232 end:
233         if(freeme) free(freeme);
234         free_req_cmd(req->cmd);
235         free_request(req);
236 }
237
238 void timeout_cb(int fd, short event, void *arg)
239 {
240         jsonrpc_request_t* req = (jsonrpc_request_t*)arg;
241         if(!req)
242                 return;
243
244         if(!(req->server)) {
245                 ERR("No server defined for request\n");
246                 return;
247         }
248
249         if(schedule_retry(req)<0) {
250                 fail_request(JRPC_ERR_TIMEOUT, req, "Request timeout");
251         }
252 }
253
254
255 int server_tried(jsonrpc_server_t* server, server_list_t* tried)
256 {
257         if(!server)
258                 return 0;
259
260         int t = 0;
261         for(;tried!=NULL;tried=tried->next)
262         {
263                 if(tried->server &&
264                         server == tried->server)
265                 {
266                         t = 1;
267                 }
268         }
269         return t;
270 }
271
272 /* loadbalance_by_weight() uses an algorithm to randomly pick a server out of
273  * a list based on its relative weight.
274  *
275  * It is loosely inspired by this:
276  * http://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
277  *
278  * The insert_server_group() function provides the ability to get the combined
279  * weight of all the servers off the head of the list, making it possible to
280  * compute in O(n) in the worst case and O(1) in the best.
281  *
282  * A random number out of the total weight is chosen. Each node is inspected and
283  * its weight added to a recurring sum. Once the sum is larger than the random
284  * number the last server that was seen is chosen.
285  *
286  * A weight of 0 will almost never be chosen, unless if maybe all the other
287  * servers are offline.
288  *
289  * The exception is when all the servers in a group have a weight of 0. In
290  * this case, the load should be distributed evenly across each of them. This
291  * requires finding the size of the list beforehand.
292  * */
293 void loadbalance_by_weight(jsonrpc_server_t** s,
294                 jsonrpc_server_group_t* grp, server_list_t* tried)
295 {
296         *s = NULL;
297
298         if(grp == NULL) {
299                 ERR("Trying to pick from an empty group\n");
300                 return;
301         }
302
303         if(grp->type != WEIGHT_GROUP) {
304                 ERR("Trying to pick from a non weight group\n");
305                 return;
306         }
307
308         jsonrpc_server_group_t* head = grp;
309         jsonrpc_server_group_t* cur = grp;
310
311         unsigned int pick = 0;
312         if(head->weight == 0) {
313                 unsigned int size = 0;
314                 size = server_group_size(cur);
315                 if(size == 0) return;
316
317                 pick = fastrand_max(size-1);
318
319                 int i;
320                 for(i=0;
321                         (i <= pick || *s == NULL)
322                                 && cur != NULL;
323                         i++, cur=cur->next)
324                 {
325                         if(cur->server->status == JSONRPC_SERVER_CONNECTED) {
326                                 if(!server_tried(cur->server, tried)
327                                         && (cur->server->hwm <= 0
328                                                 || cur->server->req_count < cur->server->hwm))
329                                 {
330                                         *s = cur->server;
331                                 }
332                         }
333                 }
334         } else {
335                 pick = fastrand_max(head->weight - 1);
336
337                 unsigned int sum = 0;
338                 while(1) {
339                         if(cur == NULL) break;
340                         if(cur->server->status == JSONRPC_SERVER_CONNECTED) {
341                                 if(!server_tried(cur->server, tried)
342                                         && (cur->server->hwm <= 0
343                                                 || cur->server->req_count < cur->server->hwm))
344                                 {
345                                         *s = cur->server;
346                                 }
347                         }
348                         sum += cur->server->weight;
349                         if(sum > pick && *s != NULL) break;
350                         cur = cur->next;
351                 }
352         }
353 }
354
355 int jsonrpc_send(str conn, jsonrpc_request_t* req, bool notify_only)
356 {
357         char* json = (char*)json_dumps(req->payload, JSON_COMPACT);
358
359         char* ns;
360         size_t bytes;
361         bytes = netstring_encode_new(&ns, json, (size_t)strlen(json));
362
363         bool sent = false;
364         jsonrpc_server_group_t* c_grp = NULL;
365         if(global_server_group != NULL)
366                 c_grp = *global_server_group;
367         jsonrpc_server_group_t* p_grp = NULL;
368         jsonrpc_server_group_t* w_grp = NULL;
369         jsonrpc_server_t* s = NULL;
370         server_list_t* tried_servers = NULL;
371         DEBUG("SENDING DATA\n");
372         for(; c_grp != NULL; c_grp = c_grp->next) {
373
374                 if(strncmp(conn.s, c_grp->conn.s, c_grp->conn.len) != 0) continue;
375
376                 for(p_grp = c_grp->sub_group; p_grp != NULL; p_grp = p_grp->next)
377                 {
378                         w_grp = p_grp->sub_group;
379                         while(!sent) {
380                                 loadbalance_by_weight(&s, w_grp, tried_servers);
381                                 if (s == NULL || s->status != JSONRPC_SERVER_CONNECTED) {
382                                         break;
383                                 }
384
385                                 if(bufferevent_write(s->bev, ns, bytes) == 0) {
386                                         sent = true;
387                                         if(!notify_only) {
388                                                 s->req_count++;
389                                                 if (s->hwm > 0 && s->req_count >= s->hwm) {
390                                                         WARN("%.*s:%d in connection group %.*s has exceeded its high water mark (%d)\n",
391                                                                         STR(s->addr), s->port,
392                                                                         STR(s->conn), s->hwm);
393                                                 }
394                                         }
395                                         req->server = s;
396                                         break;
397                                 } else {
398                                         addto_server_list(s, &tried_servers);
399                                 }
400                         }
401
402                         if (sent) {
403                                 break;
404                         }
405
406                         WARN("Failed to send to priority group, %d\n", p_grp->priority);
407                         if(p_grp->next != NULL) {
408                                 INFO("Proceeding to next priority group, %d\n",
409                                                 p_grp->next->priority);
410                         }
411                 }
412
413                 if (sent) {
414                         break;
415                 }
416
417         }
418
419         if(!sent) {
420                 WARN("Failed to send to connection group, \"%.*s\"\n",
421                                 STR(conn));
422                 if(schedule_retry(req)<0) {
423                         fail_request(JRPC_ERR_RETRY, req, "Failed to schedule retry");
424                 }
425         }
426
427         free_server_list(tried_servers);
428         if(ns) pkg_free(ns);
429         if(json) free(json);
430
431         if (sent) {
432                 if (notify_only == true) { // free the request if using janssonrpc_notification function
433                         free_req_cmd(req->cmd);
434                         free_request(req);
435                 } else {
436                         const struct timeval tv = ms_to_tv(req->timeout);
437
438                         req->timeout_ev = evtimer_new(global_ev_base, timeout_cb, (void*)req);
439                         if(event_add(req->timeout_ev, &tv)<0) {
440                                 ERR("event_add failed while setting request timer (%s).",
441                                                 strerror(errno));
442                                 return -1;
443                         }
444                 }
445         }
446
447         return sent;
448 }
449
450
451 void cmd_pipe_cb(int fd, short event, void *arg)
452 {
453         struct jsonrpc_pipe_cmd *cmd;
454
455         if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
456                 ERR("FATAL ERROR: failed to read from command pipe: %s\n",
457                                 strerror(errno));
458                 return;
459         }
460
461         cfg_update();
462
463         switch(cmd->type) {
464         case CMD_CLOSE:
465                 if(cmd->server) {
466                         wait_close(cmd->server);
467                 }
468                 goto end;
469                 break;
470         case CMD_RECONNECT:
471                 if(cmd->server) {
472                         wait_reconnect(cmd->server);
473                 }
474                 goto end;
475                 break;
476         case CMD_CONNECT:
477                 if(cmd->server) {
478                         bev_connect(cmd->server);
479                 }
480                 goto end;
481                 break;
482         case CMD_UPDATE_SERVER_GROUP:
483                 if(cmd->new_grp) {
484                         jsonrpc_server_group_t* old_grp = *global_server_group;
485                         *global_server_group = cmd->new_grp;
486                         free_server_group(&old_grp);
487                 }
488                 lock_release(jsonrpc_server_group_lock);
489                 goto end;
490                 break;
491
492         case CMD_SEND:
493                 break;
494
495         default:
496                 ERR("Unrecognized pipe command: %d\n", cmd->type);
497                 goto end;
498                 break;
499         }
500
501         /* command is SEND */
502
503         jsonrpc_req_cmd_t* req_cmd = cmd->req_cmd;
504         if(req_cmd == NULL) {
505                 ERR("req_cmd is NULL. Invalid send command\n");
506                 goto end;
507         }
508
509         jsonrpc_request_t* req = NULL;
510         req = create_request(req_cmd);
511         if (!req || !req->payload) {
512                 json_t* error = internal_error(JRPC_ERR_REQ_BUILD, NULL);
513                 pv_value_t val;
514                 char* freeme = NULL;
515                 jsontoval(&val, &freeme, error);
516                 if(req_cmd->route.len <=0 && send_to_script(&val, req_cmd)<0) {
517                         ERR("Failed to build request (method: %.*s, params: %.*s)\n",
518                                         STR(req_cmd->method), STR(req_cmd->params));
519                 }
520                 if(freeme) free(freeme);
521                 if(error) json_decref(error);
522                 free_req_cmd(req_cmd);
523                 goto end;
524         }
525
526         int sent = jsonrpc_send(req_cmd->conn, req, req_cmd->notify_only);
527
528         char* type;
529         if (sent<0) {
530                 if (req_cmd->notify_only == false) {
531                         type = "Request";
532                 } else {
533                         type = "Notification";
534                 }
535                 WARN("%s could not be sent to connection group: %.*s\n",
536                                 type, STR(req_cmd->conn));
537                 fail_request(JRPC_ERR_SEND, req, "Failed to send request");
538         }
539
540 end:
541         free_pipe_cmd(cmd);
542 }
543
544 int handle_response(json_t* response)
545 {
546         int retval = 0;
547         jsonrpc_request_t* req = NULL;
548         json_t* return_obj = NULL;
549         json_t* internal = NULL;
550         char* freeme = NULL;
551
552
553         /* check if json object */
554         if(!json_is_object(response)){
555                 WARN("jsonrpc response is not an object\n");
556                 return -1;
557         }
558
559         /* check version */
560         json_t* version = json_object_get(response, "jsonrpc");
561         if(!version) {
562                 WARN("jsonrpc response does not have a version.\n");
563                 retval = -1;
564                 goto end;
565         }
566
567         const char* version_s = json_string_value(version);
568         if(!version_s){
569                 WARN("jsonrpc response version is not a string.\n");
570                 retval = -1;
571                 goto end;
572         }
573
574         if (strlen(version_s) != (sizeof(JSONRPC_VERSION)-1)
575                         || strncmp(version_s, JSONRPC_VERSION, sizeof(JSONRPC_VERSION)-1) != 0) {
576                 WARN("jsonrpc response version is not %s. version: %s\n",
577                                 JSONRPC_VERSION, version_s);
578                 retval = -1;
579                 goto end;
580         }
581
582         /* check for an id */
583         json_t* _id = json_object_get(response, "id");
584         if(!_id) {
585                 WARN("jsonrpc response does not have an id.\n");
586                 retval = -1;
587                 goto end;
588         }
589
590         int id = json_integer_value(_id);
591         if (!(req = pop_request(id))) {
592                 /* don't fail the server for an unrecognized id */
593                 retval = 0;
594                 goto end;
595         }
596
597         return_obj = json_object();
598
599         json_t* error = json_object_get(response, "error");
600         // if the error value is null, we don't care
601         bool _error = error && (json_typeof(error) != JSON_NULL);
602
603         json_t* result = json_object_get(response, "result");
604
605         if(_error) {
606                 json_object_set(return_obj, "error", error);
607         }
608
609         if(result) {
610                 json_object_set(return_obj, "result", result);
611         }
612
613         if ((!result && !_error) || (result && _error)) {
614                 WARN("bad response\n");
615                 internal = internal_error(JRPC_ERR_BAD_RESP, req->payload);
616                 json_object_update(return_obj, internal);
617                 if(internal) json_decref(internal);
618         }
619
620         pv_value_t val;
621
622         if(jsontoval(&val, &freeme, return_obj)<0) {
623                 fail_request(
624                                 JRPC_ERR_TO_VAL,
625                                 req,
626                                 "Failed to convert response json to pv\n");
627                 retval = -1;
628                 goto end;
629         }
630
631         char* error_s = NULL;
632
633         if(send_to_script(&val, req->cmd)>=0) {
634                 goto free_and_end;
635         }
636
637         if(_error) {
638                 // get code from error
639                 json_t* _code = json_object_get(error, "code");
640                 if(_code) {
641                         int code = json_integer_value(_code);
642
643                         // check if code is in global_retry_ranges
644                         retry_range_t* tmpr;
645                         for(tmpr = global_retry_ranges;
646                                         tmpr != NULL;
647                                         tmpr = tmpr->next) {
648                                 if((tmpr->start < tmpr->end
649                                                 && tmpr->start <= code && code <= tmpr->end)
650                                 || (tmpr->end < tmpr->start
651                                                 && tmpr->end <= code && code <= tmpr->start)
652                                 || (tmpr->start == tmpr->end && tmpr->start == code)) {
653                                         if(schedule_retry(req)==0) {
654                                                 goto end;
655                                         }
656                                         break;
657                                 }
658                         }
659
660                 }
661                 error_s = json_dumps(error, JSON_COMPACT);
662                 if(error_s) {
663                         WARN("Request received an error: \n%s\n", error_s);
664                         free(error_s);
665                 } else {
666                         fail_request(
667                                         JRPC_ERR_BAD_RESP,
668                                         req,
669                                         "Could not convert 'error' response to string");
670                         retval = -1;
671                         goto end;
672                 }
673         }
674
675
676 free_and_end:
677         free_req_cmd(req->cmd);
678         free_request(req);
679
680 end:
681         if(freeme) free(freeme);
682         if(return_obj) json_decref(return_obj);
683         return retval;
684 }
685
686 void handle_netstring(jsonrpc_server_t* server)
687 {
688         unsigned int old_count = server->req_count;
689         server->req_count--;
690         if (server->hwm > 0
691                         && old_count >= server->hwm
692                         && server->req_count < server->hwm) {
693                 INFO("%.*s:%d in connection group %.*s is back to normal\n",
694                                 STR(server->addr), server->port, STR(server->conn));
695         }
696
697         json_error_t error;
698
699         json_t* res = json_loads(server->buffer->string, 0, &error);
700
701         if (res) {
702                 if(handle_response(res)<0){
703                         ERR("Cannot handle jsonrpc response: %s\n", server->buffer->string);
704                 }
705                 json_decref(res);
706         } else {
707                 ERR("Failed to parse json: %s\n", server->buffer->string);
708                 ERR("PARSE ERROR: %s at %d,%d\n",
709                                 error.text, error.line, error.column);
710         }
711 }
712
713 void bev_read_cb(struct bufferevent* bev, void* arg)
714 {
715         jsonrpc_server_t* server = (jsonrpc_server_t*)arg;
716         int retval = 0;
717         while (retval == 0) {
718                 int retval = netstring_read_evbuffer(bev, &server->buffer);
719
720                 if (retval == NETSTRING_INCOMPLETE) {
721                         return;
722                 } else if (retval < 0) {
723                         char* msg = "";
724                         switch(retval) {
725                         case NETSTRING_ERROR_TOO_LONG:
726                                 msg = "too long";
727                                 break;
728                         case NETSTRING_ERROR_NO_COLON:
729                                 msg = "no colon after length field";
730                                 break;
731                         case NETSTRING_ERROR_TOO_SHORT:
732                                 msg = "too short";
733                                 break;
734                         case NETSTRING_ERROR_NO_COMMA:
735                                 msg = "missing comma";
736                                 break;
737                         case NETSTRING_ERROR_LEADING_ZERO:
738                                 msg = "length field has a leading zero";
739                                 break;
740                         case NETSTRING_ERROR_NO_LENGTH:
741                                 msg = "missing length field";
742                                 break;
743                         case NETSTRING_INCOMPLETE:
744                                 msg = "incomplete";
745                                 break;
746                         default:
747                                 ERR("bad netstring: unknown error (%d)\n", retval);
748                                 goto reconnect;
749                         }
750                         ERR("bad netstring: %s\n", msg);
751 reconnect:
752                         force_reconnect(server);
753                         return;
754                 }
755
756                 handle_netstring(server);
757                 free_netstring(server->buffer);
758                 server->buffer = NULL;
759         }
760 }
761
762 int set_non_blocking(int fd)
763 {
764         int flags;
765
766         flags = fcntl(fd, F_GETFL);
767         if (flags < 0)
768                 return flags;
769         flags |= O_NONBLOCK;
770         if (fcntl(fd, F_SETFL, flags) < 0)
771                 return -1;
772
773         return 0;
774 }
775
776 jsonrpc_pipe_cmd_t* create_pipe_cmd()
777 {
778         jsonrpc_pipe_cmd_t* cmd = NULL;
779         cmd = (jsonrpc_pipe_cmd_t*)shm_malloc(sizeof(jsonrpc_pipe_cmd_t));
780         if(!cmd) {
781                 ERR("Failed to malloc pipe cmd.\n");
782                 return NULL;
783         }
784         memset(cmd, 0, sizeof(jsonrpc_pipe_cmd_t));
785
786         return cmd;
787 }
788
789 void free_pipe_cmd(jsonrpc_pipe_cmd_t* cmd)
790 {
791         if(!cmd) return;
792
793         shm_free(cmd);
794 }
795
796 jsonrpc_req_cmd_t* create_req_cmd()
797 {
798         jsonrpc_req_cmd_t* req_cmd = NULL;
799         req_cmd = (jsonrpc_req_cmd_t*)shm_malloc(sizeof(jsonrpc_req_cmd_t));
800         CHECK_MALLOC_NULL(req_cmd);
801         memset(req_cmd, 0, sizeof(jsonrpc_req_cmd_t));
802
803         req_cmd->conn = null_str;
804         req_cmd->method = null_str;
805         req_cmd->params = null_str;
806         req_cmd->route = null_str;
807         return req_cmd;
808 }
809
810 void free_req_cmd(jsonrpc_req_cmd_t* req_cmd)
811 {
812         if(req_cmd) {
813                 CHECK_AND_FREE(req_cmd->conn.s);
814                 CHECK_AND_FREE(req_cmd->method.s);
815                 CHECK_AND_FREE(req_cmd->params.s);
816                 CHECK_AND_FREE(req_cmd->route.s);
817                 shm_free(req_cmd);
818         }
819 }
820
821 int send_pipe_cmd(cmd_type type, void* data)
822 {
823         char* name = "";
824         jsonrpc_pipe_cmd_t* cmd = NULL;
825         cmd = create_pipe_cmd();
826         CHECK_MALLOC(cmd);
827
828         cmd->type = type;
829
830         switch(type) {
831         case CMD_CONNECT:
832                 cmd->server = (jsonrpc_server_t*)data;
833                 name = "connect";
834                 break;
835         case CMD_RECONNECT:
836                 cmd->server = (jsonrpc_server_t*)data;
837                 name = "reconnect";
838                 break;
839         case CMD_CLOSE:
840                 cmd->server = (jsonrpc_server_t*)data;
841                 name = "close";
842                 break;
843         case CMD_UPDATE_SERVER_GROUP:
844                 cmd->new_grp = (jsonrpc_server_group_t*)data;
845                 name = "update";
846                 break;
847         case CMD_SEND:
848                 cmd->req_cmd = (jsonrpc_req_cmd_t*)data;
849                 name = "send";
850                 break;
851         default:
852                 ERR("Unknown command type %d", type);
853                 goto error;
854         }
855
856         DEBUG("sending %s command\n", name);
857
858         if (write(cmd_pipe, &cmd, sizeof(cmd)) != sizeof(cmd)) {
859                 ERR("Failed to send '%s' cmd to io pipe: %s\n", name, strerror(errno));
860                 goto error;
861         }
862
863         return 0;
864 error:
865         free_pipe_cmd(cmd);
866         return -1;
867 }