async: added support for millisecond resolution sleep
authorPaweł Pierścionek <ppierscionek@gmail.com>
Sat, 27 Jul 2019 21:07:10 +0000 (23:07 +0200)
committerHenning Westerholt <henningw@users.noreply.github.com>
Sun, 28 Jul 2019 14:28:35 +0000 (16:28 +0200)
- new ms_timer parameter to enable millisecond precision timer
- new async_ms_route and async_ms_sleep functions with milliseconds as a param
- implementation:
  Each async_ms_sleep adds an entry to a linked list sorted by expiry time.
  List is checked every ms_timer ms for expired entries.
  All expired entries are pushed for execution on a pool of async workers.

src/modules/async/async_mod.c
src/modules/async/async_sleep.c
src/modules/async/async_sleep.h
src/modules/async/doc/async_admin.xml

index 88affb3..3989966 100644 (file)
 MODULE_VERSION
 
 static int async_workers = 1;
+static int async_ms_timer = 0;
 
 static int mod_init(void);
 static int child_init(int);
 static void mod_destroy(void);
 
 static int w_async_sleep(sip_msg_t *msg, char *sec, char *str2);
+static int w_async_ms_sleep(sip_msg_t *msg, char *sec, char *str2);
 static int fixup_async_sleep(void **param, int param_no);
+
 static int w_async_route(sip_msg_t *msg, char *rt, char *sec);
+static int w_async_ms_route(sip_msg_t *msg, char *rt, char *sec);
 static int fixup_async_route(void **param, int param_no);
+
 static int w_async_task_route(sip_msg_t *msg, char *rt, char *p2);
 static int fixup_async_task_route(void **param, int param_no);
 
@@ -60,8 +65,12 @@ struct tm_binds tmb;
 static cmd_export_t cmds[]={
        {"async_route", (cmd_function)w_async_route, 2, fixup_async_route,
                0, REQUEST_ROUTE|FAILURE_ROUTE},
+       {"async_ms_route", (cmd_function)w_async_ms_route, 2, fixup_async_route,
+               0, REQUEST_ROUTE|FAILURE_ROUTE},
        {"async_sleep", (cmd_function)w_async_sleep, 1, fixup_async_sleep,
                0, REQUEST_ROUTE|FAILURE_ROUTE},
+       {"async_ms_sleep", (cmd_function)w_async_ms_sleep, 1, fixup_async_sleep,
+               0, REQUEST_ROUTE|FAILURE_ROUTE},
        {"async_task_route", (cmd_function)w_async_task_route, 1, fixup_async_task_route,
                0, REQUEST_ROUTE|FAILURE_ROUTE},
        {0, 0, 0, 0, 0, 0}
@@ -69,6 +78,7 @@ static cmd_export_t cmds[]={
 
 static param_export_t params[]={
        {"workers",     INT_PARAM,   &async_workers},
+       {"ms_timer",    INT_PARAM,   &async_ms_timer},
        {0, 0, 0}
 };
 
@@ -105,7 +115,17 @@ static int mod_init(void)
                return -1;
        }
 
-       register_basic_timers(async_workers);
+        if(async_ms_timer == 0) {
+                LM_INFO("ms_timer is set to 0. Disabling async_ms_sleep and async_ms_route functions\n");
+        } else {
+               if(async_init_ms_timer_list() < 0) {
+                       LM_ERR("cannot initialize internal structure\n");
+                       return -1;
+               }
+                LM_INFO("Enabled async_ms_sleep and async_ms_route functions with resolution of %dms\n", async_ms_timer);
+       }
+
+       register_basic_timers(async_workers + (async_ms_timer > 0));
 
        return 0;
 }
@@ -131,6 +151,13 @@ static int child_init(int rank)
                        return -1; /* error */
                }
        }
+       
+       if((async_ms_timer > 0) && fork_basic_utimer(PROC_TIMER, "ASYNC MOD MILLI TIMER SINGLETON", 1 /*socks flag*/,
+                          async_mstimer_exec, NULL, 1000 * async_ms_timer /*milliseconds*/)
+                       < 0) {
+               LM_ERR("failed to register millisecond timer singleton as process (%d)\n", i);
+               return -1; /* error */
+       }
 
        return 0;
 }
