evapi: cast to void* when printing logs with pointer value
[sip-router] / src / modules / evapi / evapi_dispatch.c
1 /**
2  * Copyright (C) 2014 Daniel-Constantin Mierla (asipto.com)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * This file is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  *
12  * This file is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20  *
21  */
22
23 #include <stdio.h>
24 #include <unistd.h>
25 #include <stdlib.h>
26 #include <string.h>
27
28 #include <sys/socket.h>
29 #include <sys/types.h>
30 #include <netinet/in.h>
31 #include <arpa/inet.h>
32 #include <fcntl.h>
33
34 #include <ev.h>
35
36 #include "../../core/sr_module.h"
37 #include "../../core/dprint.h"
38 #include "../../core/ut.h"
39 #include "../../core/cfg/cfg_struct.h"
40 #include "../../core/receive.h"
41 #include "../../core/kemi.h"
42 #include "../../core/fmsg.h"
43
44 #include "evapi_dispatch.h"
45
46 static int _evapi_notify_sockets[2];
47 static int _evapi_netstring_format = 1;
48
49 extern str _evapi_event_callback;
50 extern int _evapi_dispatcher_pid;
51 extern int _evapi_max_clients;
52
53 #define EVAPI_IPADDR_SIZE       64
54 #define EVAPI_TAG_SIZE  64
55 #define CLIENT_BUFFER_SIZE      32768
56 typedef struct _evapi_client {
57         int connected;
58         int sock;
59         unsigned short af;
60         unsigned short src_port;
61         char src_addr[EVAPI_IPADDR_SIZE];
62         char tag[EVAPI_IPADDR_SIZE];
63         str  stag;
64         char rbuffer[CLIENT_BUFFER_SIZE];
65         unsigned int rpos;
66 } evapi_client_t;
67
68 typedef struct _evapi_env {
69         int eset;
70         int conidx;
71         str msg;
72 } evapi_env_t;
73
74 typedef struct _evapi_msg {
75         str data;
76         str tag;
77         int unicast;
78 } evapi_msg_t;
79
80 #define EVAPI_MAX_CLIENTS       _evapi_max_clients
81
82 /* last one used for error handling, not a real connected client */
83 static evapi_client_t *_evapi_clients = NULL;
84
85 typedef struct _evapi_evroutes {
86         int con_new;
87         str con_new_name;
88         int con_closed;
89         str con_closed_name;
90         int msg_received;
91         str msg_received_name;
92 } evapi_evroutes_t;
93
94 static evapi_evroutes_t _evapi_rts;
95
96 /**
97  *
98  */
99 void evapi_env_reset(evapi_env_t *evenv)
100 {
101         if(evenv==0)
102                 return;
103         memset(evenv, 0, sizeof(evapi_env_t));
104         evenv->conidx = -1;
105 }
106
107 /**
108  *
109  */
110 void evapi_init_environment(int dformat)
111 {
112         memset(&_evapi_rts, 0, sizeof(evapi_evroutes_t));
113
114         _evapi_rts.con_new_name.s = "evapi:connection-new";
115         _evapi_rts.con_new_name.len = strlen(_evapi_rts.con_new_name.s);
116         _evapi_rts.con_new = route_lookup(&event_rt, "evapi:connection-new");
117         if (_evapi_rts.con_new < 0 || event_rt.rlist[_evapi_rts.con_new] == NULL)
118                 _evapi_rts.con_new = -1;
119
120         _evapi_rts.con_closed_name.s = "evapi:connection-closed";
121         _evapi_rts.con_closed_name.len = strlen(_evapi_rts.con_closed_name.s);
122         _evapi_rts.con_closed = route_lookup(&event_rt, "evapi:connection-closed");
123         if (_evapi_rts.con_closed < 0 || event_rt.rlist[_evapi_rts.con_closed] == NULL)
124                 _evapi_rts.con_closed = -1;
125
126         _evapi_rts.msg_received_name.s = "evapi:message-received";
127         _evapi_rts.msg_received_name.len = strlen(_evapi_rts.msg_received_name.s);
128         _evapi_rts.msg_received = route_lookup(&event_rt, "evapi:message-received");
129         if (_evapi_rts.msg_received < 0 || event_rt.rlist[_evapi_rts.msg_received] == NULL)
130                 _evapi_rts.msg_received = -1;
131
132         _evapi_netstring_format = dformat;
133 }
134
135 /**
136  *
137  */
138 int evapi_run_cfg_route(evapi_env_t *evenv, int rt, str *rtname)
139 {
140         int backup_rt;
141         struct run_act_ctx ctx;
142         sip_msg_t *fmsg;
143         sip_msg_t tmsg;
144         sr_kemi_eng_t *keng = NULL;
145
146         if(evenv==0 || evenv->eset==0) {
147                 LM_ERR("evapi env not set\n");
148                 return -1;
149         }
150
151         if((rt<0) && (_evapi_event_callback.s==NULL || _evapi_event_callback.len<=0))
152                 return 0;
153
154         if(faked_msg_get_new(&tmsg)<0) {
155                 LM_ERR("failed to get a new faked message\n");
156                 return -1;
157         }
158         fmsg = &tmsg;
159         evapi_set_msg_env(fmsg, evenv);
160         backup_rt = get_route_type();
161         set_route_type(EVENT_ROUTE);
162         init_run_actions_ctx(&ctx);
163         if(rt>=0) {
164                 run_top_route(event_rt.rlist[rt], fmsg, 0);
165         } else {
166                 keng = sr_kemi_eng_get();
167                 if(keng!=NULL) {
168                         if(sr_kemi_route(keng, fmsg, EVENT_ROUTE,
169                                                 &_evapi_event_callback, rtname)<0) {
170                                 LM_ERR("error running event route kemi callback\n");
171                         }
172                 }
173         }
174         set_route_type(backup_rt);
175         evapi_set_msg_env(fmsg, NULL);
176         /* free the structure -- it is a clone of faked msg */
177         free_sip_msg(fmsg);
178         ksr_msg_env_reset();
179         return 0;
180 }
181
182 /**
183  *
184  */
185 int evapi_close_connection(int cidx)
186 {
187         if(cidx<0 || cidx>=EVAPI_MAX_CLIENTS || _evapi_clients==NULL)
188                 return -1;
189         if(_evapi_clients[cidx].connected==1
190                         && _evapi_clients[cidx].sock >= 0) {
191                 close(_evapi_clients[cidx].sock);
192                 _evapi_clients[cidx].connected = 0;
193                 _evapi_clients[cidx].sock = -1;
194                 return 0;
195         }
196         return -2;
197 }
198
199 /**
200  *
201  */
202 int evapi_cfg_close(sip_msg_t *msg)
203 {
204         evapi_env_t *evenv;
205
206         if(msg==NULL)
207                 return -1;
208
209         evenv = evapi_get_msg_env(msg);
210
211         if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
212                 return -1;
213         return evapi_close_connection(evenv->conidx);
214 }
215
216 /**
217  *
218  */
219 int evapi_set_tag(sip_msg_t* msg, str* stag)
220 {
221         evapi_env_t *evenv;
222
223         if(msg==NULL || stag==NULL || _evapi_clients==NULL)
224                 return -1;
225
226         evenv = evapi_get_msg_env(msg);
227
228         if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
229                 return -1;
230
231         if(!(_evapi_clients[evenv->conidx].connected==1
232                         && _evapi_clients[evenv->conidx].sock >= 0)) {
233                 LM_ERR("connection not established\n");
234                 return -1;
235         }
236
237         if(stag->len>=EVAPI_TAG_SIZE) {
238                 LM_ERR("tag size too big: %d / %d\n", stag->len, EVAPI_TAG_SIZE);
239                 return -1;
240         }
241         _evapi_clients[evenv->conidx].stag.s = _evapi_clients[evenv->conidx].tag;
242         strncpy(_evapi_clients[evenv->conidx].stag.s, stag->s, stag->len);
243         _evapi_clients[evenv->conidx].stag.s[stag->len] = '\0';
244         _evapi_clients[evenv->conidx].stag.len = stag->len;
245         return 1;
246 }
247
248 /**
249  *
250  */
251 int evapi_init_notify_sockets(void)
252 {
253         if (socketpair(PF_UNIX, SOCK_STREAM, 0, _evapi_notify_sockets) < 0) {
254                 LM_ERR("opening notify stream socket pair\n");
255                 return -1;
256         }
257         LM_DBG("inter-process event notification sockets initialized: %d ~ %d\n",
258                         _evapi_notify_sockets[0], _evapi_notify_sockets[1]);
259         return 0;
260 }
261
262 /**
263  *
264  */
265 void evapi_close_notify_sockets_child(void)
266 {
267         LM_DBG("closing the notification socket used by children\n");
268         close(_evapi_notify_sockets[1]);
269         _evapi_notify_sockets[1] = -1;
270 }
271
272 /**
273  *
274  */
275 void evapi_close_notify_sockets_parent(void)
276 {
277         LM_DBG("closing the notification socket used by parent\n");
278         close(_evapi_notify_sockets[0]);
279         _evapi_notify_sockets[0] = -1;
280 }
281
282 /**
283  *
284  */
285 int evapi_dispatch_notify(evapi_msg_t *emsg)
286 {
287         int i;
288         int n;
289         int wlen;
290
291         if(_evapi_clients==NULL) {
292                 return 0;
293         }
294
295         n = 0;
296         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
297                 if(_evapi_clients[i].connected==1 && _evapi_clients[i].sock>=0) {
298                         if(emsg->tag.s==NULL || (emsg->tag.len == _evapi_clients[i].stag.len
299                                                 && strncmp(_evapi_clients[i].stag.s,
300                                                                         emsg->tag.s, emsg->tag.len)==0)) {
301                                 wlen = write(_evapi_clients[i].sock, emsg->data.s,
302                                                 emsg->data.len);
303                                 if(wlen!=emsg->data.len) {
304                                         LM_DBG("failed to write all packet (%d out of %d) on socket"
305                                                         " %d index [%d]\n",
306                                                         wlen, emsg->data.len, _evapi_clients[i].sock, i);
307                                 }
308                                 n++;
309                                 if (emsg->unicast){
310                                         break;
311                                 }
312                         }
313                 }
314         }
315
316         LM_DBG("the message was sent to %d clients\n", n);
317
318         return n;
319 }
320
321 /**
322  *
323  */
324 void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
325 {
326         ssize_t rlen;
327         int i, k;
328         evapi_env_t evenv;
329         str frame;
330         char *sfp;
331         char *efp;
332
333         if(EV_ERROR & revents) {
334                 LM_ERR("received invalid event (%d)\n", revents);
335                 return;
336         }
337         if(_evapi_clients==NULL) {
338                 LM_ERR("no client structures\n");
339                 return;
340         }
341
342         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
343                 if(_evapi_clients[i].connected==1 && _evapi_clients[i].sock==watcher->fd) {
344                         break;
345                 }
346         }
347         if(i==EVAPI_MAX_CLIENTS) {
348                 LM_ERR("cannot lookup client socket %d\n", watcher->fd);
349                 /* try to empty the socket anyhow */
350                 rlen = recv(watcher->fd, _evapi_clients[i].rbuffer, CLIENT_BUFFER_SIZE-1, 0);
351                 return;
352         }
353
354         /* read message from client */
355         rlen = recv(watcher->fd, _evapi_clients[i].rbuffer + _evapi_clients[i].rpos,
356                         CLIENT_BUFFER_SIZE - 1 - _evapi_clients[i].rpos, 0);
357
358         if(rlen < 0) {
359                 LM_ERR("cannot read the client message\n");
360                 _evapi_clients[i].rpos = 0;
361                 return;
362         }
363
364
365         cfg_update();
366
367         evapi_env_reset(&evenv);
368         if(rlen == 0) {
369                 /* client is gone */
370                 evenv.eset = 1;
371                 evenv.conidx = i;
372                 evapi_run_cfg_route(&evenv, _evapi_rts.con_closed,
373                                 &_evapi_rts.con_closed_name);
374                 _evapi_clients[i].connected = 0;
375                 if(_evapi_clients[i].sock>=0) {
376                         close(_evapi_clients[i].sock);
377                 }
378                 _evapi_clients[i].sock = -1;
379                 _evapi_clients[i].rpos = 0;
380                 ev_io_stop(loop, watcher);
381                 free(watcher);
382                 LM_INFO("client closing connection - pos [%d] addr [%s:%d]\n",
383                                 i, _evapi_clients[i].src_addr, _evapi_clients[i].src_port);
384                 return;
385         }
386
387         _evapi_clients[i].rbuffer[_evapi_clients[i].rpos+rlen] = '\0';
388
389         LM_DBG("{%d} [%s:%d] - received [%.*s] (%d) (%d)\n",
390                 i, _evapi_clients[i].src_addr, _evapi_clients[i].src_port,
391                 (int)rlen, _evapi_clients[i].rbuffer+_evapi_clients[i].rpos,
392                 (int)rlen, (int)_evapi_clients[i].rpos);
393         evenv.conidx = i;
394         evenv.eset = 1;
395         if(_evapi_netstring_format) {
396                 /* netstring decapsulation */
397                 k = 0;
398                 while(k<_evapi_clients[i].rpos+rlen) {
399                         frame.len = 0;
400                         while(k<_evapi_clients[i].rpos+rlen) {
401                                 if(_evapi_clients[i].rbuffer[k]==' '
402                                                 || _evapi_clients[i].rbuffer[k]=='\t'
403                                                 || _evapi_clients[i].rbuffer[k]=='\r'
404                                                 || _evapi_clients[i].rbuffer[k]=='\n')
405                                         k++;
406                                 else break;
407                         }
408                         if(k==_evapi_clients[i].rpos+rlen) {
409                                 _evapi_clients[i].rpos = 0;
410                                 LM_DBG("empty content\n");
411                                 return;
412                         }
413                         /* pointer to start of whole frame */
414                         sfp = _evapi_clients[i].rbuffer + k;
415                         while(k<_evapi_clients[i].rpos+rlen) {
416                                 if(_evapi_clients[i].rbuffer[k]>='0' && _evapi_clients[i].rbuffer[k]<='9') {
417                                         frame.len = frame.len*10 + _evapi_clients[i].rbuffer[k] - '0';
418                                 } else {
419                                         if(_evapi_clients[i].rbuffer[k]==':')
420                                                 break;
421                                         /* invalid character - discard the rest */
422                                         _evapi_clients[i].rpos = 0;
423                                         LM_DBG("invalid char when searching for size [%c] [%.*s] (%d) (%d)\n",
424                                                         _evapi_clients[i].rbuffer[k],
425                                                         (int)(_evapi_clients[i].rpos+rlen), _evapi_clients[i].rbuffer,
426                                                         (int)(_evapi_clients[i].rpos+rlen), k);
427                                         return;
428                                 }
429                                 k++;
430                         }
431                         if(k==_evapi_clients[i].rpos+rlen || frame.len<=0) {
432                                 LM_DBG("invalid frame len: %d kpos: %d rpos: %u rlen: %lu\n",
433                                                 frame.len, k, _evapi_clients[i].rpos, rlen);
434                                 _evapi_clients[i].rpos = 0;
435                                 return;
436                         }
437                         if(frame.len + k>=_evapi_clients[i].rpos + rlen) {
438                                 /* partial data - shift back in buffer and wait to read more */
439                                 efp = _evapi_clients[i].rbuffer + _evapi_clients[i].rpos + rlen;
440                                 if(efp<=sfp) {
441                                         _evapi_clients[i].rpos = 0;
442                                         LM_DBG("weird - invalid size for residual data\n");
443                                         return;
444                                 }
445                                 _evapi_clients[i].rpos = (unsigned int)(efp-sfp);
446                                 if(efp-sfp > sfp-_evapi_clients[i].rbuffer) {
447                                         memcpy(_evapi_clients[i].rbuffer, sfp, _evapi_clients[i].rpos);
448                                 } else {
449                                         for(k=0; k<_evapi_clients[i].rpos; k++) {
450                                                 _evapi_clients[i].rbuffer[k] = sfp[k];
451                                         }
452                                 }
453                                 LM_DBG("residual data [%.*s] (%d)\n",
454                                                 _evapi_clients[i].rpos, _evapi_clients[i].rbuffer,
455                                                 _evapi_clients[i].rpos);
456                                 return;
457                         }
458                         k++;
459                         frame.s = _evapi_clients[i].rbuffer + k;
460                         if(frame.s[frame.len]!=',') {
461                                 /* invalid data - discard and reset buffer */
462                                 LM_DBG("frame size mismatch the ending char (%c): [%.*s] (%d)\n",
463                                                 frame.s[frame.len], frame.len, frame.s, frame.len);
464                                 _evapi_clients[i].rpos = 0 ;
465                                 return;
466                         }
467                         frame.s[frame.len] = '\0';
468                         k += frame.len ;
469                         evenv.msg.s = frame.s;
470                         evenv.msg.len = frame.len;
471                         LM_DBG("executing event route for frame: [%.*s] (%d)\n",
472                                                 frame.len, frame.s, frame.len);
473                         evapi_run_cfg_route(&evenv, _evapi_rts.msg_received,
474                                         &_evapi_rts.msg_received_name);
475                         k++;
476                 }
477                 _evapi_clients[i].rpos = 0 ;
478         } else {
479                 evenv.msg.s = _evapi_clients[i].rbuffer;
480                 evenv.msg.len = rlen;
481                 evapi_run_cfg_route(&evenv, _evapi_rts.msg_received,
482                                 &_evapi_rts.msg_received_name);
483         }
484 }
485
486 /**
487  *
488  */
489 void evapi_accept_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
490 {
491         struct sockaddr caddr;
492         socklen_t clen = sizeof(caddr);
493         int csock;
494         struct ev_io *evapi_client;
495         int i;
496         evapi_env_t evenv;
497         int optval;
498         socklen_t optlen;
499
500         if(_evapi_clients==NULL) {
501                 LM_ERR("no client structures\n");
502                 return;
503         }
504         evapi_client = (struct ev_io*) malloc (sizeof(struct ev_io));
505         if(evapi_client==NULL) {
506                 LM_ERR("no more memory\n");
507                 return;
508         }
509
510         if(EV_ERROR & revents) {
511                 LM_ERR("received invalid event\n");
512                 free(evapi_client);
513                 return;
514         }
515
516         cfg_update();
517
518         /* accept new client connection */
519         csock = accept(watcher->fd, (struct sockaddr *)&caddr, &clen);
520
521         if (csock < 0) {
522                 LM_ERR("cannot accept the client '%s' err='%d'\n", gai_strerror(csock), csock);
523                 free(evapi_client);
524                 return;
525         }
526         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
527                 if(_evapi_clients[i].connected==0) {
528                         if (caddr.sa_family == AF_INET) {
529                                 _evapi_clients[i].src_port = ntohs(((struct sockaddr_in*)&caddr)->sin_port);
530                                 if(inet_ntop(AF_INET, &((struct sockaddr_in*)&caddr)->sin_addr,
531                                                         _evapi_clients[i].src_addr,
532                                                         EVAPI_IPADDR_SIZE)==NULL) {
533                                         LM_ERR("cannot convert ipv4 address\n");
534                                         close(csock);
535                                         free(evapi_client);
536                                         return;
537                                 }
538                         } else {
539                                 _evapi_clients[i].src_port = ntohs(((struct sockaddr_in6*)&caddr)->sin6_port);
540                                 if(inet_ntop(AF_INET6, &((struct sockaddr_in6*)&caddr)->sin6_addr,
541                                                         _evapi_clients[i].src_addr,
542                                                         EVAPI_IPADDR_SIZE)==NULL) {
543                                         LM_ERR("cannot convert ipv6 address\n");
544                                         close(csock);
545                                         free(evapi_client);
546                                         return;
547                                 }
548                         }
549                         optval = 1;
550                         optlen = sizeof(optval);
551                         if(setsockopt(csock, SOL_SOCKET, SO_KEEPALIVE,
552                                                 &optval, optlen) < 0) {
553                                 LM_WARN("failed to enable keepalive on socket %d\n", csock);
554                         }
555                         _evapi_clients[i].connected = 1;
556                         _evapi_clients[i].sock = csock;
557                         _evapi_clients[i].af = caddr.sa_family;
558                         break;
559                 }
560         }
561         if(i>=EVAPI_MAX_CLIENTS) {
562                 LM_ERR("too many clients\n");
563                 close(csock);
564                 free(evapi_client);
565                 return;
566         }
567
568         LM_DBG("new connection - pos[%d] from: [%s:%d]\n", i,
569                         _evapi_clients[i].src_addr, _evapi_clients[i].src_port);
570
571         evapi_env_reset(&evenv);
572         evenv.conidx = i;
573         evenv.eset = 1;
574         evapi_run_cfg_route(&evenv, _evapi_rts.con_new, &_evapi_rts.con_new_name);
575
576         if(_evapi_clients[i].connected == 0) {
577                 free(evapi_client);
578                 return;
579         }
580
581         /* start watcher to read messages from whatchers */
582         ev_io_init(evapi_client, evapi_recv_client, csock, EV_READ);
583         ev_io_start(loop, evapi_client);
584 }
585
586 /**
587  *
588  */
589 void evapi_recv_notify(struct ev_loop *loop, struct ev_io *watcher, int revents)
590 {
591         evapi_msg_t *emsg = NULL;
592         int rlen;
593
594         if(EV_ERROR & revents) {
595                 perror("received invalid event\n");
596                 return;
597         }
598
599         cfg_update();
600
601         /* read message from client */
602         rlen = read(watcher->fd, &emsg, sizeof(evapi_msg_t*));
603
604         if(rlen != sizeof(evapi_msg_t*) || emsg==NULL) {
605                 LM_ERR("cannot read the sip worker message\n");
606                 return;
607         }
608
609         LM_DBG("received [%p] [%.*s] (%d)\n", (void*)emsg,
610                         emsg->data.len, emsg->data.s, emsg->data.len);
611         evapi_dispatch_notify(emsg);
612         shm_free(emsg);
613 }
614
615 /**
616  *
617  */
618 int evapi_run_dispatcher(char *laddr, int lport)
619 {
620         int evapi_srv_sock;
621         struct sockaddr_in evapi_srv_addr;
622         struct ev_loop *loop;
623         struct hostent *h = NULL;
624         struct ev_io io_server;
625         struct ev_io io_notify;
626         int yes_true = 1;
627         int fflags = 0;
628         int i;
629
630         LM_DBG("starting dispatcher processing\n");
631
632         _evapi_clients = (evapi_client_t*)malloc(sizeof(evapi_client_t)
633                         * (EVAPI_MAX_CLIENTS+1));
634         if(_evapi_clients==NULL) {
635                 LM_ERR("failed to allocate client structures\n");
636                 exit(-1);
637         }
638         memset(_evapi_clients, 0, sizeof(evapi_client_t) * EVAPI_MAX_CLIENTS);
639         for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
640                 _evapi_clients[i].sock = -1;
641         }
642         loop = ev_default_loop(0);
643
644         if(loop==NULL) {
645                 LM_ERR("cannot get libev loop\n");
646                 return -1;
647         }
648
649         h = gethostbyname(laddr);
650         if (h == NULL || (h->h_addrtype != AF_INET && h->h_addrtype != AF_INET6)) {
651                 LM_ERR("cannot resolve local server address [%s]\n", laddr);
652                 return -1;
653         }
654         if(h->h_addrtype == AF_INET) {
655                 evapi_srv_sock = socket(PF_INET, SOCK_STREAM, 0);
656         } else {
657                 evapi_srv_sock = socket(PF_INET6, SOCK_STREAM, 0);
658         }
659         if( evapi_srv_sock < 0 )
660         {
661                 LM_ERR("cannot create server socket (family %d)\n", h->h_addrtype);
662                 return -1;
663         }
664         /* set non-blocking flag */
665         fflags = fcntl(evapi_srv_sock, F_GETFL);
666         if(fflags<0) {
667                 LM_ERR("failed to get the srv socket flags\n");
668                 close(evapi_srv_sock);
669                 return -1;
670         }
671         if (fcntl(evapi_srv_sock, F_SETFL, fflags | O_NONBLOCK)<0) {
672                 LM_ERR("failed to set srv socket flags\n");
673                 close(evapi_srv_sock);
674                 return -1;
675         }
676
677         bzero(&evapi_srv_addr, sizeof(evapi_srv_addr));
678         evapi_srv_addr.sin_family = h->h_addrtype;
679         evapi_srv_addr.sin_port   = htons(lport);
680         evapi_srv_addr.sin_addr  = *(struct in_addr*)h->h_addr;
681
682         /* Set SO_REUSEADDR option on listening socket so that we don't
683          * have to wait for connections in TIME_WAIT to go away before
684          * re-binding.
685          */
686
687         if(setsockopt(evapi_srv_sock, SOL_SOCKET, SO_REUSEADDR,
688                 &yes_true, sizeof(int)) < 0) {
689                 LM_ERR("cannot set SO_REUSEADDR option on descriptor\n");
690                 close(evapi_srv_sock);
691                 return -1;
692         }
693
694         if (bind(evapi_srv_sock, (struct sockaddr*)&evapi_srv_addr,
695                                 sizeof(evapi_srv_addr)) < 0) {
696                 LM_ERR("cannot bind to local address and port [%s:%d]\n", laddr, lport);
697                 close(evapi_srv_sock);
698                 return -1;
699         }
700         if (listen(evapi_srv_sock, 4) < 0) {
701                 LM_ERR("listen error\n");
702                 close(evapi_srv_sock);
703                 return -1;
704         }
705         ev_io_init(&io_server, evapi_accept_client, evapi_srv_sock, EV_READ);
706         ev_io_start(loop, &io_server);
707         ev_io_init(&io_notify, evapi_recv_notify, _evapi_notify_sockets[0], EV_READ);
708         ev_io_start(loop, &io_notify);
709
710         while(1) {
711                 ev_loop (loop, 0);
712         }
713
714         return 0;
715 }
716
717 /**
718  *
719  */
720 int evapi_run_worker(int prank)
721 {
722         LM_DBG("started worker process: %d\n", prank);
723         while(1) {
724                 sleep(3);
725         }
726 }
727
728 /**
729  *
730  */
731 int _evapi_relay(str *evdata, str *ctag, int unicast)
732 {
733 #define EVAPI_RELAY_FORMAT "%d:%.*s,"
734
735         int len;
736         int sbsize;
737         evapi_msg_t *emsg;
738
739         LM_DBG("relaying event data [%.*s] (%d)\n",
740                         evdata->len, evdata->s, evdata->len);
741
742         sbsize = evdata->len;
743         len = sizeof(evapi_msg_t)
744                 + ((sbsize + 32 + ((ctag && ctag->len>0)?(ctag->len+2):0)) * sizeof(char));
745         emsg = (evapi_msg_t*)shm_malloc(len);
746         if(emsg==NULL) {
747                 LM_ERR("no more shared memory\n");
748                 return -1;
749         }
750         memset(emsg, 0, len);
751         emsg->data.s = (char*)emsg + sizeof(evapi_msg_t);
752         if(_evapi_netstring_format) {
753                 /* netstring encapsulation */
754                 emsg->data.len = snprintf(emsg->data.s, sbsize+32,
755                                 EVAPI_RELAY_FORMAT,
756                                 sbsize, evdata->len, evdata->s);
757         } else {
758                 emsg->data.len = snprintf(emsg->data.s, sbsize+32,
759                                 "%.*s",
760                                 evdata->len, evdata->s);
761         }
762         if(emsg->data.len<=0 || emsg->data.len>sbsize+32) {
763                 shm_free(emsg);
764                 LM_ERR("cannot serialize event\n");
765                 return -1;
766         }
767         if(ctag && ctag->len>0) {
768                 emsg->tag.s = emsg->data.s + sbsize + 32;
769                 strncpy(emsg->tag.s, ctag->s, ctag->len);
770                 emsg->tag.len = ctag->len;
771         }
772
773         if (unicast){
774                 emsg->unicast = unicast;
775         }
776
777         LM_DBG("sending [%p] [%.*s] (%d)\n", (void*)emsg, emsg->data.len,
778                         emsg->data.s, emsg->data.len);
779         if(_evapi_notify_sockets[1]!=-1) {
780                 len = write(_evapi_notify_sockets[1], &emsg, sizeof(evapi_msg_t*));
781                 if(len<=0) {
782                         shm_free(emsg);
783                         LM_ERR("failed to pass the pointer to evapi dispatcher\n");
784                         return -1;
785                 }
786         } else {
787                 cfg_update();
788                 LM_DBG("dispatching [%p] [%.*s] (%d)\n", (void*)emsg,
789                                 emsg->data.len, emsg->data.s, emsg->data.len);
790                 if(evapi_dispatch_notify(emsg) == 0) {
791                         shm_free(emsg);
792                         LM_WARN("message not delivered - no client connected\n");
793                         return -1;
794                 }
795                 shm_free(emsg);
796         }
797         return 0;
798 }
799
800 /**
801  *
802  */
803 int evapi_relay(str *evdata)
804 {
805         return _evapi_relay(evdata, NULL, 0);
806 }
807
808 /**
809  *
810  */
811 int evapi_relay_multicast(str *evdata, str *ctag){
812         return _evapi_relay(evdata, ctag, 0);
813 }
814
815 /**
816  *
817  */
818 int evapi_relay_unicast(str *evdata, str *ctag){
819         return _evapi_relay(evdata, ctag, 1);
820 }
821
822 #if 0
823 /**
824  *
825  */
826 int evapi_relay(str *event, str *data)
827 {
828 #define EVAPI_RELAY_FORMAT "%d:{\n \"event\":\"%.*s\",\n \"data\":%.*s\n},"
829
830         int len;
831         int sbsize;
832         str *sbuf;
833
834         LM_DBG("relaying event [%.*s] data [%.*s]\n",
835                         event->len, event->s, data->len, data->s);
836
837         sbsize = sizeof(EVAPI_RELAY_FORMAT) + event->len + data->len - 13;
838         sbuf = (str*)shm_malloc(sizeof(str) + ((sbsize+32) * sizeof(char)));
839         if(sbuf==NULL) {
840                 LM_ERR("no more shared memory\n");
841                 return -1;
842         }
843         sbuf->s = (char*)sbuf + sizeof(str);
844         sbuf->len = snprintf(sbuf->s, sbsize+32,
845                         EVAPI_RELAY_FORMAT,
846                         sbsize, event->len, event->s, data->len, data->s);
847         if(sbuf->len<=0 || sbuf->len>sbsize+32) {
848                 shm_free(sbuf);
849                 LM_ERR("cannot serialize event\n");
850                 return -1;
851         }
852
853         len = write(_evapi_notify_sockets[1], &sbuf, sizeof(str*));
854         if(len<=0) {
855                 LM_ERR("failed to pass the pointer to evapi dispatcher\n");
856                 return -1;
857         }
858         LM_DBG("sent [%p] [%.*s] (%d)\n", (void*)sbuf, sbuf->len, sbuf->s, sbuf->len);
859         return 0;
860 }
861 #endif
862
863 /**
864  *
865  */
866 int pv_parse_evapi_name(pv_spec_t *sp, str *in)
867 {
868         if(sp==NULL || in==NULL || in->len<=0)
869                 return -1;
870
871         switch(in->len)
872         {
873                 case 3:
874                         if(strncmp(in->s, "msg", 3)==0)
875                                 sp->pvp.pvn.u.isname.name.n = 1;
876                         else goto error;
877                 break;
878                 case 6:
879                         if(strncmp(in->s, "conidx", 6)==0)
880                                 sp->pvp.pvn.u.isname.name.n = 0;
881                         else goto error;
882                 break;
883                 case 7:
884                         if(strncmp(in->s, "srcaddr", 7)==0)
885                                 sp->pvp.pvn.u.isname.name.n = 2;
886                         else if(strncmp(in->s, "srcport", 7)==0)
887                                 sp->pvp.pvn.u.isname.name.n = 3;
888                         else goto error;
889                 break;
890                 default:
891                         goto error;
892         }
893         sp->pvp.pvn.type = PV_NAME_INTSTR;
894         sp->pvp.pvn.u.isname.type = 0;
895
896         return 0;
897
898 error:
899         LM_ERR("unknown PV msrp name %.*s\n", in->len, in->s);
900         return -1;
901 }
902
903 /**
904  *
905  */
906 int pv_get_evapi(sip_msg_t *msg, pv_param_t *param, pv_value_t *res)
907 {
908         evapi_env_t *evenv;
909
910         if(param==NULL || res==NULL)
911                 return -1;
912
913         if(_evapi_clients==NULL) {
914                 return pv_get_null(msg, param, res);
915         }
916         evenv = evapi_get_msg_env(msg);
917
918         if(evenv==NULL || evenv->conidx<0 || evenv->conidx>=EVAPI_MAX_CLIENTS)
919                 return pv_get_null(msg, param, res);
920
921         if(_evapi_clients[evenv->conidx].connected==0
922                         && _evapi_clients[evenv->conidx].sock < 0)
923                 return pv_get_null(msg, param, res);
924
925         switch(param->pvn.u.isname.name.n)
926         {
927                 case 0:
928                         return pv_get_sintval(msg, param, res, evenv->conidx);
929                 case 1:
930                         if(evenv->msg.s==NULL)
931                                 return pv_get_null(msg, param, res);
932                         return pv_get_strval(msg, param, res, &evenv->msg);
933                 case 2:
934                         return pv_get_strzval(msg, param, res,
935                                         _evapi_clients[evenv->conidx].src_addr);
936                 case 3:
937                         return pv_get_sintval(msg, param, res,
938                                         _evapi_clients[evenv->conidx].src_port);
939                 default:
940                         return pv_get_null(msg, param, res);
941         }
942
943         return 0;
944 }
945
946 /**
947  *
948  */
949 int pv_set_evapi(sip_msg_t *msg, pv_param_t *param, int op,
950                 pv_value_t *val)
951 {
952         return 0;
953 }