core: explicit cast to (void*) for %p format printing
[sip-router] / src / core / async_task.c
1 /*
2  * Copyright (C) 2014 Daniel-Constantin Mierla (asipto.com)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 /*!
19 * \file
20 * \brief Kamailio core :: Asynchronus tasks
21 * \ingroup core
22 * Module: \ref core
23 */
24
25 #include <stdio.h>
26 #include <unistd.h>
27 #include <stdlib.h>
28 #include <string.h>
29
30 #include <sys/socket.h>
31 #include <sys/types.h>
32 #include <sys/un.h>
33 #include <netinet/in.h>
34 #include <arpa/inet.h>
35 #include <fcntl.h>
36 #include <errno.h>
37
38 #include "dprint.h"
39 #include "sr_module.h"
40 #include "ut.h"
41 #include "pt.h"
42 #include "cfg/cfg_struct.h"
43
44
45 #include "async_task.h"
46
47 static int _async_task_workers = 0;
48 static int _async_task_sockets[2];
49 static int _async_task_usleep = 0;
50 static int _async_nonblock = 0;
51
52 int async_task_run(int idx);
53
54 /**
55  *
56  */
57 int async_task_workers_get(void)
58 {
59         return _async_task_workers;
60 }
61
62 /**
63  *
64  */
65 int async_task_workers_active(void)
66 {
67         if(_async_task_workers<=0)
68                 return 0;
69
70         return 1;
71 }
72
73 /**
74  *
75  */
76 int async_task_init_sockets(void)
77 {
78         int val;
79
80         if (socketpair(PF_UNIX, SOCK_DGRAM, 0, _async_task_sockets) < 0) {
81                 LM_ERR("opening tasks dgram socket pair\n");
82                 return -1;
83         }
84
85         if (_async_nonblock) {
86                 val = fcntl(_async_task_sockets[1], F_GETFL, 0);
87                 if(val<0) {
88                         LM_WARN("failed to get socket flags\n");
89                 } else {
90                         if(fcntl(_async_task_sockets[1], F_SETFL, val | O_NONBLOCK)<0) {
91                                 LM_WARN("failed to set socket nonblock flag\n");
92                         }
93                 }
94         }
95
96         LM_DBG("inter-process event notification sockets initialized\n");
97         return 0;
98 }
99
100 /**
101  *
102  */
103 void async_task_close_sockets_child(void)
104 {
105         LM_DBG("closing the notification socket used by children\n");
106         close(_async_task_sockets[1]);
107 }
108
109 /**
110  *
111  */
112 void async_task_close_sockets_parent(void)
113 {
114         LM_DBG("closing the notification socket used by parent\n");
115         close(_async_task_sockets[0]);
116 }
117
118 /**
119  *
120  */
121 int async_task_init(void)
122 {
123         LM_DBG("start initializing asynk task framework\n");
124         if(_async_task_workers<=0)
125                 return 0;
126
127         /* advertise new processes to core */
128         register_procs(_async_task_workers);
129
130         /* advertise new processes to cfg framework */
131         cfg_register_child(_async_task_workers);
132
133         return 0;
134 }
135
136 /**
137  *
138  */
139 int async_task_initialized(void)
140 {
141         if(_async_task_workers<=0)
142                 return 0;
143         return 1;
144 }
145
146 /**
147  *
148  */
149 int async_task_child_init(int rank)
150 {
151         int pid;
152         int i;
153
154         if(_async_task_workers<=0)
155                 return 0;
156
157         LM_DBG("child initializing asynk task framework\n");
158
159         if (rank==PROC_INIT) {
160                 if(async_task_init_sockets()<0) {
161                         LM_ERR("failed to initialize tasks sockets\n");
162                         return -1;
163                 }
164                 return 0;
165         }
166
167         if(rank>0) {
168                 async_task_close_sockets_parent();
169                 return 0;
170         }
171         if (rank!=PROC_MAIN)
172                 return 0;
173
174         for(i=0; i<_async_task_workers; i++) {
175                 pid=fork_process(PROC_RPC, "Async Task Worker", 1);
176                 if (pid<0)
177                         return -1; /* error */
178                 if(pid==0) {
179                         /* child */
180
181                         /* initialize the config framework */
182                         if (cfg_child_init())
183                                 return -1;
184                         /* main function for workers */
185                         if(async_task_run(i+1)<0) {
186                                 LM_ERR("failed to initialize task worker process: %d\n", i);
187                                 return -1;
188                         }
189                 }
190         }
191
192         return 0;
193 }
194
195 /**
196  *
197  */
198 int async_task_set_workers(int n)
199 {
200         if(_async_task_workers>0) {
201                 LM_WARN("task workers already set\n");
202                 return 0;
203         }
204         if(n<=0)
205                 return 0;
206
207         _async_task_workers = n;
208
209         return 0;
210 }
211
212 /**
213  *
214  */
215 int async_task_set_nonblock(int n)
216 {
217         if(n>0)
218                 _async_nonblock = 1;
219
220         return 0;
221 }
222
223 /**
224  *
225  */
226 int async_task_set_usleep(int n)
227 {
228         int v;
229
230         v = _async_task_usleep;
231         _async_task_usleep = n;
232
233         return v;
234 }
235
236 /**
237  *
238  */
239 int async_task_push(async_task_t *task)
240 {
241         int len;
242
243         if(_async_task_workers<=0) {
244                 LM_WARN("async task pushed, but no async workers - ignoring\n");
245                 return 0;
246         }
247
248         len = write(_async_task_sockets[1], &task, sizeof(async_task_t*));
249         if(len<=0) {
250                 LM_ERR("failed to pass the task to asynk workers\n");
251                 return -1;
252         }
253         LM_DBG("task sent [%p]\n", task);
254         return 0;
255 }
256
257 /**
258  *
259  */
260 int async_task_run(int idx)
261 {
262         async_task_t *ptask;
263         int received;
264
265         LM_DBG("async task worker %d ready\n", idx);
266
267         for( ; ; ) {
268                 if(unlikely(_async_task_usleep)) sleep_us(_async_task_usleep);
269                 if ((received = recvfrom(_async_task_sockets[0],
270                                                         &ptask, sizeof(async_task_t*),
271                                                         0, NULL, 0)) < 0) {
272                         LM_ERR("failed to received task (%d: %s)\n", errno, strerror(errno));
273                         continue;
274                 }
275                 if(received != sizeof(async_task_t*)) {
276                         LM_ERR("invalid task size %d\n", received);
277                         continue;
278                 }
279                 if(ptask->exec!=NULL) {
280                         LM_DBG("task executed [%p] (%p/%p)\n", (void*)ptask,
281                                         (void*)ptask->exec, (void*)ptask->param);
282                         ptask->exec(ptask->param);
283                 }
284                 shm_free(ptask);
285         }
286
287         return 0;
288 }