core: if nosip msg hooks skip handling the packet, stop sip routing processing
[sip-router] / src / core / receive.c
index 18d244c..b14ebf8 100644 (file)
@@ -50,7 +50,7 @@
 #endif
 #include "select_buf.h"
 
-#include "tcp_server.h" /* for tcpconn_add_alias */
+#include "tcp_server.h"  /* for tcpconn_add_alias */
 #include "tcp_options.h" /* for access to tcp_accept_aliases*/
 #include "cfg/cfg.h"
 #include "core_stats.h"
 
 int _sr_ip_free_bind = 0;
 
-unsigned int msg_no=0;
+unsigned int msg_no = 0;
 /* address preset vars */
-str default_global_address={0,0};
-str default_global_port={0,0};
-str default_via_address={0,0};
-str default_via_port={0,0};
+str default_global_address = {0, 0};
+str default_global_port = {0, 0};
+str default_via_address = {0, 0};
+str default_via_port = {0, 0};
+
+int ksr_route_locks_size = 0;
+static rec_lock_set_t* ksr_route_locks_set = NULL;
+
+int ksr_route_locks_set_init(void)
+{
+       if(ksr_route_locks_set!=NULL || ksr_route_locks_size<=0)
+               return 0;
+
+       ksr_route_locks_set = rec_lock_set_alloc(ksr_route_locks_size);
+       if(ksr_route_locks_set) {
+               LM_ERR("failed to allocate route locks set\n");
+               return -1;
+       }
+       if(rec_lock_set_init(ksr_route_locks_set)==NULL) {
+               LM_ERR("failed to init route locks set\n");
+               return -1;
+       }
+       return 0;
+}
+
+void ksr_route_locks_set_destroy(void)
+{
+       if(ksr_route_locks_set==NULL)
+               return;
+
+       rec_lock_set_destroy(ksr_route_locks_set);
+       rec_lock_set_dealloc(ksr_route_locks_set);
+       ksr_route_locks_set = NULL;
+}
 
 /**
  * increment msg_no and return the new value
@@ -80,27 +110,30 @@ unsigned int inc_msg_no(void)
 /**
  *
  */
