janssonrpcc: couple of pkg free in case of errors
[sip-router] / src / modules / janssonrpcc / janssonrpc_io.c
1 /**
2  * Copyright (C) 2013 Flowroute LLC (flowroute.com)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * This file is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  *
12  * This file is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  *
21  */
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <stdbool.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <jansson.h>
30 #include <event.h>
31 #include <event2/dns.h>
32 #include <sys/types.h>
33 #include <sys/socket.h>
34 #include <signal.h>
35
36 #include "../../core/sr_module.h"
37 #include "../../core/route.h"
38 #include "../../core/mem/mem.h"
39 #include "../../core/action.h"
40 #include "../../core/route_struct.h"
41 #include "../../core/lvalue.h"
42 #include "../../core/cfg/cfg_struct.h"
43 #include "../../core/rand/fastrand.h"
44 #include "../tm/tm_load.h"
45 #include "../jansson/jansson_utils.h"
46
47 #include "janssonrpc.h"
48 #include "janssonrpc_request.h"
49 #include "janssonrpc_server.h"
50 #include "janssonrpc_io.h"
51 #include "janssonrpc_connect.h"
52 #include "netstring.h"
53
54 struct tm_binds tmb;
55
56 void cmd_pipe_cb(int fd, short event, void *arg);
57 void io_shutdown(int sig);
58
59 int jsonrpc_io_child_process(int cmd_pipe)
60 {
61         struct event* pipe_ev = NULL;
62
63         global_ev_base = event_base_new();
64         global_evdns_base = evdns_base_new(global_ev_base, 1);
65
66         set_non_blocking(cmd_pipe);
67         pipe_ev = event_new(global_ev_base, cmd_pipe,
68                         EV_READ | EV_PERSIST, cmd_pipe_cb, NULL);
69
70         if(!pipe_ev) {
71                 ERR("Failed to create pipe event\n");
72                 return -1;
73         }
74
75         if(event_add(pipe_ev, NULL)<0) {
76                 ERR("Failed to start pipe event\n");
77                 return -1;
78         }
79
80         connect_servers(global_server_group);
81
82 #if 0
83         /* attach shutdown signal handler */
84         /* The shutdown handler are intended to clean up the remaining memory
85          * in the IO process. However, catching the signals causes unpreditable
86          * behavior in the Kamailio shutdown process, so this should be disabled
87          * except when doing memory debugging. */
88         struct sigaction sa;
89         sigemptyset(&sa.sa_mask);
90         sa.sa_flags = 0;
91         sa.sa_handler = io_shutdown;
92         if(sigaction(SIGTERM, &sa, NULL) == -1) {
93                 ERR("Failed to attach IO shutdown handler to SIGTERM\n");
94         } else if(sigaction(SIGINT, NULL, &sa) == -1) {
95                 ERR("Failed to attach IO shutdown handler to SIGINT\n");
96         }
97 #endif
98
99         if(event_base_dispatch(global_ev_base)<0) {
100                 ERR("IO couldn't start event loop\n");
101                 return -1;
102         }
103         return 0;
104 }
105
106 void io_shutdown(int sig)
107 {
108         INFO("Shutting down JSONRPC IO process...\n");
109         lock_get(jsonrpc_server_group_lock); /* blocking */
110
111         INIT_SERVER_LOOP
112         FOREACH_SERVER_IN(global_server_group)
113                 close_server(server);
114         ENDFOR
115
116         evdns_base_free(global_evdns_base, 0);
117         event_base_loopexit(global_ev_base, NULL);
118         event_base_free(global_ev_base);
119
120         lock_release(jsonrpc_server_group_lock);
121 }
122
123 int send_to_script(pv_value_t* val, jsonrpc_req_cmd_t* req_cmd)
124 {
125         if(!(req_cmd)) return -1;
126
127         if(req_cmd->route.len <= 0) return -1;
128
129         jsonrpc_result_pv.setf(req_cmd->msg, &jsonrpc_result_pv.pvp, (int)EQ_T, val);
130
131         int n = route_lookup(&main_rt, req_cmd->route.s);
132         if(n<0) {
133                 ERR("no such route: %s\n", req_cmd->route.s);
134                 return -1;
135         }
136
137         struct action* route = main_rt.rlist[n];
138
139         if(tmb.t_continue(req_cmd->t_hash, req_cmd->t_label, route)<0) {
140                 ERR("Failed to resume transaction\n");
141                 return -1;
142         }
143         return 0;
144 }
145
146 json_t* internal_error(int code, json_t* data)
147 {
148         json_t* ret = json_object();
149         json_t* inner = json_object();
150         char* message;
151
152         switch(code){
153         case JRPC_ERR_REQ_BUILD:
154                 message = "Failed to build request";
155                 break;
156         case JRPC_ERR_SEND:
157                 message = "Failed to send";
158                 break;
159         case JRPC_ERR_BAD_RESP:
160                 message = "Bad response result";
161                 json_object_set(ret, "data", data);
162                 break;
163         case JRPC_ERR_RETRY:
164                 message = "Retry failed";
165                 break;
166         case JRPC_ERR_SERVER_DISCONNECT:
167                 message = "Server disconnected";
168                 break;
169         case JRPC_ERR_TIMEOUT:
170                 message = "Message timeout";
171                 break;
172         case JRPC_ERR_PARSING:
173                 message = "JSON parse error";
174                 break;
175         case JRPC_ERR_BUG:
176                 message = "There is a bug";
177                 break;
178         default:
179                 ERR("Unrecognized error code: %d\n", code);
180                 message = "Unknown error";
181                 break;
182         }
183
184         json_t* message_js = json_string(message);
185         json_object_set(inner, "message", message_js);
186         if(message_js) json_decref(message_js);
187
188         json_t* code_js = json_integer(code);
189         json_object_set(inner, "code", code_js);
190         if(code_js) json_decref(code_js);
191
192         if(data) {
193                 json_object_set(inner, "data", data);
194         }
195
196         json_object_set(ret, "internal_error", inner);
197         if(inner) json_decref(inner);
198         return ret;
199 }
200
201 void fail_request(int code, jsonrpc_request_t* req, char* err_str)
202 {
203         char* req_s;
204         char* freeme = NULL;
205         pv_value_t val;
206         json_t* error;
207
208         if(!req) {
209 null_req:
210                 WARN("%s: (null)\n", err_str);
211                 goto end;
212         }
213
214         if(!(req->cmd) || (req->cmd->route.len <= 0)) {
215 no_route:
216                 req_s = json_dumps(req->payload, JSON_COMPACT);
217                 if(req_s) {
218                         WARN("%s: \n%s\n", err_str, req_s);
219                         free(req_s);
220                         goto end;
221                 }
222                 goto null_req;
223         }
224
225         error = internal_error(code, req->payload);
226         jsontoval(&val, &freeme, error);
227         if(error) json_decref(error);
228         if(send_to_script(&val, req->cmd)<0) {
229                 goto no_route;
230         }
231
232 end:
233         if(freeme) free(freeme);
234         if(req) {
235                 if(req->cmd) free_req_cmd(req->cmd);
236                 free_request(req);
237         }
238 }
239
240 void timeout_cb(int fd, short event, void *arg)
241 {
242         jsonrpc_request_t* req = (jsonrpc_request_t*)arg;
243         if(!req)
244                 return;
245
246         if(!(req->server)) {
247                 ERR("No server defined for request\n");
248                 return;
249         }
250
251         if(schedule_retry(req)<0) {
252                 fail_request(JRPC_ERR_TIMEOUT, req, "Request timeout");
253         }
254 }
255
256
257 int server_tried(jsonrpc_server_t* server, server_list_t* tried)
258 {
259         if(!server)
260                 return 0;
261
262         int t = 0;
263         for(;tried!=NULL;tried=tried->next)
264         {
265                 if(tried->server &&
266                         server == tried->server)
267                 {
268                         t = 1;
269                 }
270         }
271         return t;
272 }
273
274 /* loadbalance_by_weight() uses an algorithm to randomly pick a server out of
275  * a list based on its relative weight.
276  *
277  * It is loosely inspired by this:
278  * http://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
279  *
280  * The insert_server_group() function provides the ability to get the combined
281  * weight of all the servers off the head of the list, making it possible to
282  * compute in O(n) in the worst case and O(1) in the best.
283  *
284  * A random number out of the total weight is chosen. Each node is inspected and
285  * its weight added to a recurring sum. Once the sum is larger than the random
286  * number the last server that was seen is chosen.
287  *
288  * A weight of 0 will almost never be chosen, unless if maybe all the other
289  * servers are offline.
290  *
291  * The exception is when all the servers in a group have a weight of 0. In
292  * this case, the load should be distributed evenly across each of them. This
293  * requires finding the size of the list beforehand.
294  * */
295 void loadbalance_by_weight(jsonrpc_server_t** s,
296                 jsonrpc_server_group_t* grp, server_list_t* tried)
297 {
298         *s = NULL;
299
300         if(grp == NULL) {
301                 ERR("Trying to pick from an empty group\n");
302                 return;
303         }
304
305         if(grp->type != WEIGHT_GROUP) {
306                 ERR("Trying to pick from a non weight group\n");
307                 return;
308         }
309
310         jsonrpc_server_group_t* head = grp;
311         jsonrpc_server_group_t* cur = grp;
312
313         unsigned int pick = 0;
314         if(head->weight == 0) {
315                 unsigned int size = 0;
316                 size = server_group_size(cur);
317                 if(size == 0) return;
318
319                 pick = fastrand_max(size-1);
320
321                 int i;
322                 for(i=0;
323                         (i <= pick || *s == NULL)
324                                 && cur != NULL;
325                         i++, cur=cur->next)
326                 {
327                         if(cur->server->status == JSONRPC_SERVER_CONNECTED) {
328                                 if(!server_tried(cur->server, tried)
329                                         && (cur->server->hwm <= 0
330                                                 || cur->server->req_count < cur->server->hwm))
331                                 {
332                                         *s = cur->server;
333                                 }
334                         }
335                 }
336         } else {
337                 pick = fastrand_max(head->weight - 1);
338
339                 unsigned int sum = 0;
340                 while(1) {
341                         if(cur == NULL) break;
342                         if(cur->server->status == JSONRPC_SERVER_CONNECTED) {
343                                 if(!server_tried(cur->server, tried)
344                                         && (cur->server->hwm <= 0
345                                                 || cur->server->req_count < cur->server->hwm))
346                                 {
347                                         *s = cur->server;
348                                 }
349                         }
350                         sum += cur->server->weight;
351                         if(sum > pick && *s != NULL) break;
352                         cur = cur->next;
353                 }
354         }
355 }
356
357 int jsonrpc_send(str conn, jsonrpc_request_t* req, bool notify_only)
358 {
359         char* json = NULL;
360         bool sent = false;
361         char* ns = NULL;
362         size_t bytes;
363
364         json = (char*)json_dumps(req->payload, JSON_COMPACT);
365         if(json==NULL) {
366                 LM_ERR("failed to do json dump for request payload\n");
367                 return -1;
368         }
369         bytes = netstring_encode_new(&ns, json, (size_t)strlen(json));
370
371         jsonrpc_server_group_t* c_grp = NULL;
372         if(global_server_group != NULL)
373                 c_grp = *global_server_group;
374         jsonrpc_server_group_t* p_grp = NULL;
375         jsonrpc_server_group_t* w_grp = NULL;
376         jsonrpc_server_t* s = NULL;
377         server_list_t* tried_servers = NULL;
378         DEBUG("SENDING DATA\n");
379         for(; c_grp != NULL; c_grp = c_grp->next) {
380
381                 if(strncmp(conn.s, c_grp->conn.s, c_grp->conn.len) != 0) continue;
382
383                 for(p_grp = c_grp->sub_group; p_grp != NULL; p_grp = p_grp->next)
384                 {
385                         w_grp = p_grp->sub_group;
386                         while(!sent) {
387                                 loadbalance_by_weight(&s, w_grp, tried_servers);
388                                 if (s == NULL || s->status != JSONRPC_SERVER_CONNECTED) {
389                                         break;
390                                 }
391
392                                 if(bufferevent_write(s->bev, ns, bytes) == 0) {
393                                         sent = true;
394                                         if(!notify_only) {
395                                                 s->req_count++;
396                                                 if (s->hwm > 0 && s->req_count >= s->hwm) {
397                                                         WARN("%.*s:%d in connection group %.*s has exceeded its high water mark (%d)\n",
398                                                                         STR(s->addr), s->port,
399                                                                         STR(s->conn), s->hwm);
400                                                 }
401                                         }
402                                         req->server = s;
403                                         break;
404                                 } else {
405                                         addto_server_list(s, &tried_servers);
406                                 }
407                         }
408
409                         if (sent) {
410                                 break;
411                         }
412
413                         WARN("Failed to send to priority group, %d\n", p_grp->priority);
414                         if(p_grp->next != NULL) {
415                                 INFO("Proceeding to next priority group, %d\n",
416                                                 p_grp->next->priority);
417                         }
418                 }
419
420                 if (sent) {
421                         break;
422                 }
423
424         }
425
426         if(!sent) {
427                 WARN("Failed to send to connection group, \"%.*s\"\n",
428                                 STR(conn));
429                 if(schedule_retry(req)<0) {
430                         fail_request(JRPC_ERR_RETRY, req, "Failed to schedule retry");
431                 }
432         }
433
434         free_server_list(tried_servers);
435         if(ns) pkg_free(ns);
436         free(json);
437
438         if (sent) {
439                 if (notify_only == true) { // free the request if using janssonrpc_notification function
440                         free_req_cmd(req->cmd);
441                         free_request(req);
442                 } else {
443                         const struct timeval tv = ms_to_tv(req->timeout);
444
445                         req->timeout_ev = evtimer_new(global_ev_base, timeout_cb, (void*)req);
446                         if(event_add(req->timeout_ev, &tv)<0) {
447                                 ERR("event_add failed while setting request timer (%s).",
448                                                 strerror(errno));
449                                 return -1;
450                         }
451                 }
452         }
453
454         return sent;
455 }
456
457
458 void cmd_pipe_cb(int fd, short event, void *arg)
459 {
460         struct jsonrpc_pipe_cmd *cmd;
461
462         if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
463                 ERR("FATAL ERROR: failed to read from command pipe: %s\n",
464                                 strerror(errno));
465                 return;
466         }
467
468         cfg_update();
469
470         switch(cmd->type) {
471         case CMD_CLOSE:
472                 if(cmd->server) {
473                         wait_close(cmd->server);
474                 }
475                 goto end;
476                 break;
477         case CMD_RECONNECT:
478                 if(cmd->server) {
479                         wait_reconnect(cmd->server);
480                 }
481                 goto end;
482                 break;
483         case CMD_CONNECT:
484                 if(cmd->server) {
485                         bev_connect(cmd->server);
486                 }
487                 goto end;
488                 break;
489         case CMD_UPDATE_SERVER_GROUP:
490                 if(cmd->new_grp) {
491                         jsonrpc_server_group_t* old_grp = *global_server_group;
492                         *global_server_group = cmd->new_grp;
493                         free_server_group(&old_grp);
494                 }
495                 lock_release(jsonrpc_server_group_lock);
496                 goto end;
497                 break;
498
499         case CMD_SEND:
500                 break;
501
502         default:
503                 ERR("Unrecognized pipe command: %d\n", cmd->type);
504                 goto end;
505                 break;
506         }
507
508         /* command is SEND */
509
510         jsonrpc_req_cmd_t* req_cmd = cmd->req_cmd;
511         if(req_cmd == NULL) {
512                 ERR("req_cmd is NULL. Invalid send command\n");
513                 goto end;
514         }
515
516         jsonrpc_request_t* req = NULL;
517         req = create_request(req_cmd);
518         if (!req || !req->payload) {
519                 json_t* error = internal_error(JRPC_ERR_REQ_BUILD, NULL);
520                 pv_value_t val;
521                 char* freeme = NULL;
522                 jsontoval(&val, &freeme, error);
523                 if(req_cmd->route.len <=0 && send_to_script(&val, req_cmd)<0) {
524                         ERR("Failed to build request (method: %.*s, params: %.*s)\n",
525                                         STR(req_cmd->method), STR(req_cmd->params));
526                 }
527                 if(freeme) free(freeme);
528                 if(error) json_decref(error);
529                 free_req_cmd(req_cmd);
530                 if(req) pkg_free(req);
531                 goto end;
532         }
533
534         int sent = jsonrpc_send(req_cmd->conn, req, req_cmd->notify_only);
535
536         char* type;
537         if (sent<0) {
538                 if (req_cmd->notify_only == false) {
539                         type = "Request";
540                 } else {
541                         type = "Notification";
542                 }
543                 WARN("%s could not be sent to connection group: %.*s\n",
544                                 type, STR(req_cmd->conn));
545                 fail_request(JRPC_ERR_SEND, req, "Failed to send request");
546         }
547
548 end:
549         free_pipe_cmd(cmd);
550 }
551
552 int handle_response(json_t* response)
553 {
554         int retval = 0;
555         jsonrpc_request_t* req = NULL;
556         json_t* return_obj = NULL;
557         json_t* internal = NULL;
558         char* freeme = NULL;
559
560
561         /* check if json object */
562         if(!json_is_object(response)){
563                 WARN("jsonrpc response is not an object\n");
564                 return -1;
565         }
566
567         /* check version */
568         json_t* version = json_object_get(response, "jsonrpc");
569         if(!version) {
570                 WARN("jsonrpc response does not have a version.\n");
571                 retval = -1;
572                 goto end;
573         }
574
575         const char* version_s = json_string_value(version);
576         if(!version_s){
577                 WARN("jsonrpc response version is not a string.\n");
578                 retval = -1;
579                 goto end;
580         }
581
582         if (strlen(version_s) != (sizeof(JSONRPC_VERSION)-1)
583                         || strncmp(version_s, JSONRPC_VERSION, sizeof(JSONRPC_VERSION)-1) != 0) {
584                 WARN("jsonrpc response version is not %s. version: %s\n",
585                                 JSONRPC_VERSION, version_s);
586                 retval = -1;
587                 goto end;
588         }
589
590         /* check for an id */
591         json_t* _id = json_object_get(response, "id");
592         if(!_id) {
593                 WARN("jsonrpc response does not have an id.\n");
594                 retval = -1;
595                 goto end;
596         }
597
598         int id = json_integer_value(_id);
599         if (!(req = pop_request(id))) {
600                 /* don't fail the server for an unrecognized id */
601                 retval = 0;
602                 goto end;
603         }
604
605         return_obj = json_object();
606
607         json_t* error = json_object_get(response, "error");
608         // if the error value is null, we don't care
609         bool _error = error && (json_typeof(error) != JSON_NULL);
610
611         json_t* result = json_object_get(response, "result");
612
613         if(_error) {
614                 json_object_set(return_obj, "error", error);
615         }
616
617         if(result) {
618                 json_object_set(return_obj, "result", result);
619         }
620
621         if ((!result && !_error) || (result && _error)) {
622                 WARN("bad response\n");
623                 internal = internal_error(JRPC_ERR_BAD_RESP, req->payload);
624                 json_object_update(return_obj, internal);
625                 if(internal) json_decref(internal);
626         }
627
628         pv_value_t val;
629
630         if(jsontoval(&val, &freeme, return_obj)<0) {
631                 fail_request(
632                                 JRPC_ERR_TO_VAL,
633                                 req,
634                                 "Failed to convert response json to pv\n");
635                 retval = -1;
636                 goto end;
637         }
638
639         char* error_s = NULL;
640
641         if(send_to_script(&val, req->cmd)>=0) {
642                 goto free_and_end;
643         }
644
645         if(_error) {
646                 // get code from error
647                 json_t* _code = json_object_get(error, "code");
648                 if(_code) {
649                         int code = json_integer_value(_code);
650
651                         // check if code is in global_retry_ranges
652                         retry_range_t* tmpr;
653                         for(tmpr = global_retry_ranges;
654                                         tmpr != NULL;
655                                         tmpr = tmpr->next) {
656                                 if((tmpr->start < tmpr->end
657                                                 && tmpr->start <= code && code <= tmpr->end)
658                                 || (tmpr->end < tmpr->start
659                                                 && tmpr->end <= code && code <= tmpr->start)
660                                 || (tmpr->start == tmpr->end && tmpr->start == code)) {
661                                         if(schedule_retry(req)==0) {
662                                                 goto end;
663                                         }
664                                         break;
665                                 }
666                         }
667
668                 }
669                 error_s = json_dumps(error, JSON_COMPACT);
670                 if(error_s) {
671                         WARN("Request received an error: \n%s\n", error_s);
672                         free(error_s);
673                 } else {
674                         fail_request(
675                                         JRPC_ERR_BAD_RESP,
676                                         req,
677                                         "Could not convert 'error' response to string");
678                         retval = -1;
679                         goto end;
680                 }
681         }
682
683
684 free_and_end:
685         free_req_cmd(req->cmd);
686         free_request(req);
687
688 end:
689         if(freeme) free(freeme);
690         if(return_obj) json_decref(return_obj);
691         return retval;
692 }
693
694 void handle_netstring(jsonrpc_server_t* server)
695 {
696         unsigned int old_count = server->req_count;
697         server->req_count--;
698         if (server->hwm > 0
699                         && old_count >= server->hwm
700                         && server->req_count < server->hwm) {
701                 INFO("%.*s:%d in connection group %.*s is back to normal\n",
702                                 STR(server->addr), server->port, STR(server->conn));
703         }
704
705         json_error_t error;
706
707         json_t* res = json_loads(server->buffer->string, 0, &error);
708
709         if (res) {
710                 if(handle_response(res)<0){
711                         ERR("Cannot handle jsonrpc response: %s\n", server->buffer->string);
712                 }
713                 json_decref(res);
714         } else {
715                 ERR("Failed to parse json: %s\n", server->buffer->string);
716                 ERR("PARSE ERROR: %s at %d,%d\n",
717                                 error.text, error.line, error.column);
718         }
719 }
720
721 void bev_read_cb(struct bufferevent* bev, void* arg)
722 {
723         jsonrpc_server_t* server = (jsonrpc_server_t*)arg;
724         int retval = 0;
725         while (retval == 0) {
726                 int retval = netstring_read_evbuffer(bev, &server->buffer);
727
728                 if (retval == NETSTRING_INCOMPLETE) {
729                         return;
730                 } else if (retval < 0) {
731                         char* msg = "";
732                         switch(retval) {
733                         case NETSTRING_ERROR_TOO_LONG:
734                                 msg = "too long";
735                                 break;
736                         case NETSTRING_ERROR_NO_COLON:
737                                 msg = "no colon after length field";
738                                 break;
739                         case NETSTRING_ERROR_TOO_SHORT:
740                                 msg = "too short";
741                                 break;
742                         case NETSTRING_ERROR_NO_COMMA:
743                                 msg = "missing comma";
744                                 break;
745                         case NETSTRING_ERROR_LEADING_ZERO:
746                                 msg = "length field has a leading zero";
747                                 break;
748                         case NETSTRING_ERROR_NO_LENGTH:
749                                 msg = "missing length field";
750                                 break;
751                         default:
752                                 ERR("bad netstring: unknown error (%d)\n", retval);
753                                 goto reconnect;
754                         }
755                         ERR("bad netstring: %s\n", msg);
756 reconnect:
757                         force_reconnect(server);
758                         return;
759                 }
760
761                 handle_netstring(server);
762                 free_netstring(server->buffer);
763                 server->buffer = NULL;
764         }
765 }
766
767 int set_non_blocking(int fd)
768 {
769         int flags;
770
771         flags = fcntl(fd, F_GETFL);
772         if (flags < 0)
773                 return flags;
774         flags |= O_NONBLOCK;
775         if (fcntl(fd, F_SETFL, flags) < 0)
776                 return -1;
777
778         return 0;
779 }
780
781 jsonrpc_pipe_cmd_t* create_pipe_cmd()
782 {
783         jsonrpc_pipe_cmd_t* cmd = NULL;
784         cmd = (jsonrpc_pipe_cmd_t*)shm_malloc(sizeof(jsonrpc_pipe_cmd_t));
785         if(!cmd) {
786                 ERR("Failed to malloc pipe cmd.\n");
787                 return NULL;
788         }
789         memset(cmd, 0, sizeof(jsonrpc_pipe_cmd_t));
790
791         return cmd;
792 }
793
794 void free_pipe_cmd(jsonrpc_pipe_cmd_t* cmd)
795 {
796         if(!cmd) return;
797
798         shm_free(cmd);
799 }
800
801 jsonrpc_req_cmd_t* create_req_cmd()
802 {
803         jsonrpc_req_cmd_t* req_cmd = NULL;
804         req_cmd = (jsonrpc_req_cmd_t*)shm_malloc(sizeof(jsonrpc_req_cmd_t));
805         CHECK_MALLOC_NULL(req_cmd);
806         memset(req_cmd, 0, sizeof(jsonrpc_req_cmd_t));
807
808         req_cmd->conn = null_str;
809         req_cmd->method = null_str;
810         req_cmd->params = null_str;
811         req_cmd->route = null_str;
812         return req_cmd;
813 }
814
815 void free_req_cmd(jsonrpc_req_cmd_t* req_cmd)
816 {
817         if(req_cmd) {
818                 CHECK_AND_FREE(req_cmd->conn.s);
819                 CHECK_AND_FREE(req_cmd->method.s);
820                 CHECK_AND_FREE(req_cmd->params.s);
821                 CHECK_AND_FREE(req_cmd->route.s);
822                 shm_free(req_cmd);
823         }
824 }
825
826 int send_pipe_cmd(cmd_type type, void* data)
827 {
828         char* name = "";
829         jsonrpc_pipe_cmd_t* cmd = NULL;
830         cmd = create_pipe_cmd();
831         CHECK_MALLOC(cmd);
832
833         cmd->type = type;
834
835         switch(type) {
836         case CMD_CONNECT:
837                 cmd->server = (jsonrpc_server_t*)data;
838                 name = "connect";
839                 break;
840         case CMD_RECONNECT:
841                 cmd->server = (jsonrpc_server_t*)data;
842                 name = "reconnect";
843                 break;
844         case CMD_CLOSE:
845                 cmd->server = (jsonrpc_server_t*)data;
846                 name = "close";
847                 break;
848         case CMD_UPDATE_SERVER_GROUP:
849                 cmd->new_grp = (jsonrpc_server_group_t*)data;
850                 name = "update";
851                 break;
852         case CMD_SEND:
853                 cmd->req_cmd = (jsonrpc_req_cmd_t*)data;
854                 name = "send";
855                 break;
856         default:
857                 ERR("Unknown command type %d", type);
858                 goto error;
859         }
860
861         DEBUG("sending %s command\n", name);
862
863         if (write(cmd_pipe, &cmd, sizeof(cmd)) != sizeof(cmd)) {
864                 ERR("Failed to send '%s' cmd to io pipe: %s\n", name, strerror(errno));
865                 goto error;
866         }
867
868         return 0;
869 error:
870         free_pipe_cmd(cmd);
871         return -1;
872 }