core: new parameter route_locks_size
authorDaniel-Constantin Mierla <miconda@gmail.com>
Mon, 12 Mar 2018 15:44:25 +0000 (16:44 +0100)
committerDaniel-Constantin Mierla <miconda@gmail.com>
Mon, 12 Mar 2018 16:09:43 +0000 (17:09 +0100)
- if set, kamailio creates a group of recursive locks used to sync on
execution of request_route and reply_route based on hashing ID of
Call-ID header. In other words, if a message has triggered the execution
of request_route or reply_route, any other message with the same Call-ID
waits until the other one finishes the execution.
- it should help when messages belonging to the same dialog come quickly
one after the other and config execution results in sending them out in
reverse order. There are some old UA implementations not able to cope
properly with this case (e.g., pstn gateways).
- be very carefull when eanbling it, it can affect performances, use
only when strictly needed
- note also that hashing over Call-ID means that there could be
collisions and different Call-ID values can result in same hash ID
- default value is 0 - feature not enabled
- set it to a positive integer number, it should be higher than the over
all number of processes created by kamailio, can be much higher

src/core/cfg.lex
src/core/cfg.y
src/core/globals.h
src/core/receive.c
src/core/receive.h
src/main.c

index 033512c..6228308 100644 (file)
@@ -444,6 +444,7 @@ VERSION_TABLE_CFG   "version_table"
 VERBOSE_STARTUP                "verbose_startup"
 
 SERVER_ID     "server_id"
+ROUTE_LOCKS_SIZE     "route_locks_size"
 
 KEMI     "kemi"
 ONSEND_ROUTE_CALLBACK  "onsend_route_callback"
@@ -932,6 +933,7 @@ IMPORTFILE      "import_file"
 <INITIAL>{VERSION_TABLE_CFG}  { count(); yylval.strval=yytext; return VERSION_TABLE_CFG;}
 <INITIAL>{VERBOSE_STARTUP}             {       count(); yylval.strval=yytext;
                                                                        return VERBOSE_STARTUP; }
+<INITIAL>{ROUTE_LOCKS_SIZE}  { count(); yylval.strval=yytext; return ROUTE_LOCKS_SIZE; }
 <INITIAL>{SERVER_ID}  { count(); yylval.strval=yytext; return SERVER_ID;}
 <INITIAL>{KEMI}  { count(); yylval.strval=yytext; return KEMI;}
 <INITIAL>{REPLY_ROUTE_CALLBACK}  { count(); yylval.strval=yytext; return REPLY_ROUTE_CALLBACK;}
index 99bac9c..3301b1a 100644 (file)
@@ -480,6 +480,7 @@ extern char *default_routename;
 %token HTTP_REPLY_PARSE
 %token VERSION_TABLE_CFG
 %token VERBOSE_STARTUP
+%token ROUTE_LOCKS_SIZE
 %token CFG_DESCRIPTION
 %token SERVER_ID
 %token KEMI
@@ -1569,6 +1570,8 @@ assign_stm:
        | HTTP_REPLY_PARSE EQUAL error { yyerror("boolean value expected"); }
        | VERBOSE_STARTUP EQUAL NUMBER { ksr_verbose_startup=$3; }
        | VERBOSE_STARTUP EQUAL error { yyerror("boolean value expected"); }
