ec56945b5e7fca6c96fbf296f43e18b6e2afc764
[sip-router] / src / modules / evapi / evapi_dispatch.c
1 /**
2  * Copyright (C) 2014 Daniel-Constantin Mierla (asipto.com)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * This file is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  *
12  * This file is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20  *
21  */
22
23 #include <stdio.h>
24 #include <unistd.h>
25 #include <stdlib.h>
26 #include <string.h>
27
28 #include <sys/socket.h>
29 #include <sys/types.h>
30 #include <netinet/in.h>
31 #include <arpa/inet.h>
32 #include <fcntl.h>
33
34 #include <ev.h>
35
36 #include "../../core/sr_module.h"
37 #include "../../core/dprint.h"
38 #include "../../core/ut.h"
39 #include "../../core/cfg/cfg_struct.h"
40 #include "../../core/kemi.h"
41 #include "../../core/fmsg.h"
42
43 #include "evapi_dispatch.h"
44
45 static int _evapi_notify_sockets[2];
46 static int _evapi_netstring_format = 1;
47
48 extern str _evapi_event_callback;
49 extern int _evapi_dispatcher_pid;
50 extern int _evapi_max_clients;
51
52 #define EVAPI_IPADDR_SIZE       64
53 #define EVAPI_TAG_SIZE  64
54 #define CLIENT_BUFFER_SIZE      32768
55 typedef struct _evapi_client {
56         int connected;
57         int sock;
58         unsigned short af;
59         unsigned short src_port;
60         char src_addr[EVAPI_IPADDR_SIZE];
61         char tag[EVAPI_IPADDR_SIZE];
62         str  stag;
63         char rbuffer[CLIENT_BUFFER_SIZE];
64         unsigned int rpos;
65 } evapi_client_t;
66
67 typedef struct _evapi_env {
68         int eset;
69         int conidx;
70         str msg;
71 } evapi_env_t;
72
73 typedef struct _evapi_msg {
74         str data;
75         str tag;
76         int unicast;
77 } evapi_msg_t;
78
79 #define EVAPI_MAX_CLIENTS       _evapi_max_clients
80
81 /* last one used for error handling, not a real connected client */
82 static evapi_client_t *_evapi_clients = NULL;
83
84 typedef struct _evapi_evroutes {
85         int con_new;
86         str con_new_name;
87         int con_closed;
88         str con_closed_name;
89         int msg_received;
90         str msg_received_name;
91 } evapi_evroutes_t;
92
93 static evapi_evroutes_t _evapi_rts;
94
95 /**
96  *
97  */
98 void evapi_env_reset(evapi_env_t *evenv)
99 {
100         if(evenv==0)
101                 return;
102         memset(evenv, 0, sizeof(evapi_env_t));
103         evenv->conidx = -1;
104 }
105
106 /**
107  *
108  */
109 void evapi_init_environment(int dformat)
110 {
111         memset(&_evapi_rts, 0, sizeof(evapi_evroutes_t));
112
113         _evapi_rts.con_new_name.s = "evapi:connection-new";
114         _evapi_rts.con_new_name.len = strlen(_evapi_rts.con_new_name.s);
115         _evapi_rts.con_new = route_lookup(&event_rt, "evapi:connection-new");
116         if (_evapi_rts.con_new < 0 || event_rt.rlist[_evapi_rts.con_new] == NULL)
117                 _evapi_rts.con_new = -1;
118
119         _evapi_rts.con_closed_name.s = "evapi:connection-closed";
120         _evapi_rts.con_closed_name.len = strlen(_evapi_rts.con_closed_name.s);
121         _evapi_rts.con_closed = route_lookup(&event_rt, "evapi:connection-closed");
122         if (_evapi_rts.con_closed < 0 || event_rt.rlist[_evapi_rts.con_closed] == NULL)
123                 _evapi_rts.con_closed = -1;
124
125         _evapi_rts.msg_received_name.s = "evapi:message-received";
126         _evapi_rts.msg_received_name.len = strlen(_evapi_rts.msg_received_name.s);
127         _evapi_rts.msg_received = route_lookup(&event_rt, "evapi:message-received");
128         if (_evapi_rts.msg_received < 0 || event_rt.rlist[_evapi_rts.msg_received] == NULL)
129                 _evapi_rts.msg_received = -1;
130
131         _evapi_netstring_format = dformat;
132 }
133
134 /**
135  *
136  */
137 int evapi_run_cfg_route(evapi_env_t *evenv, int rt, str *rtname)
138 {
139         int backup_rt;
140         struct run_act_ctx ctx;
141         sip_msg_t *fmsg;
142         sip_msg_t tmsg;
143         sr_kemi_eng_t *keng = NULL;
144
145         if(evenv==0 || evenv->eset==0) {
146                 LM_ERR("evapi env not set\n");
147                 return -1;
148         }
149
150         if((rt<0) && (_evapi_event_callback.s==NULL || _evapi_event_callback.len<=0))
151                 return 0;
152
153         fmsg = faked_msg_next();
154         memcpy(&tmsg, fmsg, sizeof(sip_msg_t));
155         fmsg = &tmsg;
156         evapi_set_msg_env(fmsg, evenv);
157         backup_rt = get_route_type();
158         set_route_type(EVENT_ROUTE);
159         init_run_actions_ctx(&ctx);
160         if(rt>=0) {
161                 run_top_route(event_rt.rlist[rt], fmsg, 0);
162         } else {
163                 keng = sr_kemi_eng_get();
164                 if(keng!=NULL) {
165                         if(sr_kemi_route(keng, fmsg, EVENT_ROUTE,
166                                                 &_evapi_event_callback, rtname)<0) {
167                                 LM_ERR("error running event route kemi callback\n");
168                         }
169                 }
170         }
171         set_route_type(backup_rt);
172         evapi_set_msg_env(fmsg, NULL);
173         return 0;
174 }
175
176 /**
177  *
178  */
179 int evapi_close_connection(int cidx)
180 {
181         if(cidx<0 || cidx>=EVAPI_MAX_CLIENTS || _evapi_clients==NULL)
182                 return -1;
183         if(_evapi_clients[cidx].connected==1
184                         && _evapi_clients[cidx].sock >= 0) {
185                 close(_evapi_clients[cidx].sock);
186                 _evapi_clients[cidx].connected = 0;
187                 _evapi_clients[cidx].sock = -1;
188                 return 0;
189         }
190         return -2;
191 }
192
193 /**
194  *
195  */
196 int evapi_cfg_close(sip_msg_t *msg)
197 {
198         evapi_env_t *evenv;
199
200         if(msg==NULL)
201                 return -1;
202
203         evenv = evapi_get_msg_env(msg);
204
205         if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
206                 return -1;
207         return evapi_close_connection(evenv->conidx);
208 }
209
210 /**
211  *
212  */
213 int evapi_set_tag(sip_msg_t* msg, str* stag)
214 {
215         evapi_env_t *evenv;
216
217         if(msg==NULL || stag==NULL || _evapi_clients==NULL)
218                 return -1;
219
220         evenv = evapi_get_msg_env(msg);
221
222         if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
223                 return -1;
224
225         if(!(_evapi_clients[evenv->conidx].connected==1
226                         && _evapi_clients[evenv->conidx].sock >= 0)) {
227                 LM_ERR("connection not established\n");
228                 return -1;
229         }
230
231         if(stag->len>=EVAPI_TAG_SIZE) {
232                 LM_ERR("tag size too big: %d / %d\n", stag->len, EVAPI_TAG_SIZE);
233                 return -1;
234         }
235         _evapi_clients[evenv->conidx].stag.s = _evapi_clients[evenv->conidx].tag;
236         strncpy(_evapi_clients[evenv->conidx].stag.s, stag->s, stag->len);
237         _evapi_clients[evenv->conidx].stag.s[stag->len] = '\0';
238         _evapi_clients[evenv->conidx].stag.len = stag->len;
239         return 1;
240 }
241
242 /**
243  *
244  */
245 int evapi_init_notify_sockets(void)
246 {
247         if (socketpair(PF_UNIX, SOCK_STREAM, 0, _evapi_notify_sockets) < 0) {
248                 LM_ERR("opening notify stream socket pair\n");
249                 return -1;
250         }
251         LM_DBG("inter-process event notification sockets initialized: %d ~ %d\n",
252                         _evapi_notify_sockets[0], _evapi_notify_sockets[1]);
253         return 0;
254 }
255
256 /**
257  *
258  */
259 void evapi_close_notify_sockets_child(void)
260 {
261         LM_DBG("closing the notification socket used by children\n");
262         close(_evapi_notify_sockets[1]);
263         _evapi_notify_sockets[1] = -1;
264 }
265
266 /**
267  *
268  */
269 void evapi_close_notify_sockets_parent(void)
270 {
271         LM_DBG("closing the notification socket used by parent\n");
272         close(_evapi_notify_sockets[0]);
273         _evapi_notify_sockets[0] = -1;
274 }
275
276 /**
277  *
278  */
279 int evapi_dispatch_notify(evapi_msg_t *emsg)
280 {
281         int i;
282         int n;
283         int wlen;
284
285         if(_evapi_clients==NULL) {
286                 return 0;
287         }
288
289         n = 0;
290         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
291                 if(_evapi_clients[i].connected==1 && _evapi_clients[i].sock>=0) {
292                         if(emsg->tag.s==NULL || (emsg->tag.len == _evapi_clients[i].stag.len
293                                                 && strncmp(_evapi_clients[i].stag.s,
294                                                                         emsg->tag.s, emsg->tag.len)==0)) {
295                                 wlen = write(_evapi_clients[i].sock, emsg->data.s,
296                                                 emsg->data.len);
297                                 if(wlen!=emsg->data.len) {
298                                         LM_DBG("failed to write all packet (%d out of %d) on socket"
299                                                         " %d index [%d]\n",
300                                                         wlen, emsg->data.len, _evapi_clients[i].sock, i);
301                                 }
302                                 n++;
303                                 if (emsg->unicast){
304                                         break;
305                                 }
306                         }
307                 }
308         }
309
310         LM_DBG("the message was sent to %d clients\n", n);
311
312         return n;
313 }
314
315 /**
316  *
317  */
318 void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
319 {
320         ssize_t rlen;
321         int i, k;
322         evapi_env_t evenv;
323         str frame;
324         char *sfp;
325         char *efp;
326
327         if(EV_ERROR & revents) {
328                 LM_ERR("received invalid event (%d)\n", revents);
329                 return;
330         }
331         if(_evapi_clients==NULL) {
332                 LM_ERR("no client structures\n");
333                 return;
334         }
335
336         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
337                 if(_evapi_clients[i].connected==1 && _evapi_clients[i].sock==watcher->fd) {
338                         break;
339                 }
340         }
341         if(i==EVAPI_MAX_CLIENTS) {
342                 LM_ERR("cannot lookup client socket %d\n", watcher->fd);
343                 /* try to empty the socket anyhow */
344                 rlen = recv(watcher->fd, _evapi_clients[i].rbuffer, CLIENT_BUFFER_SIZE-1, 0);
345                 return;
346         }
347
348         /* read message from client */
349         rlen = recv(watcher->fd, _evapi_clients[i].rbuffer + _evapi_clients[i].rpos,
350                         CLIENT_BUFFER_SIZE - 1 - _evapi_clients[i].rpos, 0);
351
352         if(rlen < 0) {
353                 LM_ERR("cannot read the client message\n");
354                 _evapi_clients[i].rpos = 0;
355                 return;
356         }
357
358
359         cfg_update();
360
361         evapi_env_reset(&evenv);
362         if(rlen == 0) {
363                 /* client is gone */
364                 evenv.eset = 1;
365                 evenv.conidx = i;
366                 evapi_run_cfg_route(&evenv, _evapi_rts.con_closed,
367                                 &_evapi_rts.con_closed_name);
368                 _evapi_clients[i].connected = 0;
369                 if(_evapi_clients[i].sock>=0) {
370                         close(_evapi_clients[i].sock);
371                 }
372                 _evapi_clients[i].sock = -1;
373                 _evapi_clients[i].rpos = 0;
374                 ev_io_stop(loop, watcher);
375                 free(watcher);
376                 LM_INFO("client closing connection - pos [%d] addr [%s:%d]\n",
377                                 i, _evapi_clients[i].src_addr, _evapi_clients[i].src_port);
378                 return;
379         }
380
381         _evapi_clients[i].rbuffer[_evapi_clients[i].rpos+rlen] = '\0';
382
383         LM_DBG("{%d} [%s:%d] - received [%.*s] (%d) (%d)\n",
384                 i, _evapi_clients[i].src_addr, _evapi_clients[i].src_port,
385                 (int)rlen, _evapi_clients[i].rbuffer+_evapi_clients[i].rpos,
386                 (int)rlen, (int)_evapi_clients[i].rpos);
387         evenv.conidx = i;
388         evenv.eset = 1;
389         if(_evapi_netstring_format) {
390                 /* netstring decapsulation */
391                 k = 0;
392                 while(k<_evapi_clients[i].rpos+rlen) {
393                         frame.len = 0;
394                         while(k<_evapi_clients[i].rpos+rlen) {
395                                 if(_evapi_clients[i].rbuffer[k]==' '
396                                                 || _evapi_clients[i].rbuffer[k]=='\t'
397                                                 || _evapi_clients[i].rbuffer[k]=='\r'
398                                                 || _evapi_clients[i].rbuffer[k]=='\n')
399                                         k++;
400                                 else break;
401                         }
402                         if(k==_evapi_clients[i].rpos+rlen) {
403                                 _evapi_clients[i].rpos = 0;
404                                 LM_DBG("empty content\n");
405                                 return;
406                         }
407                         /* pointer to start of whole frame */
408                         sfp = _evapi_clients[i].rbuffer + k;
409                         while(k<_evapi_clients[i].rpos+rlen) {
410                                 if(_evapi_clients[i].rbuffer[k]>='0' && _evapi_clients[i].rbuffer[k]<='9') {
411                                         frame.len = frame.len*10 + _evapi_clients[i].rbuffer[k] - '0';
412                                 } else {
413                                         if(_evapi_clients[i].rbuffer[k]==':')
414                                                 break;
415                                         /* invalid character - discard the rest */
416                                         _evapi_clients[i].rpos = 0;
417                                         LM_DBG("invalid char when searching for size [%c] [%.*s] (%d) (%d)\n",
418                                                         _evapi_clients[i].rbuffer[k],
419                                                         (int)(_evapi_clients[i].rpos+rlen), _evapi_clients[i].rbuffer,
420                                                         (int)(_evapi_clients[i].rpos+rlen), k);
421                                         return;
422                                 }
423                                 k++;
424                         }
425                         if(k==_evapi_clients[i].rpos+rlen || frame.len<=0) {
426                                 LM_DBG("invalid frame len: %d kpos: %d rpos: %u rlen: %lu\n",
427                                                 frame.len, k, _evapi_clients[i].rpos, rlen);
428                                 _evapi_clients[i].rpos = 0;
429                                 return;
430                         }
431                         if(frame.len + k>=_evapi_clients[i].rpos + rlen) {
432                                 /* partial data - shift back in buffer and wait to read more */
433                                 efp = _evapi_clients[i].rbuffer + _evapi_clients[i].rpos + rlen;
434                                 if(efp<=sfp) {
435                                         _evapi_clients[i].rpos = 0;
436                                         LM_DBG("weird - invalid size for residual data\n");
437                                         return;
438                                 }
439                                 _evapi_clients[i].rpos = (unsigned int)(efp-sfp);
440                                 if(efp-sfp > sfp-_evapi_clients[i].rbuffer) {
441                                         memcpy(_evapi_clients[i].rbuffer, sfp, _evapi_clients[i].rpos);
442                                 } else {
443                                         for(k=0; k<_evapi_clients[i].rpos; k++) {
444                                                 _evapi_clients[i].rbuffer[k] = sfp[k];
445                                         }
446                                 }
447                                 LM_DBG("residual data [%.*s] (%d)\n",
448                                                 _evapi_clients[i].rpos, _evapi_clients[i].rbuffer,
449                                                 _evapi_clients[i].rpos);
450                                 return;
451                         }
452                         k++;
453                         frame.s = _evapi_clients[i].rbuffer + k;
454                         if(frame.s[frame.len]!=',') {
455                                 /* invalid data - discard and reset buffer */
456                                 LM_DBG("frame size mismatch the ending char (%c): [%.*s] (%d)\n",
457                                                 frame.s[frame.len], frame.len, frame.s, frame.len);
458                                 _evapi_clients[i].rpos = 0 ;
459                                 return;
460                         }
461                         frame.s[frame.len] = '\0';
462                         k += frame.len ;
463                         evenv.msg.s = frame.s;
464                         evenv.msg.len = frame.len;
465                         LM_DBG("executing event route for frame: [%.*s] (%d)\n",
466                                                 frame.len, frame.s, frame.len);
467                         evapi_run_cfg_route(&evenv, _evapi_rts.msg_received,
468                                         &_evapi_rts.msg_received_name);
469                         k++;
470                 }
471                 _evapi_clients[i].rpos = 0 ;
472         } else {
473                 evenv.msg.s = _evapi_clients[i].rbuffer;
474                 evenv.msg.len = rlen;
475                 evapi_run_cfg_route(&evenv, _evapi_rts.msg_received,
476                                 &_evapi_rts.msg_received_name);
477         }
478 }
479
480 /**
481  *
482  */
483 void evapi_accept_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
484 {
485         struct sockaddr caddr;
486         socklen_t clen = sizeof(caddr);
487         int csock;
488         struct ev_io *evapi_client;
489         int i;
490         evapi_env_t evenv;
491         int optval;
492         socklen_t optlen;
493
494         if(_evapi_clients==NULL) {
495                 LM_ERR("no client structures\n");
496                 return;
497         }
498         evapi_client = (struct ev_io*) malloc (sizeof(struct ev_io));
499         if(evapi_client==NULL) {
500                 LM_ERR("no more memory\n");
501                 return;
502         }
503
504         if(EV_ERROR & revents) {
505                 LM_ERR("received invalid event\n");
506                 free(evapi_client);
507                 return;
508         }
509
510         cfg_update();
511
512         /* accept new client connection */
513         csock = accept(watcher->fd, (struct sockaddr *)&caddr, &clen);
514
515         if (csock < 0) {
516                 LM_ERR("cannot accept the client '%s' err='%d'\n", gai_strerror(csock), csock);
517                 free(evapi_client);
518                 return;
519         }
520         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
521                 if(_evapi_clients[i].connected==0) {
522                         if (caddr.sa_family == AF_INET) {
523                                 _evapi_clients[i].src_port = ntohs(((struct sockaddr_in*)&caddr)->sin_port);
524                                 if(inet_ntop(AF_INET, &((struct sockaddr_in*)&caddr)->sin_addr,
525                                                         _evapi_clients[i].src_addr,
526                                                         EVAPI_IPADDR_SIZE)==NULL) {
527                                         LM_ERR("cannot convert ipv4 address\n");
528                                         close(csock);
529                                         free(evapi_client);
530                                         return;
531                                 }
532                         } else {
533                                 _evapi_clients[i].src_port = ntohs(((struct sockaddr_in6*)&caddr)->sin6_port);
534                                 if(inet_ntop(AF_INET6, &((struct sockaddr_in6*)&caddr)->sin6_addr,
535                                                         _evapi_clients[i].src_addr,
536                                                         EVAPI_IPADDR_SIZE)==NULL) {
537                                         LM_ERR("cannot convert ipv6 address\n");
538                                         close(csock);
539                                         free(evapi_client);
540                                         return;
541                                 }
542                         }
543                         optval = 1;
544                         optlen = sizeof(optval);
545                         if(setsockopt(csock, SOL_SOCKET, SO_KEEPALIVE,
546                                                 &optval, optlen) < 0) {
547                                 LM_WARN("failed to enable keepalive on socket %d\n", csock);
548                         }
549                         _evapi_clients[i].connected = 1;
550                         _evapi_clients[i].sock = csock;
551                         _evapi_clients[i].af = caddr.sa_family;
552                         break;
553                 }
554         }
555         if(i>=EVAPI_MAX_CLIENTS) {
556                 LM_ERR("too many clients\n");
557                 close(csock);
558                 free(evapi_client);
559                 return;
560         }
561
562         LM_DBG("new connection - pos[%d] from: [%s:%d]\n", i,
563                         _evapi_clients[i].src_addr, _evapi_clients[i].src_port);
564
565         evapi_env_reset(&evenv);
566         evenv.conidx = i;
567         evenv.eset = 1;
568         evapi_run_cfg_route(&evenv, _evapi_rts.con_new, &_evapi_rts.con_new_name);
569
570         if(_evapi_clients[i].connected == 0) {
571                 free(evapi_client);
572                 return;
573         }
574
575         /* start watcher to read messages from whatchers */
576         ev_io_init(evapi_client, evapi_recv_client, csock, EV_READ);
577         ev_io_start(loop, evapi_client);
578 }
579
580 /**
581  *
582  */
583 void evapi_recv_notify(struct ev_loop *loop, struct ev_io *watcher, int revents)
584 {
585         evapi_msg_t *emsg = NULL;
586         int rlen;
587
588         if(EV_ERROR & revents) {
589                 perror("received invalid event\n");
590                 return;
591         }
592
593         cfg_update();
594
595         /* read message from client */
596         rlen = read(watcher->fd, &emsg, sizeof(evapi_msg_t*));
597
598         if(rlen != sizeof(evapi_msg_t*) || emsg==NULL) {
599                 LM_ERR("cannot read the sip worker message\n");
600                 return;
601         }
602
603         LM_DBG("received [%p] [%.*s] (%d)\n", emsg,
604                         emsg->data.len, emsg->data.s, emsg->data.len);
605         evapi_dispatch_notify(emsg);
606         shm_free(emsg);
607 }
608
609 /**
610  *
611  */
612 int evapi_run_dispatcher(char *laddr, int lport)
613 {
614         int evapi_srv_sock;
615         struct sockaddr_in evapi_srv_addr;
616         struct ev_loop *loop;
617         struct hostent *h = NULL;
618         struct ev_io io_server;
619         struct ev_io io_notify;
620         int yes_true = 1;
621         int fflags = 0;
622         int i;
623
624         LM_DBG("starting dispatcher processing\n");
625
626         _evapi_clients = (evapi_client_t*)malloc(sizeof(evapi_client_t)
627                         * (EVAPI_MAX_CLIENTS+1));
628         if(_evapi_clients==NULL) {
629                 LM_ERR("failed to allocate client structures\n");
630                 exit(-1);
631         }
632         memset(_evapi_clients, 0, sizeof(evapi_client_t) * EVAPI_MAX_CLIENTS);
633         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
634                 _evapi_clients[i].sock = -1;
635         }
636         loop = ev_default_loop(0);
637
638         if(loop==NULL) {
639                 LM_ERR("cannot get libev loop\n");
640                 return -1;
641         }
642
643         h = gethostbyname(laddr);
644         if (h == NULL || (h->h_addrtype != AF_INET && h->h_addrtype != AF_INET6)) {
645                 LM_ERR("cannot resolve local server address [%s]\n", laddr);
646                 return -1;
647         }
648         if(h->h_addrtype == AF_INET) {
649                 evapi_srv_sock = socket(PF_INET, SOCK_STREAM, 0);
650         } else {
651                 evapi_srv_sock = socket(PF_INET6, SOCK_STREAM, 0);
652         }
653         if( evapi_srv_sock < 0 )
654         {
655                 LM_ERR("cannot create server socket (family %d)\n", h->h_addrtype);
656                 return -1;
657         }
658         /* set non-blocking flag */
659         fflags = fcntl(evapi_srv_sock, F_GETFL);
660         if(fflags<0) {
661                 LM_ERR("failed to get the srv socket flags\n");
662                 close(evapi_srv_sock);
663                 return -1;
664         }
665         if (fcntl(evapi_srv_sock, F_SETFL, fflags | O_NONBLOCK)<0) {
666                 LM_ERR("failed to set srv socket flags\n");
667                 close(evapi_srv_sock);
668                 return -1;
669         }
670
671         bzero(&evapi_srv_addr, sizeof(evapi_srv_addr));
672         evapi_srv_addr.sin_family = h->h_addrtype;
673         evapi_srv_addr.sin_port   = htons((short)lport);
674         evapi_srv_addr.sin_addr  = *(struct in_addr*)h->h_addr;
675
676         /* Set SO_REUSEADDR option on listening socket so that we don't
677          * have to wait for connections in TIME_WAIT to go away before
678          * re-binding.
679          */
680
681         if(setsockopt(evapi_srv_sock, SOL_SOCKET, SO_REUSEADDR,
682                 &yes_true, sizeof(int)) < 0) {
683                 LM_ERR("cannot set SO_REUSEADDR option on descriptor\n");
684                 close(evapi_srv_sock);
685                 return -1;
686         }
687
688         if (bind(evapi_srv_sock, (struct sockaddr*)&evapi_srv_addr,
689                                 sizeof(evapi_srv_addr)) < 0) {
690                 LM_ERR("cannot bind to local address and port [%s:%d]\n", laddr, lport);
691                 close(evapi_srv_sock);
692                 return -1;
693         }
694         if (listen(evapi_srv_sock, 4) < 0) {
695                 LM_ERR("listen error\n");
696                 close(evapi_srv_sock);
697                 return -1;
698         }
699         ev_io_init(&io_server, evapi_accept_client, evapi_srv_sock, EV_READ);
700         ev_io_start(loop, &io_server);
701         ev_io_init(&io_notify, evapi_recv_notify, _evapi_notify_sockets[0], EV_READ);
702         ev_io_start(loop, &io_notify);
703
704         while(1) {
705                 ev_loop (loop, 0);
706         }
707
708         return 0;
709 }
710
711 /**
712  *
713  */
714 int evapi_run_worker(int prank)
715 {
716         LM_DBG("started worker process: %d\n", prank);
717         while(1) {
718                 sleep(3);
719         }
720 }
721
722 /**
723  *
724  */
725 int _evapi_relay(str *evdata, str *ctag, int unicast)
726 {
727 #define EVAPI_RELAY_FORMAT "%d:%.*s,"
728
729         int len;
730         int sbsize;
731         evapi_msg_t *emsg;
732
733         LM_DBG("relaying event data [%.*s] (%d)\n",
734                         evdata->len, evdata->s, evdata->len);
735
736         sbsize = evdata->len;
737         len = sizeof(evapi_msg_t)
738                 + ((sbsize + 32 + ((ctag && ctag->len>0)?(ctag->len+2):0)) * sizeof(char));
739         emsg = (evapi_msg_t*)shm_malloc(len);
740         if(emsg==NULL) {
741                 LM_ERR("no more shared memory\n");
742                 return -1;
743         }
744         memset(emsg, 0, len);
745         emsg->data.s = (char*)emsg + sizeof(evapi_msg_t);
746         if(_evapi_netstring_format) {
747                 /* netstring encapsulation */
748                 emsg->data.len = snprintf(emsg->data.s, sbsize+32,
749                                 EVAPI_RELAY_FORMAT,
750                                 sbsize, evdata->len, evdata->s);
751         } else {
752                 emsg->data.len = snprintf(emsg->data.s, sbsize+32,
753                                 "%.*s",
754                                 evdata->len, evdata->s);
755         }
756         if(emsg->data.len<=0 || emsg->data.len>sbsize+32) {
757                 shm_free(emsg);
758                 LM_ERR("cannot serialize event\n");
759                 return -1;
760         }
761         if(ctag && ctag->len>0) {
762                 emsg->tag.s = emsg->data.s + sbsize + 32;
763                 strncpy(emsg->tag.s, ctag->s, ctag->len);
764                 emsg->tag.len = ctag->len;
765         }
766
767         if (unicast){
768                 emsg->unicast = unicast;
769         }
770
771         LM_DBG("sending [%p] [%.*s] (%d)\n", emsg, emsg->data.len, emsg->data.s,
772                         emsg->data.len);
773         if(_evapi_notify_sockets[1]!=-1) {
774                 len = write(_evapi_notify_sockets[1], &emsg, sizeof(evapi_msg_t*));
775                 if(len<=0) {
776                         shm_free(emsg);
777                         LM_ERR("failed to pass the pointer to evapi dispatcher\n");
778                         return -1;
779                 }
780         } else {
781                 cfg_update();
782                 LM_DBG("dispatching [%p] [%.*s] (%d)\n", emsg,
783                                 emsg->data.len, emsg->data.s, emsg->data.len);
784                 if(evapi_dispatch_notify(emsg) == 0) {
785                         shm_free(emsg);
786                         LM_WARN("message not delivered - no client connected\n");
787                         return -1;
788                 }
789                 shm_free(emsg);
790         }
791         return 0;
792 }
793
794 /**
795  *
796  */
797 int evapi_relay(str *evdata)
798 {
799         return _evapi_relay(evdata, NULL, 0);
800 }
801
802 /**
803  *
804  */
805 int evapi_relay_multicast(str *evdata, str *ctag){
806         return _evapi_relay(evdata, ctag, 0);
807 }
808
809 /**
810  *
811  */
812 int evapi_relay_unicast(str *evdata, str *ctag){
813         return _evapi_relay(evdata, ctag, 1);
814 }
815
816 #if 0
817 /**
818  *
819  */
820 int evapi_relay(str *event, str *data)
821 {
822 #define EVAPI_RELAY_FORMAT "%d:{\n \"event\":\"%.*s\",\n \"data\":%.*s\n},"
823
824         int len;
825         int sbsize;
826         str *sbuf;
827
828         LM_DBG("relaying event [%.*s] data [%.*s]\n",
829                         event->len, event->s, data->len, data->s);
830
831         sbsize = sizeof(EVAPI_RELAY_FORMAT) + event->len + data->len - 13;
832         sbuf = (str*)shm_malloc(sizeof(str) + ((sbsize+32) * sizeof(char)));
833         if(sbuf==NULL) {
834                 LM_ERR("no more shared memory\n");
835                 return -1;
836         }
837         sbuf->s = (char*)sbuf + sizeof(str);
838         sbuf->len = snprintf(sbuf->s, sbsize+32,
839                         EVAPI_RELAY_FORMAT,
840                         sbsize, event->len, event->s, data->len, data->s);
841         if(sbuf->len<=0 || sbuf->len>sbsize+32) {
842                 shm_free(sbuf);
843                 LM_ERR("cannot serialize event\n");
844                 return -1;
845         }
846
847         len = write(_evapi_notify_sockets[1], &sbuf, sizeof(str*));
848         if(len<=0) {
849                 LM_ERR("failed to pass the pointer to evapi dispatcher\n");
850                 return -1;
851         }
852         LM_DBG("sent [%p] [%.*s] (%d)\n", sbuf, sbuf->len, sbuf->s, sbuf->len);
853         return 0;
854 }
855 #endif
856
857 /**
858  *
859  */
860 int pv_parse_evapi_name(pv_spec_t *sp, str *in)
861 {
862         if(sp==NULL || in==NULL || in->len<=0)
863                 return -1;
864
865         switch(in->len)
866         {
867                 case 3:
868                         if(strncmp(in->s, "msg", 3)==0)
869                                 sp->pvp.pvn.u.isname.name.n = 1;
870                         else goto error;
871                 break;
872                 case 6:
873                         if(strncmp(in->s, "conidx", 6)==0)
874                                 sp->pvp.pvn.u.isname.name.n = 0;
875                         else goto error;
876                 break;
877                 case 7:
878                         if(strncmp(in->s, "srcaddr", 7)==0)
879                                 sp->pvp.pvn.u.isname.name.n = 2;
880                         else if(strncmp(in->s, "srcport", 7)==0)
881                                 sp->pvp.pvn.u.isname.name.n = 3;
882                         else goto error;
883                 break;
884                 default:
885                         goto error;
886         }
887         sp->pvp.pvn.type = PV_NAME_INTSTR;
888         sp->pvp.pvn.u.isname.type = 0;
889
890         return 0;
891
892 error:
893         LM_ERR("unknown PV msrp name %.*s\n", in->len, in->s);
894         return -1;
895 }
896
897 /**
898  *
899  */
900 int pv_get_evapi(sip_msg_t *msg, pv_param_t *param, pv_value_t *res)
901 {
902         evapi_env_t *evenv;
903
904         if(param==NULL || res==NULL)
905                 return -1;
906
907         if(_evapi_clients==NULL) {
908                 return pv_get_null(msg, param, res);
909         }
910         evenv = evapi_get_msg_env(msg);
911
912         if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
913                 return pv_get_null(msg, param, res);
914
915         if(_evapi_clients[evenv->conidx].connected==0
916                         && _evapi_clients[evenv->conidx].sock < 0)
917                 return pv_get_null(msg, param, res);
918
919         switch(param->pvn.u.isname.name.n)
920         {
921                 case 0:
922                         return pv_get_sintval(msg, param, res, evenv->conidx);
923                 case 1:
924                         if(evenv->msg.s==NULL)
925                                 return pv_get_null(msg, param, res);
926                         return pv_get_strval(msg, param, res, &evenv->msg);
927                 case 2:
928                         return pv_get_strzval(msg, param, res,
929                                         _evapi_clients[evenv->conidx].src_addr);
930                 case 3:
931                         return pv_get_sintval(msg, param, res,
932                                         _evapi_clients[evenv->conidx].src_port);
933                 default:
934                         return pv_get_null(msg, param, res);
935         }
936
937         return 0;
938 }
939
940 /**
941  *
942  */
943 int pv_set_evapi(sip_msg_t *msg, pv_param_t *param, int op,
944                 pv_value_t *val)
945 {
946         return 0;
947 }