@@ -141,6 +168,7 @@ static int child_init(int rank)
 static void mod_destroy(void)
 {
        async_destroy_timer_list();
+       async_destroy_ms_timer_list();
 }
 
 /**
@@ -183,6 +211,46 @@ static int w_async_sleep(sip_msg_t *msg, char *sec, char *str2)
        return -1;
 }
 
+/**
+ *
+ */
+static int w_async_ms_sleep(sip_msg_t *msg, char *sec, char *str2)
+{
+       int s;
+       async_param_t *ap;
+
+       if(msg == NULL)
+               return -1;
+
+       if(faked_msg_match(msg)) {
+               LM_ERR("invalid usage for faked message\n");
+               return -1;
+       }
+
+       if(async_workers <= 0) {
+               LM_ERR("no async mod timer workers (modparam missing?)\n");
+               return -1;
+       }
+
+       ap = (async_param_t *)sec;
+       if(fixup_get_ivalue(msg, ap->pinterval, &s) != 0) {
+               LM_ERR("no async sleep time value\n");
+               return -1;
+       }
+       if(ap->type == 0) {
+               if(ap->u.paction == NULL || ap->u.paction->next == NULL) {
+                       LM_ERR("cannot be executed as last action in a route block\n");
+                       return -1;
+               }
+               if(async_ms_sleep(msg, s, ap->u.paction->next, NULL) < 0)
+                       return -1;
+               /* force exit in config */
+               return 0;
+       }
+
+       return -1;
+}
+
 /**
  *
  */
@@ -243,6 +311,42 @@ int ki_async_route(sip_msg_t *msg, str *rn, int s)
        return 0;
 }
 
+/**
+ *
+ */
+int ki_async_ms_route(sip_msg_t *msg, str *rn, int s)
+{
+       cfg_action_t *act = NULL;
+       int ri;
+       sr_kemi_eng_t *keng = NULL;
+
+       if(faked_msg_match(msg)) {
+               LM_ERR("invalid usage for faked message\n");
+               return -1;
+       }
+
+       keng = sr_kemi_eng_get();
+       if(keng == NULL) {
+               ri = route_lookup(&main_rt, rn->s);
+               if(ri >= 0) {
+                       act = main_rt.rlist[ri];
+                       if(act == NULL) {
+                               LM_ERR("empty action lists in route block [%.*s]\n", rn->len,
+                                               rn->s);
+                               return -1;
+                       }
+               } else {
+                       LM_ERR("route block not found: %.*s\n", rn->len, rn->s);
+                       return -1;
+               }
+       }
+
+       if(async_ms_sleep(msg, s, act, rn) < 0)
+               return -1;
+       /* force exit in config */
+       return 0;
+}
+
 /**
  *
  */
@@ -271,6 +375,34 @@ static int w_async_route(sip_msg_t *msg, char *rt, char *sec)
        return ki_async_route(msg, &rn, s);
 }
 
+/**
+ *
+ */
+static int w_async_ms_route(sip_msg_t *msg, char *rt, char *sec)
+{
+       int s;
+       str rn;
+
+       if(msg == NULL)
+               return -1;
+
+       if(async_workers <= 0) {
+               LM_ERR("no async mod timer workers\n");
+               return -1;
+       }
+
+       if(fixup_get_svalue(msg, (gparam_t *)rt, &rn) != 0) {
+               LM_ERR("no async route block name\n");
+               return -1;
+       }
+
+       if(fixup_get_ivalue(msg, (gparam_t *)sec, &s) != 0) {
+               LM_ERR("no async interval value\n");
+               return -1;
+       }
+       return ki_async_route(msg, &rn, s);
+}
+
 /**
  *
  */
index 097237f..ff1da14 100644 (file)
 extern struct tm_binds tmb;
 
 /* clang-format off */
