http_async_client: do not set global avp lists from t on async callback
[sip-router] / src / modules / http_async_client / async_http.c
index d86f8a6..5edf67e 100644 (file)
@@ -45,7 +45,9 @@
 #include "../../core/dprint.h"
 #include "../../core/ut.h"
 #include "../../core/cfg/cfg_struct.h"
+#include "../../core/receive.h"
 #include "../../core/fmsg.h"
+#include "../../core/kemi.h"
 #include "../../modules/tm/tm_load.h"
 
 #include "async_http.h"
@@ -56,7 +58,7 @@ extern struct tm_binds tmb;
 struct sip_msg *ah_reply = NULL;
 str ah_error = {NULL, 0};
 
-async_http_worker_t *workers;
+async_http_worker_t *workers = NULL;
 int num_workers = 1;
 
 struct query_params ah_params;
@@ -122,24 +124,44 @@ static inline char *strfindcasestrz(str *haystack, char *needlez)
 
 void async_http_cb(struct http_m_reply *reply, void *param)
 {
-       async_query_t *aq;
-       cfg_action_t *act;
+       async_query_t *aq = NULL;
+       cfg_action_t *act = NULL;
+       int ri;
        unsigned int tindex;
        unsigned int tlabel;
        struct cell *t = NULL;
        char *p;
        str newbuf = {0, 0};
-       sip_msg_t *fmsg;
+       sip_msg_t *fmsg = NULL;
+       sr_kemi_eng_t *keng = NULL;
+       str cbname = {0, 0};
+       str evname = str_init("http_async_client:callback");
 
-       if (reply->result != NULL) {
-               LM_DBG("query result = %.*s [%d]\n", reply->result->len, reply->result->s, reply->result->len);
-       }
+       aq = param;
 
        /* clean process-local result variables */
        ah_error.s = NULL;
        ah_error.len = 0;
        memset(ah_reply, 0, sizeof(struct sip_msg));
 
+       keng = sr_kemi_eng_get();
+       if(keng==NULL) {
+               ri = route_lookup(&main_rt, aq->cbname);
+               if(ri<0) {
+                       LM_ERR("unable to find route block [%s]\n", aq->cbname);
+                       goto done;
+               }
+               act = main_rt.rlist[ri];
+               if(act==NULL) {
+                       LM_ERR("empty action lists in route block [%s]\n", aq->cbname);
+                       goto done;
+               }
+       }
+
+       if (reply->result != NULL) {
+               LM_DBG("query result = %.*s [%d]\n", reply->result->len, reply->result->s, reply->result->len);
+       }
+
        /* set process-local result variables */
        if (reply->result == NULL) {
                /* error */
@@ -189,10 +211,10 @@ void async_http_cb(struct http_m_reply *reply, void *param)
                }
        }
 
-       aq = param;
        strncpy(q_id, aq->id, strlen(aq->id));
-       
-       act = (cfg_action_t*)aq->param;
+
+       q_id[strlen(aq->id)] = '\0';
+
        cfg_update();
 
        if (aq->query_params.suspend_transaction) {
@@ -205,27 +227,41 @@ void async_http_cb(struct http_m_reply *reply, void *param)
                        free_async_query(aq);
                        return;
                }
-               // we bring the list of AVPs of the transaction to the current context
-               set_avp_list(AVP_TRACK_FROM | AVP_CLASS_URI, &t->uri_avps_from);
-               set_avp_list(AVP_TRACK_TO | AVP_CLASS_URI, &t->uri_avps_to);
-               set_avp_list(AVP_TRACK_FROM | AVP_CLASS_USER, &t->user_avps_from);
-               set_avp_list(AVP_TRACK_TO | AVP_CLASS_USER, &t->user_avps_to);
-               set_avp_list(AVP_TRACK_FROM | AVP_CLASS_DOMAIN, &t->domain_avps_from);
-               set_avp_list(AVP_TRACK_TO | AVP_CLASS_DOMAIN, &t->domain_avps_to);
-
-               if (t)
+
+               if (t) {
                        tmb.unref_cell(t);
+               }
 
                LM_DBG("resuming transaction (%d:%d)\n", tindex, tlabel);
 
