2 * Copyright 2016 (C) Federico Cabiddu <federico.cabiddu@gmail.com>
3 * Copyright 2016 (C) Giacomo Vacca <giacomo.vacca@gmail.com>
4 * Copyright 2016 (C) Orange - Camille Oudot <camille.oudot@orange.com>
6 * This file is part of Kamailio, a free SIP server.
8 * This file is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version
14 * This file is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 * \brief Kamailio http_async_client :: Include file
27 * \ingroup http_async_client
36 #include <sys/socket.h>
37 #include <sys/types.h>
38 #include <netinet/in.h>
39 #include <arpa/inet.h>
42 #include <event2/event.h>
44 #include "../../core/sr_module.h"
45 #include "../../core/dprint.h"
46 #include "../../core/ut.h"
47 #include "../../core/cfg/cfg_struct.h"
48 #include "../../core/receive.h"
49 #include "../../core/fmsg.h"
50 #include "../../core/kemi.h"
51 #include "../../modules/tm/tm_load.h"
53 #include "async_http.h"
56 extern struct tm_binds tmb;
58 struct sip_msg *ah_reply = NULL;
59 str ah_error = {NULL, 0};
61 async_http_worker_t *workers = NULL;
64 struct query_params ah_params;
66 char q_id[MAX_ID_LEN+1];
68 int async_http_init_worker(int prank, async_http_worker_t* worker)
70 LM_DBG("initializing worker process: %d\n", prank);
71 worker->evbase = event_base_new();
72 LM_DBG("base event %p created\n", worker->evbase);
74 worker->g = shm_malloc(sizeof(struct http_m_global));
76 LM_ERR("out of shared memory\n");
79 memset(worker->g, 0, sizeof(http_m_global_t));
80 LM_DBG("initialized global struct %p\n", worker->g);
84 LM_INFO("started worker process: %d\n", prank);
89 void async_http_run_worker(async_http_worker_t* worker)
91 init_http_multi(worker->evbase, worker->g);
92 event_base_dispatch(worker->evbase);
95 int async_http_init_sockets(async_http_worker_t *worker)
97 if (socketpair(PF_UNIX, SOCK_DGRAM, 0, worker->notication_socket) < 0) {
98 LM_ERR("opening tasks dgram socket pair\n");
101 LM_INFO("inter-process event notification sockets initialized\n");
105 static inline char *strfindcasestrz(str *haystack, char *needlez)
111 needle.len = strlen(needlez);
112 for(i=0;i<haystack->len-needle.len;i++) {
113 for(j=0;j<needle.len;j++) {
114 if ( !((haystack->s[i+j]==needle.s[j]) ||
115 ( isalpha((int)haystack->s[i+j])
116 && ((haystack->s[i+j])^(needle.s[j]))==0x20 )) )
120 return haystack->s+i;
125 void async_http_cb(struct http_m_reply *reply, void *param)
127 async_query_t *aq = NULL;
128 cfg_action_t *act = NULL;
132 struct cell *t = NULL;
135 sip_msg_t *fmsg = NULL;
136 sr_kemi_eng_t *keng = NULL;
138 str evname = str_init("http_async_client:callback");
142 /* clean process-local result variables */
145 memset(ah_reply, 0, sizeof(struct sip_msg));
147 keng = sr_kemi_eng_get();
149 ri = route_lookup(&main_rt, aq->cbname);
151 LM_ERR("unable to find route block [%s]\n", aq->cbname);
154 act = main_rt.rlist[ri];
156 LM_ERR("empty action lists in route block [%s]\n", aq->cbname);
161 if (reply->result != NULL) {
162 LM_DBG("query result = %.*s [%d]\n", reply->result->len, reply->result->s, reply->result->len);
165 /* set process-local result variables */
166 if (reply->result == NULL) {
168 ah_error.s = reply->error;
169 ah_error.len = strlen(ah_error.s);
172 /* check for HTTP Via header
173 * - HTTP Via format is different that SIP Via
174 * - workaround: replace with Hia to be ignored by SIP parser
176 if((p=strfindcasestrz(reply->result, "\nVia:"))!=NULL)
180 LM_DBG("replaced HTTP Via with Hia [[\n%.*s]]\n", reply->result->len, reply->result->s);
183 ah_reply->buf = reply->result->s;
184 ah_reply->len = reply->result->len;
186 if (parse_msg(reply->result->s, reply->result->len, ah_reply) != 0) {
187 LM_DBG("failed to parse the http_reply\n");
189 if (ah_reply->first_line.u.reply.statuscode == 100) {
190 newbuf.s = get_body( ah_reply );
191 newbuf.len = reply->result->s + reply->result->len - newbuf.s;
193 if (!(newbuf.len < 0)) {
194 memset(ah_reply, 0, sizeof(struct sip_msg));
195 ah_reply->buf = newbuf.s;
196 ah_reply->len = newbuf.len;
198 if (parse_msg(ah_reply->buf, ah_reply->len, ah_reply) != 0) {
199 LM_DBG("failed to parse the http_reply\n");
201 LM_DBG("successfully parsed http reply %p\n", ah_reply);
204 /* this should not happen! */
205 LM_WARN("something got wrong parsing the 100 Continue: got %d len\n", newbuf.len);
209 LM_DBG("successfully parsed http reply %p\n", ah_reply);
214 strncpy(q_id, aq->id, strlen(aq->id));
216 q_id[strlen(aq->id)] = '\0';
220 if (aq->query_params.suspend_transaction) {
224 if (tmb.t_lookup_ident(&t, tindex, tlabel) < 0) {
225 LM_ERR("transaction not found %d:%d\n", tindex, tlabel);
226 LM_DBG("freeing query %p\n", aq);
227 free_async_query(aq);
230 // we bring the list of AVPs of the transaction to the current context
231 set_avp_list(AVP_TRACK_FROM | AVP_CLASS_URI, &t->uri_avps_from);
232 set_avp_list(AVP_TRACK_TO | AVP_CLASS_URI, &t->uri_avps_to);
233 set_avp_list(AVP_TRACK_FROM | AVP_CLASS_USER, &t->user_avps_from);
234 set_avp_list(AVP_TRACK_TO | AVP_CLASS_USER, &t->user_avps_to);
235 set_avp_list(AVP_TRACK_FROM | AVP_CLASS_DOMAIN, &t->domain_avps_from);
236 set_avp_list(AVP_TRACK_TO | AVP_CLASS_DOMAIN, &t->domain_avps_to);
241 LM_DBG("resuming transaction (%d:%d)\n", tindex, tlabel);
245 tmb.t_continue(tindex, tlabel, act);
248 cbname.s = aq->cbname;
249 cbname.len = aq->cbname_len;
250 tmb.t_continue_cb(tindex, tlabel, &cbname, &evname);
253 fmsg = faked_msg_next();
256 if (run_top_route(act, fmsg, 0)<0) {
257 LM_ERR("failure inside run_top_route\n");
261 cbname.s = aq->cbname;
262 cbname.len = aq->cbname_len;
263 if(sr_kemi_route(keng, fmsg, EVENT_ROUTE, &cbname, &evname)<0) {
264 LM_ERR("error running event route kemi callback\n");
271 free_sip_msg(ah_reply);
272 free_async_query(aq);
277 void notification_socket_cb(int fd, short event, void *arg)
279 (void)fd; /* unused */
280 (void)event; /* unused */
281 const async_http_worker_t *worker = (async_http_worker_t *) arg;
287 http_m_params_t query_params;
291 if ((received = recvfrom(worker->notication_socket[0],
292 &aq, sizeof(async_query_t*),
294 LM_ERR("failed to read from socket (%d: %s)\n", errno, strerror(errno));
298 if(received != sizeof(async_query_t*)) {
299 LM_ERR("invalid query size %d\n", received);
303 query = ((str)aq->query);
305 memset(&query_params, 0, sizeof(http_m_params_t));
306 query_params.timeout = aq->query_params.timeout;
307 query_params.tls_verify_peer = aq->query_params.tls_verify_peer;
308 query_params.tls_verify_host = aq->query_params.tls_verify_host;
309 query_params.authmethod = aq->query_params.authmethod;
310 query_params.tcp_keepalive = aq->query_params.tcp_keepalive;
311 query_params.tcp_ka_idle = aq->query_params.tcp_ka_idle;
312 query_params.tcp_ka_interval = aq->query_params.tcp_ka_interval;
314 for (i = 0 ; i < aq->query_params.headers.len ; i++) {
315 query_params.headers = curl_slist_append(query_params.headers, aq->query_params.headers.t[i]);
317 query_params.method = aq->query_params.method;
319 if (aq->query_params.tls_client_cert) {
320 len = strlen(aq->query_params.tls_client_cert);
321 query_params.tls_client_cert = shm_malloc(len+1);
323 if(query_params.tls_client_cert == NULL) {
324 LM_ERR("Error allocating query_params.tls_client_cert\n");
328 strncpy(query_params.tls_client_cert, aq->query_params.tls_client_cert, len);
329 query_params.tls_client_cert[len] = '\0';
332 if (aq->query_params.tls_client_key) {
333 len = strlen(aq->query_params.tls_client_key);
334 query_params.tls_client_key = shm_malloc(len+1);
336 if(query_params.tls_client_key == NULL) {
337 LM_ERR("Error allocating query_params.tls_client_key\n");
341 strncpy(query_params.tls_client_key, aq->query_params.tls_client_key, len);
342 query_params.tls_client_key[len] = '\0';
345 if (aq->query_params.tls_ca_path) {
346 len = strlen(aq->query_params.tls_ca_path);
347 query_params.tls_ca_path = shm_malloc(len+1);
349 if(query_params.tls_ca_path == NULL) {
350 LM_ERR("Error allocating query_params.tls_ca_path\n");
354 strncpy(query_params.tls_ca_path, aq->query_params.tls_ca_path, len);
355 query_params.tls_ca_path[len] = '\0';
358 if (aq->query_params.body.s && aq->query_params.body.len > 0) {
359 if (shm_str_dup(&query_params.body, &(aq->query_params.body)) < 0) {
360 LM_ERR("Error allocating query_params.body\n");
365 if (aq->query_params.username) {
366 len = strlen(aq->query_params.username);
367 query_params.username = shm_malloc(len+1);
369 if(query_params.username == NULL) {
370 LM_ERR("error in shm_malloc\n");
374 strncpy(query_params.username, aq->query_params.username, len);
375 query_params.username[len] = '\0';
378 if (aq->query_params.password) {
379 len = strlen(aq->query_params.password);
380 query_params.password = shm_malloc(len+1);
382 if(query_params.password == NULL) {
383 LM_ERR("error in shm_malloc\n");
387 strncpy(query_params.password, aq->query_params.password, len);
388 query_params.password[len] = '\0';
391 LM_DBG("query received: [%.*s] (%p)\n", query.len, query.s, aq);
393 if (new_request(&query, &query_params, async_http_cb, aq) < 0) {
394 LM_ERR("Cannot create request for %.*s\n", query.len, query.s);
395 free_async_query(aq);
399 if (query_params.tls_client_cert) {
400 shm_free(query_params.tls_client_cert);
401 query_params.tls_client_cert = NULL;
403 if (query_params.tls_client_key) {
404 shm_free(query_params.tls_client_key);
405 query_params.tls_client_key = NULL;
407 if (query_params.tls_ca_path) {
408 shm_free(query_params.tls_ca_path);
409 query_params.tls_ca_path = NULL;
411 if (query_params.body.s && query_params.body.len > 0) {
412 shm_free(query_params.body.s);
413 query_params.body.s = NULL;
414 query_params.body.len = 0;
417 if (query_params.username) {
418 shm_free(query_params.username);
419 query_params.username = NULL;
422 if (query_params.password) {
423 shm_free(query_params.password);
424 query_params.password = NULL;
430 int init_socket(async_http_worker_t *worker)
432 worker->socket_event = event_new(worker->evbase, worker->notication_socket[0], EV_READ|EV_PERSIST, notification_socket_cb, worker);
433 event_add(worker->socket_event, NULL);
437 int async_send_query(sip_msg_t *msg, str *query, str *cbname)
440 unsigned int tindex = 0;
441 unsigned int tlabel = 0;
448 LM_ERR("invalid parameters\n");
451 if(cbname->len>=MAX_CBNAME_LEN-1) {
452 LM_ERR("callback name is too long: %d / %.*s\n", cbname->len,
453 cbname->len, cbname->s);
458 if (t==NULL || t==T_UNDEFINED) {
459 LM_DBG("no pre-existing transaction, switching to transaction-less behavior\n");
460 } else if (!ah_params.suspend_transaction) {
461 LM_DBG("transaction won't be suspended\n");
463 if(tmb.t_suspend==NULL) {
464 LM_ERR("http async query is disabled - tm module not loaded\n");
468 if(tmb.t_suspend(msg, &tindex, &tlabel)<0) {
469 LM_ERR("failed to suspend request processing\n");
475 LM_DBG("transaction suspended [%u:%u]\n", tindex, tlabel);
477 dsize = sizeof(async_query_t);
478 aq = (async_query_t*)shm_malloc(dsize);
482 LM_ERR("no more shm\n");
487 if(shm_str_dup(&aq->query, query)<0) {
491 memcpy(aq->cbname, cbname->s, cbname->len);
492 aq->cbname[cbname->len] = '\0';
493 aq->cbname_len = cbname->len;
497 aq->query_params.tls_verify_peer = ah_params.tls_verify_peer;
498 aq->query_params.tls_verify_host = ah_params.tls_verify_host;
499 aq->query_params.suspend_transaction = suspend;
500 aq->query_params.timeout = ah_params.timeout;
501 aq->query_params.tcp_keepalive = ah_params.tcp_keepalive;
502 aq->query_params.tcp_ka_idle = ah_params.tcp_ka_idle;
503 aq->query_params.tcp_ka_interval = ah_params.tcp_ka_interval;
504 aq->query_params.headers = ah_params.headers;
505 aq->query_params.method = ah_params.method;
506 aq->query_params.authmethod = ah_params.authmethod;
509 snprintf(q_id, MAX_ID_LEN+1, "%u-%u", (unsigned int)getpid(), q_idx);
510 strncpy(aq->id, q_id, strlen(q_id));
512 aq->query_params.tls_client_cert = NULL;
513 if (ah_params.tls_client_cert) {
514 len = strlen(ah_params.tls_client_cert);
515 aq->query_params.tls_client_cert = shm_malloc(len+1);
517 if(aq->query_params.tls_client_cert == NULL) {
518 LM_ERR("Error allocating aq->query_params.tls_client_cert\n");
522 strncpy(aq->query_params.tls_client_cert, ah_params.tls_client_cert, len);
523 aq->query_params.tls_client_cert[len] = '\0';
526 aq->query_params.tls_client_key = NULL;
527 if (ah_params.tls_client_key) {
528 len = strlen(ah_params.tls_client_key);
529 aq->query_params.tls_client_key = shm_malloc(len+1);
531 if(aq->query_params.tls_client_key == NULL) {
532 LM_ERR("Error allocating aq->query_params.tls_client_key\n");
536 strncpy(aq->query_params.tls_client_key, ah_params.tls_client_key, len);
537 aq->query_params.tls_client_key[len] = '\0';
540 aq->query_params.tls_ca_path = NULL;
541 if (ah_params.tls_ca_path) {
542 len = strlen(ah_params.tls_ca_path);
543 aq->query_params.tls_ca_path = shm_malloc(len+1);
545 if(aq->query_params.tls_ca_path == NULL) {
546 LM_ERR("Error allocating aq->query_params.tls_ca_path\n");
550 strncpy(aq->query_params.tls_ca_path, ah_params.tls_ca_path, len);
551 aq->query_params.tls_ca_path[len] = '\0';
554 aq->query_params.body.s = NULL;
555 aq->query_params.body.len = 0;
556 if (ah_params.body.s && ah_params.body.len > 0) {
557 if (shm_str_dup(&aq->query_params.body, &(ah_params.body)) < 0) {
558 LM_ERR("Error allocating aq->query_params.body\n");
563 aq->query_params.username = NULL;
564 if (ah_params.username) {
565 len = strlen(ah_params.username);
566 aq->query_params.username = shm_malloc(len+1);
568 if(aq->query_params.username == NULL) {
569 LM_ERR("error in shm_malloc\n");
573 strncpy(aq->query_params.username, ah_params.username, len);
574 aq->query_params.username[len] = '\0';
577 aq->query_params.password = NULL;
578 if (ah_params.password) {
579 len = strlen(ah_params.password);
580 aq->query_params.password = shm_malloc(len+1);
582 if(aq->query_params.password == NULL) {
583 LM_ERR("error in shm_malloc\n");
587 strncpy(aq->query_params.password, ah_params.password, len);
588 aq->query_params.password[len] = '\0';
591 set_query_params(&ah_params);
593 if(async_push_query(aq)<0) {
594 LM_ERR("failed to relay query: %.*s\n", query->len, query->s);
599 /* force exit in config */
603 /* continue route processing */
609 tmb.t_cancel_suspend(tindex, tlabel);
611 free_async_query(aq);
615 int async_push_query(async_query_t *aq)
619 static unsigned long rr = 0; /* round robin */
623 query = ((str)aq->query);
625 worker = rr++ % num_workers;
626 len = write(workers[worker].notication_socket[1], &aq, sizeof(async_query_t*));
628 LM_ERR("failed to pass the query to async workers\n");
631 LM_DBG("query sent [%.*s] (%p) to worker %d\n", query.len, query.s, aq, worker + 1);
635 void init_query_params(struct query_params *p) {
636 memset(&ah_params, 0, sizeof(struct query_params));
640 void set_query_params(struct query_params *p) {
644 p->tls_verify_host = tls_verify_host;
645 p->tls_verify_peer = tls_verify_peer;
646 p->suspend_transaction = 1;
647 p->timeout = http_timeout;
648 p->method = AH_METH_DEFAULT;
649 p->authmethod = default_authmethod;
650 p->tcp_keepalive = tcp_keepalive;
651 p->tcp_ka_idle = tcp_ka_idle;
652 p->tcp_ka_interval = tcp_ka_interval;
654 if (p->tls_client_cert) {
655 shm_free(p->tls_client_cert);
656 p->tls_client_cert = NULL;
658 if (tls_client_cert) {
659 len = strlen(tls_client_cert);
660 p->tls_client_cert = shm_malloc(len+1);
662 if (p->tls_client_cert == NULL) {
663 LM_ERR("Error allocating tls_client_cert\n");
667 strncpy(p->tls_client_cert, tls_client_cert, len);
668 p->tls_client_cert[len] = '\0';
671 if (p->tls_client_key) {
672 shm_free(p->tls_client_key);
673 p->tls_client_key = NULL;
675 if (tls_client_key) {
676 len = strlen(tls_client_key);
677 p->tls_client_key = shm_malloc(len+1);
679 if (p->tls_client_key == NULL) {
680 LM_ERR("Error allocating tls_client_key\n");
684 strncpy(p->tls_client_key, tls_client_key, len);
685 p->tls_client_key[len] = '\0';
688 if (p->tls_ca_path) {
689 shm_free(p->tls_ca_path);
690 p->tls_ca_path = NULL;
693 len = strlen(tls_ca_path);
694 p->tls_ca_path = shm_malloc(len+1);
696 if (p->tls_ca_path == NULL) {
697 LM_ERR("Error allocating tls_ca_path\n");
701 strncpy(p->tls_ca_path, tls_ca_path, len);
702 p->tls_ca_path[len] = '\0';
705 if (p->body.s && p->body.len > 0) {
712 shm_free(p->username);
717 shm_free(p->password);
722 int header_list_add(struct header_list *hl, str* hdr) {
726 hl->t = shm_reallocxf(hl->t, hl->len * sizeof(char*));
728 LM_ERR("shm memory allocation failure\n");
731 hl->t[hl->len - 1] = shm_malloc(hdr->len + 1);
732 tmp = hl->t[hl->len - 1];
734 LM_ERR("shm memory allocation failure\n");
737 memcpy(tmp, hdr->s, hdr->len);
738 *(tmp + hdr->len) = '\0';
740 LM_DBG("stored new http header: [%s]\n", tmp);
744 int query_params_set_method(struct query_params *qp, str *meth) {
745 if (strncasecmp(meth->s, "GET", meth->len) == 0) {
746 qp->method = AH_METH_GET;
747 } else if (strncasecmp(meth->s, "POST",meth->len) == 0) {
748 qp->method = AH_METH_POST;
749 } else if (strncasecmp(meth->s, "PUT", meth->len) == 0) {
750 qp->method = AH_METH_PUT;
751 } else if (strncasecmp(meth->s, "DELETE", meth->len) == 0) {
752 qp->method = AH_METH_DELETE;
754 LM_ERR("Unsupported method: %.*s\n", meth->len, meth->s);