Merge pull request #2013 from surendratiwari3/no_evapi_client_send_failed
[sip-router] / src / modules / evapi / evapi_dispatch.c
1 /**
2  * Copyright (C) 2014 Daniel-Constantin Mierla (asipto.com)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * This file is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  *
12  * This file is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20  *
21  */
22
23 #include <stdio.h>
24 #include <unistd.h>
25 #include <stdlib.h>
26 #include <string.h>
27
28 #include <sys/socket.h>
29 #include <sys/types.h>
30 #include <netinet/in.h>
31 #include <arpa/inet.h>
32 #include <fcntl.h>
33
34 #include <ev.h>
35
36 #include "../../core/sr_module.h"
37 #include "../../core/dprint.h"
38 #include "../../core/ut.h"
39 #include "../../core/cfg/cfg_struct.h"
40 #include "../../core/kemi.h"
41 #include "../../core/fmsg.h"
42
43 #include "evapi_dispatch.h"
44
45 static int _evapi_notify_sockets[2];
46 static int _evapi_netstring_format = 1;
47
48 extern str _evapi_event_callback;
49 extern int _evapi_dispatcher_pid;
50 extern int _evapi_max_clients;
51
52 #define EVAPI_IPADDR_SIZE       64
53 #define EVAPI_TAG_SIZE  64
54 #define CLIENT_BUFFER_SIZE      32768
55 typedef struct _evapi_client {
56         int connected;
57         int sock;
58         unsigned short af;
59         unsigned short src_port;
60         char src_addr[EVAPI_IPADDR_SIZE];
61         char tag[EVAPI_IPADDR_SIZE];
62         str  stag;
63         char rbuffer[CLIENT_BUFFER_SIZE];
64         unsigned int rpos;
65 } evapi_client_t;
66
67 typedef struct _evapi_env {
68         int eset;
69         int conidx;
70         str msg;
71 } evapi_env_t;
72
73 typedef struct _evapi_msg {
74         str data;
75         str tag;
76         int unicast;
77 } evapi_msg_t;
78
79 #define EVAPI_MAX_CLIENTS       _evapi_max_clients
80
81 /* last one used for error handling, not a real connected client */
82 static evapi_client_t *_evapi_clients = NULL;
83
84 typedef struct _evapi_evroutes {
85         int con_new;
86         str con_new_name;
87         int con_closed;
88         str con_closed_name;
89         int msg_received;
90         str msg_received_name;
91 } evapi_evroutes_t;
92
93 static evapi_evroutes_t _evapi_rts;
94
95 /**
96  *
97  */
98 void evapi_env_reset(evapi_env_t *evenv)
99 {
100         if(evenv==0)
101                 return;
102         memset(evenv, 0, sizeof(evapi_env_t));
103         evenv->conidx = -1;
104 }
105
106 /**
107  *
108  */
109 void evapi_init_environment(int dformat)
110 {
111         memset(&_evapi_rts, 0, sizeof(evapi_evroutes_t));
112
113         _evapi_rts.con_new_name.s = "evapi:connection-new";
114         _evapi_rts.con_new_name.len = strlen(_evapi_rts.con_new_name.s);
115         _evapi_rts.con_new = route_lookup(&event_rt, "evapi:connection-new");
116         if (_evapi_rts.con_new < 0 || event_rt.rlist[_evapi_rts.con_new] == NULL)
117                 _evapi_rts.con_new = -1;
118
119         _evapi_rts.con_closed_name.s = "evapi:connection-closed";
120         _evapi_rts.con_closed_name.len = strlen(_evapi_rts.con_closed_name.s);
121         _evapi_rts.con_closed = route_lookup(&event_rt, "evapi:connection-closed");
122         if (_evapi_rts.con_closed < 0 || event_rt.rlist[_evapi_rts.con_closed] == NULL)
123                 _evapi_rts.con_closed = -1;
124
125         _evapi_rts.msg_received_name.s = "evapi:message-received";
126         _evapi_rts.msg_received_name.len = strlen(_evapi_rts.msg_received_name.s);
127         _evapi_rts.msg_received = route_lookup(&event_rt, "evapi:message-received");
128         if (_evapi_rts.msg_received < 0 || event_rt.rlist[_evapi_rts.msg_received] == NULL)
129                 _evapi_rts.msg_received = -1;
130
131         _evapi_netstring_format = dformat;
132 }
133
134 /**
135  *
136  */
137 int evapi_run_cfg_route(evapi_env_t *evenv, int rt, str *rtname)
138 {
139         int backup_rt;
140         struct run_act_ctx ctx;
141         sip_msg_t *fmsg;
142         sip_msg_t tmsg;
143         sr_kemi_eng_t *keng = NULL;
144
145         if(evenv==0 || evenv->eset==0) {
146                 LM_ERR("evapi env not set\n");
147                 return -1;
148         }
149
150         if((rt<0) && (_evapi_event_callback.s==NULL || _evapi_event_callback.len<=0))
151                 return 0;
152
153         fmsg = faked_msg_next();
154         memcpy(&tmsg, fmsg, sizeof(sip_msg_t));
155         fmsg = &tmsg;
156         evapi_set_msg_env(fmsg, evenv);
157         backup_rt = get_route_type();
158         set_route_type(EVENT_ROUTE);
159         init_run_actions_ctx(&ctx);
160         if(rt>=0) {
161                 run_top_route(event_rt.rlist[rt], fmsg, 0);
162         } else {
163                 keng = sr_kemi_eng_get();
164                 if(keng!=NULL) {
165                         if(sr_kemi_route(keng, fmsg, EVENT_ROUTE,
166                                                 &_evapi_event_callback, rtname)<0) {
167                                 LM_ERR("error running event route kemi callback\n");
168                         }
169                 }
170         }
171         set_route_type(backup_rt);
172         evapi_set_msg_env(fmsg, NULL);
173         return 0;
174 }
175
176 /**
177  *
178  */
179 int evapi_close_connection(int cidx)
180 {
181         if(cidx<0 || cidx>=EVAPI_MAX_CLIENTS || _evapi_clients==NULL)
182                 return -1;
183         if(_evapi_clients[cidx].connected==1
184                         && _evapi_clients[cidx].sock >= 0) {
185                 close(_evapi_clients[cidx].sock);
186                 _evapi_clients[cidx].connected = 0;
187                 _evapi_clients[cidx].sock = -1;
188                 return 0;
189         }
190         return -2;
191 }
192
193 /**
194  *
195  */
196 int evapi_cfg_close(sip_msg_t *msg)
197 {
198         evapi_env_t *evenv;
199
200         if(msg==NULL)
201                 return -1;
202
203         evenv = evapi_get_msg_env(msg);
204
205         if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
206                 return -1;
207         return evapi_close_connection(evenv->conidx);
208 }
209
210 /**
211  *
212  */
213 int evapi_set_tag(sip_msg_t* msg, str* stag)
214 {
215         evapi_env_t *evenv;
216
217         if(msg==NULL || stag==NULL || _evapi_clients==NULL)
218                 return -1;
219
220         evenv = evapi_get_msg_env(msg);
221
222         if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
223                 return -1;
224
225         if(!(_evapi_clients[evenv->conidx].connected==1
226                         && _evapi_clients[evenv->conidx].sock >= 0)) {
227                 LM_ERR("connection not established\n");
228                 return -1;
229         }
230
231         if(stag->len>=EVAPI_TAG_SIZE) {
232                 LM_ERR("tag size too big: %d / %d\n", stag->len, EVAPI_TAG_SIZE);
233                 return -1;
234         }
235         _evapi_clients[evenv->conidx].stag.s = _evapi_clients[evenv->conidx].tag;
236         strncpy(_evapi_clients[evenv->conidx].stag.s, stag->s, stag->len);
237         _evapi_clients[evenv->conidx].stag.s[stag->len] = '\0';
238         _evapi_clients[evenv->conidx].stag.len = stag->len;
239         return 1;
240 }
241
242 /**
243  *
244  */
245 int evapi_init_notify_sockets(void)
246 {
247         if (socketpair(PF_UNIX, SOCK_STREAM, 0, _evapi_notify_sockets) < 0) {
248                 LM_ERR("opening notify stream socket pair\n");
249                 return -1;
250         }
251         LM_DBG("inter-process event notification sockets initialized: %d ~ %d\n",
252                         _evapi_notify_sockets[0], _evapi_notify_sockets[1]);
253         return 0;
254 }
255
256 /**
257  *
258  */
259 void evapi_close_notify_sockets_child(void)
260 {
261         LM_DBG("closing the notification socket used by children\n");
262         close(_evapi_notify_sockets[1]);
263         _evapi_notify_sockets[1] = -1;
264 }
265
266 /**
267  *
268  */
269 void evapi_close_notify_sockets_parent(void)
270 {
271         LM_DBG("closing the notification socket used by parent\n");
272         close(_evapi_notify_sockets[0]);
273         _evapi_notify_sockets[0] = -1;
274 }
275
276 /**
277  *
278  */
279 int evapi_dispatch_notify(evapi_msg_t *emsg)
280 {
281         int i;
282         int n;
283         int wlen;
284
285         if(_evapi_clients==NULL) {
286                 return 0;
287         }
288
289         n = 0;
290         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
291                 if(_evapi_clients[i].connected==1 && _evapi_clients[i].sock>=0) {
292                         if(emsg->tag.s==NULL || (emsg->tag.len == _evapi_clients[i].stag.len
293                                                 && strncmp(_evapi_clients[i].stag.s,
294                                                                         emsg->tag.s, emsg->tag.len)==0)) {
295                                 wlen = write(_evapi_clients[i].sock, emsg->data.s,
296                                                 emsg->data.len);
297                                 if(wlen!=emsg->data.len) {
298                                         LM_DBG("failed to write all packet (%d out of %d) on socket"
299                                                         " %d index [%d]\n",
300                                                         wlen, emsg->data.len, _evapi_clients[i].sock, i);
301                                 }
302                                 n++;
303                                 if (emsg->unicast){
304                                         break;
305                                 }
306                         }
307                 }
308         }
309
310         LM_DBG("the message was sent to %d clients\n", n);
311
312         return n;
313 }
314
315 /**
316  *
317  */
318 void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
319 {
320         ssize_t rlen;
321         int i, k;
322         evapi_env_t evenv;
323         str frame;
324         char *sfp;
325         char *efp;
326
327         if(EV_ERROR & revents) {
328                 LM_ERR("received invalid event (%d)\n", revents);
329                 return;
330         }
331         if(_evapi_clients==NULL) {
332                 LM_ERR("no client structures\n");
333                 return;
334         }
335
336         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
337                 if(_evapi_clients[i].connected==1 && _evapi_clients[i].sock==watcher->fd) {
338                         break;
339                 }
340         }
341         if(i==EVAPI_MAX_CLIENTS) {
342                 LM_ERR("cannot lookup client socket %d\n", watcher->fd);
343                 /* try to empty the socket anyhow */
344                 rlen = recv(watcher->fd, _evapi_clients[i].rbuffer, CLIENT_BUFFER_SIZE-1, 0);
345                 return;
346         }
347
348         /* read message from client */
349         rlen = recv(watcher->fd, _evapi_clients[i].rbuffer + _evapi_clients[i].rpos,
350                         CLIENT_BUFFER_SIZE - 1 - _evapi_clients[i].rpos, 0);
351
352         if(rlen < 0) {
353                 LM_ERR("cannot read the client message\n");
354                 _evapi_clients[i].rpos = 0;
355                 return;
356         }
357
358
359         cfg_update();
360
361         evapi_env_reset(&evenv);
362         if(rlen == 0) {
363                 /* client is gone */
364                 evenv.eset = 1;
365                 evenv.conidx = i;
366                 evapi_run_cfg_route(&evenv, _evapi_rts.con_closed,
367                                 &_evapi_rts.con_closed_name);
368                 _evapi_clients[i].connected = 0;
369                 if(_evapi_clients[i].sock>=0) {
370                         close(_evapi_clients[i].sock);
371                 }
372                 _evapi_clients[i].sock = -1;
373                 _evapi_clients[i].rpos = 0;
374                 ev_io_stop(loop, watcher);
375                 free(watcher);
376                 LM_INFO("client closing connection - pos [%d] addr [%s:%d]\n",
377                                 i, _evapi_clients[i].src_addr, _evapi_clients[i].src_port);
378                 return;
379         }
380
381         _evapi_clients[i].rbuffer[_evapi_clients[i].rpos+rlen] = '\0';
382
383         LM_DBG("{%d} [%s:%d] - received [%.*s] (%d) (%d)\n",
384                 i, _evapi_clients[i].src_addr, _evapi_clients[i].src_port,
385                 (int)rlen, _evapi_clients[i].rbuffer+_evapi_clients[i].rpos,
386                 (int)rlen, (int)_evapi_clients[i].rpos);
387         evenv.conidx = i;
388         evenv.eset = 1;
389         if(_evapi_netstring_format) {
390                 /* netstring decapsulation */
391                 k = 0;
392                 while(k<_evapi_clients[i].rpos+rlen) {
393                         frame.len = 0;
394                         while(k<_evapi_clients[i].rpos+rlen) {
395                                 if(_evapi_clients[i].rbuffer[k]==' '
396                                                 || _evapi_clients[i].rbuffer[k]=='\t'
397                                                 || _evapi_clients[i].rbuffer[k]=='\r'
398                                                 || _evapi_clients[i].rbuffer[k]=='\n')
399                                         k++;
400                                 else break;
401                         }
402                         if(k==_evapi_clients[i].rpos+rlen) {
403                                 _evapi_clients[i].rpos = 0;
404                                 LM_DBG("empty content\n");
405                                 return;
406                         }
407                         /* pointer to start of whole frame */
408                         sfp = _evapi_clients[i].rbuffer + k;
409                         while(k<_evapi_clients[i].rpos+rlen) {
410                                 if(_evapi_clients[i].rbuffer[k]>='0' && _evapi_clients[i].rbuffer[k]<='9') {
411                                         frame.len = frame.len*10 + _evapi_clients[i].rbuffer[k] - '0';
412                                 } else {
413                                         if(_evapi_clients[i].rbuffer[k]==':')
414                                                 break;
415                                         /* invalid character - discard the rest */
416                                         _evapi_clients[i].rpos = 0;
417                                         LM_DBG("invalid char when searching for size [%c] [%.*s] (%d) (%d)\n",
418                                                         _evapi_clients[i].rbuffer[k],
419                                                         (int)(_evapi_clients[i].rpos+rlen), _evapi_clients[i].rbuffer,
420                                                         (int)(_evapi_clients[i].rpos+rlen), k);
421                                         return;
422                                 }
423                                 k++;
424                         }
425                         if(k==_evapi_clients[i].rpos+rlen || frame.len<=0) {
426                                 LM_DBG("invalid frame len: %d kpos: %d rpos: %u rlen: %lu\n",
427                                                 frame.len, k, _evapi_clients[i].rpos, rlen);
428                                 _evapi_clients[i].rpos = 0;
429                                 return;
430                         }
431                         if(frame.len + k>=_evapi_clients[i].rpos + rlen) {
432                                 /* partial data - shift back in buffer and wait to read more */
433                                 efp = _evapi_clients[i].rbuffer + _evapi_clients[i].rpos + rlen;
434                                 if(efp<=sfp) {
435                                         _evapi_clients[i].rpos = 0;
436                                         LM_DBG("weird - invalid size for residual data\n");
437                                         return;
438                                 }
439                                 _evapi_clients[i].rpos = (unsigned int)(efp-sfp);
440                                 if(efp-sfp > sfp-_evapi_clients[i].rbuffer) {
441                                         memcpy(_evapi_clients[i].rbuffer, sfp, _evapi_clients[i].rpos);
442                                 } else {
443                                         for(k=0; k<_evapi_clients[i].rpos; k++) {
444                                                 _evapi_clients[i].rbuffer[k] = sfp[k];
445                                         }
446                                 }
447                                 LM_DBG("residual data [%.*s] (%d)\n",
448                                                 _evapi_clients[i].rpos, _evapi_clients[i].rbuffer,
449                                                 _evapi_clients[i].rpos);
450                                 return;
451                         }
452                         k++;
453                         frame.s = _evapi_clients[i].rbuffer + k;
454                         if(frame.s[frame.len]!=',') {
455                                 /* invalid data - discard and reset buffer */
456                                 LM_DBG("frame size mismatch the ending char (%c): [%.*s] (%d)\n",
457                                                 frame.s[frame.len], frame.len, frame.s, frame.len);
458                                 _evapi_clients[i].rpos = 0 ;
459                                 return;
460                         }
461                         frame.s[frame.len] = '\0';
462                         k += frame.len ;
463                         evenv.msg.s = frame.s;
464                         evenv.msg.len = frame.len;
465                         LM_DBG("executing event route for frame: [%.*s] (%d)\n",
466                                                 frame.len, frame.s, frame.len);
467                         evapi_run_cfg_route(&evenv, _evapi_rts.msg_received,
468                                         &_evapi_rts.msg_received_name);
469                         k++;
470                 }
471                 _evapi_clients[i].rpos = 0 ;
472         } else {
473                 evenv.msg.s = _evapi_clients[i].rbuffer;
474                 evenv.msg.len = rlen;
475                 evapi_run_cfg_route(&evenv, _evapi_rts.msg_received,
476                                 &_evapi_rts.msg_received_name);
477         }
478 }
479
480 /**
481  *
482  */
483 void evapi_accept_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
484 {
485         struct sockaddr caddr;
486         socklen_t clen = sizeof(caddr);
487         int csock;
488         struct ev_io *evapi_client;
489         int i;
490         evapi_env_t evenv;
491         int optval;
492         socklen_t optlen;
493
494         if(_evapi_clients==NULL) {
495                 LM_ERR("no client structures\n");
496                 return;
497         }
498         evapi_client = (struct ev_io*) malloc (sizeof(struct ev_io));
499         if(evapi_client==NULL) {
500                 LM_ERR("no more memory\n");
501                 return;
502         }
503
504         if(EV_ERROR & revents) {
505                 LM_ERR("received invalid event\n");
506                 free(evapi_client);
507                 return;
508         }
509
510         cfg_update();
511
512         /* accept new client connection */
513         csock = accept(watcher->fd, (struct sockaddr *)&caddr, &clen);
514
515         if (csock < 0) {
516                 LM_ERR("cannot accept the client '%s' err='%d'\n", gai_strerror(csock), csock);
517                 free(evapi_client);
518                 return;
519         }
520         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
521                 if(_evapi_clients[i].connected==0) {
522                         if (caddr.sa_family == AF_INET) {
523                                 _evapi_clients[i].src_port = ntohs(((struct sockaddr_in*)&caddr)->sin_port);
524                                 if(inet_ntop(AF_INET, &((struct sockaddr_in*)&caddr)->sin_addr,
525                                                         _evapi_clients[i].src_addr,
526                                                         EVAPI_IPADDR_SIZE)==NULL) {
527                                         LM_ERR("cannot convert ipv4 address\n");
528                                         close(csock);
529                                         free(evapi_client);
530                                         return;
531                                 }
532                         } else {
533                                 _evapi_clients[i].src_port = ntohs(((struct sockaddr_in6*)&caddr)->sin6_port);
534                                 if(inet_ntop(AF_INET6, &((struct sockaddr_in6*)&caddr)->sin6_addr,
535                                                         _evapi_clients[i].src_addr,
536                                                         EVAPI_IPADDR_SIZE)==NULL) {
537                                         LM_ERR("cannot convert ipv6 address\n");
538                                         close(csock);
539                                         free(evapi_client);
540                                         return;
541                                 }
542                         }
543                         optval = 1;
544                         optlen = sizeof(optval);
545                         if(setsockopt(csock, SOL_SOCKET, SO_KEEPALIVE,
546                                                 &optval, optlen) < 0) {
547                                 LM_WARN("failed to enable keepalive on socket %d\n", csock);
548                         }
549                         _evapi_clients[i].connected = 1;
550                         _evapi_clients[i].sock = csock;
551                         _evapi_clients[i].af = caddr.sa_family;
552                         break;
553                 }
554         }
555         if(i>=EVAPI_MAX_CLIENTS) {
556                 LM_ERR("too many clients\n");
557                 close(csock);
558                 free(evapi_client);
559                 return;
560         }
561
562         LM_DBG("new connection - pos[%d] from: [%s:%d]\n", i,
563                         _evapi_clients[i].src_addr, _evapi_clients[i].src_port);
564
565         evapi_env_reset(&evenv);
566         evenv.conidx = i;
567         evenv.eset = 1;
568         evapi_run_cfg_route(&evenv, _evapi_rts.con_new, &_evapi_rts.con_new_name);
569
570         if(_evapi_clients[i].connected == 0) {
571                 free(evapi_client);
572                 return;
573         }
574
575         /* start watcher to read messages from whatchers */
576         ev_io_init(evapi_client, evapi_recv_client, csock, EV_READ);
577         ev_io_start(loop, evapi_client);
578 }
579
580 /**
581  *
582  */
583 void evapi_recv_notify(struct ev_loop *loop, struct ev_io *watcher, int revents)
584 {
585         evapi_msg_t *emsg = NULL;
586         int rlen;
587
588         if(EV_ERROR & revents) {
589                 perror("received invalid event\n");
590                 return;
591         }
592
593         cfg_update();
594
595         /* read message from client */
596         rlen = read(watcher->fd, &emsg, sizeof(evapi_msg_t*));
597
598         if(rlen != sizeof(evapi_msg_t*) || emsg==NULL) {
599                 LM_ERR("cannot read the sip worker message\n");
600                 return;
601         }
602
603         LM_DBG("received [%p] [%.*s] (%d)\n", emsg,
604                         emsg->data.len, emsg->data.s, emsg->data.len);
605         evapi_dispatch_notify(emsg);
606         shm_free(emsg);
607 }
608
609 /**
610  *
611  */
612 int evapi_run_dispatcher(char *laddr, int lport)
613 {
614         int evapi_srv_sock;
615         struct sockaddr_in evapi_srv_addr;
616         struct ev_loop *loop;
617         struct hostent *h = NULL;
618         struct ev_io io_server;
619         struct ev_io io_notify;
620         int yes_true = 1;
621         int fflags = 0;
622         int i;
623
624         LM_DBG("starting dispatcher processing\n");
625
626         _evapi_clients = (evapi_client_t*)malloc(sizeof(evapi_client_t)
627                         * (EVAPI_MAX_CLIENTS+1));
628         if(_evapi_clients==NULL) {
629                 LM_ERR("failed to allocate client structures\n");
630                 exit(-1);
631         }
632         memset(_evapi_clients, 0, sizeof(evapi_client_t) * EVAPI_MAX_CLIENTS);
633         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
634                 _evapi_clients[i].sock = -1;
635         }
636         loop = ev_default_loop(0);
637
638         if(loop==NULL) {
639                 LM_ERR("cannot get libev loop\n");
640                 return -1;
641         }
642
643         h = gethostbyname(laddr);
644         if (h == NULL || (h->h_addrtype != AF_INET && h->h_addrtype != AF_INET6)) {
645                 LM_ERR("cannot resolve local server address [%s]\n", laddr);
646                 return -1;
647         }
648         if(h->h_addrtype == AF_INET) {
649                 evapi_srv_sock = socket(PF_INET, SOCK_STREAM, 0);
650         } else {
651                 evapi_srv_sock = socket(PF_INET6, SOCK_STREAM, 0);
652         }
653         if( evapi_srv_sock < 0 )
654         {
655                 LM_ERR("cannot create server socket (family %d)\n", h->h_addrtype);
656                 return -1;
657         }
658         /* set non-blocking flag */
659         fflags = fcntl(evapi_srv_sock, F_GETFL);
660         if(fflags<0) {
661                 LM_ERR("failed to get the srv socket flags\n");
662                 close(evapi_srv_sock);
663                 return -1;
664         }
665         if (fcntl(evapi_srv_sock, F_SETFL, fflags | O_NONBLOCK)<0) {
666                 LM_ERR("failed to set srv socket flags\n");
667                 close(evapi_srv_sock);
668                 return -1;
669         }
670
671         bzero(&evapi_srv_addr, sizeof(evapi_srv_addr));
672         evapi_srv_addr.sin_family = h->h_addrtype;
673         evapi_srv_addr.sin_port   = htons((short)lport);
674         evapi_srv_addr.sin_addr  = *(struct in_addr*)h->h_addr;
675
676         /* Set SO_REUSEADDR option on listening socket so that we don't
677          * have to wait for connections in TIME_WAIT to go away before
678          * re-binding.
679          */
680
681         if(setsockopt(evapi_srv_sock, SOL_SOCKET, SO_REUSEADDR,
682                 &yes_true, sizeof(int)) < 0) {
683                 LM_ERR("cannot set SO_REUSEADDR option on descriptor\n");
684                 close(evapi_srv_sock);
685                 return -1;
686         }
687
688         if (bind(evapi_srv_sock, (struct sockaddr*)&evapi_srv_addr,
689                                 sizeof(evapi_srv_addr)) < 0) {
690                 LM_ERR("cannot bind to local address and port [%s:%d]\n", laddr, lport);
691                 close(evapi_srv_sock);
692                 return -1;
693         }
694         if (listen(evapi_srv_sock, 4) < 0) {
695                 LM_ERR("listen error\n");
696                 close(evapi_srv_sock);
697                 return -1;
698         }
699         ev_io_init(&io_server, evapi_accept_client, evapi_srv_sock, EV_READ);
700         ev_io_start(loop, &io_server);
701         ev_io_init(&io_notify, evapi_recv_notify, _evapi_notify_sockets[0], EV_READ);
702         ev_io_start(loop, &io_notify);
703
704         while(1) {
705                 ev_loop (loop, 0);
706         }
707
708         return 0;
709 }
710
711 /**
712  *
713  */
714 int evapi_run_worker(int prank)
715 {
716         LM_DBG("started worker process: %d\n", prank);
717         while(1) {
718                 sleep(3);
719         }
720 }
721
722 /**
723  *
724  */
725 int _evapi_relay(str *evdata, str *ctag, int unicast)
726 {
727 #define EVAPI_RELAY_FORMAT "%d:%.*s,"
728
729         int len;
730         int sbsize;
731         int evapi_send_count;
732         evapi_msg_t *emsg;
733
734         LM_DBG("relaying event data [%.*s] (%d)\n",
735                         evdata->len, evdata->s, evdata->len);
736
737         sbsize = evdata->len;
738         len = sizeof(evapi_msg_t)
739                 + ((sbsize + 32 + ((ctag && ctag->len>0)?(ctag->len+2):0)) * sizeof(char));
740         emsg = (evapi_msg_t*)shm_malloc(len);
741         if(emsg==NULL) {
742                 LM_ERR("no more shared memory\n");
743                 return -1;
744         }
745         memset(emsg, 0, len);
746         emsg->data.s = (char*)emsg + sizeof(evapi_msg_t);
747         if(_evapi_netstring_format) {
748                 /* netstring encapsulation */
749                 emsg->data.len = snprintf(emsg->data.s, sbsize+32,
750                                 EVAPI_RELAY_FORMAT,
751                                 sbsize, evdata->len, evdata->s);
752         } else {
753                 emsg->data.len = snprintf(emsg->data.s, sbsize+32,
754                                 "%.*s",
755                                 evdata->len, evdata->s);
756         }
757         if(emsg->data.len<=0 || emsg->data.len>sbsize+32) {
758                 shm_free(emsg);
759                 LM_ERR("cannot serialize event\n");
760                 return -1;
761         }
762         if(ctag && ctag->len>0) {
763                 emsg->tag.s = emsg->data.s + sbsize + 32;
764                 strncpy(emsg->tag.s, ctag->s, ctag->len);
765                 emsg->tag.len = ctag->len;
766         }
767
768         if (unicast){
769                 emsg->unicast = unicast;
770         }
771
772         LM_DBG("sending [%p] [%.*s] (%d)\n", emsg, emsg->data.len, emsg->data.s,
773                         emsg->data.len);
774         if(_evapi_notify_sockets[1]!=-1) {
775                 len = write(_evapi_notify_sockets[1], &emsg, sizeof(evapi_msg_t*));
776                 if(len<=0) {
777                         shm_free(emsg);
778                         LM_ERR("failed to pass the pointer to evapi dispatcher\n");
779                         return -1;
780                 }
781         } else {
782                 cfg_update();
783                 LM_DBG("dispatching [%p] [%.*s] (%d)\n", emsg,
784                                 emsg->data.len, emsg->data.s, emsg->data.len);
785                 evapi_send_count = evapi_dispatch_notify(emsg);
786                 if (evapi_send_count == 0) {
787                         shm_free(emsg); 
788                         LM_ERR("no evapi client to send the message, failed to send message\n");
789                         return -1; 
790                 }
791                 shm_free(emsg);
792         }
793         return 0;
794 }
795
796 /**
797  *
798  */
799 int evapi_relay(str *evdata)
800 {
801         return _evapi_relay(evdata, NULL, 0);
802 }
803
804 /**
805  *
806  */
807 int evapi_relay_multicast(str *evdata, str *ctag){
808         return _evapi_relay(evdata, ctag, 0);
809 }
810
811 /**
812  *
813  */
814 int evapi_relay_unicast(str *evdata, str *ctag){
815         return _evapi_relay(evdata, ctag, 1);
816 }
817
818 #if 0
819 /**
820  *
821  */
822 int evapi_relay(str *event, str *data)
823 {
824 #define EVAPI_RELAY_FORMAT "%d:{\n \"event\":\"%.*s\",\n \"data\":%.*s\n},"
825
826         int len;
827         int sbsize;
828         str *sbuf;
829
830         LM_DBG("relaying event [%.*s] data [%.*s]\n",
831                         event->len, event->s, data->len, data->s);
832
833         sbsize = sizeof(EVAPI_RELAY_FORMAT) + event->len + data->len - 13;
834         sbuf = (str*)shm_malloc(sizeof(str) + ((sbsize+32) * sizeof(char)));
835         if(sbuf==NULL) {
836                 LM_ERR("no more shared memory\n");
837                 return -1;
838         }
839         sbuf->s = (char*)sbuf + sizeof(str);
840         sbuf->len = snprintf(sbuf->s, sbsize+32,
841                         EVAPI_RELAY_FORMAT,
842                         sbsize, event->len, event->s, data->len, data->s);
843         if(sbuf->len<=0 || sbuf->len>sbsize+32) {
844                 shm_free(sbuf);
845                 LM_ERR("cannot serialize event\n");
846                 return -1;
847         }
848
849         len = write(_evapi_notify_sockets[1], &sbuf, sizeof(str*));
850         if(len<=0) {
851                 LM_ERR("failed to pass the pointer to evapi dispatcher\n");
852                 return -1;
853         }
854         LM_DBG("sent [%p] [%.*s] (%d)\n", sbuf, sbuf->len, sbuf->s, sbuf->len);
855         return 0;
856 }
857 #endif
858
859 /**
860  *
861  */
862 int pv_parse_evapi_name(pv_spec_t *sp, str *in)
863 {
864         if(sp==NULL || in==NULL || in->len<=0)
865                 return -1;
866
867         switch(in->len)
868         {
869                 case 3:
870                         if(strncmp(in->s, "msg", 3)==0)
871                                 sp->pvp.pvn.u.isname.name.n = 1;
872                         else goto error;
873                 break;
874                 case 6:
875                         if(strncmp(in->s, "conidx", 6)==0)
876                                 sp->pvp.pvn.u.isname.name.n = 0;
877                         else goto error;
878                 break;
879                 case 7:
880                         if(strncmp(in->s, "srcaddr", 7)==0)
881                                 sp->pvp.pvn.u.isname.name.n = 2;
882                         else if(strncmp(in->s, "srcport", 7)==0)
883                                 sp->pvp.pvn.u.isname.name.n = 3;
884                         else goto error;
885                 break;
886                 default:
887                         goto error;
888         }
889         sp->pvp.pvn.type = PV_NAME_INTSTR;
890         sp->pvp.pvn.u.isname.type = 0;
891
892         return 0;
893
894 error:
895         LM_ERR("unknown PV msrp name %.*s\n", in->len, in->s);
896         return -1;
897 }
898
899 /**
900  *
901  */
902 int pv_get_evapi(sip_msg_t *msg, pv_param_t *param, pv_value_t *res)
903 {
904         evapi_env_t *evenv;
905
906         if(param==NULL || res==NULL)
907                 return -1;
908
909         if(_evapi_clients==NULL) {
910                 return pv_get_null(msg, param, res);
911         }
912         evenv = evapi_get_msg_env(msg);
913
914         if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
915                 return pv_get_null(msg, param, res);
916
917         if(_evapi_clients[evenv->conidx].connected==0
918                         && _evapi_clients[evenv->conidx].sock < 0)
919                 return pv_get_null(msg, param, res);
920
921         switch(param->pvn.u.isname.name.n)
922         {
923                 case 0:
924                         return pv_get_sintval(msg, param, res, evenv->conidx);
925                 case 1:
926                         if(evenv->msg.s==NULL)
927                                 return pv_get_null(msg, param, res);
928                         return pv_get_strval(msg, param, res, &evenv->msg);
929                 case 2:
930                         return pv_get_strzval(msg, param, res,
931                                         _evapi_clients[evenv->conidx].src_addr);
932                 case 3:
933                         return pv_get_sintval(msg, param, res,
934                                         _evapi_clients[evenv->conidx].src_port);
935                 default:
936                         return pv_get_null(msg, param, res);
937         }
938
939         return 0;
940 }
941
942 /**
943  *
944  */
945 int pv_set_evapi(sip_msg_t *msg, pv_param_t *param, int op,
946                 pv_value_t *val)
947 {
948         return 0;
949 }