-               if(act!=NULL)
-                       tmb.t_continue(tindex, tlabel, act);
+               if(keng==NULL) {
+                       if(act!=NULL) {
+                               tmb.t_continue(tindex, tlabel, act);
+                       }
+               } else {
+                       cbname.s = aq->cbname;
+                       cbname.len = aq->cbname_len;
+                       tmb.t_continue_cb(tindex, tlabel, &cbname, &evname);
+               }
        } else {
                fmsg = faked_msg_next();
-               if (run_top_route(act, fmsg, 0)<0)
-                       LM_ERR("failure inside run_top_route\n");
+               if(keng==NULL) {
+                       if(act!=NULL) {
+                               if (run_top_route(act, fmsg, 0)<0) {
+                                       LM_ERR("failure inside run_top_route\n");
+                               }
+                       }
+               } else {
+                       cbname.s = aq->cbname;
+                       cbname.len = aq->cbname_len;
+                       if(sr_kemi_route(keng, fmsg, EVENT_ROUTE, &cbname, &evname)<0) {
+                               LM_ERR("error running event route kemi callback\n");
+                       }
+               }
+               ksr_msg_env_reset();
        }
 
+done:
        free_sip_msg(ah_reply);
        free_async_query(aq);
 
@@ -265,31 +301,52 @@ void notification_socket_cb(int fd, short event, void *arg)
        query_params.tls_verify_peer = aq->query_params.tls_verify_peer;
        query_params.tls_verify_host = aq->query_params.tls_verify_host;
        query_params.authmethod = aq->query_params.authmethod;
+       query_params.tcp_keepalive = aq->query_params.tcp_keepalive;
+       query_params.tcp_ka_idle = aq->query_params.tcp_ka_idle;
+       query_params.tcp_ka_interval = aq->query_params.tcp_ka_interval;
 
        for (i = 0 ; i < aq->query_params.headers.len ; i++) {
                query_params.headers = curl_slist_append(query_params.headers, aq->query_params.headers.t[i]);
        }
        query_params.method  = aq->query_params.method;
 