+typedef struct async_task_param {
+       unsigned int tindex;
+       unsigned int tlabel;
+       cfg_action_t *ract;
+       char cbname[ASYNC_CBNAME_SIZE];
+       int cbname_len;
+} async_task_param_t;
+
 typedef struct async_item {
        unsigned int tindex;
        unsigned int tlabel;
@@ -51,6 +59,12 @@ typedef struct async_item {
        struct async_item *next;
 } async_item_t;
 
+typedef struct async_ms_item {
+       async_task_t *at;
+       struct timeval due;
+       struct async_ms_item *next;
+} async_ms_item_t;
+
 typedef struct async_slot {
        async_item_t *lstart;
        async_item_t *lend;
@@ -58,6 +72,15 @@ typedef struct async_slot {
 } async_slot_t;
 
 #define ASYNC_RING_SIZE        100
+#define MAX_MS_SLEEP 30*1000
+#define MAX_MS_SLEEP_QUEUE 10000
+
+static struct async_ms_list {
+       async_ms_item_t *lstart;
+       async_ms_item_t *lend;
+       int     len;
+       gen_lock_t lock;
+} *_async_ms_list = NULL;
 
 static struct async_list_head {
        async_slot_t ring[ASYNC_RING_SIZE];
@@ -95,6 +118,32 @@ int async_init_timer_list(void)
        return 0;
 }
 
+int async_init_ms_timer_list(void)
+{
+       _async_ms_list = (struct async_ms_list *)shm_malloc(
+                       sizeof(struct async_ms_list));
+       if(_async_ms_list == NULL) {
+               LM_ERR("no more shm\n");
+               return -1;
+       }
+       memset(_async_ms_list, 0, sizeof(struct async_ms_list));
+       if(lock_init(&_async_ms_list->lock) == 0) {
+               LM_ERR("cannot init lock \n");
+               shm_free(_async_ms_list);
+               _async_ms_list = 0;
+               return -1;
+       }
+       return 0;
+}
+
+int async_destroy_ms_timer_list(void)
+{      
+       if (_async_ms_list) {
+               lock_destroy(&_async_ms_list->lock);
+       }
+       return 0;
+}
+
 int async_destroy_timer_list(void)
 {
        int i;
@@ -109,6 +158,45 @@ int async_destroy_timer_list(void)
        return 0;
 }
 
+int async_insert_item(async_ms_item_t *ai) 
+{
+       struct timeval *due = &ai->due;
+       
+       if (unlikely(_async_ms_list == NULL))
+               return -1;
+       lock_get(&_async_ms_list->lock);
+       // Check if we want to insert in front
+       if (_async_ms_list->lstart == NULL || timercmp(due, &_async_ms_list->lstart->due, <=)) {
+               ai->next = _async_ms_list->lstart;
+               _async_ms_list->lstart = ai;
+               if (_async_ms_list->lend == NULL)
+                       _async_ms_list->lend = ai;
+       } else {
+               // Check if we want to add to the tail
+               if (_async_ms_list->lend && timercmp(due, &_async_ms_list->lend->due, >)) {
+                       _async_ms_list->lend->next = ai;
+                       _async_ms_list->lend = ai;
+               } else {
+                       async_ms_item_t *aip;
+                       // Find the place to insert into a sorted timer list
+                       // Most likely head && tail scanarios are covered above
+                       int i = 1;
+                       for (aip = _async_ms_list->lstart; aip->next; aip = aip->next, i++) {
+                               if (timercmp(due, &aip->next->due, <=)) {
+                                       ai->next = aip->next;
+                                       aip->next = ai;
+                                       break;
+                               }
+                       }
+               }
+       }
+       _async_ms_list->len++;
+       lock_release(&_async_ms_list->lock);
+       return 0;       
+}
+
+
+
 int async_sleep(sip_msg_t *msg, int seconds, cfg_action_t *act, str *cbname)
 {
        int slot;
@@ -209,13 +297,37 @@ void async_timer_exec(unsigned int ticks, void *param)
        }
 }
 
-typedef struct async_task_param {
-       unsigned int tindex;
-       unsigned int tlabel;
-       cfg_action_t *ract;
-       char cbname[ASYNC_CBNAME_SIZE];
-       int cbname_len;
-} async_task_param_t;
+void async_mstimer_exec(unsigned int ticks, void *param)
+{
+       struct timeval now;
+       gettimeofday(&now, NULL);
+
+       if (_async_ms_list == NULL)
+               return;
+       lock_get(&_async_ms_list->lock);
+       
+       async_ms_item_t *aip, *next;
+       int i = 0;
+       for (aip = _async_ms_list->lstart; aip; aip = next, i++) {
+               next = aip->next;
+               if (timercmp(&now, &aip->due, >=)) {
+                       if ((_async_ms_list->lstart = next) == NULL) 
+                               _async_ms_list->lend = NULL;
+                       if (async_task_push(aip->at)<0) {
+                               shm_free(aip->at);
+                       }
+                       _async_ms_list->len--;
+                       continue;
+               }
+               break;
+       }
+
+       lock_release(&_async_ms_list->lock);
+       
+       return;
+
+}
+
 
 /**
  *
@@ -246,6 +358,90 @@ void async_exec_task(void *param)
        /* param is freed along with the async task strucutre in core */
 }
 
