evapi: free the faked msg clone used for dispatcher event route
[sip-router] / src / modules / evapi / evapi_dispatch.c
1 /**
2  * Copyright (C) 2014 Daniel-Constantin Mierla (asipto.com)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * This file is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  *
12  * This file is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20  *
21  */
22
23 #include <stdio.h>
24 #include <unistd.h>
25 #include <stdlib.h>
26 #include <string.h>
27
28 #include <sys/socket.h>
29 #include <sys/types.h>
30 #include <netinet/in.h>
31 #include <arpa/inet.h>
32 #include <fcntl.h>
33
34 #include <ev.h>
35
36 #include "../../core/sr_module.h"
37 #include "../../core/dprint.h"
38 #include "../../core/ut.h"
39 #include "../../core/cfg/cfg_struct.h"
40 #include "../../core/receive.h"
41 #include "../../core/kemi.h"
42 #include "../../core/fmsg.h"
43
44 #include "evapi_dispatch.h"
45
46 static int _evapi_notify_sockets[2];
47 static int _evapi_netstring_format = 1;
48
49 extern str _evapi_event_callback;
50 extern int _evapi_dispatcher_pid;
51 extern int _evapi_max_clients;
52
53 #define EVAPI_IPADDR_SIZE       64
54 #define EVAPI_TAG_SIZE  64
55 #define CLIENT_BUFFER_SIZE      32768
56 typedef struct _evapi_client {
57         int connected;
58         int sock;
59         unsigned short af;
60         unsigned short src_port;
61         char src_addr[EVAPI_IPADDR_SIZE];
62         char tag[EVAPI_IPADDR_SIZE];
63         str  stag;
64         char rbuffer[CLIENT_BUFFER_SIZE];
65         unsigned int rpos;
66 } evapi_client_t;
67
68 typedef struct _evapi_env {
69         int eset;
70         int conidx;
71         str msg;
72 } evapi_env_t;
73
74 typedef struct _evapi_msg {
75         str data;
76         str tag;
77         int unicast;
78 } evapi_msg_t;
79
80 #define EVAPI_MAX_CLIENTS       _evapi_max_clients
81
82 /* last one used for error handling, not a real connected client */
83 static evapi_client_t *_evapi_clients = NULL;
84
85 typedef struct _evapi_evroutes {
86         int con_new;
87         str con_new_name;
88         int con_closed;
89         str con_closed_name;
90         int msg_received;
91         str msg_received_name;
92 } evapi_evroutes_t;
93
94 static evapi_evroutes_t _evapi_rts;
95
96 /**
97  *
98  */
99 void evapi_env_reset(evapi_env_t *evenv)
100 {
101         if(evenv==0)
102                 return;
103         memset(evenv, 0, sizeof(evapi_env_t));
104         evenv->conidx = -1;
105 }
106
107 /**
108  *
109  */
110 void evapi_init_environment(int dformat)
111 {
112         memset(&_evapi_rts, 0, sizeof(evapi_evroutes_t));
113
114         _evapi_rts.con_new_name.s = "evapi:connection-new";
115         _evapi_rts.con_new_name.len = strlen(_evapi_rts.con_new_name.s);
116         _evapi_rts.con_new = route_lookup(&event_rt, "evapi:connection-new");
117         if (_evapi_rts.con_new < 0 || event_rt.rlist[_evapi_rts.con_new] == NULL)
118                 _evapi_rts.con_new = -1;
119
120         _evapi_rts.con_closed_name.s = "evapi:connection-closed";
121         _evapi_rts.con_closed_name.len = strlen(_evapi_rts.con_closed_name.s);
122         _evapi_rts.con_closed = route_lookup(&event_rt, "evapi:connection-closed");
123         if (_evapi_rts.con_closed < 0 || event_rt.rlist[_evapi_rts.con_closed] == NULL)
124                 _evapi_rts.con_closed = -1;
125
126         _evapi_rts.msg_received_name.s = "evapi:message-received";
127         _evapi_rts.msg_received_name.len = strlen(_evapi_rts.msg_received_name.s);
128         _evapi_rts.msg_received = route_lookup(&event_rt, "evapi:message-received");
129         if (_evapi_rts.msg_received < 0 || event_rt.rlist[_evapi_rts.msg_received] == NULL)
130                 _evapi_rts.msg_received = -1;
131
132         _evapi_netstring_format = dformat;
133 }
134
135 /**
136  *
137  */
138 int evapi_run_cfg_route(evapi_env_t *evenv, int rt, str *rtname)
139 {
140         int backup_rt;
141         struct run_act_ctx ctx;
142         sip_msg_t *fmsg;
143         sip_msg_t tmsg;
144         sr_kemi_eng_t *keng = NULL;
145
146         if(evenv==0 || evenv->eset==0) {
147                 LM_ERR("evapi env not set\n");
148                 return -1;
149         }
150
151         if((rt<0) && (_evapi_event_callback.s==NULL || _evapi_event_callback.len<=0))
152                 return 0;
153
154         fmsg = faked_msg_next();
155         memcpy(&tmsg, fmsg, sizeof(sip_msg_t));
156         fmsg = &tmsg;
157         evapi_set_msg_env(fmsg, evenv);
158         backup_rt = get_route_type();
159         set_route_type(EVENT_ROUTE);
160         init_run_actions_ctx(&ctx);
161         if(rt>=0) {
162                 run_top_route(event_rt.rlist[rt], fmsg, 0);
163         } else {
164                 keng = sr_kemi_eng_get();
165                 if(keng!=NULL) {
166                         if(sr_kemi_route(keng, fmsg, EVENT_ROUTE,
167                                                 &_evapi_event_callback, rtname)<0) {
168                                 LM_ERR("error running event route kemi callback\n");
169                         }
170                 }
171         }
172         set_route_type(backup_rt);
173         evapi_set_msg_env(fmsg, NULL);
174         /* free the structure -- it is a clone of faked msg */
175         free_sip_msg(fmsg);
176         ksr_msg_env_reset();
177         return 0;
178 }
179
180 /**
181  *
182  */
183 int evapi_close_connection(int cidx)
184 {
185         if(cidx<0 || cidx>=EVAPI_MAX_CLIENTS || _evapi_clients==NULL)
186                 return -1;
187         if(_evapi_clients[cidx].connected==1
188                         && _evapi_clients[cidx].sock >= 0) {
189                 close(_evapi_clients[cidx].sock);
190                 _evapi_clients[cidx].connected = 0;
191                 _evapi_clients[cidx].sock = -1;
192                 return 0;
193         }
194         return -2;
195 }
196
197 /**
198  *
199  */
200 int evapi_cfg_close(sip_msg_t *msg)
201 {
202         evapi_env_t *evenv;
203
204         if(msg==NULL)
205                 return -1;
206
207         evenv = evapi_get_msg_env(msg);
208
209         if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
210                 return -1;
211         return evapi_close_connection(evenv->conidx);
212 }
213
214 /**
215  *
216  */
217 int evapi_set_tag(sip_msg_t* msg, str* stag)
218 {
219         evapi_env_t *evenv;
220
221         if(msg==NULL || stag==NULL || _evapi_clients==NULL)
222                 return -1;
223
224         evenv = evapi_get_msg_env(msg);
225
226         if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
227                 return -1;
228
229         if(!(_evapi_clients[evenv->conidx].connected==1
230                         && _evapi_clients[evenv->conidx].sock >= 0)) {
231                 LM_ERR("connection not established\n");
232                 return -1;
233         }
234
235         if(stag->len>=EVAPI_TAG_SIZE) {
236                 LM_ERR("tag size too big: %d / %d\n", stag->len, EVAPI_TAG_SIZE);
237                 return -1;
238         }
239         _evapi_clients[evenv->conidx].stag.s = _evapi_clients[evenv->conidx].tag;
240         strncpy(_evapi_clients[evenv->conidx].stag.s, stag->s, stag->len);
241         _evapi_clients[evenv->conidx].stag.s[stag->len] = '\0';
242         _evapi_clients[evenv->conidx].stag.len = stag->len;
243         return 1;
244 }
245
246 /**
247  *
248  */
249 int evapi_init_notify_sockets(void)
250 {
251         if (socketpair(PF_UNIX, SOCK_STREAM, 0, _evapi_notify_sockets) < 0) {
252                 LM_ERR("opening notify stream socket pair\n");
253                 return -1;
254         }
255         LM_DBG("inter-process event notification sockets initialized: %d ~ %d\n",
256                         _evapi_notify_sockets[0], _evapi_notify_sockets[1]);
257         return 0;
258 }
259
260 /**
261  *
262  */
263 void evapi_close_notify_sockets_child(void)
264 {
265         LM_DBG("closing the notification socket used by children\n");
266         close(_evapi_notify_sockets[1]);
267         _evapi_notify_sockets[1] = -1;
268 }
269
270 /**
271  *
272  */
273 void evapi_close_notify_sockets_parent(void)
274 {
275         LM_DBG("closing the notification socket used by parent\n");
276         close(_evapi_notify_sockets[0]);
277         _evapi_notify_sockets[0] = -1;
278 }
279
280 /**
281  *
282  */
283 int evapi_dispatch_notify(evapi_msg_t *emsg)
284 {
285         int i;
286         int n;
287         int wlen;
288
289         if(_evapi_clients==NULL) {
290                 return 0;
291         }
292
293         n = 0;
294         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
295                 if(_evapi_clients[i].connected==1 && _evapi_clients[i].sock>=0) {
296                         if(emsg->tag.s==NULL || (emsg->tag.len == _evapi_clients[i].stag.len
297                                                 && strncmp(_evapi_clients[i].stag.s,
298                                                                         emsg->tag.s, emsg->tag.len)==0)) {
299                                 wlen = write(_evapi_clients[i].sock, emsg->data.s,
300                                                 emsg->data.len);
301                                 if(wlen!=emsg->data.len) {
302                                         LM_DBG("failed to write all packet (%d out of %d) on socket"
303                                                         " %d index [%d]\n",
304                                                         wlen, emsg->data.len, _evapi_clients[i].sock, i);
305                                 }
306                                 n++;
307                                 if (emsg->unicast){
308                                         break;
309                                 }
310                         }
311                 }
312         }
313
314         LM_DBG("the message was sent to %d clients\n", n);
315
316         return n;
317 }
318
319 /**
320  *
321  */
322 void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
323 {
324         ssize_t rlen;
325         int i, k;
326         evapi_env_t evenv;
327         str frame;
328         char *sfp;
329         char *efp;
330
331         if(EV_ERROR & revents) {
332                 LM_ERR("received invalid event (%d)\n", revents);
333                 return;
334         }
335         if(_evapi_clients==NULL) {
336                 LM_ERR("no client structures\n");
337                 return;
338         }
339
340         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
341                 if(_evapi_clients[i].connected==1 && _evapi_clients[i].sock==watcher->fd) {
342                         break;
343                 }
344         }
345         if(i==EVAPI_MAX_CLIENTS) {
346                 LM_ERR("cannot lookup client socket %d\n", watcher->fd);
347                 /* try to empty the socket anyhow */
348                 rlen = recv(watcher->fd, _evapi_clients[i].rbuffer, CLIENT_BUFFER_SIZE-1, 0);
349                 return;
350         }
351
352         /* read message from client */
353         rlen = recv(watcher->fd, _evapi_clients[i].rbuffer + _evapi_clients[i].rpos,
354                         CLIENT_BUFFER_SIZE - 1 - _evapi_clients[i].rpos, 0);
355
356         if(rlen < 0) {
357                 LM_ERR("cannot read the client message\n");
358                 _evapi_clients[i].rpos = 0;
359                 return;
360         }
361
362
363         cfg_update();
364
365         evapi_env_reset(&evenv);
366         if(rlen == 0) {
367                 /* client is gone */
368                 evenv.eset = 1;
369                 evenv.conidx = i;
370                 evapi_run_cfg_route(&evenv, _evapi_rts.con_closed,
371                                 &_evapi_rts.con_closed_name);
372                 _evapi_clients[i].connected = 0;
373                 if(_evapi_clients[i].sock>=0) {
374                         close(_evapi_clients[i].sock);
375                 }
376                 _evapi_clients[i].sock = -1;
377                 _evapi_clients[i].rpos = 0;
378                 ev_io_stop(loop, watcher);
379                 free(watcher);
380                 LM_INFO("client closing connection - pos [%d] addr [%s:%d]\n",
381                                 i, _evapi_clients[i].src_addr, _evapi_clients[i].src_port);
382                 return;
383         }
384
385         _evapi_clients[i].rbuffer[_evapi_clients[i].rpos+rlen] = '\0';
386
387         LM_DBG("{%d} [%s:%d] - received [%.*s] (%d) (%d)\n",
388                 i, _evapi_clients[i].src_addr, _evapi_clients[i].src_port,
389                 (int)rlen, _evapi_clients[i].rbuffer+_evapi_clients[i].rpos,
390                 (int)rlen, (int)_evapi_clients[i].rpos);
391         evenv.conidx = i;
392         evenv.eset = 1;
393         if(_evapi_netstring_format) {
394                 /* netstring decapsulation */
395                 k = 0;
396                 while(k<_evapi_clients[i].rpos+rlen) {
397                         frame.len = 0;
398                         while(k<_evapi_clients[i].rpos+rlen) {
399                                 if(_evapi_clients[i].rbuffer[k]==' '
400                                                 || _evapi_clients[i].rbuffer[k]=='\t'
401                                                 || _evapi_clients[i].rbuffer[k]=='\r'
402                                                 || _evapi_clients[i].rbuffer[k]=='\n')
403                                         k++;
404                                 else break;
405                         }
406                         if(k==_evapi_clients[i].rpos+rlen) {
407                                 _evapi_clients[i].rpos = 0;
408                                 LM_DBG("empty content\n");
409                                 return;
410                         }
411                         /* pointer to start of whole frame */
412                         sfp = _evapi_clients[i].rbuffer + k;
413                         while(k<_evapi_clients[i].rpos+rlen) {
414                                 if(_evapi_clients[i].rbuffer[k]>='0' && _evapi_clients[i].rbuffer[k]<='9') {
415                                         frame.len = frame.len*10 + _evapi_clients[i].rbuffer[k] - '0';
416                                 } else {
417                                         if(_evapi_clients[i].rbuffer[k]==':')
418                                                 break;
419                                         /* invalid character - discard the rest */
420                                         _evapi_clients[i].rpos = 0;
421                                         LM_DBG("invalid char when searching for size [%c] [%.*s] (%d) (%d)\n",
422                                                         _evapi_clients[i].rbuffer[k],
423                                                         (int)(_evapi_clients[i].rpos+rlen), _evapi_clients[i].rbuffer,
424                                                         (int)(_evapi_clients[i].rpos+rlen), k);
425                                         return;
426                                 }
427                                 k++;
428                         }
429                         if(k==_evapi_clients[i].rpos+rlen || frame.len<=0) {
430                                 LM_DBG("invalid frame len: %d kpos: %d rpos: %u rlen: %lu\n",
431                                                 frame.len, k, _evapi_clients[i].rpos, rlen);
432                                 _evapi_clients[i].rpos = 0;
433                                 return;
434                         }
435                         if(frame.len + k>=_evapi_clients[i].rpos + rlen) {
436                                 /* partial data - shift back in buffer and wait to read more */
437                                 efp = _evapi_clients[i].rbuffer + _evapi_clients[i].rpos + rlen;
438                                 if(efp<=sfp) {
439                                         _evapi_clients[i].rpos = 0;
440                                         LM_DBG("weird - invalid size for residual data\n");
441                                         return;
442                                 }
443                                 _evapi_clients[i].rpos = (unsigned int)(efp-sfp);
444                                 if(efp-sfp > sfp-_evapi_clients[i].rbuffer) {
445                                         memcpy(_evapi_clients[i].rbuffer, sfp, _evapi_clients[i].rpos);
446                                 } else {
447                                         for(k=0; k<_evapi_clients[i].rpos; k++) {
448                                                 _evapi_clients[i].rbuffer[k] = sfp[k];
449                                         }
450                                 }
451                                 LM_DBG("residual data [%.*s] (%d)\n",
452                                                 _evapi_clients[i].rpos, _evapi_clients[i].rbuffer,
453                                                 _evapi_clients[i].rpos);
454                                 return;
455                         }
456                         k++;
457                         frame.s = _evapi_clients[i].rbuffer + k;
458                         if(frame.s[frame.len]!=',') {
459                                 /* invalid data - discard and reset buffer */
460                                 LM_DBG("frame size mismatch the ending char (%c): [%.*s] (%d)\n",
461                                                 frame.s[frame.len], frame.len, frame.s, frame.len);
462                                 _evapi_clients[i].rpos = 0 ;
463                                 return;
464                         }
465                         frame.s[frame.len] = '\0';
466                         k += frame.len ;
467                         evenv.msg.s = frame.s;
468                         evenv.msg.len = frame.len;
469                         LM_DBG("executing event route for frame: [%.*s] (%d)\n",
470                                                 frame.len, frame.s, frame.len);
471                         evapi_run_cfg_route(&evenv, _evapi_rts.msg_received,
472                                         &_evapi_rts.msg_received_name);
473                         k++;
474                 }
475                 _evapi_clients[i].rpos = 0 ;
476         } else {
477                 evenv.msg.s = _evapi_clients[i].rbuffer;
478                 evenv.msg.len = rlen;
479                 evapi_run_cfg_route(&evenv, _evapi_rts.msg_received,
480                                 &_evapi_rts.msg_received_name);
481         }
482 }
483
484 /**
485  *
486  */
487 void evapi_accept_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
488 {
489         struct sockaddr caddr;
490         socklen_t clen = sizeof(caddr);
491         int csock;
492         struct ev_io *evapi_client;
493         int i;
494         evapi_env_t evenv;
495         int optval;
496         socklen_t optlen;
497
498         if(_evapi_clients==NULL) {
499                 LM_ERR("no client structures\n");
500                 return;
501         }
502         evapi_client = (struct ev_io*) malloc (sizeof(struct ev_io));
503         if(evapi_client==NULL) {
504                 LM_ERR("no more memory\n");
505                 return;
506         }
507
508         if(EV_ERROR & revents) {
509                 LM_ERR("received invalid event\n");
510                 free(evapi_client);
511                 return;
512         }
513
514         cfg_update();
515
516         /* accept new client connection */
517         csock = accept(watcher->fd, (struct sockaddr *)&caddr, &clen);
518
519         if (csock < 0) {
520                 LM_ERR("cannot accept the client '%s' err='%d'\n", gai_strerror(csock), csock);
521                 free(evapi_client);
522                 return;
523         }
524         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
525                 if(_evapi_clients[i].connected==0) {
526                         if (caddr.sa_family == AF_INET) {
527                                 _evapi_clients[i].src_port = ntohs(((struct sockaddr_in*)&caddr)->sin_port);
528                                 if(inet_ntop(AF_INET, &((struct sockaddr_in*)&caddr)->sin_addr,
529                                                         _evapi_clients[i].src_addr,
530                                                         EVAPI_IPADDR_SIZE)==NULL) {
531                                         LM_ERR("cannot convert ipv4 address\n");
532                                         close(csock);
533                                         free(evapi_client);
534                                         return;
535                                 }
536                         } else {
537                                 _evapi_clients[i].src_port = ntohs(((struct sockaddr_in6*)&caddr)->sin6_port);
538                                 if(inet_ntop(AF_INET6, &((struct sockaddr_in6*)&caddr)->sin6_addr,
539                                                         _evapi_clients[i].src_addr,
540                                                         EVAPI_IPADDR_SIZE)==NULL) {
541                                         LM_ERR("cannot convert ipv6 address\n");
542                                         close(csock);
543                                         free(evapi_client);
544                                         return;
545                                 }
546                         }
547                         optval = 1;
548                         optlen = sizeof(optval);
549                         if(setsockopt(csock, SOL_SOCKET, SO_KEEPALIVE,
550                                                 &optval, optlen) < 0) {
551                                 LM_WARN("failed to enable keepalive on socket %d\n", csock);
552                         }
553                         _evapi_clients[i].connected = 1;
554                         _evapi_clients[i].sock = csock;
555                         _evapi_clients[i].af = caddr.sa_family;
556                         break;
557                 }
558         }
559         if(i>=EVAPI_MAX_CLIENTS) {
560                 LM_ERR("too many clients\n");
561                 close(csock);
562                 free(evapi_client);
563                 return;
564         }
565
566         LM_DBG("new connection - pos[%d] from: [%s:%d]\n", i,
567                         _evapi_clients[i].src_addr, _evapi_clients[i].src_port);
568
569         evapi_env_reset(&evenv);
570         evenv.conidx = i;
571         evenv.eset = 1;
572         evapi_run_cfg_route(&evenv, _evapi_rts.con_new, &_evapi_rts.con_new_name);
573
574         if(_evapi_clients[i].connected == 0) {
575                 free(evapi_client);
576                 return;
577         }
578
579         /* start watcher to read messages from whatchers */
580         ev_io_init(evapi_client, evapi_recv_client, csock, EV_READ);
581         ev_io_start(loop, evapi_client);
582 }
583
584 /**
585  *
586  */
587 void evapi_recv_notify(struct ev_loop *loop, struct ev_io *watcher, int revents)
588 {
589         evapi_msg_t *emsg = NULL;
590         int rlen;
591
592         if(EV_ERROR & revents) {
593                 perror("received invalid event\n");
594                 return;
595         }
596
597         cfg_update();
598
599         /* read message from client */
600         rlen = read(watcher->fd, &emsg, sizeof(evapi_msg_t*));
601
602         if(rlen != sizeof(evapi_msg_t*) || emsg==NULL) {
603                 LM_ERR("cannot read the sip worker message\n");
604                 return;
605         }
606
607         LM_DBG("received [%p] [%.*s] (%d)\n", emsg,
608                         emsg->data.len, emsg->data.s, emsg->data.len);
609         evapi_dispatch_notify(emsg);
610         shm_free(emsg);
611 }
612
613 /**
614  *
615  */
616 int evapi_run_dispatcher(char *laddr, int lport)
617 {
618         int evapi_srv_sock;
619         struct sockaddr_in evapi_srv_addr;
620         struct ev_loop *loop;
621         struct hostent *h = NULL;
622         struct ev_io io_server;
623         struct ev_io io_notify;
624         int yes_true = 1;
625         int fflags = 0;
626         int i;
627
628         LM_DBG("starting dispatcher processing\n");
629
630         _evapi_clients = (evapi_client_t*)malloc(sizeof(evapi_client_t)
631                         * (EVAPI_MAX_CLIENTS+1));
632         if(_evapi_clients==NULL) {
633                 LM_ERR("failed to allocate client structures\n");
634                 exit(-1);
635         }
636         memset(_evapi_clients, 0, sizeof(evapi_client_t) * EVAPI_MAX_CLIENTS);
637         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
638                 _evapi_clients[i].sock = -1;
639         }
640         loop = ev_default_loop(0);
641
642         if(loop==NULL) {
643                 LM_ERR("cannot get libev loop\n");
644                 return -1;
645         }
646
647         h = gethostbyname(laddr);
648         if (h == NULL || (h->h_addrtype != AF_INET && h->h_addrtype != AF_INET6)) {
649                 LM_ERR("cannot resolve local server address [%s]\n", laddr);
650                 return -1;
651         }
652         if(h->h_addrtype == AF_INET) {
653                 evapi_srv_sock = socket(PF_INET, SOCK_STREAM, 0);
654         } else {
655                 evapi_srv_sock = socket(PF_INET6, SOCK_STREAM, 0);
656         }
657         if( evapi_srv_sock < 0 )
658         {
659                 LM_ERR("cannot create server socket (family %d)\n", h->h_addrtype);
660                 return -1;
661         }
662         /* set non-blocking flag */
663         fflags = fcntl(evapi_srv_sock, F_GETFL);
664         if(fflags<0) {
665                 LM_ERR("failed to get the srv socket flags\n");
666                 close(evapi_srv_sock);
667                 return -1;
668         }
669         if (fcntl(evapi_srv_sock, F_SETFL, fflags | O_NONBLOCK)<0) {
670                 LM_ERR("failed to set srv socket flags\n");
671                 close(evapi_srv_sock);
672                 return -1;
673         }
674
675         bzero(&evapi_srv_addr, sizeof(evapi_srv_addr));
676         evapi_srv_addr.sin_family = h->h_addrtype;
677         evapi_srv_addr.sin_port   = htons((short)lport);
678         evapi_srv_addr.sin_addr  = *(struct in_addr*)h->h_addr;
679
680         /* Set SO_REUSEADDR option on listening socket so that we don't
681          * have to wait for connections in TIME_WAIT to go away before
682          * re-binding.
683          */
684
685         if(setsockopt(evapi_srv_sock, SOL_SOCKET, SO_REUSEADDR,
686                 &yes_true, sizeof(int)) < 0) {
687                 LM_ERR("cannot set SO_REUSEADDR option on descriptor\n");
688                 close(evapi_srv_sock);
689                 return -1;
690         }
691
692         if (bind(evapi_srv_sock, (struct sockaddr*)&evapi_srv_addr,
693                                 sizeof(evapi_srv_addr)) < 0) {
694                 LM_ERR("cannot bind to local address and port [%s:%d]\n", laddr, lport);
695                 close(evapi_srv_sock);
696                 return -1;
697         }
698         if (listen(evapi_srv_sock, 4) < 0) {
699                 LM_ERR("listen error\n");
700                 close(evapi_srv_sock);
701                 return -1;
702         }
703         ev_io_init(&io_server, evapi_accept_client, evapi_srv_sock, EV_READ);
704         ev_io_start(loop, &io_server);
705         ev_io_init(&io_notify, evapi_recv_notify, _evapi_notify_sockets[0], EV_READ);
706         ev_io_start(loop, &io_notify);
707
708         while(1) {
709                 ev_loop (loop, 0);
710         }
711
712         return 0;
713 }
714
715 /**
716  *
717  */
718 int evapi_run_worker(int prank)
719 {
720         LM_DBG("started worker process: %d\n", prank);
721         while(1) {
722                 sleep(3);
723         }
724 }
725
726 /**
727  *
728  */
729 int _evapi_relay(str *evdata, str *ctag, int unicast)
730 {
731 #define EVAPI_RELAY_FORMAT "%d:%.*s,"
732
733         int len;
734         int sbsize;
735         evapi_msg_t *emsg;
736
737         LM_DBG("relaying event data [%.*s] (%d)\n",
738                         evdata->len, evdata->s, evdata->len);
739
740         sbsize = evdata->len;
741         len = sizeof(evapi_msg_t)
742                 + ((sbsize + 32 + ((ctag && ctag->len>0)?(ctag->len+2):0)) * sizeof(char));
743         emsg = (evapi_msg_t*)shm_malloc(len);
744         if(emsg==NULL) {
745                 LM_ERR("no more shared memory\n");
746                 return -1;
747         }
748         memset(emsg, 0, len);
749         emsg->data.s = (char*)emsg + sizeof(evapi_msg_t);
750         if(_evapi_netstring_format) {
751                 /* netstring encapsulation */
752                 emsg->data.len = snprintf(emsg->data.s, sbsize+32,
753                                 EVAPI_RELAY_FORMAT,
754                                 sbsize, evdata->len, evdata->s);
755         } else {
756                 emsg->data.len = snprintf(emsg->data.s, sbsize+32,
757                                 "%.*s",
758                                 evdata->len, evdata->s);
759         }
760         if(emsg->data.len<=0 || emsg->data.len>sbsize+32) {
761                 shm_free(emsg);
762                 LM_ERR("cannot serialize event\n");
763                 return -1;
764         }
765         if(ctag && ctag->len>0) {
766                 emsg->tag.s = emsg->data.s + sbsize + 32;
767                 strncpy(emsg->tag.s, ctag->s, ctag->len);
768                 emsg->tag.len = ctag->len;
769         }
770
771         if (unicast){
772                 emsg->unicast = unicast;
773         }
774
775         LM_DBG("sending [%p] [%.*s] (%d)\n", emsg, emsg->data.len, emsg->data.s,
776                         emsg->data.len);
777         if(_evapi_notify_sockets[1]!=-1) {
778                 len = write(_evapi_notify_sockets[1], &emsg, sizeof(evapi_msg_t*));
779                 if(len<=0) {
780                         shm_free(emsg);
781                         LM_ERR("failed to pass the pointer to evapi dispatcher\n");
782                         return -1;
783                 }
784         } else {
785                 cfg_update();
786                 LM_DBG("dispatching [%p] [%.*s] (%d)\n", emsg,
787                                 emsg->data.len, emsg->data.s, emsg->data.len);
788                 if(evapi_dispatch_notify(emsg) == 0) {
789                         shm_free(emsg);
790                         LM_WARN("message not delivered - no client connected\n");
791                         return -1;
792                 }
793                 shm_free(emsg);
794         }
795         return 0;
796 }
797
798 /**
799  *
800  */
801 int evapi_relay(str *evdata)
802 {
803         return _evapi_relay(evdata, NULL, 0);
804 }
805
806 /**
807  *
808  */
809 int evapi_relay_multicast(str *evdata, str *ctag){
810         return _evapi_relay(evdata, ctag, 0);
811 }
812
813 /**
814  *
815  */
816 int evapi_relay_unicast(str *evdata, str *ctag){
817         return _evapi_relay(evdata, ctag, 1);
818 }
819
820 #if 0
821 /**
822  *
823  */
824 int evapi_relay(str *event, str *data)
825 {
826 #define EVAPI_RELAY_FORMAT "%d:{\n \"event\":\"%.*s\",\n \"data\":%.*s\n},"
827
828         int len;
829         int sbsize;
830         str *sbuf;
831
832         LM_DBG("relaying event [%.*s] data [%.*s]\n",
833                         event->len, event->s, data->len, data->s);
834
835         sbsize = sizeof(EVAPI_RELAY_FORMAT) + event->len + data->len - 13;
836         sbuf = (str*)shm_malloc(sizeof(str) + ((sbsize+32) * sizeof(char)));
837         if(sbuf==NULL) {
838                 LM_ERR("no more shared memory\n");
839                 return -1;
840         }
841         sbuf->s = (char*)sbuf + sizeof(str);
842         sbuf->len = snprintf(sbuf->s, sbsize+32,
843                         EVAPI_RELAY_FORMAT,
844                         sbsize, event->len, event->s, data->len, data->s);
845         if(sbuf->len<=0 || sbuf->len>sbsize+32) {
846                 shm_free(sbuf);
847                 LM_ERR("cannot serialize event\n");
848                 return -1;
849         }
850
851         len = write(_evapi_notify_sockets[1], &sbuf, sizeof(str*));
852         if(len<=0) {
853                 LM_ERR("failed to pass the pointer to evapi dispatcher\n");
854                 return -1;
855         }
856         LM_DBG("sent [%p] [%.*s] (%d)\n", sbuf, sbuf->len, sbuf->s, sbuf->len);
857         return 0;
858 }
859 #endif
860
861 /**
862  *
863  */
864 int pv_parse_evapi_name(pv_spec_t *sp, str *in)
865 {
866         if(sp==NULL || in==NULL || in->len<=0)
867                 return -1;
868
869         switch(in->len)
870         {
871                 case 3:
872                         if(strncmp(in->s, "msg", 3)==0)
873                                 sp->pvp.pvn.u.isname.name.n = 1;
874                         else goto error;
875                 break;
876                 case 6:
877                         if(strncmp(in->s, "conidx", 6)==0)
878                                 sp->pvp.pvn.u.isname.name.n = 0;
879                         else goto error;
880                 break;
881                 case 7:
882                         if(strncmp(in->s, "srcaddr", 7)==0)
883                                 sp->pvp.pvn.u.isname.name.n = 2;
884                         else if(strncmp(in->s, "srcport", 7)==0)
885                                 sp->pvp.pvn.u.isname.name.n = 3;
886                         else goto error;
887                 break;
888                 default:
889                         goto error;
890         }
891         sp->pvp.pvn.type = PV_NAME_INTSTR;
892         sp->pvp.pvn.u.isname.type = 0;
893
894         return 0;
895
896 error:
897         LM_ERR("unknown PV msrp name %.*s\n", in->len, in->s);
898         return -1;
899 }
900
901 /**
902  *
903  */
904 int pv_get_evapi(sip_msg_t *msg, pv_param_t *param, pv_value_t *res)
905 {
906         evapi_env_t *evenv;
907
908         if(param==NULL || res==NULL)
909                 return -1;
910
911         if(_evapi_clients==NULL) {
912                 return pv_get_null(msg, param, res);
913         }
914         evenv = evapi_get_msg_env(msg);
915
916         if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
917                 return pv_get_null(msg, param, res);
918
919         if(_evapi_clients[evenv->conidx].connected==0
920                         && _evapi_clients[evenv->conidx].sock < 0)
921                 return pv_get_null(msg, param, res);
922
923         switch(param->pvn.u.isname.name.n)
924         {
925                 case 0:
926                         return pv_get_sintval(msg, param, res, evenv->conidx);
927                 case 1:
928                         if(evenv->msg.s==NULL)
929                                 return pv_get_null(msg, param, res);
930                         return pv_get_strval(msg, param, res, &evenv->msg);
931                 case 2:
932                         return pv_get_strzval(msg, param, res,
933                                         _evapi_clients[evenv->conidx].src_addr);
934                 case 3:
935                         return pv_get_sintval(msg, param, res,
936                                         _evapi_clients[evenv->conidx].src_port);
937                 default:
938                         return pv_get_null(msg, param, res);
939         }
940
941         return 0;
942 }
943
944 /**
945  *
946  */
947 int pv_set_evapi(sip_msg_t *msg, pv_param_t *param, int op,
948                 pv_value_t *val)
949 {
950         return 0;
951 }