2 * Copyright 2016 (C) Federico Cabiddu <federico.cabiddu@gmail.com>
3 * Copyright 2016 (C) Giacomo Vacca <giacomo.vacca@gmail.com>
4 * Copyright 2016 (C) Orange - Camille Oudot <camille.oudot@orange.com>
6 * This file is part of Kamailio, a free SIP server.
8 * This file is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version
14 * This file is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 * \brief Kamailio http_async_client :: Include file
27 * \ingroup http_async_client
36 #include <sys/socket.h>
37 #include <sys/types.h>
38 #include <netinet/in.h>
39 #include <arpa/inet.h>
42 #include <event2/event.h>
44 #include "../../core/sr_module.h"
45 #include "../../core/dprint.h"
46 #include "../../core/ut.h"
47 #include "../../core/cfg/cfg_struct.h"
48 #include "../../core/receive.h"
49 #include "../../core/fmsg.h"
50 #include "../../core/kemi.h"
51 #include "../../modules/tm/tm_load.h"
53 #include "async_http.h"
56 extern struct tm_binds tmb;
58 struct sip_msg *ah_reply = NULL;
59 str ah_error = {NULL, 0};
61 async_http_worker_t *workers = NULL;
64 struct query_params ah_params;
66 char q_id[MAX_ID_LEN+1];
68 int async_http_init_worker(int prank, async_http_worker_t* worker)
70 LM_DBG("initializing worker process: %d\n", prank);
71 worker->evbase = event_base_new();
72 LM_DBG("base event %p created\n", worker->evbase);
74 worker->g = shm_malloc(sizeof(struct http_m_global));
76 LM_ERR("out of shared memory\n");
79 memset(worker->g, 0, sizeof(http_m_global_t));
80 LM_DBG("initialized global struct %p\n", worker->g);
84 LM_INFO("started worker process: %d\n", prank);
89 void async_http_run_worker(async_http_worker_t* worker)
91 init_http_multi(worker->evbase, worker->g);
92 event_base_dispatch(worker->evbase);
95 int async_http_init_sockets(async_http_worker_t *worker)
97 if (socketpair(PF_UNIX, SOCK_DGRAM, 0, worker->notication_socket) < 0) {
98 LM_ERR("opening tasks dgram socket pair\n");
101 LM_INFO("inter-process event notification sockets initialized\n");
105 static inline char *strfindcasestrz(str *haystack, char *needlez)
111 needle.len = strlen(needlez);
112 for(i=0;i<haystack->len-needle.len;i++) {
113 for(j=0;j<needle.len;j++) {
114 if ( !((haystack->s[i+j]==needle.s[j]) ||
115 ( isalpha((int)haystack->s[i+j])
116 && ((haystack->s[i+j])^(needle.s[j]))==0x20 )) )
120 return haystack->s+i;
125 void async_http_cb(struct http_m_reply *reply, void *param)
127 async_query_t *aq = NULL;
128 cfg_action_t *act = NULL;
132 struct cell *t = NULL;
135 sip_msg_t *fmsg = NULL;
136 sr_kemi_eng_t *keng = NULL;
138 str evname = str_init("http_async_client:callback");
142 /* clean process-local result variables */
145 memset(ah_reply, 0, sizeof(struct sip_msg));
147 keng = sr_kemi_eng_get();
149 ri = route_lookup(&main_rt, aq->cbname);
151 LM_ERR("unable to find route block [%s]\n", aq->cbname);
154 act = main_rt.rlist[ri];
156 LM_ERR("empty action lists in route block [%s]\n", aq->cbname);
161 if (reply->result != NULL) {
162 LM_DBG("query result = %.*s [%d]\n", reply->result->len, reply->result->s, reply->result->len);
165 /* set process-local result variables */
166 if (reply->result == NULL) {
168 ah_error.s = reply->error;
169 ah_error.len = strlen(ah_error.s);
172 /* check for HTTP Via header
173 * - HTTP Via format is different that SIP Via
174 * - workaround: replace with Hia to be ignored by SIP parser
176 if((p=strfindcasestrz(reply->result, "\nVia:"))!=NULL)
180 LM_DBG("replaced HTTP Via with Hia [[\n%.*s]]\n", reply->result->len, reply->result->s);
183 ah_reply->buf = reply->result->s;
184 ah_reply->len = reply->result->len;
186 if (parse_msg(reply->result->s, reply->result->len, ah_reply) != 0) {
187 LM_DBG("failed to parse the http_reply\n");
189 if (ah_reply->first_line.u.reply.statuscode == 100) {
190 newbuf.s = get_body( ah_reply );
191 newbuf.len = reply->result->s + reply->result->len - newbuf.s;
193 if (!(newbuf.len < 0)) {
194 memset(ah_reply, 0, sizeof(struct sip_msg));
195 ah_reply->buf = newbuf.s;
196 ah_reply->len = newbuf.len;
198 if (parse_msg(ah_reply->buf, ah_reply->len, ah_reply) != 0) {
199 LM_DBG("failed to parse the http_reply\n");
201 LM_DBG("successfully parsed http reply %p\n", ah_reply);
204 /* this should not happen! */
205 LM_WARN("something got wrong parsing the 100 Continue: got %d len\n", newbuf.len);
209 LM_DBG("successfully parsed http reply %p\n", ah_reply);
214 strncpy(q_id, aq->id, strlen(aq->id));
216 q_id[strlen(aq->id)] = '\0';
220 if (aq->query_params.suspend_transaction) {
224 if (tmb.t_lookup_ident(&t, tindex, tlabel) < 0) {
225 LM_ERR("transaction not found %d:%d\n", tindex, tlabel);
226 LM_DBG("freeing query %p\n", aq);
227 free_async_query(aq);
230 // we bring the list of AVPs of the transaction to the current context
231 set_avp_list(AVP_TRACK_FROM | AVP_CLASS_URI, &t->uri_avps_from);
232 set_avp_list(AVP_TRACK_TO | AVP_CLASS_URI, &t->uri_avps_to);
233 set_avp_list(AVP_TRACK_FROM | AVP_CLASS_USER, &t->user_avps_from);
234 set_avp_list(AVP_TRACK_TO | AVP_CLASS_USER, &t->user_avps_to);
235 set_avp_list(AVP_TRACK_FROM | AVP_CLASS_DOMAIN, &t->domain_avps_from);
236 set_avp_list(AVP_TRACK_TO | AVP_CLASS_DOMAIN, &t->domain_avps_to);
241 LM_DBG("resuming transaction (%d:%d)\n", tindex, tlabel);
245 tmb.t_continue(tindex, tlabel, act);
248 cbname.s = aq->cbname;
249 cbname.len = aq->cbname_len;
250 tmb.t_continue_cb(tindex, tlabel, &cbname, &evname);
253 fmsg = faked_msg_next();
256 if (run_top_route(act, fmsg, 0)<0) {
257 LM_ERR("failure inside run_top_route\n");
261 cbname.s = aq->cbname;
262 cbname.len = aq->cbname_len;
263 if(keng->froute(fmsg, EVENT_ROUTE, &cbname, &evname)<0) {
264 LM_ERR("error running event route kemi callback\n");
271 free_sip_msg(ah_reply);
272 free_async_query(aq);
277 void notification_socket_cb(int fd, short event, void *arg)
279 (void)fd; /* unused */
280 (void)event; /* unused */
281 const async_http_worker_t *worker = (async_http_worker_t *) arg;
287 http_m_params_t query_params;
291 if ((received = recvfrom(worker->notication_socket[0],
292 &aq, sizeof(async_query_t*),
294 LM_ERR("failed to read from socket (%d: %s)\n", errno, strerror(errno));
298 if(received != sizeof(async_query_t*)) {
299 LM_ERR("invalid query size %d\n", received);
303 query = ((str)aq->query);
305 memset(&query_params, 0, sizeof(http_m_params_t));
306 query_params.timeout = aq->query_params.timeout;
307 query_params.tls_verify_peer = aq->query_params.tls_verify_peer;
308 query_params.tls_verify_host = aq->query_params.tls_verify_host;
309 query_params.authmethod = aq->query_params.authmethod;
311 for (i = 0 ; i < aq->query_params.headers.len ; i++) {
312 query_params.headers = curl_slist_append(query_params.headers, aq->query_params.headers.t[i]);
314 query_params.method = aq->query_params.method;
316 if (aq->query_params.tls_client_cert) {
317 len = strlen(aq->query_params.tls_client_cert);
318 query_params.tls_client_cert = shm_malloc(len+1);
320 if(query_params.tls_client_cert == NULL) {
321 LM_ERR("Error allocating query_params.tls_client_cert\n");
325 strncpy(query_params.tls_client_cert, aq->query_params.tls_client_cert, len);
326 query_params.tls_client_cert[len] = '\0';
329 if (aq->query_params.tls_client_key) {
330 len = strlen(aq->query_params.tls_client_key);
331 query_params.tls_client_key = shm_malloc(len+1);
333 if(query_params.tls_client_key == NULL) {
334 LM_ERR("Error allocating query_params.tls_client_key\n");
338 strncpy(query_params.tls_client_key, aq->query_params.tls_client_key, len);
339 query_params.tls_client_key[len] = '\0';
342 if (aq->query_params.tls_ca_path) {
343 len = strlen(aq->query_params.tls_ca_path);
344 query_params.tls_ca_path = shm_malloc(len+1);
346 if(query_params.tls_ca_path == NULL) {
347 LM_ERR("Error allocating query_params.tls_ca_path\n");
351 strncpy(query_params.tls_ca_path, aq->query_params.tls_ca_path, len);
352 query_params.tls_ca_path[len] = '\0';
355 if (aq->query_params.body.s && aq->query_params.body.len > 0) {
356 if (shm_str_dup(&query_params.body, &(aq->query_params.body)) < 0) {
357 LM_ERR("Error allocating query_params.body\n");
362 if (aq->query_params.username) {
363 len = strlen(aq->query_params.username);
364 query_params.username = shm_malloc(len+1);
366 if(query_params.username == NULL) {
367 LM_ERR("error in shm_malloc\n");
371 strncpy(query_params.username, aq->query_params.username, len);
372 query_params.username[len] = '\0';
375 if (aq->query_params.password) {
376 len = strlen(aq->query_params.password);
377 query_params.password = shm_malloc(len+1);
379 if(query_params.password == NULL) {
380 LM_ERR("error in shm_malloc\n");
384 strncpy(query_params.password, aq->query_params.password, len);
385 query_params.password[len] = '\0';
388 LM_DBG("query received: [%.*s] (%p)\n", query.len, query.s, aq);
390 if (new_request(&query, &query_params, async_http_cb, aq) < 0) {
391 LM_ERR("Cannot create request for %.*s\n", query.len, query.s);
392 free_async_query(aq);
396 if (query_params.tls_client_cert) {
397 shm_free(query_params.tls_client_cert);
398 query_params.tls_client_cert = NULL;
400 if (query_params.tls_client_key) {
401 shm_free(query_params.tls_client_key);
402 query_params.tls_client_key = NULL;
404 if (query_params.tls_ca_path) {
405 shm_free(query_params.tls_ca_path);
406 query_params.tls_ca_path = NULL;
408 if (query_params.body.s && query_params.body.len > 0) {
409 shm_free(query_params.body.s);
410 query_params.body.s = NULL;
411 query_params.body.len = 0;
414 if (query_params.username) {
415 shm_free(query_params.username);
416 query_params.username = NULL;
419 if (query_params.password) {
420 shm_free(query_params.password);
421 query_params.password = NULL;
427 int init_socket(async_http_worker_t *worker)
429 worker->socket_event = event_new(worker->evbase, worker->notication_socket[0], EV_READ|EV_PERSIST, notification_socket_cb, worker);
430 event_add(worker->socket_event, NULL);
434 int async_send_query(sip_msg_t *msg, str *query, str *cbname)
437 unsigned int tindex = 0;
438 unsigned int tlabel = 0;
445 LM_ERR("invalid parameters\n");
448 if(cbname->len>=MAX_CBNAME_LEN-1) {
449 LM_ERR("callback name is too long: %d / %.*s\n", cbname->len,
450 cbname->len, cbname->s);
455 if (t==NULL || t==T_UNDEFINED) {
456 LM_DBG("no pre-existing transaction, switching to transaction-less behavior\n");
457 } else if (!ah_params.suspend_transaction) {
458 LM_DBG("transaction won't be suspended\n");
460 if(tmb.t_suspend==NULL) {
461 LM_ERR("http async query is disabled - tm module not loaded\n");
465 if(tmb.t_suspend(msg, &tindex, &tlabel)<0) {
466 LM_ERR("failed to suspend request processing\n");
472 LM_DBG("transaction suspended [%u:%u]\n", tindex, tlabel);
474 dsize = sizeof(async_query_t);
475 aq = (async_query_t*)shm_malloc(dsize);
479 LM_ERR("no more shm\n");
484 if(shm_str_dup(&aq->query, query)<0) {
488 memcpy(aq->cbname, cbname->s, cbname->len);
489 aq->cbname[cbname->len] = '\0';
490 aq->cbname_len = cbname->len;
494 aq->query_params.tls_verify_peer = ah_params.tls_verify_peer;
495 aq->query_params.tls_verify_host = ah_params.tls_verify_host;
496 aq->query_params.suspend_transaction = suspend;
497 aq->query_params.timeout = ah_params.timeout;
498 aq->query_params.headers = ah_params.headers;
499 aq->query_params.method = ah_params.method;
500 aq->query_params.authmethod = ah_params.authmethod;
503 snprintf(q_id, MAX_ID_LEN+1, "%u-%u", (unsigned int)getpid(), q_idx);
504 strncpy(aq->id, q_id, strlen(q_id));
506 aq->query_params.tls_client_cert = NULL;
507 if (ah_params.tls_client_cert) {
508 len = strlen(ah_params.tls_client_cert);
509 aq->query_params.tls_client_cert = shm_malloc(len+1);
511 if(aq->query_params.tls_client_cert == NULL) {
512 LM_ERR("Error allocating aq->query_params.tls_client_cert\n");
516 strncpy(aq->query_params.tls_client_cert, ah_params.tls_client_cert, len);
517 aq->query_params.tls_client_cert[len] = '\0';
520 aq->query_params.tls_client_key = NULL;
521 if (ah_params.tls_client_key) {
522 len = strlen(ah_params.tls_client_key);
523 aq->query_params.tls_client_key = shm_malloc(len+1);
525 if(aq->query_params.tls_client_key == NULL) {
526 LM_ERR("Error allocating aq->query_params.tls_client_key\n");
530 strncpy(aq->query_params.tls_client_key, ah_params.tls_client_key, len);
531 aq->query_params.tls_client_key[len] = '\0';
534 aq->query_params.tls_ca_path = NULL;
535 if (ah_params.tls_ca_path) {
536 len = strlen(ah_params.tls_ca_path);
537 aq->query_params.tls_ca_path = shm_malloc(len+1);
539 if(aq->query_params.tls_ca_path == NULL) {
540 LM_ERR("Error allocating aq->query_params.tls_ca_path\n");
544 strncpy(aq->query_params.tls_ca_path, ah_params.tls_ca_path, len);
545 aq->query_params.tls_ca_path[len] = '\0';
548 aq->query_params.body.s = NULL;
549 aq->query_params.body.len = 0;
550 if (ah_params.body.s && ah_params.body.len > 0) {
551 if (shm_str_dup(&aq->query_params.body, &(ah_params.body)) < 0) {
552 LM_ERR("Error allocating aq->query_params.body\n");
557 aq->query_params.username = NULL;
558 if (ah_params.username) {
559 len = strlen(ah_params.username);
560 aq->query_params.username = shm_malloc(len+1);
562 if(aq->query_params.username == NULL) {
563 LM_ERR("error in shm_malloc\n");
567 strncpy(aq->query_params.username, ah_params.username, len);
568 aq->query_params.username[len] = '\0';
571 aq->query_params.password = NULL;
572 if (ah_params.password) {
573 len = strlen(ah_params.password);
574 aq->query_params.password = shm_malloc(len+1);
576 if(aq->query_params.password == NULL) {
577 LM_ERR("error in shm_malloc\n");
581 strncpy(aq->query_params.password, ah_params.password, len);
582 aq->query_params.password[len] = '\0';
585 set_query_params(&ah_params);
587 if(async_push_query(aq)<0) {
588 LM_ERR("failed to relay query: %.*s\n", query->len, query->s);
593 /* force exit in config */
597 /* continue route processing */
603 tmb.t_cancel_suspend(tindex, tlabel);
605 free_async_query(aq);
609 int async_push_query(async_query_t *aq)
613 static unsigned long rr = 0; /* round robin */
617 query = ((str)aq->query);
619 worker = rr++ % num_workers;
620 len = write(workers[worker].notication_socket[1], &aq, sizeof(async_query_t*));
622 LM_ERR("failed to pass the query to async workers\n");
625 LM_DBG("query sent [%.*s] (%p) to worker %d\n", query.len, query.s, aq, worker + 1);
629 void init_query_params(struct query_params *p) {
630 memset(&ah_params, 0, sizeof(struct query_params));
634 void set_query_params(struct query_params *p) {
638 p->tls_verify_host = tls_verify_host;
639 p->tls_verify_peer = tls_verify_peer;
640 p->suspend_transaction = 1;
641 p->timeout = http_timeout;
642 p->method = AH_METH_DEFAULT;
643 p->authmethod = default_authmethod;
645 if (p->tls_client_cert) {
646 shm_free(p->tls_client_cert);
647 p->tls_client_cert = NULL;
649 if (tls_client_cert) {
650 len = strlen(tls_client_cert);
651 p->tls_client_cert = shm_malloc(len+1);
653 if (p->tls_client_cert == NULL) {
654 LM_ERR("Error allocating tls_client_cert\n");
658 strncpy(p->tls_client_cert, tls_client_cert, len);
659 p->tls_client_cert[len] = '\0';
662 if (p->tls_client_key) {
663 shm_free(p->tls_client_key);
664 p->tls_client_key = NULL;
666 if (tls_client_key) {
667 len = strlen(tls_client_key);
668 p->tls_client_key = shm_malloc(len+1);
670 if (p->tls_client_key == NULL) {
671 LM_ERR("Error allocating tls_client_key\n");
675 strncpy(p->tls_client_key, tls_client_key, len);
676 p->tls_client_key[len] = '\0';
679 if (p->tls_ca_path) {
680 shm_free(p->tls_ca_path);
681 p->tls_ca_path = NULL;
684 len = strlen(tls_ca_path);
685 p->tls_ca_path = shm_malloc(len+1);
687 if (p->tls_ca_path == NULL) {
688 LM_ERR("Error allocating tls_ca_path\n");
692 strncpy(p->tls_ca_path, tls_ca_path, len);
693 p->tls_ca_path[len] = '\0';
696 if (p->body.s && p->body.len > 0) {
703 shm_free(p->username);
708 shm_free(p->password);
713 int header_list_add(struct header_list *hl, str* hdr) {
717 hl->t = shm_reallocxf(hl->t, hl->len * sizeof(char*));
719 LM_ERR("shm memory allocation failure\n");
722 hl->t[hl->len - 1] = shm_malloc(hdr->len + 1);
723 tmp = hl->t[hl->len - 1];
725 LM_ERR("shm memory allocation failure\n");
728 memcpy(tmp, hdr->s, hdr->len);
729 *(tmp + hdr->len) = '\0';
731 LM_DBG("stored new http header: [%s]\n", tmp);
735 int query_params_set_method(struct query_params *qp, str *meth) {
736 if (strncasecmp(meth->s, "GET", meth->len) == 0) {
737 qp->method = AH_METH_GET;
738 } else if (strncasecmp(meth->s, "POST",meth->len) == 0) {
739 qp->method = AH_METH_POST;
740 } else if (strncasecmp(meth->s, "PUT", meth->len) == 0) {
741 qp->method = AH_METH_PUT;
742 } else if (strncasecmp(meth->s, "DELETE", meth->len) == 0) {
743 qp->method = AH_METH_DELETE;
745 LM_ERR("Unsupported method: %.*s\n", meth->len, meth->s);