core, lib, modules: restructured source code tree
[sip-router] / src / modules / http_async_client / async_http.c
1 /**
2  * Copyright 2016 (C) Federico Cabiddu <federico.cabiddu@gmail.com>
3  * Copyright 2016 (C) Giacomo Vacca <giacomo.vacca@gmail.com>
4  * Copyright 2016 (C) Orange - Camille Oudot <camille.oudot@orange.com>
5  *
6  * This file is part of Kamailio, a free SIP server.
7  *
8  * This file is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version
12  *
13  *
14  * This file is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22  *
23  */
24
25 /*! \file
26  * \brief  Kamailio http_async_client :: Include file
27  * \ingroup http_async_client
28  */
29
30
31 #include <stdio.h>
32 #include <unistd.h>
33 #include <stdlib.h>
34 #include <string.h>
35
36 #include <sys/socket.h>
37 #include <sys/types.h>
38 #include <netinet/in.h>
39 #include <arpa/inet.h>
40 #include <fcntl.h>
41
42 #include <event2/event.h>
43
44 #include "../../sr_module.h"
45 #include "../../dprint.h"
46 #include "../../ut.h"
47 #include "../../cfg/cfg_struct.h"
48 #include "../../fmsg.h"
49 #include "../../modules/tm/tm_load.h"
50
51 #include "async_http.h"
52
53 /* tm */
54 extern struct tm_binds tmb;
55
56 struct sip_msg *ah_reply = NULL;
57 str ah_error = {NULL, 0};
58
59 async_http_worker_t *workers;
60 int num_workers = 1;
61
62 struct query_params ah_params;
63 unsigned int q_idx;
64 char q_id[MAX_ID_LEN+1];
65
66 int async_http_init_worker(int prank, async_http_worker_t* worker)
67 {
68         LM_DBG("initializing worker process: %d\n", prank);
69         worker->evbase = event_base_new();
70         LM_DBG("base event %p created\n", worker->evbase);
71
72         worker->g = shm_malloc(sizeof(struct http_m_global));
73         memset(worker->g, 0, sizeof(http_m_global_t));
74         LM_DBG("initialized global struct %p\n", worker->g);
75
76         init_socket(worker);
77
78         LM_INFO("started worker process: %d\n", prank);
79
80         return 0;
81 }
82
83 void async_http_run_worker(async_http_worker_t* worker)
84 {
85         init_http_multi(worker->evbase, worker->g);
86         event_base_dispatch(worker->evbase);
87 }
88
89 int async_http_init_sockets(async_http_worker_t *worker)
90 {
91         if (socketpair(PF_UNIX, SOCK_DGRAM, 0, worker->notication_socket) < 0) {
92                 LM_ERR("opening tasks dgram socket pair\n");
93                 return -1;
94         }
95         LM_INFO("inter-process event notification sockets initialized\n");
96         return 0;
97 }
98
99 static inline char *strfindcasestrz(str *haystack, char *needlez)                                                                                                                                                                                                              
100 {
101     int i,j;
102     str needle;
103
104     needle.s = needlez;
105     needle.len = strlen(needlez);
106     for(i=0;i<haystack->len-needle.len;i++) {
107         for(j=0;j<needle.len;j++) {
108             if ( !((haystack->s[i+j]==needle.s[j]) ||
109                     ( isalpha((int)haystack->s[i+j])
110                         && ((haystack->s[i+j])^(needle.s[j]))==0x20 )) )
111                 break;
112         }
113         if (j==needle.len)
114             return haystack->s+i;
115     }
116     return 0;
117 }
118
119 void async_http_cb(struct http_m_reply *reply, void *param)
120 {
121         async_query_t *aq;
122         cfg_action_t *act;
123         unsigned int tindex;
124         unsigned int tlabel;
125         struct cell *t = NULL;
126         char *p;
127         sip_msg_t *fmsg;
128
129         if (reply->result != NULL) {
130                 LM_DBG("query result = %.*s [%d]\n", reply->result->len, reply->result->s, reply->result->len);
131         }
132
133         /* clean process-local result variables */
134         ah_error.s = NULL;
135         ah_error.len = 0;
136         memset(ah_reply, 0, sizeof(struct sip_msg));
137
138         /* set process-local result variables */
139         if (reply->result == NULL) {
140                 /* error */
141                 ah_error.s = reply->error;
142                 ah_error.len = strlen(ah_error.s);
143         } else {
144                 /* success */
145                 
146                 /* check for HTTP Via header
147          * - HTTP Via format is different that SIP Via
148          * - workaround: replace with Hia to be ignored by SIP parser
149          */
150         if((p=strfindcasestrz(reply->result, "\nVia:"))!=NULL)
151         {
152                 p++;
153                 *p = 'H';
154                 LM_DBG("replaced HTTP Via with Hia [[\n%.*s]]\n", reply->result->len, reply->result->s);
155         }
156
157                 ah_reply->buf = reply->result->s;
158                 ah_reply->len = reply->result->len;
159
160                 if (parse_msg(reply->result->s, reply->result->len, ah_reply) != 0) {
161                         LM_DBG("failed to parse the http_reply\n");
162                 } else {
163                         LM_DBG("successfully parsed http reply %p\n", ah_reply);
164                 }
165         }
166
167         aq = param;
168         strncpy(q_id, aq->id, strlen(aq->id));
169         
170         act = (cfg_action_t*)aq->param;
171
172         if (aq->query_params.suspend_transaction) {
173                 tindex = aq->tindex;
174                 tlabel = aq->tlabel;
175
176                 if (tmb.t_lookup_ident(&t, tindex, tlabel) < 0) {
177                         LM_ERR("transaction not found %d:%d\n", tindex, tlabel);
178                         LM_DBG("freeing query %p\n", aq);
179                         free_async_query(aq);
180                         return;
181                 }
182                 // we bring the list of AVPs of the transaction to the current context
183                 set_avp_list(AVP_TRACK_FROM | AVP_CLASS_URI, &t->uri_avps_from);
184                 set_avp_list(AVP_TRACK_TO | AVP_CLASS_URI, &t->uri_avps_to);
185                 set_avp_list(AVP_TRACK_FROM | AVP_CLASS_USER, &t->user_avps_from);
186                 set_avp_list(AVP_TRACK_TO | AVP_CLASS_USER, &t->user_avps_to);
187                 set_avp_list(AVP_TRACK_FROM | AVP_CLASS_DOMAIN, &t->domain_avps_from);
188                 set_avp_list(AVP_TRACK_TO | AVP_CLASS_DOMAIN, &t->domain_avps_to);
189
190                 if (t)
191                         tmb.unref_cell(t);
192
193                 LM_DBG("resuming transaction (%d:%d)\n", tindex, tlabel);
194
195                 if(act!=NULL)
196                         tmb.t_continue(tindex, tlabel, act);
197         } else {
198                 fmsg = faked_msg_next();
199                 if (run_top_route(act, fmsg, 0)<0)
200                         LM_ERR("failure inside run_top_route\n");
201         }
202
203         free_sip_msg(ah_reply);
204         free_async_query(aq);
205
206         return;
207 }
208
209 void notification_socket_cb(int fd, short event, void *arg)
210 {
211         (void)fd; /* unused */
212         (void)event; /* unused */
213         const async_http_worker_t *worker = (async_http_worker_t *) arg;
214
215         int received;
216         int i, len;
217         async_query_t *aq;
218
219         http_m_params_t query_params;
220
221         str query;
222
223         if ((received = recvfrom(worker->notication_socket[0],
224                         &aq, sizeof(async_query_t*),
225                         0, NULL, 0)) < 0) {
226                 LM_ERR("failed to read from socket (%d: %s)\n", errno, strerror(errno));
227                 return;
228         }
229
230         if(received != sizeof(async_query_t*)) {
231                 LM_ERR("invalid query size %d\n", received);
232                 return;
233         }
234
235         query = ((str)aq->query);
236
237         query_params.timeout = aq->query_params.timeout;
238         query_params.tls_verify_peer = aq->query_params.tls_verify_peer;
239         query_params.tls_verify_host = aq->query_params.tls_verify_host;
240         query_params.authmethod = aq->query_params.authmethod;
241
242         query_params.headers = NULL;
243         for (i = 0 ; i < aq->query_params.headers.len ; i++) {
244                 query_params.headers = curl_slist_append(query_params.headers, aq->query_params.headers.t[i]);
245         }
246         query_params.method  = aq->query_params.method;
247
248         query_params.tls_client_cert.s = NULL;
249         query_params.tls_client_cert.len = 0;
250         if (aq->query_params.tls_client_cert.s && aq->query_params.tls_client_cert.len > 0) {
251                 if (shm_str_dup(&query_params.tls_client_cert, &(aq->query_params.tls_client_cert)) < 0) {
252                         LM_ERR("Error allocating query_params.tls_client_cert\n");
253                         goto done;
254                 }
255         }
256
257         query_params.tls_client_key.s = NULL;
258         query_params.tls_client_key.len = 0;
259         if (aq->query_params.tls_client_key.s && aq->query_params.tls_client_key.len > 0) {
260                 if (shm_str_dup(&query_params.tls_client_key, &(aq->query_params.tls_client_key)) < 0) {
261                         LM_ERR("Error allocating query_params.tls_client_key\n");
262                         goto done;
263                 }
264         }
265
266         query_params.tls_ca_path.s = NULL;
267         query_params.tls_ca_path.len = 0;
268         if (aq->query_params.tls_ca_path.s && aq->query_params.tls_ca_path.len > 0) {
269                 if (shm_str_dup(&query_params.tls_ca_path, &(aq->query_params.tls_ca_path)) < 0) {
270                         LM_ERR("Error allocating query_params.tls_ca_path\n");
271                         goto done;
272                 }
273         }
274
275         query_params.body.s = NULL;
276         query_params.body.len = 0;
277         if (aq->query_params.body.s && aq->query_params.body.len > 0) {
278                 if (shm_str_dup(&query_params.body, &(aq->query_params.body)) < 0) {
279                         LM_ERR("Error allocating query_params.body\n");
280                         goto done;
281                 }
282         }
283   
284         if (aq->query_params.username) {
285                 len = strlen(aq->query_params.username);
286                 query_params.username = shm_malloc(len+1);
287         
288                 if(query_params.username == NULL) {
289                         LM_ERR("error in shm_malloc\n");
290                         goto done;
291                 }
292
293                 strncpy(query_params.username, aq->query_params.username, len);
294                 query_params.username[len] = '\0';
295         }
296         
297         if (aq->query_params.password) {
298                 len = strlen(aq->query_params.password);
299                 query_params.password = shm_malloc(len+1);
300         
301                 if(query_params.password == NULL) {
302                         LM_ERR("error in shm_malloc\n");
303                         goto done;
304                 }
305
306                 strncpy(query_params.password, aq->query_params.password, len);
307                 query_params.password[len] = '\0';
308         }
309
310         LM_DBG("query received: [%.*s] (%p)\n", query.len, query.s, aq);
311
312         if (new_request(&query, &query_params, async_http_cb, aq) < 0) {
313                 LM_ERR("Cannot create request for %.*s\n", query.len, query.s);
314                 free_async_query(aq);
315         }
316
317 done:
318         if (query_params.tls_client_cert.s && query_params.tls_client_cert.len > 0) {
319                 shm_free(query_params.tls_client_cert.s);
320                 query_params.tls_client_cert.s = NULL;
321                 query_params.tls_client_cert.len = 0;
322         }
323         if (query_params.tls_client_key.s && query_params.tls_client_key.len > 0) {
324                 shm_free(query_params.tls_client_key.s);
325                 query_params.tls_client_key.s = NULL;
326                 query_params.tls_client_key.len = 0;
327         }
328         if (query_params.tls_ca_path.s && query_params.tls_ca_path.len > 0) {
329                 shm_free(query_params.tls_ca_path.s);
330                 query_params.tls_ca_path.s = NULL;
331                 query_params.tls_ca_path.len = 0;
332         }
333         if (query_params.body.s && query_params.body.len > 0) {
334                 shm_free(query_params.body.s);
335                 query_params.body.s = NULL;
336                 query_params.body.len = 0;
337         }
338
339         if (query_params.username) {
340                 shm_free(query_params.username);
341                 query_params.username = NULL;
342         }
343         
344         if (query_params.password) {
345                 shm_free(query_params.password);
346                 query_params.password = NULL;
347         }
348
349         return;
350 }
351
352 int init_socket(async_http_worker_t *worker)
353 {
354         worker->socket_event = event_new(worker->evbase, worker->notication_socket[0], EV_READ|EV_PERSIST, notification_socket_cb, worker);
355         event_add(worker->socket_event, NULL);
356         return (0);
357 }
358
359 int async_send_query(sip_msg_t *msg, str *query, cfg_action_t *act)
360 {
361         async_query_t *aq;
362         unsigned int tindex = 0;
363         unsigned int tlabel = 0;
364         short suspend = 0;
365         int dsize;
366         int len;
367         tm_cell_t *t = 0;
368
369         if(query==0) {
370                 LM_ERR("invalid parameters\n");
371                 return -1;
372         }
373
374         t = tmb.t_gett();
375         if (t==NULL || t==T_UNDEFINED) {
376                 LM_DBG("no pre-existing transaction, switching to transaction-less behavior\n");
377         } else if (!ah_params.suspend_transaction) {
378                 LM_DBG("transaction won't be suspended\n");
379         } else {
380                 if(tmb.t_suspend==NULL) {
381                         LM_ERR("http async query is disabled - tm module not loaded\n");
382                         return -1;
383                 }
384
385                 if(tmb.t_suspend(msg, &tindex, &tlabel)<0) {
386                         LM_ERR("failed to suspend request processing\n");
387                         return -1;
388                 }
389
390                 suspend = 1;
391
392                 LM_DBG("transaction suspended [%u:%u]\n", tindex, tlabel);
393         }
394         dsize = sizeof(async_query_t);
395         aq = (async_query_t*)shm_malloc(dsize);
396
397         if(aq==NULL)
398         {
399                 LM_ERR("no more shm\n");
400                 goto error;
401         }
402         memset(aq,0,dsize);
403
404         if(shm_str_dup(&aq->query, query)<0) {
405                 goto error;
406         }
407
408         aq->param = act;
409         aq->tindex = tindex;
410         aq->tlabel = tlabel;
411         
412         aq->query_params.tls_verify_peer = ah_params.tls_verify_peer;
413         aq->query_params.tls_verify_host = ah_params.tls_verify_host;
414         aq->query_params.suspend_transaction = suspend;
415         aq->query_params.timeout = ah_params.timeout;
416         aq->query_params.headers = ah_params.headers;
417         aq->query_params.method = ah_params.method;
418         aq->query_params.authmethod = ah_params.authmethod;
419         
420         q_idx++;
421         snprintf(q_id, MAX_ID_LEN+1, "%u-%u", (unsigned int)getpid(), q_idx);
422         strncpy(aq->id, q_id, strlen(q_id));
423
424         aq->query_params.tls_client_cert.s = NULL;
425         aq->query_params.tls_client_cert.len = 0;
426         if (ah_params.tls_client_cert.s && ah_params.tls_client_cert.len > 0) {
427                 if (shm_str_dup(&aq->query_params.tls_client_cert, &(ah_params.tls_client_cert)) < 0) {
428                         LM_ERR("Error allocating aq->query_params.tls_client_cert\n");
429                         goto error;
430                 }
431         }
432
433         aq->query_params.tls_client_key.s = NULL;
434         aq->query_params.tls_client_key.len = 0;
435         if (ah_params.tls_client_key.s && ah_params.tls_client_key.len > 0) {
436                 if (shm_str_dup(&aq->query_params.tls_client_key, &(ah_params.tls_client_key)) < 0) {
437                         LM_ERR("Error allocating aq->query_params.tls_client_key\n");
438                         goto error;
439                 }
440         }
441
442         aq->query_params.tls_ca_path.s = NULL;
443         aq->query_params.tls_ca_path.len = 0;
444         if (ah_params.tls_ca_path.s && ah_params.tls_ca_path.len > 0) {
445                 if (shm_str_dup(&aq->query_params.tls_ca_path, &(ah_params.tls_ca_path)) < 0) {
446                         LM_ERR("Error allocating aq->query_params.tls_ca_path\n");
447                         goto error;
448                 }
449         }
450
451         aq->query_params.body.s = NULL;
452         aq->query_params.body.len = 0;
453         if (ah_params.body.s && ah_params.body.len > 0) {
454                 if (shm_str_dup(&aq->query_params.body, &(ah_params.body)) < 0) {
455                         LM_ERR("Error allocating aq->query_params.body\n");
456                         goto error;
457                 }
458         }
459
460         aq->query_params.username = NULL;
461         if (ah_params.username) {
462                 len = strlen(ah_params.username);
463                 aq->query_params.username = shm_malloc(len+1);
464         
465                 if(aq->query_params.username == NULL) {
466                         LM_ERR("error in shm_malloc\n");
467                         goto error;
468                 }
469
470                 strncpy(aq->query_params.username, ah_params.username, len);
471                 aq->query_params.username[len] = '\0';
472         }
473
474         aq->query_params.password = NULL;
475         if (ah_params.password) {
476                 len = strlen(ah_params.password);
477                 aq->query_params.password = shm_malloc(len+1);
478         
479                 if(aq->query_params.password == NULL) {
480                         LM_ERR("error in shm_malloc\n");
481                         goto error;
482                 }
483
484                 strncpy(aq->query_params.password, ah_params.password, len);
485                 aq->query_params.password[len] = '\0';
486         }
487
488         set_query_params(&ah_params);
489
490         if(async_push_query(aq)<0) {
491                 LM_ERR("failed to relay query: %.*s\n", query->len, query->s);
492                 goto error;
493         }
494
495         if (suspend)  {
496                 /* force exit in config */
497                 return 0;
498         }
499         
500         /* continue route processing */
501         return 1;
502
503 error:
504
505         if (suspend) {
506                 tmb.t_cancel_suspend(tindex, tlabel);
507         }
508         free_async_query(aq);
509         return -1;
510 }
511
512 int async_push_query(async_query_t *aq)
513 {
514         int len;
515         int worker;
516         static unsigned long rr = 0; /* round robin */
517
518         str query;
519
520         query = ((str)aq->query);
521
522         worker = rr++ % num_workers;
523         len = write(workers[worker].notication_socket[1], &aq, sizeof(async_query_t*));
524         if(len<=0) {
525                 LM_ERR("failed to pass the query to async workers\n");
526                 return -1;
527         }
528         LM_DBG("query sent [%.*s] (%p) to worker %d\n", query.len, query.s, aq, worker + 1);
529         return 0;
530 }
531
532 void init_query_params(struct query_params *p) {
533         memset(&ah_params, 0, sizeof(struct query_params));
534         set_query_params(p);
535 }
536
537 void set_query_params(struct query_params *p) {
538         p->headers.len = 0;
539         p->headers.t = NULL;
540         p->tls_verify_host = tls_verify_host;
541         p->tls_verify_peer = tls_verify_peer;
542         p->suspend_transaction = 1;
543         p->timeout = http_timeout;
544         p->method = AH_METH_DEFAULT;
545         p->authmethod = default_authmethod;
546
547         if (p->tls_client_cert.s && p->tls_client_cert.len > 0) {
548                 shm_free(p->tls_client_cert.s);
549                 p->tls_client_cert.s = NULL;
550                 p->tls_client_cert.len = 0;
551         }
552         if (tls_client_cert.s && tls_client_cert.len > 0) {
553                 if (shm_str_dup(&p->tls_client_cert, &tls_client_cert) < 0) {
554                         LM_ERR("Error allocating tls_client_cert\n");
555                         return;
556                 }
557         }
558
559         if (p->tls_client_key.s && p->tls_client_key.len > 0) {
560                 shm_free(p->tls_client_key.s);
561                 p->tls_client_key.s = NULL;
562                 p->tls_client_key.len = 0;
563         }
564         if (tls_client_key.s && tls_client_key.len > 0) {
565                 if (shm_str_dup(&p->tls_client_key, &tls_client_key) < 0) {
566                         LM_ERR("Error allocating tls_client_key\n");
567                         return;
568                 }
569         }
570
571         if (p->tls_ca_path.s && p->tls_ca_path.len > 0) {
572                 shm_free(p->tls_ca_path.s);
573                 p->tls_ca_path.s = NULL;
574                 p->tls_ca_path.len = 0;
575         }
576         if (tls_ca_path.s && tls_ca_path.len > 0) {
577                 if (shm_str_dup(&p->tls_ca_path, &tls_ca_path) < 0) {
578                         LM_ERR("Error allocating tls_ca_path\n");
579                         return;
580                 }
581         }
582
583         if (p->body.s && p->body.len > 0) {
584                 shm_free(p->body.s);
585                 p->body.s = NULL;
586                 p->body.len = 0;
587         }
588         
589         if (p->username) {
590                 shm_free(p->username);
591                 p->username = NULL;
592         }
593         
594         if (p->password) {
595                 shm_free(p->password);
596                 p->password = NULL;
597         }
598 }
599
600 int header_list_add(struct header_list *hl, str* hdr) {
601         char *tmp;
602
603         hl->len++;
604         hl->t = shm_realloc(hl->t, hl->len * sizeof(char*));
605         if (!hl->t) {
606                 LM_ERR("shm memory allocation failure\n");
607                 return -1;
608         }
609         hl->t[hl->len - 1] = shm_malloc(hdr->len + 1);
610         tmp = hl->t[hl->len - 1];
611         if (!tmp) {
612                 LM_ERR("shm memory allocation failure\n");
613                 return -1;
614         }
615         memcpy(tmp, hdr->s, hdr->len);
616         *(tmp + hdr->len) = '\0';
617
618         LM_DBG("stored new http header: [%s]\n", tmp);
619         return 1;
620 }
621
622 int query_params_set_method(struct query_params *qp, str *meth) {
623         if (strncasecmp(meth->s, "GET", meth->len) == 0) {
624                 qp->method = AH_METH_GET;
625         } else if (strncasecmp(meth->s, "POST",meth->len) == 0) {
626                 qp->method = AH_METH_POST;
627         } else if (strncasecmp(meth->s, "PUT", meth->len) == 0) {
628                 qp->method = AH_METH_PUT;
629         } else if (strncasecmp(meth->s, "DELETE", meth->len) == 0) {
630                 qp->method = AH_METH_DELETE;
631         } else {
632                 LM_ERR("Unsupported method: %.*s\n", meth->len, meth->s);
633                 return -1;
634         }
635         return 1;
636 }