a65323d30a999a0e2bc58986a9cb0f4d9224a1c6
[sip-router] / src / modules / janssonrpcc / janssonrpc_connect.c
1 /**
2  * Copyright (C) 2013 Flowroute LLC (flowroute.com)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * This file is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  *
12  * This file is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  *
21  */
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <stdbool.h>
26 #include <errno.h>
27 #include <string.h>
28 #include <fcntl.h>
29 #include <event.h>
30
31 #include "../../core/sr_module.h"
32 #include "../../core/route.h"
33 #include "../../core/route_struct.h"
34 #include "../../core/resolve.h"
35 #include "../../core/parser/parse_param.h"
36 #include "../../core/mem/mem.h"
37 #include "../../core/lvalue.h"
38
39 #include "netstring.h"
40 #include "janssonrpc.h"
41 #include "janssonrpc_request.h"
42 #include "janssonrpc_io.h"
43 #include "janssonrpc_srv.h"
44 #include "janssonrpc_server.h"
45 #include "janssonrpc_connect.h"
46
47 void wait_server_backoff(unsigned int timeout /* seconds */,
48                 jsonrpc_server_t* server, bool delay);
49
50 void bev_connect(jsonrpc_server_t* server);
51
52 void bev_disconnect(struct bufferevent* bev)
53 {
54         // close bufferevent
55         if(bev != NULL) {
56                 short enabled = bufferevent_get_enabled(bev);
57                 if(EV_READ & enabled)
58                         bufferevent_disable(bev, EV_READ);
59                 if(EV_WRITE & enabled)
60                         bufferevent_disable(bev, EV_WRITE);
61                 bufferevent_free(bev);
62                 bev = NULL;
63         }
64 }
65
66
67 /* This will immediately close a server socket and clean out any pending
68  * requests that are waiting on that socket.
69  * */
70 void force_disconnect(jsonrpc_server_t* server)
71 {
72         if(!server) {
73                 ERR("Trying to disconnect a NULL server.\n");
74                 return;
75         }
76
77         // clear the netstring buffer when disconnecting
78         free_netstring(server->buffer);
79         server->buffer = NULL;
80
81         server->status = JSONRPC_SERVER_DISCONNECTED;
82
83         // close bufferevent
84         bev_disconnect(server->bev);
85         INFO("Disconnected from server %.*s:%d for conn %.*s.\n",
86                         STR(server->addr), server->port, STR(server->conn));
87
88
89         /* clean out requests */
90         jsonrpc_request_t* req = NULL;
91         jsonrpc_request_t* next = NULL;
92         int key = 0;
93         for (key=0; key < JSONRPC_DEFAULT_HTABLE_SIZE; key++) {
94                 for (req = request_table[key]; req != NULL; req = next) {
95                         /* fail_request frees req so need to store
96                            next_req before call */
97                         next = req->next;
98                         if(req->server != NULL && req->server == server) {
99                                 fail_request(JRPC_ERR_SERVER_DISCONNECT, req,
100                                                 "Failing request for server shutdown");
101                         }
102                 }
103         }
104 }
105
106 typedef struct server_backoff_args {
107         struct event* ev;
108         jsonrpc_server_t* server;
109         unsigned int timeout;
110 } server_backoff_args_t;
111
112 void server_backoff_cb(int fd, short event, void *arg)
113 {
114         if(!arg)
115                 return;
116
117         server_backoff_args_t* a = (server_backoff_args_t*)arg;
118         if(!a)
119                 return;
120
121         unsigned int timeout = a->timeout;
122
123         /* exponential backoff */
124         if(timeout < 1) {
125                 timeout = 1;
126         } else {
127                 timeout = timeout * 2;
128                 if(timeout > 60) {
129                         timeout = 60;
130                 }
131         }
132
133         close(fd);
134         CHECK_AND_FREE_EV(a->ev);
135
136         wait_server_backoff(timeout, a->server, false);
137
138         pkg_free(arg);
139 }
140
141 void wait_server_backoff(unsigned int timeout /* seconds */,
142                 jsonrpc_server_t* server, bool delay)
143 {
144         if(!server) {
145                 ERR("Trying to close/reconnect a NULL server\n");
146                 return;
147         }
148
149         if(delay == false) {
150                 if (requests_using_server(server) <= 0) {
151                         if(server->status == JSONRPC_SERVER_RECONNECTING) {
152                                 bev_connect(server);
153                         } else if(server->status == JSONRPC_SERVER_CLOSING) {
154                                 close_server(server);
155                         }
156                         return;
157                 }
158         }
159
160         const struct timeval tv = {timeout, 0};
161
162         server_backoff_args_t* args = pkg_malloc(sizeof(server_backoff_args_t));
163         CHECK_MALLOC_VOID(args);
164         memset(args, 0, sizeof(server_backoff_args_t));
165
166         args->ev = evtimer_new(global_ev_base, server_backoff_cb, (void*)args);
167         CHECK_MALLOC_GOTO(args->ev, error);
168
169         args->server = server;
170         args->timeout = timeout;
171
172         if(evtimer_add(args->ev, &tv)<0) {
173                 ERR("event_add failed while setting request timer (%s).", strerror(errno));
174                 goto error;
175         }
176
177         return;
178
179 error:
180         ERR("schedule_server failed.\n");
181
182         if(args) {
183                 if(args->ev) {
184                         evtimer_del(args->ev);
185                 }
186                 pkg_free(args);
187         }
188
189         if (server->status == JSONRPC_SERVER_CLOSING) {
190                 ERR("Closing server now...\n");
191                 close_server(server);
192         } else if (server->status == JSONRPC_SERVER_RECONNECTING) {
193                 ERR("Reconnecting server now...\n");
194                 force_reconnect(server);
195         }
196 }
197
198 void wait_close(jsonrpc_server_t* server)
199 {
200         if(!server) {
201                 ERR("Trying to close null server.\n");
202                 return;
203         }
204
205         server->status = JSONRPC_SERVER_CLOSING;
206         wait_server_backoff(1, server, false);
207 }
208
209 void wait_reconnect(jsonrpc_server_t* server)
210 {
211         if(!server) {
212                 ERR("Trying to reconnect null server.\n");
213                 return;
214         }
215
216         server->status = JSONRPC_SERVER_RECONNECTING;
217         wait_server_backoff(1, server, false);
218 }
219
220 void connect_servers(jsonrpc_server_group_t** group)
221 {
222         INIT_SERVER_LOOP
223         FOREACH_SERVER_IN(group)
224                 server = wgroup->server;
225                 if(server->status != JSONRPC_SERVER_FAILURE
226                                 && server->status != JSONRPC_SERVER_RECONNECTING)
227                 {
228                         bev_connect(server);
229                 }
230         ENDFOR
231 }
232
233 void force_reconnect(jsonrpc_server_t* server)
234 {
235         INFO("Reconnecting to server %.*s:%d for conn %.*s.\n",
236                         STR(server->addr), server->port, STR(server->conn));
237         force_disconnect(server);
238         bev_connect(server);
239 }
240
241 /* helper for bev_connect_cb() and bev_connect() */
242 void connect_failed(jsonrpc_server_t* server)
243 {
244         bev_disconnect(server->bev);
245
246         server->status = JSONRPC_SERVER_RECONNECTING;
247         wait_server_backoff(JSONRPC_RECONNECT_INTERVAL, server, true);
248 }
249
250 void bev_connect_cb(struct bufferevent* bev, short events, void* arg)
251 {
252         jsonrpc_server_t* server = (jsonrpc_server_t*)arg;
253         if(!arg) {
254                 ERR("Trying to connect null server\n");
255                 return;
256         }
257
258         if (events & (BEV_EVENT_ERROR|BEV_EVENT_EOF)) {
259                 WARN("Connection error for %.*s:%d\n", STR(server->addr), server->port);
260                 if (events & BEV_EVENT_ERROR) {
261                         int err = bufferevent_socket_get_dns_error(bev);
262                         if(err) {
263                                 ERR("DNS error for %.*s: %s\n",
264                                         STR(server->addr), evutil_gai_strerror(err));
265                         }
266                 }
267                 goto failed;
268         } else if(events & BEV_EVENT_CONNECTED) {
269
270                 if (server->status == JSONRPC_SERVER_CONNECTED) {
271                         return;
272                 }
273
274                 server->status = JSONRPC_SERVER_CONNECTED;
275                 INFO("Connected to host %.*s:%d\n",
276                                 STR(server->addr), server->port);
277         }
278
279         return;
280
281 failed:
282         connect_failed(server);
283 }
284
285 void bev_connect(jsonrpc_server_t* server)
286 {
287         if(!server) {
288                 ERR("Trying to connect null server\n");
289                 return;
290         }
291
292         INFO("Connecting to server %.*s:%d for conn %.*s.\n",
293                         STR(server->addr), server->port, STR(server->conn));
294
295         server->bev = bufferevent_socket_new(
296                         global_ev_base,
297                         -1,
298                         BEV_OPT_CLOSE_ON_FREE);
299         if(!(server->bev)) {
300                 ERR("Could not create bufferevent for  %.*s:%d\n", STR(server->addr), server->port);
301                 connect_failed(server);
302                 return;
303         }
304
305         bufferevent_setcb(
306                         server->bev,
307                         bev_read_cb,
308                         NULL,
309                         bev_connect_cb,
310                         server);
311         bufferevent_enable(server->bev, EV_READ|EV_WRITE);
312         if(bufferevent_socket_connect_hostname(
313                         server->bev,
314                         global_evdns_base,
315                         AF_UNSPEC,
316                         server->addr.s,
317                         server->port)<0) {
318                 WARN("Failed to connect to %.*s:%d\n", STR(server->addr), server->port);
319                 connect_failed(server);
320         }
321 }
322