+int async_ms_sleep(sip_msg_t *msg, int milliseconds, cfg_action_t *act, str *cbname)
+{
+       async_ms_item_t *ai;
+       int dsize;
+       tm_cell_t *t = 0;
+       unsigned int tindex;
+       unsigned int tlabel;
+       async_task_param_t *atp;
+       async_task_t *at;
+
+       if(milliseconds <= 0) {
+               LM_ERR("negative or zero sleep time (%d)\n", milliseconds);
+               return -1;
+       }
+       if(milliseconds >= MAX_MS_SLEEP) {
+               LM_ERR("max sleep time is %d msec\n", MAX_MS_SLEEP);
+               return -1;
+       }
+       if(_async_ms_list->len >= MAX_MS_SLEEP_QUEUE) {
+               LM_ERR("max sleep queue length exceeded (%d) \n", MAX_MS_SLEEP_QUEUE);
+               return -1;
+       }
+       if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) {
+               LM_ERR("callback name is too long: %.*s\n", cbname->len, cbname->s);
+               return -1;
+       }
+       dsize = sizeof(async_task_t) + sizeof(async_task_param_t) + sizeof(async_ms_item_t);
+
+       at = (async_task_t *)shm_malloc(dsize);
+       if(at == NULL) {
+               LM_ERR("no more shm memory\n");
+               return -1;
+       }
+       memset(at, 0, dsize);
+       at->param = (char *)at + sizeof(async_task_t);
+       atp = (async_task_param_t *)at->param;
+       ai = (async_ms_item_t *) ((char *)at +  sizeof(async_task_t) + sizeof(async_task_param_t));
+       ai->at = at;
+
+       if(cbname && cbname->len>=ASYNC_CBNAME_SIZE-1) {
+               LM_ERR("callback name is too long: %.*s\n", cbname->len, cbname->s);
+               return -1;
+       }
+
+       t = tmb.t_gett();
+       if(t == NULL || t == T_UNDEFINED) {
+               if(tmb.t_newtran(msg) < 0) {
+                       LM_ERR("cannot create the transaction\n");
+                       return -1;
+               }
+               t = tmb.t_gett();
+               if(t == NULL || t == T_UNDEFINED) {
+                       LM_ERR("cannot lookup the transaction\n");
+                       return -1;
+               }
+       }
+       
+       if(tmb.t_suspend(msg, &tindex, &tlabel) < 0) {
+               LM_ERR("failed to suspend the processing\n");
+               shm_free(ai);
+               return -1;
+       }
+       at->exec = async_exec_task;
+       at->param = atp;
+       atp->ract = act;
+       atp->tindex = tindex;
+       atp->tlabel = tlabel;
+       if(cbname && cbname->len>0) {
+               memcpy(atp->cbname, cbname->s, cbname->len);
+               atp->cbname[cbname->len] = '\0';
+               atp->cbname_len = cbname->len;
+       }
+       
+       struct timeval now, upause;
+       gettimeofday(&now, NULL);
+       upause.tv_sec = milliseconds / 1000; 
+       upause.tv_usec = (milliseconds * 1000) % 1000000;
+       
+       timeradd(&now, &upause, &ai->due);      
+       async_insert_item(ai);
+
+       return 0;
+}
+
 /**
  *
  */
