2 * Copyright 2016 (C) Federico Cabiddu <federico.cabiddu@gmail.com>
3 * Copyright 2016 (C) Giacomo Vacca <giacomo.vacca@gmail.com>
4 * Copyright 2016 (C) Orange - Camille Oudot <camille.oudot@orange.com>
6 * This file is part of Kamailio, a free SIP server.
8 * This file is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version
14 * This file is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 * \brief Kamailio http_async_client :: Include file
27 * \ingroup http_async_client
36 #include <sys/socket.h>
37 #include <sys/types.h>
38 #include <netinet/in.h>
39 #include <arpa/inet.h>
42 #include <event2/event.h>
44 #include "../../core/sr_module.h"
45 #include "../../core/dprint.h"
46 #include "../../core/ut.h"
47 #include "../../core/cfg/cfg_struct.h"
48 #include "../../core/receive.h"
49 #include "../../core/fmsg.h"
50 #include "../../core/kemi.h"
51 #include "../../modules/tm/tm_load.h"
53 #include "async_http.h"
56 extern struct tm_binds tmb;
58 struct sip_msg *ah_reply = NULL;
59 str ah_error = {NULL, 0};
61 async_http_worker_t *workers = NULL;
64 struct query_params ah_params;
66 char q_id[MAX_ID_LEN+1];
68 int async_http_init_worker(int prank, async_http_worker_t* worker)
70 LM_DBG("initializing worker process: %d\n", prank);
71 worker->evbase = event_base_new();
72 LM_DBG("base event %p created\n", worker->evbase);
74 worker->g = shm_malloc(sizeof(struct http_m_global));
76 LM_ERR("out of shared memory\n");
79 memset(worker->g, 0, sizeof(http_m_global_t));
80 LM_DBG("initialized global struct %p\n", worker->g);
84 LM_INFO("started worker process: %d\n", prank);
89 void async_http_run_worker(async_http_worker_t* worker)
91 init_http_multi(worker->evbase, worker->g);
92 event_base_dispatch(worker->evbase);
95 int async_http_init_sockets(async_http_worker_t *worker)
97 if (socketpair(PF_UNIX, SOCK_DGRAM, 0, worker->notication_socket) < 0) {
98 LM_ERR("opening tasks dgram socket pair\n");
101 LM_INFO("inter-process event notification sockets initialized\n");
105 static inline char *strfindcasestrz(str *haystack, char *needlez)
111 needle.len = strlen(needlez);
112 for(i=0;i<haystack->len-needle.len;i++) {
113 for(j=0;j<needle.len;j++) {
114 if ( !((haystack->s[i+j]==needle.s[j]) ||
115 ( isalpha((int)haystack->s[i+j])
116 && ((haystack->s[i+j])^(needle.s[j]))==0x20 )) )
120 return haystack->s+i;
125 void async_http_cb(struct http_m_reply *reply, void *param)
127 async_query_t *aq = NULL;
128 cfg_action_t *act = NULL;
132 struct cell *t = NULL;
135 sip_msg_t *fmsg = NULL;
136 sr_kemi_eng_t *keng = NULL;
138 str evname = str_init("http_async_client:callback");
142 /* clean process-local result variables */
145 memset(ah_reply, 0, sizeof(struct sip_msg));
147 keng = sr_kemi_eng_get();
149 ri = route_lookup(&main_rt, aq->cbname);
151 LM_ERR("unable to find route block [%s]\n", aq->cbname);
154 act = main_rt.rlist[ri];
156 LM_ERR("empty action lists in route block [%s]\n", aq->cbname);
161 if (reply->result != NULL) {
162 LM_DBG("query result = %.*s [%d]\n", reply->result->len, reply->result->s, reply->result->len);
165 /* set process-local result variables */
166 if (reply->result == NULL) {
168 ah_error.s = reply->error;
169 ah_error.len = strlen(ah_error.s);
172 /* check for HTTP Via header
173 * - HTTP Via format is different that SIP Via
174 * - workaround: replace with Hia to be ignored by SIP parser
176 if((p=strfindcasestrz(reply->result, "\nVia:"))!=NULL)
180 LM_DBG("replaced HTTP Via with Hia [[\n%.*s]]\n", reply->result->len, reply->result->s);
183 ah_reply->buf = reply->result->s;
184 ah_reply->len = reply->result->len;
186 if (parse_msg(reply->result->s, reply->result->len, ah_reply) != 0) {
187 LM_DBG("failed to parse the http_reply\n");
189 if (ah_reply->first_line.u.reply.statuscode == 100) {
190 newbuf.s = get_body( ah_reply );
191 newbuf.len = reply->result->s + reply->result->len - newbuf.s;
193 if (!(newbuf.len < 0)) {
194 memset(ah_reply, 0, sizeof(struct sip_msg));
195 ah_reply->buf = newbuf.s;
196 ah_reply->len = newbuf.len;
198 if (parse_msg(ah_reply->buf, ah_reply->len, ah_reply) != 0) {
199 LM_DBG("failed to parse the http_reply\n");
201 LM_DBG("successfully parsed http reply %p\n", ah_reply);
204 /* this should not happen! */
205 LM_WARN("something got wrong parsing the 100 Continue: got %d len\n", newbuf.len);
209 LM_DBG("successfully parsed http reply %p\n", ah_reply);
214 strncpy(q_id, aq->id, strlen(aq->id));
216 q_id[strlen(aq->id)] = '\0';
220 if (aq->query_params.suspend_transaction) {
224 if (tmb.t_lookup_ident(&t, tindex, tlabel) < 0) {
225 LM_ERR("transaction not found %d:%d\n", tindex, tlabel);
226 LM_DBG("freeing query %p\n", aq);
227 free_async_query(aq);
235 LM_DBG("resuming transaction (%d:%d)\n", tindex, tlabel);
239 tmb.t_continue(tindex, tlabel, act);
242 cbname.s = aq->cbname;
243 cbname.len = aq->cbname_len;
244 tmb.t_continue_cb(tindex, tlabel, &cbname, &evname);
247 fmsg = faked_msg_next();
250 if (run_top_route(act, fmsg, 0)<0) {
251 LM_ERR("failure inside run_top_route\n");
255 cbname.s = aq->cbname;
256 cbname.len = aq->cbname_len;
257 if(sr_kemi_route(keng, fmsg, EVENT_ROUTE, &cbname, &evname)<0) {
258 LM_ERR("error running event route kemi callback\n");
265 free_sip_msg(ah_reply);
266 free_async_query(aq);
271 void notification_socket_cb(int fd, short event, void *arg)
273 (void)fd; /* unused */
274 (void)event; /* unused */
275 const async_http_worker_t *worker = (async_http_worker_t *) arg;
281 http_m_params_t query_params;
285 if ((received = recvfrom(worker->notication_socket[0],
286 &aq, sizeof(async_query_t*),
288 LM_ERR("failed to read from socket (%d: %s)\n", errno, strerror(errno));
292 if(received != sizeof(async_query_t*)) {
293 LM_ERR("invalid query size %d\n", received);
297 query = ((str)aq->query);
299 memset(&query_params, 0, sizeof(http_m_params_t));
300 query_params.timeout = aq->query_params.timeout;
301 query_params.tls_verify_peer = aq->query_params.tls_verify_peer;
302 query_params.tls_verify_host = aq->query_params.tls_verify_host;
303 query_params.authmethod = aq->query_params.authmethod;
304 query_params.tcp_keepalive = aq->query_params.tcp_keepalive;
305 query_params.tcp_ka_idle = aq->query_params.tcp_ka_idle;
306 query_params.tcp_ka_interval = aq->query_params.tcp_ka_interval;
308 for (i = 0 ; i < aq->query_params.headers.len ; i++) {
309 query_params.headers = curl_slist_append(query_params.headers, aq->query_params.headers.t[i]);
311 query_params.method = aq->query_params.method;
313 if (aq->query_params.tls_client_cert) {
314 len = strlen(aq->query_params.tls_client_cert);
315 query_params.tls_client_cert = shm_malloc(len+1);
317 if(query_params.tls_client_cert == NULL) {
318 LM_ERR("Error allocating query_params.tls_client_cert\n");
322 strncpy(query_params.tls_client_cert, aq->query_params.tls_client_cert, len);
323 query_params.tls_client_cert[len] = '\0';
326 if (aq->query_params.tls_client_key) {
327 len = strlen(aq->query_params.tls_client_key);
328 query_params.tls_client_key = shm_malloc(len+1);
330 if(query_params.tls_client_key == NULL) {
331 LM_ERR("Error allocating query_params.tls_client_key\n");
335 strncpy(query_params.tls_client_key, aq->query_params.tls_client_key, len);
336 query_params.tls_client_key[len] = '\0';
339 if (aq->query_params.tls_ca_path) {
340 len = strlen(aq->query_params.tls_ca_path);
341 query_params.tls_ca_path = shm_malloc(len+1);
343 if(query_params.tls_ca_path == NULL) {
344 LM_ERR("Error allocating query_params.tls_ca_path\n");
348 strncpy(query_params.tls_ca_path, aq->query_params.tls_ca_path, len);
349 query_params.tls_ca_path[len] = '\0';
352 if (aq->query_params.body.s && aq->query_params.body.len > 0) {
353 if (shm_str_dup(&query_params.body, &(aq->query_params.body)) < 0) {
354 LM_ERR("Error allocating query_params.body\n");
359 if (aq->query_params.username) {
360 len = strlen(aq->query_params.username);
361 query_params.username = shm_malloc(len+1);
363 if(query_params.username == NULL) {
364 LM_ERR("error in shm_malloc\n");
368 strncpy(query_params.username, aq->query_params.username, len);
369 query_params.username[len] = '\0';
372 if (aq->query_params.password) {
373 len = strlen(aq->query_params.password);
374 query_params.password = shm_malloc(len+1);
376 if(query_params.password == NULL) {
377 LM_ERR("error in shm_malloc\n");
381 strncpy(query_params.password, aq->query_params.password, len);
382 query_params.password[len] = '\0';
385 LM_DBG("query received: [%.*s] (%p)\n", query.len, query.s, aq);
387 if (new_request(&query, &query_params, async_http_cb, aq) < 0) {
388 LM_ERR("Cannot create request for %.*s\n", query.len, query.s);
389 free_async_query(aq);
393 if (query_params.tls_client_cert) {
394 shm_free(query_params.tls_client_cert);
395 query_params.tls_client_cert = NULL;
397 if (query_params.tls_client_key) {
398 shm_free(query_params.tls_client_key);
399 query_params.tls_client_key = NULL;
401 if (query_params.tls_ca_path) {
402 shm_free(query_params.tls_ca_path);
403 query_params.tls_ca_path = NULL;
405 if (query_params.body.s && query_params.body.len > 0) {
406 shm_free(query_params.body.s);
407 query_params.body.s = NULL;
408 query_params.body.len = 0;
411 if (query_params.username) {
412 shm_free(query_params.username);
413 query_params.username = NULL;
416 if (query_params.password) {
417 shm_free(query_params.password);
418 query_params.password = NULL;
424 int init_socket(async_http_worker_t *worker)
426 worker->socket_event = event_new(worker->evbase, worker->notication_socket[0], EV_READ|EV_PERSIST, notification_socket_cb, worker);
427 event_add(worker->socket_event, NULL);
431 int async_send_query(sip_msg_t *msg, str *query, str *cbname)
434 unsigned int tindex = 0;
435 unsigned int tlabel = 0;
442 LM_ERR("invalid parameters\n");
445 if(cbname->len>=MAX_CBNAME_LEN-1) {
446 LM_ERR("callback name is too long: %d / %.*s\n", cbname->len,
447 cbname->len, cbname->s);
452 if (t==NULL || t==T_UNDEFINED) {
453 LM_DBG("no pre-existing transaction, switching to transaction-less behavior\n");
454 } else if (!ah_params.suspend_transaction) {
455 LM_DBG("transaction won't be suspended\n");
457 if(tmb.t_suspend==NULL) {
458 LM_ERR("http async query is disabled - tm module not loaded\n");
462 if(tmb.t_suspend(msg, &tindex, &tlabel)<0) {
463 LM_ERR("failed to suspend request processing\n");
469 LM_DBG("transaction suspended [%u:%u]\n", tindex, tlabel);
471 dsize = sizeof(async_query_t);
472 aq = (async_query_t*)shm_malloc(dsize);
476 LM_ERR("no more shm\n");
481 if(shm_str_dup(&aq->query, query)<0) {
485 memcpy(aq->cbname, cbname->s, cbname->len);
486 aq->cbname[cbname->len] = '\0';
487 aq->cbname_len = cbname->len;
491 aq->query_params.tls_verify_peer = ah_params.tls_verify_peer;
492 aq->query_params.tls_verify_host = ah_params.tls_verify_host;
493 aq->query_params.suspend_transaction = suspend;
494 aq->query_params.timeout = ah_params.timeout;
495 aq->query_params.tcp_keepalive = ah_params.tcp_keepalive;
496 aq->query_params.tcp_ka_idle = ah_params.tcp_ka_idle;
497 aq->query_params.tcp_ka_interval = ah_params.tcp_ka_interval;
498 aq->query_params.headers = ah_params.headers;
499 aq->query_params.method = ah_params.method;
500 aq->query_params.authmethod = ah_params.authmethod;
503 snprintf(q_id, MAX_ID_LEN+1, "%u-%u", (unsigned int)getpid(), q_idx);
504 strncpy(aq->id, q_id, strlen(q_id));
506 aq->query_params.tls_client_cert = NULL;
507 if (ah_params.tls_client_cert) {
508 len = strlen(ah_params.tls_client_cert);
509 aq->query_params.tls_client_cert = shm_malloc(len+1);
511 if(aq->query_params.tls_client_cert == NULL) {
512 LM_ERR("Error allocating aq->query_params.tls_client_cert\n");
516 strncpy(aq->query_params.tls_client_cert, ah_params.tls_client_cert, len);
517 aq->query_params.tls_client_cert[len] = '\0';
520 aq->query_params.tls_client_key = NULL;
521 if (ah_params.tls_client_key) {
522 len = strlen(ah_params.tls_client_key);
523 aq->query_params.tls_client_key = shm_malloc(len+1);
525 if(aq->query_params.tls_client_key == NULL) {
526 LM_ERR("Error allocating aq->query_params.tls_client_key\n");
530 strncpy(aq->query_params.tls_client_key, ah_params.tls_client_key, len);
531 aq->query_params.tls_client_key[len] = '\0';
534 aq->query_params.tls_ca_path = NULL;
535 if (ah_params.tls_ca_path) {
536 len = strlen(ah_params.tls_ca_path);
537 aq->query_params.tls_ca_path = shm_malloc(len+1);
539 if(aq->query_params.tls_ca_path == NULL) {
540 LM_ERR("Error allocating aq->query_params.tls_ca_path\n");
544 strncpy(aq->query_params.tls_ca_path, ah_params.tls_ca_path, len);
545 aq->query_params.tls_ca_path[len] = '\0';
548 aq->query_params.body.s = NULL;
549 aq->query_params.body.len = 0;
550 if (ah_params.body.s && ah_params.body.len > 0) {
551 if (shm_str_dup(&aq->query_params.body, &(ah_params.body)) < 0) {
552 LM_ERR("Error allocating aq->query_params.body\n");
557 aq->query_params.username = NULL;
558 if (ah_params.username) {
559 len = strlen(ah_params.username);
560 aq->query_params.username = shm_malloc(len+1);
562 if(aq->query_params.username == NULL) {
563 LM_ERR("error in shm_malloc\n");
567 strncpy(aq->query_params.username, ah_params.username, len);
568 aq->query_params.username[len] = '\0';
571 aq->query_params.password = NULL;
572 if (ah_params.password) {
573 len = strlen(ah_params.password);
574 aq->query_params.password = shm_malloc(len+1);
576 if(aq->query_params.password == NULL) {
577 LM_ERR("error in shm_malloc\n");
581 strncpy(aq->query_params.password, ah_params.password, len);
582 aq->query_params.password[len] = '\0';
585 set_query_params(&ah_params);
587 if(async_push_query(aq)<0) {
588 LM_ERR("failed to relay query: %.*s\n", query->len, query->s);
593 /* force exit in config */
597 /* continue route processing */
603 tmb.t_cancel_suspend(tindex, tlabel);
605 free_async_query(aq);
609 int async_push_query(async_query_t *aq)
613 static unsigned long rr = 0; /* round robin */
617 query = ((str)aq->query);
619 worker = rr++ % num_workers;
620 len = write(workers[worker].notication_socket[1], &aq, sizeof(async_query_t*));
622 LM_ERR("failed to pass the query to async workers\n");
625 LM_DBG("query sent [%.*s] (%p) to worker %d\n", query.len, query.s, aq, worker + 1);
629 void init_query_params(struct query_params *p) {
630 memset(&ah_params, 0, sizeof(struct query_params));
634 void set_query_params(struct query_params *p) {
638 p->tls_verify_host = tls_verify_host;
639 p->tls_verify_peer = tls_verify_peer;
640 p->suspend_transaction = 1;
641 p->timeout = http_timeout;
642 p->method = AH_METH_DEFAULT;
643 p->authmethod = default_authmethod;
644 p->tcp_keepalive = tcp_keepalive;
645 p->tcp_ka_idle = tcp_ka_idle;
646 p->tcp_ka_interval = tcp_ka_interval;
648 if (p->tls_client_cert) {
649 shm_free(p->tls_client_cert);
650 p->tls_client_cert = NULL;
652 if (tls_client_cert) {
653 len = strlen(tls_client_cert);
654 p->tls_client_cert = shm_malloc(len+1);
656 if (p->tls_client_cert == NULL) {
657 LM_ERR("Error allocating tls_client_cert\n");
661 strncpy(p->tls_client_cert, tls_client_cert, len);
662 p->tls_client_cert[len] = '\0';
665 if (p->tls_client_key) {
666 shm_free(p->tls_client_key);
667 p->tls_client_key = NULL;
669 if (tls_client_key) {
670 len = strlen(tls_client_key);
671 p->tls_client_key = shm_malloc(len+1);
673 if (p->tls_client_key == NULL) {
674 LM_ERR("Error allocating tls_client_key\n");
678 strncpy(p->tls_client_key, tls_client_key, len);
679 p->tls_client_key[len] = '\0';
682 if (p->tls_ca_path) {
683 shm_free(p->tls_ca_path);
684 p->tls_ca_path = NULL;
687 len = strlen(tls_ca_path);
688 p->tls_ca_path = shm_malloc(len+1);
690 if (p->tls_ca_path == NULL) {
691 LM_ERR("Error allocating tls_ca_path\n");
695 strncpy(p->tls_ca_path, tls_ca_path, len);
696 p->tls_ca_path[len] = '\0';
699 if (p->body.s && p->body.len > 0) {
706 shm_free(p->username);
711 shm_free(p->password);
716 int header_list_add(struct header_list *hl, str* hdr) {
720 hl->t = shm_reallocxf(hl->t, hl->len * sizeof(char*));
722 LM_ERR("shm memory allocation failure\n");
725 hl->t[hl->len - 1] = shm_malloc(hdr->len + 1);
726 tmp = hl->t[hl->len - 1];
728 LM_ERR("shm memory allocation failure\n");
731 memcpy(tmp, hdr->s, hdr->len);
732 *(tmp + hdr->len) = '\0';
734 LM_DBG("stored new http header: [%s]\n", tmp);
738 int query_params_set_method(struct query_params *qp, str *meth) {
739 if (strncasecmp(meth->s, "GET", meth->len) == 0) {
740 qp->method = AH_METH_GET;
741 } else if (strncasecmp(meth->s, "POST",meth->len) == 0) {
742 qp->method = AH_METH_POST;
743 } else if (strncasecmp(meth->s, "PUT", meth->len) == 0) {
744 qp->method = AH_METH_PUT;
745 } else if (strncasecmp(meth->s, "DELETE", meth->len) == 0) {
746 qp->method = AH_METH_DELETE;
748 LM_ERR("Unsupported method: %.*s\n", meth->len, meth->s);