-       if (aq->query_params.tls_client_cert.s && aq->query_params.tls_client_cert.len > 0) {
-               if (shm_str_dup(&query_params.tls_client_cert, &(aq->query_params.tls_client_cert)) < 0) {
+       if (aq->query_params.tls_client_cert) {
+               len = strlen(aq->query_params.tls_client_cert);
+               query_params.tls_client_cert = shm_malloc(len+1);
+
+               if(query_params.tls_client_cert == NULL) {
                        LM_ERR("Error allocating query_params.tls_client_cert\n");
                        goto done;
                }
+
+               strncpy(query_params.tls_client_cert, aq->query_params.tls_client_cert, len);
+               query_params.tls_client_cert[len] = '\0';
        }
 
-       if (aq->query_params.tls_client_key.s && aq->query_params.tls_client_key.len > 0) {
-               if (shm_str_dup(&query_params.tls_client_key, &(aq->query_params.tls_client_key)) < 0) {
+       if (aq->query_params.tls_client_key) {
+               len = strlen(aq->query_params.tls_client_key);
+               query_params.tls_client_key = shm_malloc(len+1);
+
+               if(query_params.tls_client_key == NULL) {
                        LM_ERR("Error allocating query_params.tls_client_key\n");
                        goto done;
                }
+
+               strncpy(query_params.tls_client_key, aq->query_params.tls_client_key, len);
+               query_params.tls_client_key[len] = '\0';
        }
 
-       if (aq->query_params.tls_ca_path.s && aq->query_params.tls_ca_path.len > 0) {
-               if (shm_str_dup(&query_params.tls_ca_path, &(aq->query_params.tls_ca_path)) < 0) {
+       if (aq->query_params.tls_ca_path) {
+               len = strlen(aq->query_params.tls_ca_path);
+               query_params.tls_ca_path = shm_malloc(len+1);
+
+               if(query_params.tls_ca_path == NULL) {
                        LM_ERR("Error allocating query_params.tls_ca_path\n");
                        goto done;
                }
+
+               strncpy(query_params.tls_ca_path, aq->query_params.tls_ca_path, len);
+               query_params.tls_ca_path[len] = '\0';
        }
 
        if (aq->query_params.body.s && aq->query_params.body.len > 0) {
@@ -333,20 +390,17 @@ void notification_socket_cb(int fd, short event, void *arg)
        }
 
 done:
-       if (query_params.tls_client_cert.s && query_params.tls_client_cert.len > 0) {
-               shm_free(query_params.tls_client_cert.s);
-               query_params.tls_client_cert.s = NULL;
-               query_params.tls_client_cert.len = 0;
+       if (query_params.tls_client_cert) {
+               shm_free(query_params.tls_client_cert);
+               query_params.tls_client_cert = NULL;
        }
-       if (query_params.tls_client_key.s && query_params.tls_client_key.len > 0) {
-               shm_free(query_params.tls_client_key.s);
-               query_params.tls_client_key.s = NULL;
-               query_params.tls_client_key.len = 0;
+       if (query_params.tls_client_key) {
+               shm_free(query_params.tls_client_key);
+               query_params.tls_client_key = NULL;
        }
-       if (query_params.tls_ca_path.s && query_params.tls_ca_path.len > 0) {
-               shm_free(query_params.tls_ca_path.s);
-               query_params.tls_ca_path.s = NULL;
-               query_params.tls_ca_path.len = 0;
+       if (query_params.tls_ca_path) {
+               shm_free(query_params.tls_ca_path);
+               query_params.tls_ca_path = NULL;
        }
        if (query_params.body.s && query_params.body.len > 0) {
                shm_free(query_params.body.s);
@@ -374,7 +428,7 @@ int init_socket(async_http_worker_t *worker)
        return (0);
 }
 
-int async_send_query(sip_msg_t *msg, str *query, cfg_action_t *act)
+int async_send_query(sip_msg_t *msg, str *query, str *cbname)
 {
        async_query_t *aq;
        unsigned int tindex = 0;
@@ -388,6 +442,11 @@ int async_send_query(sip_msg_t *msg, str *query, cfg_action_t *act)
                LM_ERR("invalid parameters\n");
                return -1;
        }
+       if(cbname->len>=MAX_CBNAME_LEN-1) {
+               LM_ERR("callback name is too long: %d / %.*s\n", cbname->len,
+                               cbname->len, cbname->s);
+               return -1;
+       }
 
        t = tmb.t_gett();
        if (t==NULL || t==T_UNDEFINED) {
@@ -423,7 +482,9 @@ int async_send_query(sip_msg_t *msg, str *query, cfg_action_t *act)
                goto error;
        }
 
-       aq->param = act;
+       memcpy(aq->cbname, cbname->s, cbname->len);
+       aq->cbname[cbname->len] = '\0';
+       aq->cbname_len = cbname->len;
        aq->tindex = tindex;
        aq->tlabel = tlabel;
        
@@ -431,6 +492,9 @@ int async_send_query(sip_msg_t *msg, str *query, cfg_action_t *act)
        aq->query_params.tls_verify_host = ah_params.tls_verify_host;
        aq->query_params.suspend_transaction = suspend;
        aq->query_params.timeout = ah_params.timeout;
+       aq->query_params.tcp_keepalive = ah_params.tcp_keepalive;
+       aq->query_params.tcp_ka_idle = ah_params.tcp_ka_idle;
+       aq->query_params.tcp_ka_interval = ah_params.tcp_ka_interval;
        aq->query_params.headers = ah_params.headers;
        aq->query_params.method = ah_params.method;
        aq->query_params.authmethod = ah_params.authmethod;
@@ -439,31 +503,46 @@ int async_send_query(sip_msg_t *msg, str *query, cfg_action_t *act)
        snprintf(q_id, MAX_ID_LEN+1, "%u-%u", (unsigned int)getpid(), q_idx);
        strncpy(aq->id, q_id, strlen(q_id));
 
-       aq->query_params.tls_client_cert.s = NULL;
-       aq->query_params.tls_client_cert.len = 0;
-       if (ah_params.tls_client_cert.s && ah_params.tls_client_cert.len > 0) {
-               if (shm_str_dup(&aq->query_params.tls_client_cert, &(ah_params.tls_client_cert)) < 0) {
+       aq->query_params.tls_client_cert = NULL;
+       if (ah_params.tls_client_cert) {
+               len = strlen(ah_params.tls_client_cert);
+               aq->query_params.tls_client_cert = shm_malloc(len+1);
+
+               if(aq->query_params.tls_client_cert == NULL) {
                        LM_ERR("Error allocating aq->query_params.tls_client_cert\n");
                        goto error;
                }
+
+               strncpy(aq->query_params.tls_client_cert, ah_params.tls_client_cert, len);
+               aq->query_params.tls_client_cert[len] = '\0';
        }
 
-       aq->query_params.tls_client_key.s = NULL;
-       aq->query_params.tls_client_key.len = 0;
-       if (ah_params.tls_client_key.s && ah_params.tls_client_key.len > 0) {
-               if (shm_str_dup(&aq->query_params.tls_client_key, &(ah_params.tls_client_key)) < 0) {
+       aq->query_params.tls_client_key = NULL;
+       if (ah_params.tls_client_key) {
+               len = strlen(ah_params.tls_client_key);
+               aq->query_params.tls_client_key = shm_malloc(len+1);
+
+               if(aq->query_params.tls_client_key == NULL) {
                        LM_ERR("Error allocating aq->query_params.tls_client_key\n");
                        goto error;
                }
+
+               strncpy(aq->query_params.tls_client_key, ah_params.tls_client_key, len);
+               aq->query_params.tls_client_key[len] = '\0';
        }
 
-       aq->query_params.tls_ca_path.s = NULL;
-       aq->query_params.tls_ca_path.len = 0;
-       if (ah_params.tls_ca_path.s && ah_params.tls_ca_path.len > 0) {
-               if (shm_str_dup(&aq->query_params.tls_ca_path, &(ah_params.tls_ca_path)) < 0) {
+       aq->query_params.tls_ca_path = NULL;
+       if (ah_params.tls_ca_path) {
+               len = strlen(ah_params.tls_ca_path);
+               aq->query_params.tls_ca_path = shm_malloc(len+1);
+
+               if(aq->query_params.tls_ca_path == NULL) {
                        LM_ERR("Error allocating aq->query_params.tls_ca_path\n");
                        goto error;
                }
+
+               strncpy(aq->query_params.tls_ca_path, ah_params.tls_ca_path, len);
+               aq->query_params.tls_ca_path[len] = '\0';
        }
 
        aq->query_params.body.s = NULL;
@@ -553,6 +632,7 @@ void init_query_params(struct query_params *p) {
 }
 
 void set_query_params(struct query_params *p) {
+       int len;
        p->headers.len = 0;
        p->headers.t = NULL;
        p->tls_verify_host = tls_verify_host;
@@ -561,41 +641,59 @@ void set_query_params(struct query_params *p) {
        p->timeout = http_timeout;
        p->method = AH_METH_DEFAULT;
        p->authmethod = default_authmethod;
+       p->tcp_keepalive = tcp_keepalive;
+       p->tcp_ka_idle = tcp_ka_idle;
+       p->tcp_ka_interval = tcp_ka_interval;
 
-       if (p->tls_client_cert.s && p->tls_client_cert.len > 0) {
-               shm_free(p->tls_client_cert.s);
-               p->tls_client_cert.s = NULL;
-               p->tls_client_cert.len = 0;
+       if (p->tls_client_cert) {
+               shm_free(p->tls_client_cert);
+               p->tls_client_cert = NULL;
        }
-       if (tls_client_cert.s && tls_client_cert.len > 0) {
-               if (shm_str_dup(&p->tls_client_cert, &tls_client_cert) < 0) {
+       if (tls_client_cert) {
+               len = strlen(tls_client_cert);
+               p->tls_client_cert = shm_malloc(len+1);
+
+               if (p->tls_client_cert == NULL) {
                        LM_ERR("Error allocating tls_client_cert\n");
                        return;
                }
+
+               strncpy(p->tls_client_cert, tls_client_cert, len);
+               p->tls_client_cert[len] = '\0';
        }
 
-       if (p->tls_client_key.s && p->tls_client_key.len > 0) {
-               shm_free(p->tls_client_key.s);
-               p->tls_client_key.s = NULL;
-               p->tls_client_key.len = 0;
+       if (p->tls_client_key) {
+               shm_free(p->tls_client_key);
+               p->tls_client_key = NULL;
        }
-       if (tls_client_key.s && tls_client_key.len > 0) {
-               if (shm_str_dup(&p->tls_client_key, &tls_client_key) < 0) {
+       if (tls_client_key) {
+               len = strlen(tls_client_key);
+               p->tls_client_key = shm_malloc(len+1);
+
+               if (p->tls_client_key == NULL) {
                        LM_ERR("Error allocating tls_client_key\n");
                        return;
                }
+
+               strncpy(p->tls_client_key, tls_client_key, len);
+               p->tls_client_key[len] = '\0';
        }
 
-       if (p->tls_ca_path.s && p->tls_ca_path.len > 0) {
-               shm_free(p->tls_ca_path.s);
-               p->tls_ca_path.s = NULL;
-               p->tls_ca_path.len = 0;
+       if (p->tls_ca_path) {
+               shm_free(p->tls_ca_path);
+               p->tls_ca_path = NULL;
        }
-       if (tls_ca_path.s && tls_ca_path.len > 0) {
-               if (shm_str_dup(&p->tls_ca_path, &tls_ca_path) < 0) {
+       if (tls_ca_path) {
+               len = strlen(tls_ca_path);
+               p->tls_ca_path = shm_malloc(len+1);
+
+               if (p->tls_ca_path == NULL) {
                        LM_ERR("Error allocating tls_ca_path\n");
                        return;
                }
+
+               strncpy(p->tls_ca_path, tls_ca_path, len);
+               p->tls_ca_path[len] = '\0';
        }
 
        if (p->body.s && p->body.len > 0) {