- changed sip_msg (new rcv member containing all the ips, ports, protocol)
[sip-router] / tcp_main.c
1 /*
2  * $Id$
3  *
4  * Copyright (C) 2001-2003 Fhg Fokus
5  *
6  * This file is part of ser, a free SIP server.
7  *
8  * ser is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version
12  *
13  * For a license to use the ser software under conditions
14  * other than those described here, or to purchase support for this
15  * software, please contact iptel.org by e-mail at the following addresses:
16  *    info@iptel.org
17  *
18  * ser is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software
25  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26  */
27
28
29 #ifdef USE_TCP
30
31
32 #ifndef SHM_MEM
33 #error "shared memory support needed (add -DSHM_MEM to Makefile.defs)"
34 #endif
35
36 #include <sys/select.h>
37
38 #include <sys/time.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41
42 #include <unistd.h>
43
44 #include <errno.h>
45 #include <string.h>
46
47
48
49 #include "ip_addr.h"
50 #include "pass_fd.h"
51 #include "tcp_conn.h"
52 #include "globals.h"
53 #include "pt.h"
54 #include "mem/mem.h"
55 #include "mem/shm_mem.h"
56 #include "timer.h"
57 #include "tcp_server.h"
58 #include "tcp_init.h"
59
60
61
62 #define local_malloc pkg_malloc
63 #define local_free   pkg_free
64
65 #define MAX_TCP_CHILDREN 100
66
67 struct tcp_child{
68         pid_t pid;
69         int unix_sock; /* unix sock fd, copied from pt*/
70         int busy;
71         int n_reqs; /* number of requests serviced so far */
72 };
73
74
75
76 struct tcp_connection** conn_list=0;
77 struct tcp_child tcp_children[MAX_TCP_CHILDREN];
78 static int connection_id=1; /*  unique for each connection, used for 
79                                                                 quickly finding the corresponding connection
80                                                                 for a reply */
81 int unix_tcp_sock;
82
83
84
85 struct tcp_connection* tcpconn_new(int sock, union sockaddr_union* su,
86                                                                         struct socket_info* ba)
87 {
88         struct tcp_connection *c;
89         
90
91         c=(struct tcp_connection*)shm_malloc(sizeof(struct tcp_connection));
92         if (c==0){
93                 LOG(L_ERR, "ERROR: tcpconn_add: mem. allocation failure\n");
94                 goto error;
95         }
96         c->s=sock;
97         c->fd=sock;
98         c->rcv.src_su=*su;
99         
100         c->refcnt=0;
101         su2ip_addr(&c->rcv.src_ip, su);
102         c->rcv.src_port=su_getport(su);
103         c->rcv.proto=PROTO_TCP;
104         c->rcv.bind_address=ba;
105         if (ba){
106                 c->rcv.dst_ip=ba->address;
107                 c->rcv.dst_port=ba->port_no;
108         }
109         init_tcp_req(&c->req);
110         c->timeout=get_ticks()+TCP_CON_TIMEOUT;
111         c->id=connection_id++;
112         c->rcv.proto_reserved1=0; /* this will be filled before receive_message*/
113         c->rcv.proto_reserved2=0;
114         return c;
115         
116 error:
117         return 0;
118 }
119
120
121
122 struct tcp_connection* tcpconn_connect(union sockaddr_union* server)
123 {
124         int s;
125
126         s=socket(AF2PF(server->s.sa_family), SOCK_STREAM, 0);
127         if (s<0){
128                 LOG(L_ERR, "ERROR: tcpconn_connect: socket: (%d) %s\n",
129                                 errno, strerror(errno));
130                 goto error;
131         }
132         if (connect(s, &server->s, sockaddru_len(*server))<0){
133                 LOG(L_ERR, "ERROR: tcpconn_connect: connect: (%d) %s\n",
134                                 errno, strerror(errno));
135                 goto error;
136         }
137         return tcpconn_new(s, server, 0); /*FIXME: set sock idx! */
138 error:
139         return 0;
140 }
141
142
143
144 struct tcp_connection*  tcpconn_add(struct tcp_connection *c)
145 {
146         TCPCONN_LOCK;
147         /* add it at the begining of the list*/
148         if (c) tcpconn_listadd(*conn_list, c, next, prev);
149         TCPCONN_UNLOCK;
150         return c;
151 }
152
153
154
155 void tcpconn_rm(struct tcp_connection* c)
156 {
157         TCPCONN_LOCK;
158         tcpconn_listrm(*conn_list, c, next, prev);
159         TCPCONN_UNLOCK;
160         shm_free(c);
161 }
162
163
164 /* finds a connection, if id=0 uses the ip addr & port */
165 struct tcp_connection* tcpconn_find(int id, struct ip_addr* ip, int port)
166 {
167
168         struct tcp_connection *c;
169         
170         DBG("tcpconn_find: %d ",id ); print_ip(ip); DBG(" %d\n", ntohs(port));
171         for (c=*conn_list; c; c=c->next){
172                 DBG("c=%p, c->id=%d, ip=",c, c->id);
173                 print_ip(&c->rcv.src_ip);
174                 DBG(" port=%d\n", ntohs(c->rcv.src_port));
175                 if (id){
176                         if (id==c->id) return c;
177                 }else if (ip && (port==c->rcv.src_port)&&
178                                         (ip_addr_cmp(ip, &c->rcv.src_ip)))
179                         return c;
180         }
181         return 0;
182 }
183
184
185
186 struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port)
187 {
188         struct tcp_connection* c;
189         TCPCONN_LOCK;
190         c=tcpconn_find(id, ip, port);
191         if (c) c->refcnt++;
192         TCPCONN_UNLOCK;
193         return c;
194 }
195
196
197
198 void tcpconn_put(struct tcp_connection* c)
199 {
200         c->refcnt--; /* FIXME: atomic_dec */
201 }
202
203
204
205 /* finds a tcpconn & sends on it */
206 int tcp_send(char* buf, unsigned len, union sockaddr_union* to, int id)
207 {
208         struct tcp_connection *c;
209         struct ip_addr ip;
210         int port;
211         long response[2];
212         int n;
213         
214         port=0;
215         if (to){
216                 su2ip_addr(&ip, to);
217                 port=su_getport(to);
218                 c=tcpconn_get(id, &ip, port); /* lock ;inc refcnt; unlock */
219         }else if (id){
220                 c=tcpconn_get(id, 0, 0);
221         }else{
222                 LOG(L_CRIT, "BUG: tcp_send called with null id & to\n");
223                 return -1;
224         }
225         
226         if (id){
227                 if (c==0) {
228                         if (to){
229                                 c=tcpconn_get(0, &ip, port); /* try again w/o id */
230                                 goto no_id;
231                         }else{
232                                 LOG(L_ERR, "ERROR: tcp_send: id %d not found, dropping\n",
233                                                 id);
234                                 return -1;
235                         }
236                 }else goto get_fd;
237         }
238 no_id:
239                 if (c==0){
240                         DBG("tcp_send: no open tcp connection found, opening new one\n");
241                         /* create tcp connection */
242                         if ((c=tcpconn_connect(to))==0){
243                                 LOG(L_ERR, "ERROR: tcp_send: connect failed\n");
244                                 return 0;
245                         }
246                         c->refcnt++;
247                         
248                         /* send the new tcpconn to "tcp main" */
249                         response[0]=(long)c;
250                         response[1]=CONN_NEW;
251                         n=write(unix_tcp_sock, response, sizeof(response));
252                         n=send_fd(unix_tcp_sock, &c, sizeof(c), c->s);
253                         goto send_it;
254                 }
255 get_fd:
256                         DBG("tcp_send: tcp connection found, acquiring fd\n");
257                         /* get the fd */
258                         response[0]=(long)c;
259                         response[1]=CONN_GET_FD;
260                         n=write(unix_tcp_sock, response, sizeof(response));
261                         DBG("tcp_send, c= %p, n=%d\n", c, n);
262                         n=receive_fd(unix_tcp_sock, &c, sizeof(c), &c->fd);
263                         DBG("tcp_send: after receive_fd: c= %p n=%d fd=%d\n",c, n, c->fd);
264                 
265         
266         
267 send_it:
268         DBG("tcp_send: sending...\n");
269         n=write(c->fd, buf, len);
270         DBG("tcp_send: after write: c= %p n=%d fd=%d\n",c, n, c->fd);
271         close(c->fd);
272         tcpconn_put(c); /* release c (lock; dec refcnt; unlock) */
273         return n;
274 }
275
276
277
278 /* very ineficient for now, use hashtable some day - FIXME*/
279 void tcpconn_timeout(fd_set* set)
280 {
281         struct tcp_connection *c, *next;
282         int ticks;;
283         
284         
285         ticks=get_ticks();
286         c=*conn_list;
287         while(c){
288                 next=c->next;
289                 if ((c->refcnt==0) && (ticks>c->timeout)) {
290                         DBG("tcpconn_timeout: timeout for %p (%d > %d)\n",
291                                         c, ticks, c->timeout);
292                         if (c->s>0) {
293                                 FD_CLR(c->s, set);
294                                 close(c->s);
295                         }
296                         tcpconn_rm(c);
297                 }
298                 c=next;
299         }
300 }
301
302
303
304 int tcp_init(struct socket_info* sock_info)
305 {
306         union sockaddr_union* addr;
307         
308         addr=&sock_info->su;
309         sock_info->proto=PROTO_TCP;
310         if (init_su(addr, &sock_info->address, htons(sock_info->port_no))<0){
311                 LOG(L_ERR, "ERROR: tcp_init: could no init sockaddr_union\n");
312                 goto error;
313         }
314         sock_info->socket=socket(AF2PF(addr->s.sa_family), SOCK_STREAM, 0);
315         if (sock_info->socket==-1){
316                 LOG(L_ERR, "ERROR: tcp_init: socket: %s\n", strerror(errno));
317                 goto error;
318         }
319         if (bind(sock_info->socket, &addr->s, sockaddru_len(*addr))==-1){
320                 LOG(L_ERR, "ERROR: tcp_init: bind(%x, %p, %d) on %s: %s\n",
321                                 sock_info->socket, &addr->s, 
322                                 sockaddru_len(*addr),
323                                 sock_info->address_str.s,
324                                 strerror(errno));
325                 goto error;
326         }
327         if (listen(sock_info->socket, 10)==-1){
328                 LOG(L_ERR, "ERROR: tcp_init: listen(%x, %p, %d) on %s: %s\n",
329                                 sock_info->socket, &addr->s, 
330                                 sockaddru_len(*addr),
331                                 sock_info->address_str.s,
332                                 strerror(errno));
333                 goto error;
334         }
335         
336         return 0;
337 error:
338         if (sock_info->socket!=-1){
339                 close(sock_info->socket);
340                 sock_info->socket=-1;
341         }
342         return -1;
343 }
344
345
346
347 static int send2child(struct tcp_connection* tcpconn)
348 {
349         int i;
350         int min_busy;
351         int idx;
352         
353         min_busy=tcp_children[0].busy;
354         idx=0;
355         for (i=0; i<tcp_children_no; i++){
356                 if (!tcp_children[i].busy){
357                         idx=i;
358                         min_busy=0;
359                         break;
360                         return 0;
361                 }else if (min_busy>tcp_children[i].busy){
362                         min_busy=tcp_children[i].busy;
363                         idx=i;
364                 }
365         }
366         
367         tcp_children[idx].busy++;
368         tcp_children[idx].n_reqs++;
369         tcpconn->refcnt++;
370         if (min_busy){
371                 LOG(L_WARN, "WARNING: send2child:no free tcp receiver, "
372                                 " connection passed to the least busy one (%d)\n",
373                                 min_busy);
374         }
375         DBG("send2child: to child %d, %ld\n", idx, (long)tcpconn);
376         send_fd(tcp_children[idx].unix_sock, &tcpconn, sizeof(tcpconn),
377                         tcpconn->s);
378         
379         return 0; /* just to fix a warning*/
380 }
381
382
383 void tcp_main_loop()
384 {
385         int r;
386         int n;
387         fd_set master_set;
388         fd_set sel_set;
389         int maxfd;
390         int new_sock;
391         union sockaddr_union su;
392         struct tcp_connection* tcpconn;
393         long response[2];
394         int cmd;
395         int bytes;
396         socklen_t su_len;
397         struct timeval timeout;
398
399         /*init */
400         maxfd=0;
401         FD_ZERO(&master_set);
402         /* set all the listen addresses */
403         for (r=0; r<sock_no; r++){
404                 if ((tcp_info[r].proto==PROTO_TCP) &&(tcp_info[r].socket!=-1)){
405                         FD_SET(tcp_info[r].socket, &master_set);
406                         if (tcp_info[r].socket>maxfd) maxfd=tcp_info[r].socket;
407                 }
408         }
409         /* set all the unix sockets used for child comm */
410         for (r=1; r<process_no; r++){
411                 if (pt[r].unix_sock>0){ /* we can't have 0, we never close it!*/
412                         FD_SET(pt[r].unix_sock, &master_set);
413                         if (pt[r].unix_sock>maxfd) maxfd=pt[r].unix_sock;
414                 }
415         }
416         
417         
418         /* main loop*/
419         
420         while(1){
421                 sel_set=master_set;
422                 timeout.tv_sec=TCP_MAIN_SELECT_TIMEOUT;
423                 timeout.tv_usec=0;
424                 n=select(maxfd+1, &sel_set, 0 ,0 , &timeout);
425                 if (n<0){
426                         if (errno==EINTR) continue; /* just a signal */
427                         /* errors */
428                         LOG(L_ERR, "ERROR: tcp_main_loop: select:(%d) %s\n", errno,
429                                         strerror(errno));
430                         n=0;
431                 }
432                 
433                 for (r=0; r<sock_no && n; r++){
434                         if ((FD_ISSET(tcp_info[r].socket, &sel_set))){
435                                 /* got a connection on r */
436                                 su_len=sizeof(su);
437                                 new_sock=accept(tcp_info[r].socket, &(su.s), &su_len);
438                                 n--;
439                                 if (new_sock<0){
440                                         LOG(L_ERR,  "WARNING: tcp_main_loop: error while accepting"
441                                                         " connection(%d): %s\n", errno, strerror(errno));
442                                         continue;
443                                 }
444                                 
445                                 /* add socket to list */
446                                 tcpconn=tcpconn_new(new_sock, &su, &tcp_info[r]);
447                                 if (tcpconn){
448                                         tcpconn_add(tcpconn);
449                                         DBG("tcp_main_loop: new connection: %p %d\n",
450                                                 tcpconn, tcpconn->s);
451                                         /* pass it to a child */
452                                         if(send2child(tcpconn)<0){
453                                                 LOG(L_ERR,"ERROR: tcp_main_loop: no children "
454                                                                 "available\n");
455                                                 close(tcpconn->s);
456                                                 tcpconn_rm(tcpconn);
457                                         }
458                                 }
459                         }
460                 }
461                 
462                 /* check all the read fds (from the tcpconn list) */
463                 
464                 for(tcpconn=*conn_list; tcpconn && n; tcpconn=tcpconn->next){
465                         if ((tcpconn->refcnt==0)&&(FD_ISSET(tcpconn->s, &sel_set))){
466                                 /* new data available */
467                                 n--;
468                                 /* pass it to child, so remove it from select list */
469                                 DBG("tcp_main_loop: data available on %p %d\n",
470                                                 tcpconn, tcpconn->s);
471                                 FD_CLR(tcpconn->s, &master_set);
472                                 if (send2child(tcpconn)<0){
473                                         LOG(L_ERR,"ERROR: tcp_main_loop: no children available\n");
474                                         close(tcpconn->s);
475                                         tcpconn_rm(tcpconn);
476                                 }
477                         }
478                 }
479                 
480                 /* check unix sockets & listen | destroy connections */
481                 /* start from 1, the "main" process does not transmit anything*/
482                 for (r=1; r<process_no && n; r++){
483                         if ( (pt[r].unix_sock>0) && FD_ISSET(pt[r].unix_sock, &sel_set)){
484                                 /* (we can't have a fd==0, 0 i s never closed )*/
485                                 n--;
486                                 /* errno==EINTR !!! TODO*/
487 read_again:
488                                 bytes=read(pt[r].unix_sock, response, sizeof(response));
489                                 if (bytes==0){
490                                         /* EOF -> bad, child has died */
491                                         LOG(L_CRIT, "BUG: tcp_main_loop: dead child %d\n", r);
492                                         /* don't listen on it any more */
493                                         FD_CLR(pt[r].unix_sock, &master_set);
494                                         /*exit(-1)*/;
495                                 }else if (bytes<0){
496                                         if (errno==EINTR) goto read_again;
497                                         else{
498                                                 LOG(L_CRIT, "ERROR: tcp_main_loop: read from child: "
499                                                                 " %s\n", strerror(errno));
500                                                 /* try to continue ? */
501                                         }
502                                 }
503                                         
504                                 DBG("tcp_main_loop: read response= %lx, %ld from %d (%d)\n",
505                                                 response[0], response[1], r, pt[r].pid);
506                                 cmd=response[1];
507                                 switch(cmd){
508                                         case CONN_RELEASE:
509                                                 if (pt[r].idx>=0){
510                                                         tcp_children[pt[r].idx].busy--;
511                                                 }else{
512                                                         LOG(L_CRIT, "BUG: tcp_main_loop: CONN_RELEASE\n");
513                                                 }
514                                                 tcpconn=(struct tcp_connection*)response[0];
515                                                 if (tcpconn){
516                                                         tcpconn->refcnt--;
517                                                         DBG("tcp_main_loop: %p refcnt= %d\n", 
518                                                                         tcpconn, tcpconn->refcnt);
519                                                                 FD_SET(tcpconn->s, &master_set);
520                                                                 if (maxfd<tcpconn->s) maxfd=tcpconn->s;
521                                                                 /* update the timeout*/
522                                                                 tcpconn->timeout=get_ticks()+TCP_CON_TIMEOUT;
523                                                 }
524                                                 break;
525                                         case CONN_ERROR:
526                                         case CONN_DESTROY:
527                                         case CONN_EOF:
528                                                 if (pt[r].idx>=0){
529                                                         tcp_children[pt[r].idx].busy--;
530                                                 }else{
531                                                         LOG(L_CRIT, "BUG: tcp_main_loop: CONN_RELEASE\n");
532                                                 }
533                                                 tcpconn=(struct tcp_connection*)response[0];
534                                                 if (tcpconn){
535                                                         tcpconn->refcnt--;
536                                                         if (tcpconn->refcnt==0){
537                                                                 DBG("tcp_main_loop: destroying connection\n");
538                                                                 close(tcpconn->s);
539                                                                 tcpconn_rm(tcpconn);
540                                                         }else{
541                                                                 DBG("tcp_main_loop: delaying ...\n");
542                                                         }
543                                                 }
544                                                 break;
545                                         case CONN_GET_FD:
546                                                 /* send the requested FD  */
547                                                 tcpconn=(struct tcp_connection*)response[0];
548                                                 /* WARNING: take care of setting refcnt properly to
549                                                  * avoid race condition */
550                                                 if (tcpconn){
551                                                         send_fd(pt[r].unix_sock, &tcpconn,
552                                                                         sizeof(tcpconn), tcpconn->s);
553                                                 }else{
554                                                         LOG(L_CRIT, "BUG: tcp_main_loop: null pointer\n");
555                                                 }
556                                                 break;
557                                         case CONN_NEW:
558                                                 /* update the fd in the requested tcpconn*/
559                                                 tcpconn=(struct tcp_connection*)response[0];
560                                                 /* WARNING: take care of setting refcnt properly to
561                                                  * avoid race condition */
562                                                 if (tcpconn){
563                                                         receive_fd(pt[r].unix_sock, &tcpconn,
564                                                                                 sizeof(tcpconn), &tcpconn->s);
565                                                         /* add tcpconn to the list*/
566                                                         tcpconn_add(tcpconn);
567                                                         FD_SET(tcpconn->s, &master_set);
568                                                         if (maxfd<tcpconn->s) maxfd=tcpconn->s;
569                                                         /* update the timeout*/
570                                                         tcpconn->timeout=get_ticks()+TCP_CON_TIMEOUT;
571                                                 }else{
572                                                         LOG(L_CRIT, "BUG: tcp_main_loop: null pointer\n");
573                                                 }
574                                                 break;
575                                         default:
576                                                         LOG(L_CRIT, "BUG: tcp_main_loop: unknown cmd %d\n",
577                                                                         cmd);
578                                 }
579                         }
580                 }
581                 
582                 /* remove old connections */
583                 tcpconn_timeout(&master_set);
584         
585         }
586 }
587
588
589
590 int init_tcp()
591 {
592         /* allocate list head*/
593         conn_list=shm_malloc(sizeof(struct tcp_connection*));
594         if (conn_list==0){
595                 LOG(L_CRIT, "ERROR: init_tcp: memory allocation failure\n");
596                 goto error;
597         }
598         *conn_list=0;
599         return 0;
600 error:
601                 return -1;
602 }
603
604
605
606 /* starts the tcp processes */
607 int tcp_init_children()
608 {
609         int r;
610         int sockfd[2];
611         pid_t pid;
612         
613         
614         /* create the tcp sock_info structures */
615         /* copy the sockets --moved to main_loop*/
616         
617         /* fork children & create the socket pairs*/
618         for(r=0; r<tcp_children_no; r++){
619                 if (socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd)<0){
620                         LOG(L_ERR, "ERROR: tcp_main: socketpair failed: %s\n",
621                                         strerror(errno));
622                         goto error;
623                 }
624                 
625                 process_no++;
626                 pid=fork();
627                 if (pid<0){
628                         LOG(L_ERR, "ERROR: tcp_main: fork failed: %s\n",
629                                         strerror(errno));
630                         goto error;
631                 }else if (pid>0){
632                         /* parent */
633                         close(sockfd[1]);
634                         tcp_children[r].pid=pid;
635                         tcp_children[r].busy=0;
636                         tcp_children[r].n_reqs=0;
637                         tcp_children[r].unix_sock=sockfd[0];
638                         pt[process_no].pid=pid;
639                         pt[process_no].unix_sock=sockfd[0];
640                         pt[process_no].idx=r;
641                         strncpy(pt[process_no].desc, "tcp receiver", MAX_PT_DESC);
642                 }else{
643                         /* child */
644                         close(sockfd[0]);
645                         unix_tcp_sock=sockfd[1];
646                         tcp_receive_loop(sockfd[1]);
647                 }
648         }
649         return 0;
650 error:
651         return -1;
652 }
653
654 #endif