janssonrpcc: couple of pkg free in case of errors
[sip-router] / src / modules / janssonrpcc / janssonrpc_connect.c
1 /**
2  * Copyright (C) 2013 Flowroute LLC (flowroute.com)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * This file is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  *
12  * This file is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  *
21  */
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <stdbool.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <event.h>
30
31 #include "../../core/sr_module.h"
32 #include "../../core/route.h"
33 #include "../../core/route_struct.h"
34 #include "../../core/resolve.h"
35 #include "../../core/parser/parse_param.h"
36 #include "../../core/mem/mem.h"
37 #include "../../core/lvalue.h"
38
39 #include "netstring.h"
40 #include "janssonrpc.h"
41 #include "janssonrpc_request.h"
42 #include "janssonrpc_io.h"
43 #include "janssonrpc_srv.h"
44 #include "janssonrpc_server.h"
45 #include "janssonrpc_connect.h"
46
47 void wait_server_backoff(unsigned int timeout /* seconds */,
48                 jsonrpc_server_t* server, bool delay);
49
50 void bev_connect(jsonrpc_server_t* server);
51
52 void bev_disconnect(struct bufferevent* bev)
53 {
54         // close bufferevent
55         if(bev != NULL) {
56                 short enabled = bufferevent_get_enabled(bev);
57                 if(EV_READ & enabled)
58                         bufferevent_disable(bev, EV_READ);
59                 if(EV_WRITE & enabled)
60                         bufferevent_disable(bev, EV_WRITE);
61                 bufferevent_free(bev);
62                 bev = NULL;
63         }
64 }
65
66
67 /* This will immediately close a server socket and clean out any pending
68  * requests that are waiting on that socket.
69  * */
70 void force_disconnect(jsonrpc_server_t* server)
71 {
72         if(!server) {
73                 ERR("Trying to disconnect a NULL server.\n");
74                 return;
75         }
76
77         // clear the netstring buffer when disconnecting
78         free_netstring(server->buffer);
79         server->buffer = NULL;
80
81         server->status = JSONRPC_SERVER_DISCONNECTED;
82
83         // close bufferevent
84         bev_disconnect(server->bev);
85         INFO("Disconnected from server %.*s:%d for conn %.*s.\n",
86                         STR(server->addr), server->port, STR(server->conn));
87
88
89         /* clean out requests */
90         jsonrpc_request_t* req = NULL;
91         jsonrpc_request_t* next = NULL;
92         int key = 0;
93         for (key=0; key < JSONRPC_DEFAULT_HTABLE_SIZE; key++) {
94                 for (req = request_table[key]; req != NULL; req = next) {
95                         /* fail_request frees req so need to store
96                            next_req before call */
97                         next = req->next;
98                         if(req->server != NULL && req->server == server) {
99                                 fail_request(JRPC_ERR_SERVER_DISCONNECT, req,
100                                                 "Failing request for server shutdown");
101                         }
102                 }
103         }
104 }
105
106 typedef struct server_backoff_args {
107         struct event* ev;
108         jsonrpc_server_t* server;
109         unsigned int timeout;
110 } server_backoff_args_t;
111
112 void server_backoff_cb(int fd, short event, void *arg)
113 {
114         unsigned int timeout;
115         server_backoff_args_t* a;
116
117         if(!arg)
118                 return;
119
120         a = (server_backoff_args_t*)arg;
121
122         timeout = a->timeout;
123
124         /* exponential backoff */
125         if(timeout < 1) {
126                 timeout = 1;
127         } else {
128                 timeout = timeout * 2;
129                 if(timeout > 60) {
130                         timeout = 60;
131                 }
132         }
133
134         close(fd);
135         CHECK_AND_FREE_EV(a->ev);
136
137         wait_server_backoff(timeout, a->server, false);
138
139         pkg_free(arg);
140 }
141
142 void wait_server_backoff(unsigned int timeout /* seconds */,
143                 jsonrpc_server_t* server, bool delay)
144 {
145         if(!server) {
146                 ERR("Trying to close/reconnect a NULL server\n");
147                 return;
148         }
149
150         if(delay == false) {
151                 if (requests_using_server(server) <= 0) {
152                         if(server->status == JSONRPC_SERVER_RECONNECTING) {
153                                 bev_connect(server);
154                         } else if(server->status == JSONRPC_SERVER_CLOSING) {
155                                 close_server(server);
156                         }
157                         return;
158                 }
159         }
160
161         const struct timeval tv = {timeout, 0};
162
163         server_backoff_args_t* args = pkg_malloc(sizeof(server_backoff_args_t));
164         CHECK_MALLOC_VOID(args);
165         memset(args, 0, sizeof(server_backoff_args_t));
166
167         args->ev = evtimer_new(global_ev_base, server_backoff_cb, (void*)args);
168         CHECK_MALLOC_GOTO(args->ev, error);
169
170         args->server = server;
171         args->timeout = timeout;
172
173         if(evtimer_add(args->ev, &tv)<0) {
174                 ERR("event_add failed while setting request timer (%s).", strerror(errno));
175                 goto error;
176         }
177
178         return;
179
180 error:
181         ERR("schedule_server failed.\n");
182
183         if(args) {
184                 if(args->ev) {
185                         evtimer_del(args->ev);
186                 }
187                 pkg_free(args);
188         }
189
190         if (server->status == JSONRPC_SERVER_CLOSING) {
191                 ERR("Closing server now...\n");
192                 close_server(server);
193         } else if (server->status == JSONRPC_SERVER_RECONNECTING) {
194                 ERR("Reconnecting server now...\n");
195                 force_reconnect(server);
196         }
197 }
198
199 void wait_close(jsonrpc_server_t* server)
200 {
201         if(!server) {
202                 ERR("Trying to close null server.\n");
203                 return;
204         }
205
206         server->status = JSONRPC_SERVER_CLOSING;
207         wait_server_backoff(1, server, false);
208 }
209
210 void wait_reconnect(jsonrpc_server_t* server)
211 {
212         if(!server) {
213                 ERR("Trying to reconnect null server.\n");
214                 return;
215         }
216
217         server->status = JSONRPC_SERVER_RECONNECTING;
218         wait_server_backoff(1, server, false);
219 }
220
221 void connect_servers(jsonrpc_server_group_t** group)
222 {
223         INIT_SERVER_LOOP
224         FOREACH_SERVER_IN(group)
225                 server = wgroup->server;
226                 if(server->status != JSONRPC_SERVER_FAILURE
227                                 && server->status != JSONRPC_SERVER_RECONNECTING)
228                 {
229                         bev_connect(server);
230                 }
231         ENDFOR
232 }
233
234 void force_reconnect(jsonrpc_server_t* server)
235 {
236         INFO("Reconnecting to server %.*s:%d for conn %.*s.\n",
237                         STR(server->addr), server->port, STR(server->conn));
238         force_disconnect(server);
239         bev_connect(server);
240 }
241
242 /* helper for bev_connect_cb() and bev_connect() */
243 void connect_failed(jsonrpc_server_t* server)
244 {
245         bev_disconnect(server->bev);
246
247         server->status = JSONRPC_SERVER_RECONNECTING;
248         wait_server_backoff(JSONRPC_RECONNECT_INTERVAL, server, true);
249 }
250
251 void bev_connect_cb(struct bufferevent* bev, short events, void* arg)
252 {
253         jsonrpc_server_t* server = (jsonrpc_server_t*)arg;
254         if(!arg) {
255                 ERR("Trying to connect null server\n");
256                 return;
257         }
258
259         if (events & (BEV_EVENT_ERROR|BEV_EVENT_EOF)) {
260                 WARN("Connection error for %.*s:%d\n", STR(server->addr), server->port);
261                 if (events & BEV_EVENT_ERROR) {
262                         int err = bufferevent_socket_get_dns_error(bev);
263                         if(err) {
264                                 ERR("DNS error for %.*s: %s\n",
265                                         STR(server->addr), evutil_gai_strerror(err));
266                         }
267                 }
268                 goto failed;
269         } else if(events & BEV_EVENT_CONNECTED) {
270
271                 if (server->status == JSONRPC_SERVER_CONNECTED) {
272                         return;
273                 }
274
275                 server->status = JSONRPC_SERVER_CONNECTED;
276                 INFO("Connected to host %.*s:%d\n",
277                                 STR(server->addr), server->port);
278         }
279
280         return;
281
282 failed:
283         connect_failed(server);
284 }
285
286 void bev_connect(jsonrpc_server_t* server)
287 {
288         if(!server) {
289                 ERR("Trying to connect null server\n");
290                 return;
291         }
292
293         INFO("Connecting to server %.*s:%d for conn %.*s.\n",
294                         STR(server->addr), server->port, STR(server->conn));
295
296         server->bev = bufferevent_socket_new(
297                         global_ev_base,
298                         -1,
299                         BEV_OPT_CLOSE_ON_FREE);
300         if(!(server->bev)) {
301                 ERR("Could not create bufferevent for  %.*s:%d\n", STR(server->addr), server->port);
302                 connect_failed(server);
303                 return;
304         }
305
306         bufferevent_setcb(
307                         server->bev,
308                         bev_read_cb,
309                         NULL,
310                         bev_connect_cb,
311                         server);
312         bufferevent_enable(server->bev, EV_READ|EV_WRITE);
313         if(bufferevent_socket_connect_hostname(
314                         server->bev,
315                         global_evdns_base,
316                         AF_UNSPEC,
317                         server->addr.s,
318                         server->port)<0) {
319                 WARN("Failed to connect to %.*s:%d\n", STR(server->addr), server->port);
320                 connect_failed(server);
321         }
322 }
323