index 9fac192..7b5d27f 100644 (file)
@@ -39,13 +39,15 @@ typedef struct async_param {
 /* clang-format on */
 
 int async_init_timer_list(void);
-
 int async_destroy_timer_list(void);
-
 int async_sleep(sip_msg_t *msg, int seconds, cfg_action_t *act, str *cbname);
-
 void async_timer_exec(unsigned int ticks, void *param);
 
+int async_init_ms_timer_list(void);
+int async_destroy_ms_timer_list(void);
+int async_ms_sleep(sip_msg_t *msg, int milliseconds, cfg_action_t *act, str *cbname);
+void async_mstimer_exec(unsigned int ticks, void *param);
+
 int async_send_task(sip_msg_t *msg, cfg_action_t *act, str *cbname);
 
 #endif
index d829679..c07beab 100644 (file)
 ...
 modparam("async", "workers", 2)
 ...
+</programlisting>
+               </example>
+       </section>
+       <section>
+               <title><varname>ms_timer</varname> (int)</title>
+               <para>
+                       Enables millisecond timer for async_ms_sleep() and async_ms_route() functions.
+                       The integer value is the timer resolution in milliseconds.
+                       ms_timer = 1 enables 1 millisecond timer but generates higher load on the system.
+                       ms_timer = 20 enables 20 ms timer. 
+               </para>
+               <para>
+               <emphasis>
+                       Default value is 0.
+               </emphasis>
+               </para>
+               <example>
+               <title>Set <varname>ms_timer</varname> parameter</title>
+               <programlisting format="linespecific">
+...
+modparam("async", "ms_timer", 1)
+...
 </programlisting>
                </example>
        </section>
@@ -134,6 +156,52 @@ route[RESUME] {
    exit;
 }
 ...
+</programlisting>
+           </example>
+       </section>
+       <section id="async.f.async_ms_route">
+           <title>
+               <function moreinfo="none">async_ms_route(routename, milliseconds)</function>
+           </title>
+           <para>
+               Simulate a sleep of 'milliseconds' and then continue the processing of the SIP
+               request with the route[routename]. In case of internal errors, the
+               function returns false, otherwise the function exits the execution of
+               the script at that moment (return 0 behaviour).
+               This function works only if the ms_timer parameter has a value greater then 0.
+               </para>
+               <para>
+               The routename parameter can be a static string or a dynamic string
+               value with config variables.
+               </para>
+               <para>
+               The sleep parameter represent the number of milliseconds to suspend the
+               processing of a SIP request. Maximum value is 30000 (30 sec). The parameter can be
+               a static integer or a variable holding an integer.
+               </para>
+               <para>
+               Since the SIP request handling is resumed in another process,
+               the config file execution state is practically lost. Therefore beware
+               that the execution of config after resume will end once the
+               route[routename] is finished.
+               </para>
+               <para>
+               This function can be used from REQUEST_ROUTE.
+               </para>
+               <example>
+               <title><function>async_ms_route</function> usage</title>
+               <programlisting format="linespecific">
+...
+request_route {
+    ...
+    async_ms_route("RESUME", "250");
+    ...
+}
+route[RESUME] {
+   send_reply("404", "Not found");
+   exit;
+}
+...
 </programlisting>
            </example>
        </section>
@@ -167,6 +235,51 @@ exit;
            </example>
        </section>
 
+       <section id="async.f.async_ms_sleep">
+           <title>
+               <function moreinfo="none">async_ms_sleep(milliseconds)</function>
+           </title>
+           <para>
+               Simulate a sleep of 'milliseconds' and then continue the processing of SIP
+               request with the next action. In case of internal errors, the function
+               returns false.
+               This function works only if the ms_timer parameter has a value greater then 0.
+               </para>
+               <para>
+               The sleep parameter represent the number of milliseconds to suspend the
+               processing of SIP request. Maximum value is 30000 (30 sec). The parameter can be
+               a static integer or a variable holding an integer.
+               </para>
+               <para>
+               This function can be used from REQUEST_ROUTE.
+               </para>
+               <example>
+               <title><function>async_ms_sleep</function> usage</title>
+               <programlisting format="linespecific">
+...
+route[REQUESTSHAPER] {
+        $var(res) = http_connect("leakybucket", 
+                                "/add?key=$fd", $null, $null,"$avp(delay)");
+        $var(d) = $(avp(delay){s.int});
+       if ($var(d) > 0) {
+               # Delay the request by $avp(delay) ms
+               async_ms_sleep("$var(d)");
+               if (!t_relay()) {
+                       sl_reply_error();
+               }
+               exit;
+       } 
+       # No delay
+       if (!t_relay()) {
+               sl_reply_error();
+       }
+        exit;
+}
+...
+</programlisting>
+           </example>
+       </section>
+
        <section id="async.f.async_task_route">
            <title>
                <function moreinfo="none">async_task_route(routename)</function>