+       | ROUTE_LOCKS_SIZE EQUAL NUMBER { ksr_route_locks_size=$3; }
+       | ROUTE_LOCKS_SIZE EQUAL error { yyerror("number expected"); }
     | SERVER_ID EQUAL NUMBER { server_id=$3; }
        | SERVER_ID EQUAL error  { yyerror("number expected"); }
        | KEMI DOT ONSEND_ROUTE_CALLBACK EQUAL STRING {
index fbbca1f..786e8f2 100644 (file)
@@ -204,6 +204,7 @@ extern int rt_timer2_policy; /* "slow" timer, SCHED_OTHER */
 extern int http_reply_parse;
 extern int _sr_ip_free_bind;
 extern int ksr_verbose_startup;
+extern int ksr_route_locks_size;
 
 #ifdef USE_DNS_CACHE
 extern int dns_cache_init; /* if 0, the DNS cache is not initialized at startup */
index a4f28ed..b1feba0 100644 (file)
@@ -69,6 +69,36 @@ str default_global_port = {0, 0};
 str default_via_address = {0, 0};
 str default_via_port = {0, 0};
 
+int ksr_route_locks_size = 0;
+static rec_lock_set_t* ksr_route_locks_set = NULL;
+
+int ksr_route_locks_set_init(void)
+{
+       if(ksr_route_locks_set!=NULL || ksr_route_locks_size<=0)
+               return 0;
+
+       ksr_route_locks_set = rec_lock_set_alloc(ksr_route_locks_size);
+       if(ksr_route_locks_set) {
+               LM_ERR("failed to allocate route locks set\n");
+               return -1;
+       }
+       if(rec_lock_set_init(ksr_route_locks_set)==NULL) {
+               LM_ERR("failed to init route locks set\n");
+               return -1;
+       }
+       return 0;
+}
+
+void ksr_route_locks_set_destroy(void)
+{
+       if(ksr_route_locks_set==NULL)
+               return;
+
+       rec_lock_set_destroy(ksr_route_locks_set);
+       rec_lock_set_dealloc(ksr_route_locks_set);
+       ksr_route_locks_set = NULL;
+}
+
 /**
  * increment msg_no and return the new value
  */
@@ -137,6 +167,8 @@ int receive_msg(char *buf, unsigned int len, struct receive_info *rcv_info)
        sr_net_info_t netinfo;
        sr_kemi_eng_t *keng = NULL;
        sr_event_param_t evp = {0};
+       unsigned int cidlockidx = 0;
+       unsigned int cidlockset = 0;
 
        if(sr_event_enabled(SREV_NET_DATA_RECV)) {
                if(sip_check_fline(buf, len) == 0) {
@@ -210,6 +242,13 @@ int receive_msg(char *buf, unsigned int len, struct receive_info *rcv_info)
        /* ... clear branches from previous message */
        clear_branches();
 
+       if(ksr_route_locks_set!=NULL && msg->callid && msg->callid->body.s
+                       && msg->callid->body.len >0) {
+               cidlockidx = get_hash1_raw(msg->callid->body.s, msg->callid->body.len);
+               cidlockidx = cidlockidx % ksr_route_locks_set->size;
+               cidlockset = 1;
+       }
+
        if(msg->first_line.type == SIP_REQUEST) {
                ruri_mark_new(); /* ruri is usable for forking (not consumed yet) */
                if(!IS_SIP(msg)) {
@@ -270,14 +309,24 @@ int receive_msg(char *buf, unsigned int len, struct receive_info *rcv_info)
                                LM_ERR("no config routing engine registered\n");
                                goto error_req;
                        }
+                       if(cidlockset)
+                               rec_lock_set_get(ksr_route_locks_set, cidlockidx);
                        if(keng->froute(msg, REQUEST_ROUTE, NULL, NULL) < 0) {
                                LM_NOTICE("negative return code from engine function\n");
                        }
+                       if(cidlockset)
+                               rec_lock_set_release(ksr_route_locks_set, cidlockidx);
                } else {
+                       if(cidlockset)
+                               rec_lock_set_get(ksr_route_locks_set, cidlockidx);
                        if(run_top_route(main_rt.rlist[DEFAULT_RT], msg, 0) < 0) {
+                               if(cidlockset)
+                                       rec_lock_set_release(ksr_route_locks_set, cidlockidx);
                                LM_WARN("error while trying script\n");
                                goto error_req;
                        }
+                       if(cidlockset)
+                               rec_lock_set_release(ksr_route_locks_set, cidlockidx);
                }
 
                if(is_printable(cfg_get(core, core_cfg, latency_cfg_log))
@@ -336,10 +385,18 @@ int receive_msg(char *buf, unsigned int len, struct receive_info *rcv_info)
                                bctx = sr_kemi_act_ctx_get();
                                init_run_actions_ctx(&ctx);
                                sr_kemi_act_ctx_set(&ctx);
+                               if(cidlockset)
+                                       rec_lock_set_get(ksr_route_locks_set, cidlockidx);
                                ret = keng->froute(msg, CORE_ONREPLY_ROUTE, NULL, NULL);
+                               if(cidlockset)
+                                       rec_lock_set_release(ksr_route_locks_set, cidlockidx);
                                sr_kemi_act_ctx_set(bctx);
                        } else {
+                               if(cidlockset)
+                                       rec_lock_set_get(ksr_route_locks_set, cidlockidx);
                                ret = run_top_route(onreply_rt.rlist[DEFAULT_RT], msg, &ctx);
+                               if(cidlockset)
+                                       rec_lock_set_release(ksr_route_locks_set, cidlockidx);
                        }
 #ifndef NO_ONREPLY_ROUTE_ERROR
                        if(unlikely(ret < 0)) {
index 0a928c5..d4ce789 100644 (file)
@@ -35,4 +35,7 @@ int receive_msg(char* buf, unsigned int len, struct receive_info *ri);
 unsigned int inc_msg_no(void);
 void ksr_msg_env_reset(void);
 
+int ksr_route_locks_set_init(void);
+void ksr_route_locks_set_destroy(void);
+
 #endif
index 9366884..a49e30e 100644 (file)
 #include "core/dset.h"
 #include "core/timer_proc.h"
 #include "core/srapi.h"
+#include "core/receive.h"
 
 #ifdef DEBUG_DMALLOC
 #include <dmalloc.h>
@@ -554,6 +555,7 @@ void cleanup(int show_status)
 #endif
        destroy_timer();
        pv_destroy_api();
+       ksr_route_locks_set_destroy();
        destroy_script_cb();
        destroy_nonsip_hooks();
        destroy_routes();
@@ -2324,6 +2326,9 @@ try_again:
        if (pv_reinit_buffer()<0)
                goto error;
 
+       if (ksr_route_locks_set_init()<0)
+               goto error;
+
        /* init lookup for core event routes */
        sr_core_ert_init();