-int sip_check_fline(charbuf, unsigned int len)
+int sip_check_fline(char *buf, unsigned int len)
 {
        char *p;
        int m;
 
        m = 0;
-       for(p=buf; p<buf+len; p++) {
+       for(p = buf; p < buf + len; p++) {
                /* first check if is a reply - starts with SIP/2.0 */
-               if(m==0) {
-                       if(*p==' ' || *p=='\t' || *p=='\r' || *p=='\n') continue;
-                       if(buf+len-p<10) return -1;
-                       if(strncmp(p, "SIP/2.0 ", 8)==0) {
+               if(m == 0) {
+                       if(*p == ' ' || *p == '\t' || *p == '\r' || *p == '\n')
+                               continue;
+                       if(buf + len - p < 10)
+                               return -1;
+                       if(strncmp(p, "SIP/2.0 ", 8) == 0) {
                                LM_DBG("first line indicates a SIP reply\n");
                                return 0;
                        }
-                       m=1;
+                       m = 1;
                } else {
                        /* check if a request - before end of first line is SIP/2.0 */
-                       if(*p!='\r' && *p!='\n') continue;
-                       if(p-10>=buf) {
-                               if(strncmp(p-8, " SIP/2.0", 8)==0) {
+                       if(*p != '\r' && *p != '\n')
+                               continue;
+                       if(p - 10 >= buf) {
+                               if(strncmp(p - 8, " SIP/2.0", 8) == 0) {
                                        LM_DBG("first line indicates a SIP request\n");
                                        return 0;
                                }
@@ -115,9 +148,9 @@ int sip_check_fline(char* buf, unsigned int len)
  *  WARNING: buf must be 0 terminated (buf[len]=0) or some things might
  * break (e.g.: modules/textops)
  */
-int receive_msg(char* buf, unsigned int len, struct receive_info* rcv_info)
+int receive_msg(char *buf, unsigned int len, struct receive_info *rcv_info)
 {
-       struct sip_msgmsg;
+       struct sip_msg *msg;
        struct run_act_ctx ctx;
        struct run_act_ctx *bctx;
        int ret;
@@ -133,64 +166,83 @@ int receive_msg(char* buf, unsigned int len, struct receive_info* rcv_info)
        str inb;
        sr_net_info_t netinfo;
        sr_kemi_eng_t *keng = NULL;
+       sr_event_param_t evp = {0};
+       unsigned int cidlockidx = 0;
+       unsigned int cidlockset = 0;
+       int errsipmsg = 0;
 
        if(sr_event_enabled(SREV_NET_DATA_RECV)) {
-               if(sip_check_fline(buf, len)==0) {
+               if(sip_check_fline(buf, len) == 0) {
                        memset(&netinfo, 0, sizeof(sr_net_info_t));
                        netinfo.data.s = buf;
                        netinfo.data.len = len;
                        netinfo.rcv = rcv_info;
-                       sr_event_exec(SREV_NET_DATA_RECV, (void*)&netinfo);
+                       evp.data = (void *)&netinfo;
+                       sr_event_exec(SREV_NET_DATA_RECV, &evp);
                }
        }
 
        inb.s = buf;
        inb.len = len;
-       sr_event_exec(SREV_NET_DATA_IN, (void*)&inb);
+       evp.data = (void *)&inb;
+       evp.rcv = rcv_info;
+       sr_event_exec(SREV_NET_DATA_IN, &evp);
        len = inb.len;
 
-       msg=pkg_malloc(sizeof(struct sip_msg));
-       if (msg==0) {
+       msg = pkg_malloc(sizeof(struct sip_msg));
+       if(unlikely(msg == 0)) {
                LM_ERR("no mem for sip_msg\n");
                goto error00;
        }
        msg_no++;
        /* number of vias parsed -- good for diagnostic info in replies */
-       via_cnt=0;
+       via_cnt = 0;
 
-       memset(msg,0, sizeof(struct sip_msg)); /* init everything to 0 */
+       memset(msg, 0, sizeof(struct sip_msg)); /* init everything to 0 */
        /* fill in msg */
-       msg->buf=buf;
-       msg->len=len;
+       msg->buf = buf;
+       msg->len = len;
        /* zero termination (termination of orig message bellow not that
         * useful as most of the work is done with scratch-pad; -jiri  */
        /* buf[len]=0; */ /* WARNING: zero term removed! */
-       msg->rcv=*rcv_info;
-       msg->id=msg_no;
-       msg->pid=my_pid();
-       msg->set_global_address=default_global_address;
-       msg->set_global_port=default_global_port;
-
-       if(likely(sr_msg_time==1)) msg_set_time(msg);
-
-       if (parse_msg(buf,len, msg)!=0){
-               if((ret=sr_event_exec(SREV_RCV_NOSIP, (void*)msg))<NONSIP_MSG_DROP) {
-                       LOG(cfg_get(core, core_cfg, corelog),
+       msg->rcv = *rcv_info;
+       msg->id = msg_no;
+       msg->pid = my_pid();
+       msg->set_global_address = default_global_address;
+       msg->set_global_port = default_global_port;
+
+       if(likely(sr_msg_time == 1))
+               msg_set_time(msg);
+
+       if(parse_msg(buf, len, msg) != 0) {
+               errsipmsg = 1;
+               evp.data = (void *)msg;
+               if((ret = sr_event_exec(SREV_RCV_NOSIP, &evp)) < NONSIP_MSG_DROP) {
+                       LM_DBG("attempt of nonsip message processing failed\n");
+               } else if(ret == NONSIP_MSG_DROP) {
+                       LM_DBG("nonsip message processing completed\n");
+                       goto error02;
+               }
+       }
+       if(errsipmsg==1) {
+               LOG(cfg_get(core, core_cfg, corelog),
                                "core parsing of SIP message failed (%s:%d/%d)\n",
                                ip_addr2a(&msg->rcv.src_ip), (int)msg->rcv.src_port,
                                (int)msg->rcv.proto);
-                       sr_core_ert_run(msg, SR_CORE_ERT_RECEIVE_PARSE_ERROR);
-               }
-               else if(ret == NONSIP_MSG_DROP) goto error02;
+               sr_core_ert_run(msg, SR_CORE_ERT_RECEIVE_PARSE_ERROR);
+               goto error02;
        }
 
-       parse_headers(msg, HDR_FROM_F|HDR_TO_F|HDR_CALLID_F|HDR_CSEQ_F, 0);
+       if(unlikely(parse_headers(msg, HDR_FROM_F | HDR_TO_F | HDR_CALLID_F | HDR_CSEQ_F, 0)
+                       < 0)) {
+               LM_WARN("parsing relevant headers failed\n");
+       }
        LM_DBG("--- received sip message - %s - call-id: [%.*s] - cseq: [%.*s]\n",
-                       (msg->first_line.type==SIP_REQUEST)?"request":"reply",
-                       (msg->callid && msg->callid->body.s)?msg->callid->body.len:0,
-                       (msg->callid && msg->callid->body.s)?msg->callid->body.s:"",
-                       (msg->cseq && msg->cseq->body.s)?msg->cseq->body.len:0,
-                       (msg->cseq && msg->cseq->body.s)?msg->cseq->body.s:"");
+                       (msg->first_line.type == SIP_REQUEST) ? "request" : "reply",
+                       (msg->callid && msg->callid->body.s) ? msg->callid->body.len : 0,
+                       (msg->callid && msg->callid->body.s) ? msg->callid->body.s : "",
+                       (msg->cseq && msg->cseq->body.s) ? msg->cseq->body.len : 0,
+                       (msg->cseq && msg->cseq->body.s) ? msg->cseq->body.s : "");
 
        /* set log prefix */
        log_prefix_set(msg);
@@ -198,45 +250,52 @@ int receive_msg(char* buf, unsigned int len, struct receive_info* rcv_info)
        /* ... clear branches from previous message */
        clear_branches();
 
-       if (msg->first_line.type==SIP_REQUEST){
+       if(unlikely(ksr_route_locks_set!=NULL && msg->callid && msg->callid->body.s
+                       && msg->callid->body.len >0)) {
+               cidlockidx = get_hash1_raw(msg->callid->body.s, msg->callid->body.len);
+               cidlockidx = cidlockidx % ksr_route_locks_set->size;
+               cidlockset = 1;
+       }
+
+       if(msg->first_line.type == SIP_REQUEST) {
                ruri_mark_new(); /* ruri is usable for forking (not consumed yet) */
-               if (!IS_SIP(msg)){
-                       if ((ret=nonsip_msg_run_hooks(msg))!=NONSIP_MSG_ACCEPT){
-                               if (unlikely(ret==NONSIP_MSG_ERROR))
+               if(!IS_SIP(msg)) {
+                       if((ret = nonsip_msg_run_hooks(msg)) != NONSIP_MSG_ACCEPT) {
+                               if(unlikely(ret == NONSIP_MSG_ERROR))
                                        goto error03;
                                goto end; /* drop the message */
                        }
                }
                /* sanity checks */
-               if ((msg->via1==0) || (msg->via1->error!=PARSE_OK)){
+               if(unlikely((msg->via1 == 0) || (msg->via1->error != PARSE_OK))) {
                        /* no via, send back error ? */
                        LM_ERR("no via found in request\n");
                        STATS_BAD_MSG();
                        goto error02;
                }
-               /* check if necessary to add receive?->moved to forward_req */
-               /* check for the alias stuff */
+/* check if necessary to add receive?->moved to forward_req */
+/* check for the alias stuff */
 #ifdef USE_TCP
-               if (msg->via1->alias && cfg_get(tcp, tcp_cfg, accept_aliases) &&
-                               (((rcv_info->proto==PROTO_TCP) && !tcp_disable)
+               if(msg->via1->alias && cfg_get(tcp, tcp_cfg, accept_aliases)
+                               && (((rcv_info->proto == PROTO_TCP) && !tcp_disable)
 #ifdef USE_TLS
-                                       || ((rcv_info->proto==PROTO_TLS) && !tls_disable)
+                                                  || ((rcv_info->proto == PROTO_TLS) && !tls_disable)
 #endif
-                               )
-                       ){
-                       if (tcpconn_add_alias(rcv_info->proto_reserved1, msg->via1->port,
-                                                                       rcv_info->proto)!=0){
+                                                                  )) {
+                       if(tcpconn_add_alias(rcv_info->proto_reserved1, msg->via1->port,
+                                          rcv_info->proto)
+                                       != 0) {
                                LM_ERR("tcp alias failed\n");
                                /* continue */
                        }
                }
 #endif
 
-       /*      skip: */
+               /*      skip: */
                LM_DBG("preparing to run routing scripts...\n");
                if(is_printable(cfg_get(core, core_cfg, latency_cfg_log))
-                               || stats_on==1) {
-                       gettimeofday( & tvb, &tz );
+                               || stats_on == 1) {
+                       gettimeofday(&tvb, &tz);
                }
                /* execute pre-script callbacks, if any; -jiri */
                /* if some of the callbacks said not to continue with
@@ -245,48 +304,65 @@ int receive_msg(char* buf, unsigned int len, struct receive_info* rcv_info)
                 * (like presence of at least one via), so you can count
                 * on via1 being parsed in a pre-script callback --andrei
                */
-               if (exec_pre_script_cb(msg, REQUEST_CB_TYPE)==0 )
-               {
+               if(exec_pre_script_cb(msg, REQUEST_CB_TYPE) == 0) {
                        STATS_REQ_FWD_DROP();
                        goto end; /* drop the request */
                }
 
                set_route_type(REQUEST_ROUTE);
                /* exec the routing script */
-               if(unlikely(main_rt.rlist[DEFAULT_RT]==NULL)) {
+               if(unlikely(main_rt.rlist[DEFAULT_RT] == NULL)) {
                        keng = sr_kemi_eng_get();
-                       if(keng==NULL) {
-                               LM_ERR("no config routing engine registered\n");
+                       if(keng == NULL) {
+                               LM_ERR("no request_route {...} and no other config routing"
+                                               " engine registered\n");
                                goto error_req;
                        }
-                       if(keng->froute(msg, REQUEST_ROUTE, NULL, NULL)<0) {
-                               LM_NOTICE("negative return code from engine function\n");
+                       if(unlikely(cidlockset)) {
+                               rec_lock_set_get(ksr_route_locks_set, cidlockidx);
+                               if(keng->froute(msg, REQUEST_ROUTE, NULL, NULL) < 0)
+                                       LM_NOTICE("negative return code from engine function\n");
+                               rec_lock_set_release(ksr_route_locks_set, cidlockidx);
+                       } else {
+                               if(keng->froute(msg, REQUEST_ROUTE, NULL, NULL) < 0)
+                                       LM_NOTICE("negative return code from engine function\n");
                        }
                } else {
-                       if (run_top_route(main_rt.rlist[DEFAULT_RT], msg, 0)<0){
-                               LM_WARN("error while trying script\n");
-                               goto error_req;
+                       if(unlikely(cidlockset)) {
+                               rec_lock_set_get(ksr_route_locks_set, cidlockidx);
+                               if(run_top_route(main_rt.rlist[DEFAULT_RT], msg, 0) < 0) {
+                                       rec_lock_set_release(ksr_route_locks_set, cidlockidx);
+                                       LM_WARN("error while trying script\n");
+                                       goto error_req;
+                               }
+                               rec_lock_set_release(ksr_route_locks_set, cidlockidx);
+                       } else {
+                               if(run_top_route(main_rt.rlist[DEFAULT_RT], msg, 0) < 0) {
+                                       LM_WARN("error while trying script\n");
+                                       goto error_req;
+                               }
                        }
                }
 
                if(is_printable(cfg_get(core, core_cfg, latency_cfg_log))
-                               || stats_on==1) {
-                       gettimeofday( & tve, &tz );
-                       diff = (tve.tv_sec-tvb.tv_sec)*1000000+(tve.tv_usec-tvb.tv_usec);
+                               || stats_on == 1) {
+                       gettimeofday(&tve, &tz);
+                       diff = (tve.tv_sec - tvb.tv_sec) * 1000000
+                                  + (tve.tv_usec - tvb.tv_usec);
                        LOG(cfg_get(core, core_cfg, latency_cfg_log),
                                        "request-route executed in: %d usec\n", diff);
 #ifdef STATS
                        stats->processed_requests++;
                        stats->acc_req_time += diff;
-                       STATS_RX_REQUEST( msg->first_line.u.request.method_value );
+                       STATS_RX_REQUEST(msg->first_line.u.request.method_value);
 #endif
                }
 
                /* execute post request-script callbacks */
                exec_post_script_cb(msg, REQUEST_CB_TYPE);
-       }else if (msg->first_line.type==SIP_REPLY){
+       } else if(msg->first_line.type == SIP_REPLY) {
                /* sanity checks */
-               if ((msg->via1==0) || (msg->via1->error!=PARSE_OK)){
+               if((msg->via1 == 0) || (msg->via1->error != PARSE_OK)) {
                        /* no via, send back error ? */
                        LM_ERR("no via found in reply\n");
                        STATS_BAD_RPL();
@@ -294,11 +370,11 @@ int receive_msg(char* buf, unsigned int len, struct receive_info* rcv_info)
                }
 
                if(is_printable(cfg_get(core, core_cfg, latency_cfg_log))
-                               || stats_on==1) {
-                       gettimeofday( & tvb, &tz );
+                               || stats_on == 1) {
+                       gettimeofday(&tvb, &tz);
                }
 #ifdef STATS
-               STATS_RX_RESPONSE ( msg->first_line.u.reply.statuscode / 100 );
+               STATS_RX_RESPONSE(msg->first_line.u.reply.statuscode / 100);
 #endif
 
                /* execute pre-script callbacks, if any; -jiri */
@@ -308,49 +384,64 @@ int receive_msg(char* buf, unsigned int len, struct receive_info* rcv_info)
                 * (like presence of at least one via), so you can count
                 * on via1 being parsed in a pre-script callback --andrei
                */
-               if (exec_pre_script_cb(msg, ONREPLY_CB_TYPE)==0 )
-               {
+               if(exec_pre_script_cb(msg, ONREPLY_CB_TYPE) == 0) {
                        STATS_RPL_FWD_DROP();
                        goto end; /* drop the reply */
                }
 
                /* exec the onreply routing script */
-               keng = sr_kemi_eng_get();
-               if (onreply_rt.rlist[DEFAULT_RT]!=NULL || keng!=NULL){
+               if(kemi_reply_route_callback.len>0) {
+                       keng = sr_kemi_eng_get();
+               }
+               if(onreply_rt.rlist[DEFAULT_RT] != NULL || keng != NULL) {
                        set_route_type(CORE_ONREPLY_ROUTE);
                        ret = 1;
-                       if(unlikely(keng!=NULL)) {
+                       if(unlikely(keng != NULL)) {
                                bctx = sr_kemi_act_ctx_get();
                                init_run_actions_ctx(&ctx);
                                sr_kemi_act_ctx_set(&ctx);
-                               ret = keng->froute(msg, CORE_ONREPLY_ROUTE, NULL, NULL);
+                               if(unlikely(cidlockset)) {
+                                       rec_lock_set_get(ksr_route_locks_set, cidlockidx);
+                                       ret = keng->froute(msg, CORE_ONREPLY_ROUTE, NULL, NULL);
+                                       rec_lock_set_release(ksr_route_locks_set, cidlockidx);
+                               } else {
+                                       ret = keng->froute(msg, CORE_ONREPLY_ROUTE, NULL, NULL);
+                               }
                                sr_kemi_act_ctx_set(bctx);
                        } else {
-                               ret=run_top_route(onreply_rt.rlist[DEFAULT_RT], msg, &ctx);
+                               if(unlikely(cidlockset)) {
+                                       rec_lock_set_get(ksr_route_locks_set, cidlockidx);
+                                       ret = run_top_route(onreply_rt.rlist[DEFAULT_RT], msg, &ctx);
+                                       rec_lock_set_release(ksr_route_locks_set, cidlockidx);
+                               } else  {
+                                       ret = run_top_route(onreply_rt.rlist[DEFAULT_RT], msg, &ctx);
+                               }
                        }
 #ifndef NO_ONREPLY_ROUTE_ERROR
-                       if (unlikely(ret<0)){
+                       if(unlikely(ret < 0)) {
                                LM_WARN("error while trying onreply script\n");
                                goto error_rpl;
-                       }else
+                       } else
 #endif /* NO_ONREPLY_ROUTE_ERROR */
-                       if (unlikely(ret==0 || (ctx.run_flags&DROP_R_F))){
-                               STATS_RPL_FWD_DROP();
-                               goto skip_send_reply; /* drop the message, no error */
-                       }
+                               if(unlikely(ret == 0 || (ctx.run_flags & DROP_R_F))) {
+                                       STATS_RPL_FWD_DROP();
+                                       LM_DBG("drop flag set - skip forwarding the reply\n");
+                                       goto skip_send_reply; /* drop the message, no error */
+                               }
                }
                /* send the msg */
                forward_reply(msg);
        skip_send_reply:
                if(is_printable(cfg_get(core, core_cfg, latency_cfg_log))
-                               || stats_on==1) {
-                       gettimeofday( & tve, &tz );
-                       diff = (tve.tv_sec-tvb.tv_sec)*1000000+(tve.tv_usec-tvb.tv_usec);
+                               || stats_on == 1) {
+                       gettimeofday(&tve, &tz);
+                       diff = (tve.tv_sec - tvb.tv_sec) * 1000000
+                                  + (tve.tv_usec - tvb.tv_usec);
                        LOG(cfg_get(core, core_cfg, latency_cfg_log),
                                        "reply-route executed in: %d usec\n", diff);
 #ifdef STATS
                        stats->processed_responses++;
-                       stats->acc_res_time+=diff;
+                       stats->acc_res_time += diff;
 #endif
                }
 
@@ -362,16 +453,13 @@ end:
 #ifdef STATS
        skipped = 0;
 #endif
-       /* free possible loaded avps -bogdan */
-       reset_avps();
-#ifdef WITH_XAVP
-       xavp_reset_list();
-#endif
+       ksr_msg_env_reset();
        LM_DBG("cleaning up\n");
        free_sip_msg(msg);
        pkg_free(msg);
 #ifdef STATS
-       if (skipped) STATS_RX_DROPS;
+       if(skipped)
+               STATS_RX_DROPS;
 #endif
        /* reset log prefix */
        log_prefix_set(NULL);
@@ -381,10 +469,6 @@ end:
 error_rpl:
        /* execute post reply-script callbacks */
        exec_post_script_cb(msg, ONREPLY_CB_TYPE);
-       reset_avps();
-#ifdef WITH_XAVP
-       xavp_reset_list();
-#endif
        goto error02;
 #endif /* NO_ONREPLY_ROUTE_ERROR */
 error_req:
@@ -392,18 +476,24 @@ error_req:
        /* execute post request-script callbacks */
        exec_post_script_cb(msg, REQUEST_CB_TYPE);
 error03:
-       /* free possible loaded avps -bogdan */
-       reset_avps();
-#ifdef WITH_XAVP
-       xavp_reset_list();
-#endif
 error02:
        free_sip_msg(msg);
        pkg_free(msg);
 error00:
+       ksr_msg_env_reset();
        STATS_RX_DROPS;
        /* reset log prefix */
        log_prefix_set(NULL);
        return -1;
 }
 
+/**
+ * clean up msg environment, such as avp and xavp lists
+ */
+void ksr_msg_env_reset(void)
+{
+       reset_avps();
+#ifdef WITH_XAVP
+       xavp_reset_list();
+#endif
+}