tcp: config option for the async write block size
[sip-router] / tcp_main.c
1 /*
2  * $Id$
3  *
4  * Copyright (C) 2001-2003 FhG Fokus
5  *
6  * This file is part of ser, a free SIP server.
7  *
8  * ser is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version
12  *
13  * For a license to use the ser software under conditions
14  * other than those described here, or to purchase support for this
15  * software, please contact iptel.org by e-mail at the following addresses:
16  *    info@iptel.org
17  *
18  * ser is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software
25  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26  */
27 /*
28  * History:
29  * --------
30  *  2002-11-29  created by andrei
31  *  2002-12-11  added tcp_send (andrei)
32  *  2003-01-20  locking fixes, hashtables (andrei)
33  *  2003-02-20  s/lock_t/gen_lock_t/ to avoid a conflict on solaris (andrei)
34  *  2003-02-25  Nagle is disabled if -DDISABLE_NAGLE (andrei)
35  *  2003-03-29  SO_REUSEADDR before calling bind to allow
36  *              server restart, Nagle set on the (hopefuly) 
37  *              correct socket (jiri)
38  *  2003-03-31  always try to find the corresponding tcp listen socket for
39  *               a temp. socket and store in in *->bind_address: added
40  *               find_tcp_si, modified tcpconn_connect (andrei)
41  *  2003-04-14  set sockopts to TOS low delay (andrei)
42  *  2003-06-30  moved tcp new connect checking & handling to
43  *               handle_new_connect (andrei)
44  *  2003-07-09  tls_close called before closing the tcp connection (andrei)
45  *  2003-10-24  converted to the new socket_info lists (andrei)
46  *  2003-10-27  tcp port aliases support added (andrei)
47  *  2003-11-04  always lock before manipulating refcnt; sendchild
48  *              does not inc refcnt by itself anymore (andrei)
49  *  2003-11-07  different unix sockets are used for fd passing
50  *              to/from readers/writers (andrei)
51  *  2003-11-17  handle_new_connect & tcp_connect will close the 
52  *              new socket if tcpconn_new return 0 (e.g. out of mem) (andrei)
53  *  2003-11-28  tcp_blocking_write & tcp_blocking_connect added (andrei)
54  *  2004-11-08  dropped find_tcp_si and replaced with find_si (andrei)
55  *  2005-06-07  new tcp optimized code, supports epoll (LT), sigio + real time
56  *               signals, poll & select (andrei)
57  *  2005-06-26  *bsd kqueue support (andrei)
58  *  2005-07-04  solaris /dev/poll support (andrei)
59  *  2005-07-08  tcp_max_connections, tcp_connection_lifetime, don't accept
60  *               more connections if tcp_max_connections is exceeded (andrei)
61  *  2005-10-21  cleanup all the open connections on exit
62  *              decrement the no. of open connections on timeout too    (andrei) *  2006-01-30  queue send_fd request and execute them at the end of the
63  *              poll loop  (#ifdef) (andrei)
64  *              process all children requests, before attempting to send
65  *              them new stuff (fixes some deadlocks) (andrei)
66  *  2006-02-03  timers are run only once per s (andrei)
67  *              tcp children fds can be non-blocking; send fds are queued on
68  *              EAGAIN; lots of bug fixes (andrei)
69  *  2006-02-06  better tcp_max_connections checks, tcp_connections_no moved to
70  *              shm (andrei)
71  *  2006-04-12  tcp_send() changed to use struct dest_info (andrei)
72  *  2006-11-02  switched to atomic ops for refcnt, locking improvements 
73  *               (andrei)
74  *  2006-11-04  switched to raw ticks (to fix conversion errors which could
75  *               result in inf. lifetime) (andrei)
76  *  2007-07-25  tcpconn_connect can now bind the socket on a specified
77  *                source addr/port (andrei)
78  *  2007-07-26   tcp_send() and tcpconn_get() can now use a specified source
79  *                addr./port (andrei)
80  *  2007-08-23   getsockname() for INADDR_ANY(SI_IS_ANY) sockets (andrei)
81  *  2007-08-27   split init_sock_opt into a lightweight init_sock_opt_accept() 
82  *               used when accepting connections and init_sock_opt used for 
83  *               connect/ new sockets (andrei)
84  *  2007-11-22  always add the connection & clear the coresponding flags before
85  *               io_watch_add-ing its fd - it's safer this way (andrei)
86  *  2007-11-26  improved tcp timers: switched to local_timer (andrei)
87  *  2007-11-27  added send fd cache and reader fd reuse (andrei)
88  *  2007-11-28  added support for TCP_DEFER_ACCEPT, KEEPALIVE, KEEPINTVL,
89  *               KEEPCNT, QUICKACK, SYNCNT, LINGER2 (andrei)
90  *  2007-12-04  support for queueing write requests (andrei)
91  *  2007-12-12  destroy connection asap on wbuf. timeout (andrei)
92  *  2007-12-13  changed the refcnt and destroy scheme, now refcnt is 1 if
93  *                linked into the hash tables (was 0) (andrei)
94  *  2007-12-21  support for pending connects (connections are added to the
95  *               hash immediately and writes on them are buffered) (andrei)
96  *  2008-02-05  handle POLLRDHUP (if supported), POLLERR and
97  *               POLLHUP (andrei)
98  *              on write error check if there's still data in the socket 
99  *               read buffer and process it first (andrei)
100  *  2009-02-26  direct blacklist support (andrei)
101  */
102
103
104 #ifdef USE_TCP
105
106
107 #ifndef SHM_MEM
108 #error "shared memory support needed (add -DSHM_MEM to Makefile.defs)"
109 #endif
110
111 #define HANDLE_IO_INLINE
112 #include "io_wait.h" /* include first to make sure the needed features are
113                                                 turned on (e.g. _GNU_SOURCE for POLLRDHUP) */
114
115 #include <sys/time.h>
116 #include <sys/types.h>
117 #include <sys/select.h>
118 #include <sys/socket.h>
119 #ifdef HAVE_FILIO_H
120 #include <sys/filio.h> /* needed on solaris 2.x for FIONREAD */
121 #elif defined __OS_solaris
122 #define BSD_COMP  /* needed on older solaris for FIONREAD */
123 #endif /* HAVE_FILIO_H / __OS_solaris */
124 #include <sys/ioctl.h>  /* ioctl() used on write error */
125 #include <netinet/in.h>
126 #include <netinet/in_systm.h>
127 #include <netinet/ip.h>
128 #include <netinet/tcp.h>
129 #include <sys/uio.h>  /* writev*/
130 #include <netdb.h>
131 #include <stdlib.h> /*exit() */
132
133 #include <unistd.h>
134
135 #include <errno.h>
136 #include <string.h>
137
138 #ifdef HAVE_SELECT
139 #include <sys/select.h>
140 #endif
141 #include <sys/poll.h>
142
143
144 #include "ip_addr.h"
145 #include "pass_fd.h"
146 #include "tcp_conn.h"
147 #include "globals.h"
148 #include "pt.h"
149 #include "locking.h"
150 #include "mem/mem.h"
151 #include "mem/shm_mem.h"
152 #include "timer.h"
153 #include "sr_module.h"
154 #include "tcp_server.h"
155 #include "tcp_init.h"
156 #include "tsend.h"
157 #include "timer_ticks.h"
158 #include "local_timer.h"
159 #ifdef CORE_TLS
160 #include "tls/tls_server.h"
161 #define tls_loaded() 1
162 #else
163 #include "tls_hooks_init.h"
164 #include "tls_hooks.h"
165 #endif /* CORE_TLS*/
166 #ifdef USE_DST_BLACKLIST
167 #include "dst_blacklist.h"
168 #endif /* USE_DST_BLACKLIST */
169
170 #include "tcp_info.h"
171 #include "tcp_options.h"
172 #include "ut.h"
173 #include "cfg/cfg_struct.h"
174
175 #define local_malloc pkg_malloc
176 #define local_free   pkg_free
177
178 #include <fcntl.h> /* must be included after io_wait.h if SIGIO_RT is used */
179
180
181 #ifdef NO_MSG_DONTWAIT
182 #ifndef MSG_DONTWAIT
183 /* should work inside tcp_main */
184 #define MSG_DONTWAIT 0
185 #endif
186 #endif /*NO_MSG_DONTWAIT */
187
188
189 #define TCP_PASS_NEW_CONNECTION_ON_DATA /* don't pass a new connection
190                                                                                    immediately to a child, wait for
191                                                                                    some data on it first */
192 #define TCP_LISTEN_BACKLOG 1024
193 #define SEND_FD_QUEUE /* queue send fd requests on EAGAIN, instead of sending 
194                                                         them immediately */
195 #define TCP_CHILD_NON_BLOCKING 
196 #ifdef SEND_FD_QUEUE
197 #ifndef TCP_CHILD_NON_BLOCKING
198 #define TCP_CHILD_NON_BLOCKING
199 #endif
200 #define MAX_SEND_FD_QUEUE_SIZE  tcp_main_max_fd_no
201 #define SEND_FD_QUEUE_SIZE              128  /* initial size */
202 #define MAX_SEND_FD_RETRIES             96       /* FIXME: not used for now */
203 #define SEND_FD_QUEUE_TIMEOUT   MS_TO_TICKS(2000)  /* 2 s */
204 #endif
205
206 /* minimum interval local_timer_run() is allowed to run, in ticks */
207 #define TCPCONN_TIMEOUT_MIN_RUN 1  /* once per tick */
208 #define TCPCONN_WAIT_TIMEOUT 1 /* 1 tick */
209
210 #ifdef TCP_ASYNC
211 static unsigned int* tcp_total_wq=0;
212 #endif
213
214
215 enum fd_types { F_NONE, F_SOCKINFO /* a tcp_listen fd */,
216                                 F_TCPCONN, F_TCPCHILD, F_PROC };
217
218
219 #ifdef TCP_FD_CACHE
220
221 #define TCP_FD_CACHE_SIZE 8
222
223 struct fd_cache_entry{
224         struct tcp_connection* con;
225         int id;
226         int fd;
227 };
228
229
230 static struct fd_cache_entry fd_cache[TCP_FD_CACHE_SIZE];
231 #endif /* TCP_FD_CACHE */
232
233 static int is_tcp_main=0;
234
235
236 enum poll_types tcp_poll_method=0; /* by default choose the best method */
237 int tcp_main_max_fd_no=0;
238 int tcp_max_connections=DEFAULT_TCP_MAX_CONNECTIONS;
239
240 static union sockaddr_union tcp_source_ipv4_addr; /* saved bind/srv v4 addr. */
241 static union sockaddr_union* tcp_source_ipv4=0;
242 #ifdef USE_IPV6
243 static union sockaddr_union tcp_source_ipv6_addr; /* saved bind/src v6 addr. */
244 static union sockaddr_union* tcp_source_ipv6=0;
245 #endif
246
247 static int* tcp_connections_no=0; /* current open connections */
248
249 /* connection hash table (after ip&port) , includes also aliases */
250 struct tcp_conn_alias** tcpconn_aliases_hash=0;
251 /* connection hash table (after connection id) */
252 struct tcp_connection** tcpconn_id_hash=0;
253 gen_lock_t* tcpconn_lock=0;
254
255 struct tcp_child* tcp_children;
256 static int* connection_id=0; /*  unique for each connection, used for 
257                                                                 quickly finding the corresponding connection
258                                                                 for a reply */
259 int unix_tcp_sock;
260
261 static int tcp_proto_no=-1; /* tcp protocol number as returned by
262                                                            getprotobyname */
263
264 static io_wait_h io_h;
265
266 static struct local_timer tcp_main_ltimer;
267 static ticks_t tcp_main_prev_ticks;
268
269
270 static ticks_t tcpconn_main_timeout(ticks_t , struct timer_ln* , void* );
271
272 inline static int _tcpconn_add_alias_unsafe(struct tcp_connection* c, int port,
273                                                                                 struct ip_addr* l_ip, int l_port,
274                                                                                 int flags);
275
276
277
278 /* sets source address used when opening new sockets and no source is specified
279  *  (by default the address is choosen by the kernel)
280  * Should be used only on init.
281  * returns -1 on error */
282 int tcp_set_src_addr(struct ip_addr* ip)
283 {
284         switch (ip->af){
285                 case AF_INET:
286                         ip_addr2su(&tcp_source_ipv4_addr, ip, 0);
287                         tcp_source_ipv4=&tcp_source_ipv4_addr;
288                         break;
289                 #ifdef USE_IPV6
290                 case AF_INET6:
291                         ip_addr2su(&tcp_source_ipv6_addr, ip, 0);
292                         tcp_source_ipv6=&tcp_source_ipv6_addr;
293                         break;
294                 #endif
295                 default:
296                         return -1;
297         }
298         return 0;
299 }
300
301
302
303 static inline int init_sock_keepalive(int s)
304 {
305         int optval;
306         
307 #ifdef HAVE_SO_KEEPALIVE
308         if (cfg_get(tcp, tcp_cfg, keepalive)){
309                 optval=1;
310                 if (setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &optval,
311                                                 sizeof(optval))<0){
312                         LOG(L_WARN, "WARNING: init_sock_keepalive: failed to enable"
313                                                 " SO_KEEPALIVE: %s\n", strerror(errno));
314                         return -1;
315                 }
316         }
317 #endif
318 #ifdef HAVE_TCP_KEEPINTVL
319         if ((optval=cfg_get(tcp, tcp_cfg, keepintvl))){
320                 if (setsockopt(s, IPPROTO_TCP, TCP_KEEPINTVL, &optval,
321                                                 sizeof(optval))<0){
322                         LOG(L_WARN, "WARNING: init_sock_keepalive: failed to set"
323                                                 " keepalive probes interval: %s\n", strerror(errno));
324                 }
325         }
326 #endif
327 #ifdef HAVE_TCP_KEEPIDLE
328         if ((optval=cfg_get(tcp, tcp_cfg, keepidle))){
329                 if (setsockopt(s, IPPROTO_TCP, TCP_KEEPIDLE, &optval,
330                                                 sizeof(optval))<0){
331                         LOG(L_WARN, "WARNING: init_sock_keepalive: failed to set"
332                                                 " keepalive idle interval: %s\n", strerror(errno));
333                 }
334         }
335 #endif
336 #ifdef HAVE_TCP_KEEPCNT
337         if ((optval=cfg_get(tcp, tcp_cfg, keepcnt))){
338                 if (setsockopt(s, IPPROTO_TCP, TCP_KEEPCNT, &optval,
339                                                 sizeof(optval))<0){
340                         LOG(L_WARN, "WARNING: init_sock_keepalive: failed to set"
341                                                 " maximum keepalive count: %s\n", strerror(errno));
342                 }
343         }
344 #endif
345         return 0;
346 }
347
348
349
350 /* set all socket/fd options for new sockets (e.g. before connect): 
351  *  disable nagle, tos lowdelay, reuseaddr, non-blocking
352  *
353  * return -1 on error */
354 static int init_sock_opt(int s)
355 {
356         int flags;
357         int optval;
358         
359 #ifdef DISABLE_NAGLE
360         flags=1;
361         if ( (tcp_proto_no!=-1) && (setsockopt(s, tcp_proto_no , TCP_NODELAY,
362                                         &flags, sizeof(flags))<0) ){
363                 LOG(L_WARN, "WARNING: init_sock_opt: could not disable Nagle: %s\n",
364                                 strerror(errno));
365         }
366 #endif
367         /* tos*/
368         optval = tos;
369         if (setsockopt(s, IPPROTO_IP, IP_TOS, (void*)&optval,sizeof(optval)) ==-1){
370                 LOG(L_WARN, "WARNING: init_sock_opt: setsockopt tos: %s\n",
371                                 strerror(errno));
372                 /* continue since this is not critical */
373         }
374 #if  !defined(TCP_DONT_REUSEADDR) 
375         optval=1;
376         if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
377                                                 (void*)&optval, sizeof(optval))==-1){
378                 LOG(L_ERR, "ERROR: setsockopt SO_REUSEADDR %s\n",
379                                 strerror(errno));
380                 /* continue, not critical */
381         }
382 #endif /* !TCP_DONT_REUSEADDR */
383 #ifdef HAVE_TCP_SYNCNT
384         if ((optval=cfg_get(tcp, tcp_cfg, syncnt))){
385                 if (setsockopt(s, IPPROTO_TCP, TCP_SYNCNT, &optval,
386                                                 sizeof(optval))<0){
387                         LOG(L_WARN, "WARNING: init_sock_opt: failed to set"
388                                                 " maximum SYN retr. count: %s\n", strerror(errno));
389                 }
390         }
391 #endif
392 #ifdef HAVE_TCP_LINGER2
393         if ((optval=cfg_get(tcp, tcp_cfg, linger2))){
394                 if (setsockopt(s, IPPROTO_TCP, TCP_LINGER2, &optval,
395                                                 sizeof(optval))<0){
396                         LOG(L_WARN, "WARNING: init_sock_opt: failed to set"
397                                                 " maximum LINGER2 timeout: %s\n", strerror(errno));
398                 }
399         }
400 #endif
401 #ifdef HAVE_TCP_QUICKACK
402         if (cfg_get(tcp, tcp_cfg, delayed_ack)){
403                 optval=0; /* reset quick ack => delayed ack */
404                 if (setsockopt(s, IPPROTO_TCP, TCP_QUICKACK, &optval,
405                                                 sizeof(optval))<0){
406                         LOG(L_WARN, "WARNING: init_sock_opt: failed to reset"
407                                                 " TCP_QUICKACK: %s\n", strerror(errno));
408                 }
409         }
410 #endif /* HAVE_TCP_QUICKACK */
411         init_sock_keepalive(s);
412         
413         /* non-blocking */
414         flags=fcntl(s, F_GETFL);
415         if (flags==-1){
416                 LOG(L_ERR, "ERROR: init_sock_opt: fnctl failed: (%d) %s\n",
417                                 errno, strerror(errno));
418                 goto error;
419         }
420         if (fcntl(s, F_SETFL, flags|O_NONBLOCK)==-1){
421                 LOG(L_ERR, "ERROR: init_sock_opt: fcntl: set non-blocking failed:"
422                                 " (%d) %s\n", errno, strerror(errno));
423                 goto error;
424         }
425         return 0;
426 error:
427         return -1;
428 }
429
430
431
432 /* set all socket/fd options for "accepted" sockets 
433  *  only nonblocking is set since the rest is inherited from the
434  *  "parent" (listening) socket
435  *  Note: setting O_NONBLOCK is required on linux but it's not needed on
436  *        BSD and possibly solaris (where the flag is inherited from the 
437  *        parent socket). However since there is no standard document 
438  *        requiring a specific behaviour in this case it's safer to always set
439  *        it (at least for now)  --andrei
440  *  TODO: check on which OSes  O_NONBLOCK is inherited and make this 
441  *        function a nop.
442  *
443  * return -1 on error */
444 static int init_sock_opt_accept(int s)
445 {
446         int flags;
447         
448         /* non-blocking */
449         flags=fcntl(s, F_GETFL);
450         if (flags==-1){
451                 LOG(L_ERR, "ERROR: init_sock_opt_accept: fnctl failed: (%d) %s\n",
452                                 errno, strerror(errno));
453                 goto error;
454         }
455         if (fcntl(s, F_SETFL, flags|O_NONBLOCK)==-1){
456                 LOG(L_ERR, "ERROR: init_sock_opt_accept: "
457                                         "fcntl: set non-blocking failed: (%d) %s\n",
458                                         errno, strerror(errno));
459                 goto error;
460         }
461         return 0;
462 error:
463         return -1;
464 }
465
466
467
468 /* blocking connect on a non-blocking fd; it will timeout after
469  * tcp_connect_timeout 
470  * if BLOCKING_USE_SELECT and HAVE_SELECT are defined it will internally
471  * use select() instead of poll (bad if fd > FD_SET_SIZE, poll is preferred)
472  */
473 static int tcp_blocking_connect(int fd, int type,
474                                                                 const struct sockaddr *servaddr,
475                                                                 socklen_t addrlen)
476 {
477         int n;
478 #if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
479         fd_set sel_set;
480         fd_set orig_set;
481         struct timeval timeout;
482 #else
483         struct pollfd pf;
484 #endif
485         int elapsed;
486         int to;
487         int ticks;
488         int err;
489         unsigned int err_len;
490         int poll_err;
491         
492         poll_err=0;
493         to=cfg_get(tcp, tcp_cfg, connect_timeout_s);
494         ticks=get_ticks();
495 again:
496         n=connect(fd, servaddr, addrlen);
497         if (n==-1){
498                 if (errno==EINTR){
499                         elapsed=(get_ticks()-ticks)*TIMER_TICK;
500                         if (elapsed<to)         goto again;
501                         else goto error_timeout;
502                 }
503                 if (errno!=EINPROGRESS && errno!=EALREADY){
504 #ifdef USE_DST_BLACKLIST
505                         if (cfg_get(core, core_cfg, use_dst_blacklist))
506                                 switch(errno){
507                                         case ECONNREFUSED:
508                                         case ENETUNREACH:
509                                         case ETIMEDOUT:
510                                         case ECONNRESET:
511                                         case EHOSTUNREACH:
512                                                 dst_blacklist_su(BLST_ERR_CONNECT, type,
513                                                                                  (union sockaddr_union*)servaddr, 0);
514                                                 break;
515                                 }
516 #endif /* USE_DST_BLACKLIST */
517                         LOG(L_ERR, "ERROR: tcp_blocking_connect %s: (%d) %s\n",
518                                         su2a((union sockaddr_union*)servaddr, addrlen),
519                                         errno, strerror(errno));
520                         goto error;
521                 }
522         }else goto end;
523         
524         /* poll/select loop */
525 #if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
526                 FD_ZERO(&orig_set);
527                 FD_SET(fd, &orig_set);
528 #else
529                 pf.fd=fd;
530                 pf.events=POLLOUT;
531 #endif
532         while(1){
533                 elapsed=(get_ticks()-ticks)*TIMER_TICK;
534                 if (elapsed<to)
535                         to-=elapsed;
536                 else 
537                         goto error_timeout;
538 #if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
539                 sel_set=orig_set;
540                 timeout.tv_sec=to;
541                 timeout.tv_usec=0;
542                 n=select(fd+1, 0, &sel_set, 0, &timeout);
543 #else
544                 n=poll(&pf, 1, to*1000);
545 #endif
546                 if (n<0){
547                         if (errno==EINTR) continue;
548                         LOG(L_ERR, "ERROR: tcp_blocking_connect %s: poll/select failed:"
549                                         " (%d) %s\n",
550                                         su2a((union sockaddr_union*)servaddr, addrlen),
551                                         errno, strerror(errno));
552                         goto error;
553                 }else if (n==0) /* timeout */ continue;
554 #if defined(HAVE_SELECT) && defined(BLOCKING_USE_SELECT)
555                 if (FD_ISSET(fd, &sel_set))
556 #else
557                 if (pf.revents&(POLLERR|POLLHUP|POLLNVAL)){ 
558                         LOG(L_ERR, "ERROR: tcp_blocking_connect %s: poll error: "
559                                         "flags %x\n",
560                                         su2a((union sockaddr_union*)servaddr, addrlen),
561                                         pf.revents);
562                         poll_err=1;
563                 }
564 #endif
565                 {
566                         err_len=sizeof(err);
567                         getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &err_len);
568                         if ((err==0) && (poll_err==0)) goto end;
569                         if (err!=EINPROGRESS && err!=EALREADY){
570                                 LOG(L_ERR, "ERROR: tcp_blocking_connect %s: SO_ERROR (%d) "
571                                                 "%s\n",
572                                                 su2a((union sockaddr_union*)servaddr, addrlen),
573                                                 err, strerror(err));
574                                 goto error;
575                         }
576                 }
577         }
578 error_timeout:
579         /* timeout */
580 #ifdef USE_DST_BLACKLIST
581         if (cfg_get(core, core_cfg, use_dst_blacklist))
582                 dst_blacklist_su(BLST_ERR_CONNECT, type,
583                                                         (union sockaddr_union*)servaddr, 0);
584 #endif /* USE_DST_BLACKLIST */
585         LOG(L_ERR, "ERROR: tcp_blocking_connect %s: timeout %d s elapsed "
586                                 "from %d s\n", su2a((union sockaddr_union*)servaddr, addrlen),
587                                 elapsed, cfg_get(tcp, tcp_cfg, connect_timeout_s));
588 error:
589         return -1;
590 end:
591         return 0;
592 }
593
594
595
596 inline static int _tcpconn_write_nb(int fd, struct tcp_connection* c,
597                                                                         char* buf, int len);
598
599
600 #ifdef TCP_ASYNC
601
602
603 /* unsafe version */
604 #define _wbufq_empty(con) ((con)->wbuf_q.first==0)
605 /* unsafe version */
606 #define _wbufq_non_empty(con) ((con)->wbuf_q.first!=0)
607
608
609 /* unsafe version, call while holding the connection write lock */
610 inline static int _wbufq_add(struct  tcp_connection* c, char* data, 
611                                                         unsigned int size)
612 {
613         struct tcp_wbuffer_queue* q;
614         struct tcp_wbuffer* wb;
615         unsigned int last_free;
616         unsigned int wb_size;
617         unsigned int crt_size;
618         ticks_t t;
619         
620         q=&c->wbuf_q;
621         t=get_ticks_raw();
622         if (unlikely(   ((q->queued+size)>cfg_get(tcp, tcp_cfg, tcpconn_wq_max)) ||
623                                         ((*tcp_total_wq+size)>cfg_get(tcp, tcp_cfg, tcp_wq_max)) ||
624                                         (q->first &&
625                                         TICKS_LT(q->wr_timeout, t)) )){
626                 LOG(L_ERR, "ERROR: wbufq_add(%d bytes): write queue full or timeout "
627                                         " (%d, total %d, last write %d s ago)\n",
628                                         size, q->queued, *tcp_total_wq,
629                                         TICKS_TO_S(t-q->wr_timeout-
630                                                 cfg_get(tcp, tcp_cfg, tcp_wq_timeout)));
631 #ifdef USE_DST_BLACKLIST
632                 if (q->first && TICKS_LT(q->wr_timeout, t) &&
633                                 cfg_get(core, core_cfg, use_dst_blacklist)){
634                         ERR("blacklisting, state=%d\n", c->state);
635                         dst_blacklist_su((c->state==S_CONN_CONNECT)?  BLST_ERR_CONNECT:
636                                                                         BLST_ERR_SEND,
637                                                                 c->rcv.proto, &c->rcv.src_su, 0);
638                 }
639 #endif /* USE_DST_BLACKLIST */
640                 goto error;
641         }
642         
643         if (unlikely(q->last==0)){
644                 wb_size=MAX_unsigned(cfg_get(tcp, tcp_cfg, wq_blk_size), size);
645                 wb=shm_malloc(sizeof(*wb)+wb_size-1);
646                 if (unlikely(wb==0))
647                         goto error;
648                 wb->b_size=wb_size;
649                 wb->next=0;
650                 q->last=wb;
651                 q->first=wb;
652                 q->last_used=0;
653                 q->offset=0;
654                 q->wr_timeout=get_ticks_raw()+
655                         ((c->state==S_CONN_CONNECT)?
656                                         S_TO_TICKS(cfg_get(tcp, tcp_cfg, connect_timeout_s)):
657                                         cfg_get(tcp, tcp_cfg, tcp_wq_timeout));
658         }else{
659                 wb=q->last;
660         }
661         
662         while(size){
663                 last_free=wb->b_size-q->last_used;
664                 if (last_free==0){
665                         wb_size=MAX_unsigned(cfg_get(tcp, tcp_cfg, wq_blk_size), size);
666                         wb=shm_malloc(sizeof(*wb)+wb_size-1);
667                         if (unlikely(wb==0))
668                                 goto error;
669                         wb->b_size=wb_size;
670                         wb->next=0;
671                         q->last->next=wb;
672                         q->last=wb;
673                         q->last_used=0;
674                         last_free=wb->b_size;
675                 }
676                 crt_size=MIN_unsigned(last_free, size);
677                 memcpy(wb->buf+q->last_used, data, crt_size);
678                 q->last_used+=crt_size;
679                 size-=crt_size;
680                 data+=crt_size;
681                 q->queued+=crt_size;
682                 atomic_add_int((int*)tcp_total_wq, crt_size);
683         }
684         return 0;
685 error:
686         return -1;
687 }
688
689
690
691 /* unsafe version, call while holding the connection write lock
692  * inserts data at the beginning, it ignores the max queue size checks and
693  * the timeout (use sparingly)
694  * Note: it should never be called on a write buffer after wbufq_run() */
695 inline static int _wbufq_insert(struct  tcp_connection* c, char* data, 
696                                                         unsigned int size)
697 {
698         struct tcp_wbuffer_queue* q;
699         struct tcp_wbuffer* wb;
700         
701         q=&c->wbuf_q;
702         if (likely(q->first==0)) /* if empty, use wbufq_add */
703                 return _wbufq_add(c, data, size);
704         
705         if (unlikely((*tcp_total_wq+size)>cfg_get(tcp, tcp_cfg, tcp_wq_max))){
706                 LOG(L_ERR, "ERROR: wbufq_insert(%d bytes): write queue full"
707                                         " (%d, total %d, last write %d s ago)\n",
708                                         size, q->queued, *tcp_total_wq,
709                                         TICKS_TO_S(get_ticks_raw()-q->wr_timeout-
710                                                                         cfg_get(tcp, tcp_cfg, tcp_wq_timeout)));
711                 goto error;
712         }
713         if (unlikely(q->offset)){
714                 LOG(L_CRIT, "BUG: wbufq_insert: non-null offset %d (bad call, should"
715                                 "never be called after the wbufq_run())\n", q->offset);
716                 goto error;
717         }
718         if ((q->first==q->last) && ((q->last->b_size-q->last_used)>=size)){
719                 /* one block with enough space in it for size bytes */
720                 memmove(q->first->buf+size, q->first->buf, size);
721                 memcpy(q->first->buf, data, size);
722                 q->last_used+=size;
723         }else{
724                 /* create a size bytes block directly */
725                 wb=shm_malloc(sizeof(*wb)+size-1);
726                 if (unlikely(wb==0))
727                         goto error;
728                 wb->b_size=size;
729                 /* insert it */
730                 wb->next=q->first;
731                 q->first=wb;
732                 memcpy(wb->buf, data, size);
733         }
734         
735         q->queued+=size;
736         atomic_add_int((int*)tcp_total_wq, size);
737         return 0;
738 error:
739         return -1;
740 }
741
742
743
744 /* unsafe version, call while holding the connection write lock */
745 inline static void _wbufq_destroy( struct  tcp_wbuffer_queue* q)
746 {
747         struct tcp_wbuffer* wb;
748         struct tcp_wbuffer* next_wb;
749         int unqueued;
750         
751         unqueued=0;
752         if (likely(q->first)){
753                 wb=q->first;
754                 do{
755                         next_wb=wb->next;
756                         unqueued+=(wb==q->last)?q->last_used:wb->b_size;
757                         if (wb==q->first)
758                                 unqueued-=q->offset;
759                         shm_free(wb);
760                         wb=next_wb;
761                 }while(wb);
762         }
763         memset(q, 0, sizeof(*q));
764         atomic_add_int((int*)tcp_total_wq, -unqueued);
765 }
766
767
768
769 /* tries to empty the queue  (safe version, c->write_lock must not be hold)
770  * returns -1 on error, bytes written on success (>=0) 
771  * if the whole queue is emptied => sets *empty*/
772 inline static int wbufq_run(int fd, struct tcp_connection* c, int* empty)
773 {
774         struct tcp_wbuffer_queue* q;
775         struct tcp_wbuffer* wb;
776         int n;
777         int ret;
778         int block_size;
779         char* buf;
780         
781         *empty=0;
782         ret=0;
783         lock_get(&c->write_lock);
784         q=&c->wbuf_q;
785         while(q->first){
786                 block_size=((q->first==q->last)?q->last_used:q->first->b_size)-
787                                                 q->offset;
788                 buf=q->first->buf+q->offset;
789                 n=_tcpconn_write_nb(fd, c, buf, block_size);
790                 if (likely(n>0)){
791                         ret+=n;
792                         if (likely(n==block_size)){
793                                 wb=q->first;
794                                 q->first=q->first->next; 
795                                 shm_free(wb);
796                                 q->offset=0;
797                                 q->queued-=block_size;
798                                 atomic_add_int((int*)tcp_total_wq, -block_size);
799                         }else{
800                                 q->offset+=n;
801                                 q->queued-=n;
802                                 atomic_add_int((int*)tcp_total_wq, -n);
803                                 break;
804                         }
805                 }else{
806                         if (n<0){
807                                 /* EINTR is handled inside _tcpconn_write_nb */
808                                 if (!(errno==EAGAIN || errno==EWOULDBLOCK)){
809 #ifdef USE_DST_BLACKLIST
810                                         if (cfg_get(core, core_cfg, use_dst_blacklist))
811                                                 switch(errno){
812                                                         case ENETUNREACH:
813                                                         case ECONNRESET:
814                                                         /*case EHOSTUNREACH: -- not posix */
815                                                                 dst_blacklist_su((c->state==S_CONN_CONNECT)?
816                                                                                                                 BLST_ERR_CONNECT:
817                                                                                                                 BLST_ERR_SEND,
818                                                                                                                 c->rcv.proto,
819                                                                                                                 &c->rcv.src_su, 0);
820                                                                 break;
821                                                 }
822 #endif /* USE_DST_BLACKLIST */
823                                         ret=-1;
824                                         LOG(L_ERR, "ERROR: wbuf_runq: %s [%d]\n",
825                                                 strerror(errno), errno);
826                                 }
827                         }
828                         break;
829                 }
830         }
831         if (likely(q->first==0)){
832                 q->last=0;
833                 q->last_used=0;
834                 q->offset=0;
835                 *empty=1;
836         }
837         lock_release(&c->write_lock);
838         if (likely(ret>0)){
839                 q->wr_timeout=get_ticks_raw()+cfg_get(tcp, tcp_cfg, tcp_wq_timeout);
840                 if (unlikely(c->state==S_CONN_CONNECT || c->state==S_CONN_ACCEPT))
841                         c->state=S_CONN_OK;
842         }
843         return ret;
844 }
845
846 #endif /* TCP_ASYNC */
847
848
849
850 #if 0
851 /* blocking write even on non-blocking sockets 
852  * if TCP_TIMEOUT will return with error */
853 static int tcp_blocking_write(struct tcp_connection* c, int fd, char* buf,
854                                                                 unsigned int len)
855 {
856         int n;
857         fd_set sel_set;
858         struct timeval timeout;
859         int ticks;
860         int initial_len;
861         
862         initial_len=len;
863 again:
864         
865         n=send(fd, buf, len,
866 #ifdef HAVE_MSG_NOSIGNAL
867                         MSG_NOSIGNAL
868 #else
869                         0
870 #endif
871                 );
872         if (n<0){
873                 if (errno==EINTR)       goto again;
874                 else if (errno!=EAGAIN && errno!=EWOULDBLOCK){
875                         LOG(L_ERR, "tcp_blocking_write: failed to send: (%d) %s\n",
876                                         errno, strerror(errno));
877                         goto error;
878                 }
879         }else if (n<len){
880                 /* partial write */
881                 buf+=n;
882                 len-=n;
883         }else{
884                 /* success: full write */
885                 goto end;
886         }
887         while(1){
888                 FD_ZERO(&sel_set);
889                 FD_SET(fd, &sel_set);
890                 timeout.tv_sec=tcp_send_timeout;
891                 timeout.tv_usec=0;
892                 ticks=get_ticks();
893                 n=select(fd+1, 0, &sel_set, 0, &timeout);
894                 if (n<0){
895                         if (errno==EINTR) continue; /* signal, ignore */
896                         LOG(L_ERR, "ERROR: tcp_blocking_write: select failed: "
897                                         " (%d) %s\n", errno, strerror(errno));
898                         goto error;
899                 }else if (n==0){
900                         /* timeout */
901                         if (get_ticks()-ticks>=tcp_send_timeout){
902                                 LOG(L_ERR, "ERROR: tcp_blocking_write: send timeout (%d)\n",
903                                                 tcp_send_timeout);
904                                 goto error;
905                         }
906                         continue;
907                 }
908                 if (FD_ISSET(fd, &sel_set)){
909                         /* we can write again */
910                         goto again;
911                 }
912         }
913 error:
914                 return -1;
915 end:
916                 return initial_len;
917 }
918 #endif
919
920
921
922 struct tcp_connection* tcpconn_new(int sock, union sockaddr_union* su,
923                                                                         union sockaddr_union* local_addr,
924                                                                         struct socket_info* ba, int type, 
925                                                                         int state)
926 {
927         struct tcp_connection *c;
928         int rd_b_size;
929         
930         rd_b_size=cfg_get(tcp, tcp_cfg, rd_buf_size);
931         c=shm_malloc(sizeof(struct tcp_connection) + rd_b_size);
932         if (c==0){
933                 LOG(L_ERR, "ERROR: tcpconn_new: mem. allocation failure\n");
934                 goto error;
935         }
936         memset(c, 0, sizeof(struct tcp_connection)); /* zero init (skip rd buf)*/
937         c->s=sock;
938         c->fd=-1; /* not initialized */
939         if (lock_init(&c->write_lock)==0){
940                 LOG(L_ERR, "ERROR: tcpconn_new: init lock failed\n");
941                 goto error;
942         }
943         
944         c->rcv.src_su=*su;
945         
946         atomic_set(&c->refcnt, 0);
947         local_timer_init(&c->timer, tcpconn_main_timeout, c, 0);
948         su2ip_addr(&c->rcv.src_ip, su);
949         c->rcv.src_port=su_getport(su);
950         c->rcv.bind_address=ba;
951         if (likely(local_addr)){
952                 su2ip_addr(&c->rcv.dst_ip, local_addr);
953                 c->rcv.dst_port=su_getport(local_addr);
954         }else if (ba){
955                 c->rcv.dst_ip=ba->address;
956                 c->rcv.dst_port=ba->port_no;
957         }
958         print_ip("tcpconn_new: new tcp connection: ", &c->rcv.src_ip, "\n");
959         DBG(     "tcpconn_new: on port %d, type %d\n", c->rcv.src_port, type);
960         init_tcp_req(&c->req, (char*)c+sizeof(struct tcp_connection), rd_b_size);
961         c->id=(*connection_id)++;
962         c->rcv.proto_reserved1=0; /* this will be filled before receive_message*/
963         c->rcv.proto_reserved2=0;
964         c->state=state;
965         c->extra_data=0;
966 #ifdef USE_TLS
967         if (type==PROTO_TLS){
968                 if (tls_tcpconn_init(c, sock)==-1) goto error;
969         }else
970 #endif /* USE_TLS*/
971         {
972                 c->type=PROTO_TCP;
973                 c->rcv.proto=PROTO_TCP;
974                 c->timeout=get_ticks_raw()+cfg_get(tcp, tcp_cfg, con_lifetime);
975         }
976         
977         return c;
978         
979 error:
980         if (c) shm_free(c);
981         return 0;
982 }
983
984
985
986 /* do the actual connect, set sock. options a.s.o
987  * returns socket on success, -1 on error
988  * sets also *res_local_addr, res_si and state (S_CONN_CONNECT for an
989  * unfinished connect and S_CONN_OK for a finished one)*/
990 inline static int tcp_do_connect(       union sockaddr_union* server,
991                                                                         union sockaddr_union* from,
992                                                                         int type,
993                                                                         union sockaddr_union* res_local_addr,
994                                                                         struct socket_info** res_si,
995                                                                         enum tcp_conn_states *state
996                                                                         )
997 {
998         int s;
999         union sockaddr_union my_name;
1000         socklen_t my_name_len;
1001         struct ip_addr ip;
1002 #ifdef TCP_ASYNC
1003         int n;
1004 #endif /* TCP_ASYNC */
1005
1006         s=socket(AF2PF(server->s.sa_family), SOCK_STREAM, 0);
1007         if (unlikely(s==-1)){
1008                 LOG(L_ERR, "ERROR: tcp_do_connect %s: socket: (%d) %s\n",
1009                                 su2a(server, sizeof(*server)), errno, strerror(errno));
1010                 goto error;
1011         }
1012         if (init_sock_opt(s)<0){
1013                 LOG(L_ERR, "ERROR: tcp_do_connect %s: init_sock_opt failed\n",
1014                                         su2a(server, sizeof(*server)));
1015                 goto error;
1016         }
1017         
1018         if (unlikely(from && bind(s, &from->s, sockaddru_len(*from)) != 0)){
1019                 LOG(L_WARN, "WARNING: tcp_do_connect: binding to source address"
1020                                         " %s failed: %s [%d]\n", su2a(from, sizeof(*from)),
1021                                         strerror(errno), errno);
1022         }
1023         *state=S_CONN_OK;
1024 #ifdef TCP_ASYNC
1025         if (likely(cfg_get(tcp, tcp_cfg, async))){
1026 again:
1027                 n=connect(s, &server->s, sockaddru_len(*server));
1028                 if (unlikely(n==-1)){
1029                         if (errno==EINTR) goto again;
1030                         if (likely(errno==EINPROGRESS))
1031                                 *state=S_CONN_CONNECT;
1032                         else if (errno!=EALREADY){
1033 #ifdef USE_DST_BLACKLIST
1034                                 if (cfg_get(core, core_cfg, use_dst_blacklist))
1035                                         switch(errno){
1036                                                 case ECONNREFUSED:
1037                                                 case ENETUNREACH:
1038                                                 case ETIMEDOUT:
1039                                                 case ECONNRESET:
1040                                                 case EHOSTUNREACH:
1041                                                         dst_blacklist_su(BLST_ERR_CONNECT, type, server,
1042                                                                                                 0);
1043                                                         break;
1044                                 }
1045 #endif /* USE_DST_BLACKLIST */
1046                                 LOG(L_ERR, "ERROR: tcp_do_connect: connect %s: (%d) %s\n",
1047                                                         su2a(server, sizeof(*server)),
1048                                                         errno, strerror(errno));
1049                                 goto error;
1050                         }
1051                 }
1052         }else{
1053 #endif /* TCP_ASYNC */
1054                 if (tcp_blocking_connect(s, type, &server->s,
1055                                                                         sockaddru_len(*server))<0){
1056                         LOG(L_ERR, "ERROR: tcp_do_connect: tcp_blocking_connect %s"
1057                                                 " failed\n", su2a(server, sizeof(*server)));
1058                         goto error;
1059                 }
1060 #ifdef TCP_ASYNC
1061         }
1062 #endif /* TCP_ASYNC */
1063         if (from){
1064                 su2ip_addr(&ip, from);
1065                 if (!ip_addr_any(&ip))
1066                         /* we already know the source ip, skip the sys. call */
1067                         goto find_socket;
1068         }
1069         my_name_len=sizeof(my_name);
1070         if (unlikely(getsockname(s, &my_name.s, &my_name_len)!=0)){
1071                 LOG(L_ERR, "ERROR: tcp_do_connect: getsockname failed: %s(%d)\n",
1072                                 strerror(errno), errno);
1073                 *res_si=0;
1074                 goto error;
1075         }
1076         from=&my_name; /* update from with the real "from" address */
1077         su2ip_addr(&ip, &my_name);
1078 find_socket:
1079 #ifdef USE_TLS
1080         if (unlikely(type==PROTO_TLS))
1081                 *res_si=find_si(&ip, 0, PROTO_TLS);
1082         else
1083 #endif
1084                 *res_si=find_si(&ip, 0, PROTO_TCP);
1085         
1086         if (unlikely(*res_si==0)){
1087                 LOG(L_WARN, "WARNING: tcp_do_connect %s: could not find corresponding"
1088                                 " listening socket for %s, using default...\n",
1089                                         su2a(server, sizeof(*server)), ip_addr2a(&ip));
1090                 if (server->s.sa_family==AF_INET) *res_si=sendipv4_tcp;
1091 #ifdef USE_IPV6
1092                 else *res_si=sendipv6_tcp;
1093 #endif
1094         }
1095         *res_local_addr=*from;
1096         return s;
1097 error:
1098         if (s!=-1) close(s);
1099         return -1;
1100 }
1101
1102
1103
1104 struct tcp_connection* tcpconn_connect( union sockaddr_union* server, 
1105                                                                                 union sockaddr_union* from,
1106                                                                                 int type)
1107 {
1108         int s;
1109         struct socket_info* si;
1110         union sockaddr_union my_name;
1111         struct tcp_connection* con;
1112         enum tcp_conn_states state;
1113
1114         s=-1;
1115         
1116         if (*tcp_connections_no >= cfg_get(tcp, tcp_cfg, max_connections)){
1117                 LOG(L_ERR, "ERROR: tcpconn_connect: maximum number of connections"
1118                                         " exceeded (%d/%d)\n",
1119                                         *tcp_connections_no,
1120                                         cfg_get(tcp, tcp_cfg, max_connections));
1121                 goto error;
1122         }
1123         s=tcp_do_connect(server, from, type, &my_name, &si, &state);
1124         if (s==-1){
1125                 LOG(L_ERR, "ERROR: tcp_do_connect %s: failed (%d) %s\n",
1126                                 su2a(server, sizeof(*server)), errno, strerror(errno));
1127                 goto error;
1128         }
1129         con=tcpconn_new(s, server, &my_name, si, type, state);
1130         if (con==0){
1131                 LOG(L_ERR, "ERROR: tcp_connect %s: tcpconn_new failed, closing the "
1132                                  " socket\n", su2a(server, sizeof(*server)));
1133                 goto error;
1134         }
1135         return con;
1136         /*FIXME: set sock idx! */
1137 error:
1138         if (s!=-1) close(s); /* close the opened socket */
1139         return 0;
1140 }
1141
1142
1143
1144 #ifdef TCP_CONNECT_WAIT
1145 int tcpconn_finish_connect( struct tcp_connection* c,
1146                                                                                                 union sockaddr_union* from)
1147 {
1148         int s;
1149         int r;
1150         union sockaddr_union local_addr;
1151         struct socket_info* si;
1152         enum tcp_conn_states state;
1153         struct tcp_conn_alias* a;
1154         int new_conn_alias_flags;
1155         
1156         s=tcp_do_connect(&c->rcv.src_su, from, c->type, &local_addr, &si, &state);
1157         if (unlikely(s==-1)){
1158                 LOG(L_ERR, "ERROR: tcpconn_finish_connect %s: tcp_do_connect for %p"
1159                                         " failed\n", su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
1160                                         c);
1161                 return -1;
1162         }
1163         c->rcv.bind_address=si;
1164         su2ip_addr(&c->rcv.dst_ip, &local_addr);
1165         c->rcv.dst_port=su_getport(&local_addr);
1166         /* update aliases if needed */
1167         if (likely(from==0)){
1168                 new_conn_alias_flags=cfg_get(tcp, tcp_cfg, new_conn_alias_flags);
1169                 /* add aliases */
1170                 TCPCONN_LOCK;
1171                 _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip, 0,
1172                                                                                                         new_conn_alias_flags);
1173                 _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
1174                                                                         c->rcv.dst_port, new_conn_alias_flags);
1175                 TCPCONN_UNLOCK;
1176         }else if (su_cmp(from, &local_addr)!=1){
1177                 new_conn_alias_flags=cfg_get(tcp, tcp_cfg, new_conn_alias_flags);
1178                 TCPCONN_LOCK;
1179                         /* remove all the aliases except the first one and re-add them
1180                          * (there shouldn't be more then the 3 default aliases at this 
1181                          * stage) */
1182                         for (r=1; r<c->aliases; r++){
1183                                 a=&c->con_aliases[r];
1184                                 tcpconn_listrm(tcpconn_aliases_hash[a->hash], a, next, prev);
1185                         }
1186                         c->aliases=1;
1187                         /* add the local_ip:0 and local_ip:local_port aliases */
1188                         _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
1189                                                                                                 0, new_conn_alias_flags);
1190                         _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
1191                                                                         c->rcv.dst_port, new_conn_alias_flags);
1192                 TCPCONN_UNLOCK;
1193         }
1194         
1195         return s;
1196 }
1197 #endif /* TCP_CONNECT_WAIT */
1198
1199
1200
1201 /* adds a tcp connection to the tcpconn hashes
1202  * Note: it's called _only_ from the tcp_main process */
1203 inline static struct tcp_connection*  tcpconn_add(struct tcp_connection *c)
1204 {
1205         struct ip_addr zero_ip;
1206         int new_conn_alias_flags;
1207
1208         if (likely(c)){
1209                 ip_addr_mk_any(c->rcv.src_ip.af, &zero_ip);
1210                 c->id_hash=tcp_id_hash(c->id);
1211                 c->aliases=0;
1212                 new_conn_alias_flags=cfg_get(tcp, tcp_cfg, new_conn_alias_flags);
1213                 TCPCONN_LOCK;
1214                 c->flags|=F_CONN_HASHED;
1215                 /* add it at the begining of the list*/
1216                 tcpconn_listadd(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
1217                 /* set the aliases */
1218                 /* first alias is for (peer_ip, peer_port, 0 ,0) -- for finding
1219                  *  any connection to peer_ip, peer_port
1220                  * the second alias is for (peer_ip, peer_port, local_addr, 0) -- for
1221                  *  finding any conenction to peer_ip, peer_port from local_addr 
1222                  * the third alias is for (peer_ip, peer_port, local_addr, local_port) 
1223                  *   -- for finding if a fully specified connection exists */
1224                 _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &zero_ip, 0,
1225                                                                                                         new_conn_alias_flags);
1226                 if (likely(c->rcv.dst_ip.af && ! ip_addr_any(&c->rcv.dst_ip))){
1227                         _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip, 0,
1228                                                                                                         new_conn_alias_flags);
1229                         _tcpconn_add_alias_unsafe(c, c->rcv.src_port, &c->rcv.dst_ip,
1230                                                                         c->rcv.dst_port, new_conn_alias_flags);
1231                 }
1232                 /* ignore add_alias errors, there are some valid cases when one
1233                  *  of the add_alias would fail (e.g. first add_alias for 2 connections
1234                  *   with the same destination but different src. ip*/
1235                 TCPCONN_UNLOCK;
1236                 DBG("tcpconn_add: hashes: %d:%d:%d, %d\n",
1237                                                                                                 c->con_aliases[0].hash,
1238                                                                                                 c->con_aliases[1].hash,
1239                                                                                                 c->con_aliases[2].hash,
1240                                                                                                 c->id_hash);
1241                 return c;
1242         }else{
1243                 LOG(L_CRIT, "tcpconn_add: BUG: null connection pointer\n");
1244                 return 0;
1245         }
1246 }
1247
1248
1249 static inline void _tcpconn_detach(struct tcp_connection *c)
1250 {
1251         int r;
1252         tcpconn_listrm(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
1253         /* remove all the aliases */
1254         for (r=0; r<c->aliases; r++)
1255                 tcpconn_listrm(tcpconn_aliases_hash[c->con_aliases[r].hash], 
1256                                                 &c->con_aliases[r], next, prev);
1257 }
1258
1259
1260
1261 static inline void _tcpconn_free(struct tcp_connection* c)
1262 {
1263 #ifdef TCP_ASYNC
1264         if (unlikely(_wbufq_non_empty(c)))
1265                 _wbufq_destroy(&c->wbuf_q);
1266 #endif
1267         lock_destroy(&c->write_lock);
1268 #ifdef USE_TLS
1269         if (unlikely(c->type==PROTO_TLS)) tls_tcpconn_clean(c);
1270 #endif
1271         shm_free(c);
1272 }
1273
1274
1275
1276 /* unsafe tcpconn_rm version (nolocks) */
1277 void _tcpconn_rm(struct tcp_connection* c)
1278 {
1279         _tcpconn_detach(c);
1280         _tcpconn_free(c);
1281 }
1282
1283
1284
1285 void tcpconn_rm(struct tcp_connection* c)
1286 {
1287         int r;
1288         TCPCONN_LOCK;
1289         tcpconn_listrm(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
1290         /* remove all the aliases */
1291         for (r=0; r<c->aliases; r++)
1292                 tcpconn_listrm(tcpconn_aliases_hash[c->con_aliases[r].hash], 
1293                                                 &c->con_aliases[r], next, prev);
1294         TCPCONN_UNLOCK;
1295         lock_destroy(&c->write_lock);
1296 #ifdef USE_TLS
1297         if ((c->type==PROTO_TLS)&&(c->extra_data)) tls_tcpconn_clean(c);
1298 #endif
1299         shm_free(c);
1300 }
1301
1302
1303 /* finds a connection, if id=0 uses the ip addr, port, local_ip and local port
1304  *  (host byte order) and tries to find the connection that matches all of
1305  *   them. Wild cards can be used for local_ip and local_port (a 0 filled
1306  *   ip address and/or a 0 local port).
1307  * WARNING: unprotected (locks) use tcpconn_get unless you really
1308  * know what you are doing */
1309 struct tcp_connection* _tcpconn_find(int id, struct ip_addr* ip, int port,
1310                                                                                 struct ip_addr* l_ip, int l_port)
1311 {
1312
1313         struct tcp_connection *c;
1314         struct tcp_conn_alias* a;
1315         unsigned hash;
1316         int is_local_ip_any;
1317         
1318 #ifdef EXTRA_DEBUG
1319         DBG("tcpconn_find: %d  port %d\n",id, port);
1320         if (ip) print_ip("tcpconn_find: ip ", ip, "\n");
1321 #endif
1322         if (likely(id)){
1323                 hash=tcp_id_hash(id);
1324                 for (c=tcpconn_id_hash[hash]; c; c=c->id_next){
1325 #ifdef EXTRA_DEBUG
1326                         DBG("c=%p, c->id=%d, port=%d\n",c, c->id, c->rcv.src_port);
1327                         print_ip("ip=", &c->rcv.src_ip, "\n");
1328 #endif
1329                         if ((id==c->id)&&(c->state!=S_CONN_BAD)) return c;
1330                 }
1331         }else if (likely(ip)){
1332                 hash=tcp_addr_hash(ip, port, l_ip, l_port);
1333                 is_local_ip_any=ip_addr_any(l_ip);
1334                 for (a=tcpconn_aliases_hash[hash]; a; a=a->next){
1335 #ifdef EXTRA_DEBUG
1336                         DBG("a=%p, c=%p, c->id=%d, alias port= %d port=%d\n", a, a->parent,
1337                                         a->parent->id, a->port, a->parent->rcv.src_port);
1338                         print_ip("ip=",&a->parent->rcv.src_ip,"\n");
1339 #endif
1340                         if ( (a->parent->state!=S_CONN_BAD) && (port==a->port) &&
1341                                         ((l_port==0) || (l_port==a->parent->rcv.dst_port)) &&
1342                                         (ip_addr_cmp(ip, &a->parent->rcv.src_ip)) &&
1343                                         (is_local_ip_any ||
1344                                                 ip_addr_cmp(l_ip, &a->parent->rcv.dst_ip))
1345                                 )
1346                                 return a->parent;
1347                 }
1348         }
1349         return 0;
1350 }
1351
1352
1353
1354 /* _tcpconn_find with locks and timeout
1355  * local_addr contains the desired local ip:port. If null any local address 
1356  * will be used.  IN*ADDR_ANY or 0 port are wild cards.
1357  */
1358 struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port,
1359                                                                         union sockaddr_union* local_addr,
1360                                                                         ticks_t timeout)
1361 {
1362         struct tcp_connection* c;
1363         struct ip_addr local_ip;
1364         int local_port;
1365         
1366         local_port=0;
1367         if (likely(ip)){
1368                 if (unlikely(local_addr)){
1369                         su2ip_addr(&local_ip, local_addr);
1370                         local_port=su_getport(local_addr);
1371                 }else{
1372                         ip_addr_mk_any(ip->af, &local_ip);
1373                         local_port=0;
1374                 }
1375         }
1376         TCPCONN_LOCK;
1377         c=_tcpconn_find(id, ip, port, &local_ip, local_port);
1378         if (likely(c)){ 
1379                         atomic_inc(&c->refcnt);
1380                         /* update the timeout only if the connection is not handled
1381                          * by a tcp reader (the tcp reader process uses c->timeout for 
1382                          * its own internal timeout and c->timeout will be overwritten
1383                          * anyway on return to tcp_main) */
1384                         if (likely(c->reader_pid==0))
1385                                 c->timeout=get_ticks_raw()+timeout;
1386         }
1387         TCPCONN_UNLOCK;
1388         return c;
1389 }
1390
1391
1392
1393 /* add c->dst:port, local_addr as an alias for the "id" connection, 
1394  * flags: TCP_ALIAS_FORCE_ADD  - add an alias even if a previous one exists
1395  *        TCP_ALIAS_REPLACE    - if a prev. alias exists, replace it with the
1396  *                                new one
1397  * returns 0 on success, <0 on failure ( -1  - null c, -2 too many aliases,
1398  *  -3 alias already present and pointing to another connection)
1399  * WARNING: must be called with TCPCONN_LOCK held */
1400 inline static int _tcpconn_add_alias_unsafe(struct tcp_connection* c, int port,
1401                                                                                 struct ip_addr* l_ip, int l_port,
1402                                                                                 int flags)
1403 {
1404         unsigned hash;
1405         struct tcp_conn_alias* a;
1406         struct tcp_conn_alias* nxt;
1407         struct tcp_connection* p;
1408         int is_local_ip_any;
1409         int i;
1410         int r;
1411         
1412         a=0;
1413         is_local_ip_any=ip_addr_any(l_ip);
1414         if (likely(c)){
1415                 hash=tcp_addr_hash(&c->rcv.src_ip, port, l_ip, l_port);
1416                 /* search the aliases for an already existing one */
1417                 for (a=tcpconn_aliases_hash[hash], nxt=0; a; a=nxt){
1418                         nxt=a->next;
1419                         if ( (a->parent->state!=S_CONN_BAD) && (port==a->port) &&
1420                                         ( (l_port==0) || (l_port==a->parent->rcv.dst_port)) &&
1421                                         (ip_addr_cmp(&c->rcv.src_ip, &a->parent->rcv.src_ip)) &&
1422                                         ( is_local_ip_any || 
1423                                           ip_addr_cmp(&a->parent->rcv.dst_ip, l_ip))
1424                                         ){
1425                                 /* found */
1426                                 if (unlikely(a->parent!=c)){
1427                                         if (flags & TCP_ALIAS_FORCE_ADD)
1428                                                 /* still have to walk the whole list to check if
1429                                                  * the alias was not already added */
1430                                                 continue;
1431                                         else if (flags & TCP_ALIAS_REPLACE){
1432                                                 /* remove the alias =>
1433                                                  * remove the current alias and all the following
1434                                                  *  ones from the corresponding connection, shift the 
1435                                                  *  connection aliases array and re-add the other 
1436                                                  *  aliases (!= current one) */
1437                                                 p=a->parent;
1438                                                 for (i=0; (i<p->aliases) && (&(p->con_aliases[i])!=a);
1439                                                                 i++);
1440                                                 if (unlikely(i==p->aliases)){
1441                                                         LOG(L_CRIT, "BUG: _tcpconn_add_alias_unsafe: "
1442                                                                         " alias %p not found in con %p (id %d)\n",
1443                                                                         a, p, p->id);
1444                                                         goto error_not_found;
1445                                                 }
1446                                                 for (r=i; r<p->aliases; r++){
1447                                                         tcpconn_listrm(
1448                                                                 tcpconn_aliases_hash[p->con_aliases[r].hash],
1449                                                                 &p->con_aliases[r], next, prev);
1450                                                 }
1451                                                 if (likely((i+1)<p->aliases)){
1452                                                         memmove(&p->con_aliases[i], &p->con_aliases[i+1],
1453                                                                                         (p->aliases-i-1)*
1454                                                                                                 sizeof(p->con_aliases[0]));
1455                                                 }
1456                                                 p->aliases--;
1457                                                 /* re-add the remaining aliases */
1458                                                 for (r=i; r<p->aliases; r++){
1459                                                         tcpconn_listadd(
1460                                                                 tcpconn_aliases_hash[p->con_aliases[r].hash], 
1461                                                                 &p->con_aliases[r], next, prev);
1462                                                 }
1463                                         }else
1464                                                 goto error_sec;
1465                                 }else goto ok;
1466                         }
1467                 }
1468                 if (unlikely(c->aliases>=TCP_CON_MAX_ALIASES)) goto error_aliases;
1469                 c->con_aliases[c->aliases].parent=c;
1470                 c->con_aliases[c->aliases].port=port;
1471                 c->con_aliases[c->aliases].hash=hash;
1472                 tcpconn_listadd(tcpconn_aliases_hash[hash], 
1473                                                                 &c->con_aliases[c->aliases], next, prev);
1474                 c->aliases++;
1475         }else goto error_not_found;
1476 ok:
1477 #ifdef EXTRA_DEBUG
1478         if (a) DBG("_tcpconn_add_alias_unsafe: alias already present\n");
1479         else   DBG("_tcpconn_add_alias_unsafe: alias port %d for hash %d, id %d\n",
1480                         port, hash, c->id);
1481 #endif
1482         return 0;
1483 error_aliases:
1484         /* too many aliases */
1485         return -2;
1486 error_not_found:
1487         /* null connection */
1488         return -1;
1489 error_sec:
1490         /* alias already present and pointing to a different connection
1491          * (hijack attempt?) */
1492         return -3;
1493 }
1494
1495
1496
1497 /* add port as an alias for the "id" connection, 
1498  * returns 0 on success,-1 on failure */
1499 int tcpconn_add_alias(int id, int port, int proto)
1500 {
1501         struct tcp_connection* c;
1502         int ret;
1503         struct ip_addr zero_ip;
1504         int r;
1505         int alias_flags;
1506         
1507         /* fix the port */
1508         port=port?port:((proto==PROTO_TLS)?SIPS_PORT:SIP_PORT);
1509         TCPCONN_LOCK;
1510         /* check if alias already exists */
1511         c=_tcpconn_find(id, 0, 0, 0, 0);
1512         if (likely(c)){
1513                 ip_addr_mk_any(c->rcv.src_ip.af, &zero_ip);
1514                 alias_flags=cfg_get(tcp, tcp_cfg, alias_flags);
1515                 /* alias src_ip:port, 0, 0 */
1516                 ret=_tcpconn_add_alias_unsafe(c, port,  &zero_ip, 0, 
1517                                                                                 alias_flags);
1518                 if (ret<0 && ret!=-3) goto error;
1519                 /* alias src_ip:port, local_ip, 0 */
1520                 ret=_tcpconn_add_alias_unsafe(c, port,  &c->rcv.dst_ip, 0, 
1521                                                                                 alias_flags);
1522                 if (ret<0 && ret!=-3) goto error;
1523                 /* alias src_ip:port, local_ip, local_port */
1524                 ret=_tcpconn_add_alias_unsafe(c, port, &c->rcv.dst_ip, c->rcv.dst_port,
1525                                                                                 alias_flags);
1526                 if (unlikely(ret<0)) goto error;
1527         }else goto error_not_found;
1528         TCPCONN_UNLOCK;
1529         return 0;
1530 error_not_found:
1531         TCPCONN_UNLOCK;
1532         LOG(L_ERR, "ERROR: tcpconn_add_alias: no connection found for id %d\n",id);
1533         return -1;
1534 error:
1535         TCPCONN_UNLOCK;
1536         switch(ret){
1537                 case -2:
1538                         LOG(L_ERR, "ERROR: tcpconn_add_alias: too many aliases (%d)"
1539                                         " for connection %p (id %d) %s:%d <- %d\n",
1540                                         c->aliases, c, c->id, ip_addr2a(&c->rcv.src_ip),
1541                                         c->rcv.src_port, port);
1542                         for (r=0; r<c->aliases; r++){
1543                                 LOG(L_ERR, "ERROR: tcpconn_add_alias: alias %d: for %p (%d)"
1544                                                 " %s:%d <-%d hash %x\n",  r, c, c->id, 
1545                                                  ip_addr2a(&c->rcv.src_ip), c->rcv.src_port, 
1546                                                 c->con_aliases[r].port, c->con_aliases[r].hash);
1547                         }
1548                         break;
1549                 case -3:
1550                         LOG(L_ERR, "ERROR: tcpconn_add_alias: possible port"
1551                                         " hijack attempt\n");
1552                         LOG(L_ERR, "ERROR: tcpconn_add_alias: alias for %d port %d already"
1553                                                 " present and points to another connection \n",
1554                                                 c->id, port);
1555                         break;
1556                 default:
1557                         LOG(L_ERR, "ERROR: tcpconn_add_alias: unkown error %d\n", ret);
1558         }
1559         return -1;
1560 }
1561
1562
1563
1564 #ifdef TCP_FD_CACHE
1565
1566 static void tcp_fd_cache_init()
1567 {
1568         int r;
1569         for (r=0; r<TCP_FD_CACHE_SIZE; r++)
1570                 fd_cache[r].fd=-1;
1571 }
1572
1573
1574 inline static struct fd_cache_entry* tcp_fd_cache_get(struct tcp_connection *c)
1575 {
1576         int h;
1577         
1578         h=c->id%TCP_FD_CACHE_SIZE;
1579         if ((fd_cache[h].fd>0) && (fd_cache[h].id==c->id) && (fd_cache[h].con==c))
1580                 return &fd_cache[h];
1581         return 0;
1582 }
1583
1584
1585 inline static void tcp_fd_cache_rm(struct fd_cache_entry* e)
1586 {
1587         e->fd=-1;
1588 }
1589
1590
1591 inline static void tcp_fd_cache_add(struct tcp_connection *c, int fd)
1592 {
1593         int h;
1594         
1595         h=c->id%TCP_FD_CACHE_SIZE;
1596         if (likely(fd_cache[h].fd>0))
1597                 close(fd_cache[h].fd);
1598         fd_cache[h].fd=fd;
1599         fd_cache[h].id=c->id;
1600         fd_cache[h].con=c;
1601 }
1602
1603 #endif /* TCP_FD_CACHE */
1604
1605
1606
1607 inline static int tcpconn_chld_put(struct tcp_connection* tcpconn);
1608
1609
1610
1611 /* finds a tcpconn & sends on it
1612  * uses the dst members to, proto (TCP|TLS) and id and tries to send
1613  *  from the "from" address (if non null and id==0)
1614  * returns: number of bytes written (>=0) on success
1615  *          <0 on error */
1616 int tcp_send(struct dest_info* dst, union sockaddr_union* from,
1617                                         char* buf, unsigned len)
1618 {
1619         struct tcp_connection *c;
1620         struct tcp_connection *tmp;
1621         struct ip_addr ip;
1622         int port;
1623         int fd;
1624         long response[2];
1625         int n;
1626         int do_close_fd;
1627         ticks_t con_lifetime;
1628 #ifdef TCP_ASYNC
1629         int enable_write_watch;
1630 #endif /* TCP_ASYNC */
1631 #ifdef TCP_FD_CACHE
1632         struct fd_cache_entry* fd_cache_e;
1633         int use_fd_cache;
1634         
1635         use_fd_cache=cfg_get(tcp, tcp_cfg, fd_cache);
1636         fd_cache_e=0;
1637 #endif /* TCP_FD_CACHE */
1638         do_close_fd=1; /* close the fd on exit */
1639         port=su_getport(&dst->to);
1640         con_lifetime=cfg_get(tcp, tcp_cfg, con_lifetime);
1641         if (likely(port)){
1642                 su2ip_addr(&ip, &dst->to);
1643                 c=tcpconn_get(dst->id, &ip, port, from, con_lifetime); 
1644         }else if (likely(dst->id)){
1645                 c=tcpconn_get(dst->id, 0, 0, 0, con_lifetime);
1646         }else{
1647                 LOG(L_CRIT, "BUG: tcp_send called with null id & to\n");
1648                 return -1;
1649         }
1650         
1651         if (likely(dst->id)){
1652                 if (unlikely(c==0)) {
1653                         if (likely(port)){
1654                                 /* try again w/o id */
1655                                 c=tcpconn_get(0, &ip, port, from, con_lifetime);
1656                                 goto no_id;
1657                         }else{
1658                                 LOG(L_ERR, "ERROR: tcp_send: id %d not found, dropping\n",
1659                                                 dst->id);
1660                                 return -1;
1661                         }
1662                 }else goto get_fd;
1663         }
1664 no_id:
1665                 if (unlikely(c==0)){
1666                         /* check if connect() is disabled */
1667                         if (cfg_get(tcp, tcp_cfg, no_connect))
1668                                 return -1;
1669                         DBG("tcp_send: no open tcp connection found, opening new one\n");
1670                         /* create tcp connection */
1671                         if (likely(from==0)){
1672                                 /* check to see if we have to use a specific source addr. */
1673                                 switch (dst->to.s.sa_family) {
1674                                         case AF_INET:
1675                                                         from = tcp_source_ipv4;
1676                                                 break;
1677 #ifdef USE_IPV6
1678                                         case AF_INET6:
1679                                                         from = tcp_source_ipv6;
1680                                                 break;
1681 #endif
1682                                         default:
1683                                                 /* error, bad af, ignore ... */
1684                                                 break;
1685                                 }
1686                         }
1687 #if defined(TCP_CONNECT_WAIT) && defined(TCP_ASYNC)
1688                         if (likely(cfg_get(tcp, tcp_cfg, tcp_connect_wait) && 
1689                                                 cfg_get(tcp, tcp_cfg, async) )){
1690                                 if (unlikely(*tcp_connections_no >=
1691                                                                 cfg_get(tcp, tcp_cfg, max_connections))){
1692                                         LOG(L_ERR, "ERROR: tcp_send %s: maximum number of"
1693                                                                 " connections exceeded (%d/%d)\n",
1694                                                                 su2a(&dst->to, sizeof(dst->to)),
1695                                                                 *tcp_connections_no,
1696                                                                 cfg_get(tcp, tcp_cfg, max_connections));
1697                                         return -1;
1698                                 }
1699                                 c=tcpconn_new(-1, &dst->to, from, 0, dst->proto,
1700                                                                 S_CONN_CONNECT);
1701                                 if (unlikely(c==0)){
1702                                         LOG(L_ERR, "ERROR: tcp_send %s: could not create new"
1703                                                         " connection\n",
1704                                                         su2a(&dst->to, sizeof(dst->to)));
1705                                         return -1;
1706                                 }
1707                                 c->flags|=F_CONN_PENDING|F_CONN_FD_CLOSED;
1708                                 atomic_set(&c->refcnt, 2); /* ref from here and from main hash
1709                                                                                          table */
1710                                 /* add it to id hash and aliases */
1711                                 if (unlikely(tcpconn_add(c)==0)){
1712                                         LOG(L_ERR, "ERROR: tcp_send %s: could not add "
1713                                                                 "connection %p\n",
1714                                                                 su2a(&dst->to, sizeof(dst->to)),
1715                                                                         c);
1716                                         _tcpconn_free(c);
1717                                         n=-1;
1718                                         goto end_no_conn;
1719                                 }
1720                                 /* do connect and if src ip or port changed, update the 
1721                                  * aliases */
1722                                 if (unlikely((fd=tcpconn_finish_connect(c, from))<0)){
1723                                         /* tcpconn_finish_connect will automatically blacklist
1724                                            on error => no need to do it here */
1725                                         LOG(L_ERR, "ERROR: tcp_send %s: tcpconn_finish_connect(%p)"
1726                                                         " failed\n", su2a(&dst->to, sizeof(dst->to)),
1727                                                                 c);
1728                                         goto conn_wait_error;
1729                                 }
1730                                 /* ? TODO: it might be faster just to queue the write directly
1731                                  *  and send to main CONN_NEW_PENDING_WRITE */
1732                                 /* delay sending the fd to main after the send */
1733                                 
1734                                 /* NOTE: no lock here, because the connection is marked as
1735                                  * pending and nobody else will try to write on it. However
1736                                  * this might produce out-of-order writes. If this is not
1737                                  * desired either lock before the write or use 
1738                                  * _wbufq_insert(...) */
1739                                 n=_tcpconn_write_nb(fd, c, buf, len);
1740                                 if (unlikely(n<(int)len)){
1741                                         if ((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK){
1742                                                 DBG("tcp_send: pending write on new connection %p "
1743                                                                 " (%d/%d bytes written)\n", c, n, len);
1744                                                 if (n<0) n=0;
1745                                                 else 
1746                                                         c->state=S_CONN_OK; /* partial write => connect()
1747                                                                                                         ended */
1748                                                 /* add to the write queue */
1749                                                 lock_get(&c->write_lock);
1750                                                         if (unlikely(_wbufq_insert(c, buf+n, len-n)<0)){
1751                                                                 lock_release(&c->write_lock);
1752                                                                 n=-1;
1753                                                                 LOG(L_ERR, "ERROR: tcp_send %s: EAGAIN and"
1754                                                                                 " write queue full or failed for %p\n",
1755                                                                                 su2a(&dst->to, sizeof(dst->to)),
1756                                                                                 c);
1757                                                                 goto conn_wait_error;
1758                                                         }
1759                                                 lock_release(&c->write_lock);
1760                                                 /* send to tcp_main */
1761                                                 response[0]=(long)c;
1762                                                 response[1]=CONN_NEW_PENDING_WRITE;
1763                                                 if (unlikely(send_fd(unix_tcp_sock, response, 
1764                                                                                                 sizeof(response), fd) <= 0)){
1765                                                         LOG(L_ERR, "BUG: tcp_send %s: "
1766                                                                                 "CONN_NEW_PENDING_WRITE  for %p"
1767                                                                                 " failed:" " %s (%d)\n",
1768                                                                                 su2a(&dst->to, sizeof(dst->to)),
1769                                                                                 c, strerror(errno), errno);
1770                                                         goto conn_wait_error;
1771                                                 }
1772                                                 n=len;
1773                                                 goto end;
1774                                         }
1775 #ifdef USE_DST_BLACKLIST
1776                                         if (cfg_get(core, core_cfg, use_dst_blacklist))
1777                                                 switch(errno){
1778                                                         case ENETUNREACH:
1779                                                         case ECONNRESET:
1780                                                         /*case EHOSTUNREACH: -- not posix */
1781                                                                 /* if first write failed it's most likely a
1782                                                                    connect error */
1783                                                                 dst_blacklist_add( BLST_ERR_CONNECT, dst, 0);
1784                                                                 break;
1785                                                 }
1786 #endif /* USE_DST_BLACKLIST */
1787                                         /* error: destroy it directly */
1788                                         LOG(L_ERR, "ERROR: tcp_send %s: connect & send "
1789                                                                                 " for %p failed:" " %s (%d)\n",
1790                                                                                 su2a(&dst->to, sizeof(dst->to)),
1791                                                                                 c, strerror(errno), errno);
1792                                         goto conn_wait_error;
1793                                 }
1794                                 LOG(L_INFO, "tcp_send: quick connect for %p\n", c);
1795                                 c->state=S_CONN_OK;
1796                                 /* send to tcp_main */
1797                                 response[0]=(long)c;
1798                                 response[1]=CONN_NEW_COMPLETE;
1799                                 if (unlikely(send_fd(unix_tcp_sock, response, 
1800                                                                                 sizeof(response), fd) <= 0)){
1801                                         LOG(L_ERR, "BUG: tcp_send %s: CONN_NEW_COMPLETE  for %p"
1802                                                                 " failed:" " %s (%d)\n",
1803                                                                 su2a(&dst->to, sizeof(dst->to)),
1804                                                                 c, strerror(errno), errno);
1805                                         goto conn_wait_error;
1806                                 }
1807                                 goto end;
1808                         }
1809 #endif /* TCP_CONNECT_WAIT  && TCP_ASYNC */
1810                         if (unlikely((c=tcpconn_connect(&dst->to, from, dst->proto))==0)){
1811                                 LOG(L_ERR, "ERROR: tcp_send %s: connect failed\n",
1812                                                                 su2a(&dst->to, sizeof(dst->to)));
1813                                 return -1;
1814                         }
1815                         atomic_set(&c->refcnt, 2); /* ref. from here and it will also
1816                                                       be added in the tcp_main hash */
1817                         fd=c->s;
1818                         c->flags|=F_CONN_FD_CLOSED; /* not yet opened in main */
1819                         /* ? TODO: it might be faster just to queue the write and
1820                          * send to main a CONN_NEW_PENDING_WRITE */
1821                         
1822                         /* send the new tcpconn to "tcp main" */
1823                         response[0]=(long)c;
1824                         response[1]=CONN_NEW;
1825                         n=send_fd(unix_tcp_sock, response, sizeof(response), c->s);
1826                         if (unlikely(n<=0)){
1827                                 LOG(L_ERR, "BUG: tcp_send %s: failed send_fd: %s (%d)\n",
1828                                                 su2a(&dst->to, sizeof(dst->to)),
1829                                                 strerror(errno), errno);
1830                                 /* we can safely delete it, it's not referenced by anybody */
1831                                 _tcpconn_free(c);
1832                                 n=-1;
1833                                 goto end_no_conn;
1834                         }
1835                         goto send_it;
1836                 }
1837 get_fd:
1838 #ifdef TCP_ASYNC
1839                 /* if data is already queued, we don't need the fd any more */
1840                 if (unlikely(cfg_get(tcp, tcp_cfg, async) &&
1841                                                 (_wbufq_non_empty(c)
1842 #ifdef TCP_CONNECT_WAIT
1843                                                                                         || (c->flags&F_CONN_PENDING)
1844 #endif /* TCP_CONNECT_WAIT */
1845                                                 ) )){
1846                         lock_get(&c->write_lock);
1847                                 if (likely(_wbufq_non_empty(c)
1848 #ifdef TCP_CONNECT_WAIT
1849                                                         || (c->flags&F_CONN_PENDING)
1850 #endif /* TCP_CONNECT_WAIT */
1851
1852                                                         )){
1853                                         do_close_fd=0;
1854                                         if (unlikely(_wbufq_add(c, buf, len)<0)){
1855                                                 lock_release(&c->write_lock);
1856                                                 n=-1;
1857                                                 goto error;
1858                                         }
1859                                         n=len;
1860                                         lock_release(&c->write_lock);
1861                                         goto release_c;
1862                                 }
1863                         lock_release(&c->write_lock);
1864                 }
1865 #endif /* TCP_ASYNC */
1866                 /* check if this is not the same reader process holding
1867                  *  c  and if so send directly on c->fd */
1868                 if (c->reader_pid==my_pid()){
1869                         DBG("tcp_send: send from reader (%d (%d)), reusing fd\n",
1870                                         my_pid(), process_no);
1871                         fd=c->fd;
1872                         do_close_fd=0; /* don't close the fd on exit, it's in use */
1873 #ifdef TCP_FD_CACHE
1874                         use_fd_cache=0; /* don't cache: problems would arise due to the
1875                                                            close() on cache eviction (if the fd is still 
1876                                                            used). If it has to be cached then dup() _must_ 
1877                                                            be used */
1878                 }else if (likely(use_fd_cache && 
1879                                                         ((fd_cache_e=tcp_fd_cache_get(c))!=0))){
1880                         fd=fd_cache_e->fd;
1881                         do_close_fd=0;
1882                         DBG("tcp_send: found fd in cache ( %d, %p, %d)\n",
1883                                         fd, c, fd_cache_e->id);
1884 #endif /* TCP_FD_CACHE */
1885                 }else{
1886                         DBG("tcp_send: tcp connection found (%p), acquiring fd\n", c);
1887                         /* get the fd */
1888                         response[0]=(long)c;
1889                         response[1]=CONN_GET_FD;
1890                         n=send_all(unix_tcp_sock, response, sizeof(response));
1891                         if (unlikely(n<=0)){
1892                                 LOG(L_ERR, "BUG: tcp_send: failed to get fd(write):%s (%d)\n",
1893                                                 strerror(errno), errno);
1894                                 n=-1;
1895                                 goto release_c;
1896                         }
1897                         DBG("tcp_send, c= %p, n=%d\n", c, n);
1898                         n=receive_fd(unix_tcp_sock, &tmp, sizeof(tmp), &fd, MSG_WAITALL);
1899                         if (unlikely(n<=0)){
1900                                 LOG(L_ERR, "BUG: tcp_send: failed to get fd(receive_fd):"
1901                                                         " %s (%d)\n", strerror(errno), errno);
1902                                 n=-1;
1903                                 do_close_fd=0;
1904                                 goto release_c;
1905                         }
1906                         if (unlikely(c!=tmp)){
1907                                 LOG(L_CRIT, "BUG: tcp_send: get_fd: got different connection:"
1908                                                 "  %p (id= %d, refcnt=%d state=%d) != "
1909                                                 "  %p (n=%d)\n",
1910                                                   c,   c->id,   atomic_get(&c->refcnt),   c->state,
1911                                                   tmp, n
1912                                    );
1913                                 n=-1; /* fail */
1914                                 goto end;
1915                         }
1916                         DBG("tcp_send: after receive_fd: c= %p n=%d fd=%d\n",c, n, fd);
1917                 }
1918         
1919         
1920 send_it:
1921         DBG("tcp_send: sending...\n");
1922         lock_get(&c->write_lock);
1923 #ifdef TCP_ASYNC
1924         if (likely(cfg_get(tcp, tcp_cfg, async))){
1925                 if (_wbufq_non_empty(c)
1926 #ifdef TCP_CONNECT_WAIT
1927                         || (c->flags&F_CONN_PENDING) 
1928 #endif /* TCP_CONNECT_WAIT */
1929                         ){
1930                         if (unlikely(_wbufq_add(c, buf, len)<0)){
1931                                 lock_release(&c->write_lock);
1932                                 n=-1;
1933                                 goto error;
1934                         }
1935                         lock_release(&c->write_lock);
1936                         n=len;
1937                         goto end;
1938                 }
1939                 n=_tcpconn_write_nb(fd, c, buf, len);
1940         }else{
1941 #endif /* TCP_ASYNC */
1942 #ifdef USE_TLS
1943         if (c->type==PROTO_TLS)
1944                 n=tls_blocking_write(c, fd, buf, len);
1945         else
1946 #endif
1947                 /* n=tcp_blocking_write(c, fd, buf, len); */
1948                 n=tsend_stream(fd, buf, len,
1949                                                         cfg_get(tcp, tcp_cfg, send_timeout_s)*1000);
1950 #ifdef TCP_ASYNC
1951         }
1952 #else /* ! TCP_ASYNC */
1953         lock_release(&c->write_lock);
1954 #endif /* TCP_ASYNC */
1955         
1956         DBG("tcp_send: after real write: c= %p n=%d fd=%d\n",c, n, fd);
1957         DBG("tcp_send: buf=\n%.*s\n", (int)len, buf);
1958         if (unlikely(n<(int)len)){
1959 #ifdef TCP_ASYNC
1960                 if (cfg_get(tcp, tcp_cfg, async) && 
1961                                 ((n>=0) || errno==EAGAIN || errno==EWOULDBLOCK)){
1962                         enable_write_watch=_wbufq_empty(c);
1963                         if (n<0) n=0;
1964                         else if (unlikely(c->state==S_CONN_CONNECT ||
1965                                                 c->state==S_CONN_ACCEPT))
1966                                 c->state=S_CONN_OK; /* something was written */
1967                         if (unlikely(_wbufq_add(c, buf+n, len-n)<0)){
1968                                 lock_release(&c->write_lock);
1969                                 n=-1;
1970                                 goto error;
1971                         }
1972                         lock_release(&c->write_lock);
1973                         n=len;
1974                         if (likely(enable_write_watch)){
1975                                 response[0]=(long)c;
1976                                 response[1]=CONN_QUEUED_WRITE;
1977                                 if (send_all(unix_tcp_sock, response, sizeof(response)) <= 0){
1978                                         LOG(L_ERR, "BUG: tcp_send: error return failed "
1979                                                                 "(write):%s (%d)\n", strerror(errno), errno);
1980                                         n=-1;
1981                                         goto error;
1982                                 }
1983                         }
1984                         goto end;
1985                 }else{
1986                         lock_release(&c->write_lock);
1987                 }
1988 #endif /* TCP_ASYNC */
1989 #ifdef USE_DST_BLACKLIST
1990                 if (cfg_get(core, core_cfg, use_dst_blacklist))
1991                         switch(errno){
1992                                 case ENETUNREACH:
1993                                 case ECONNRESET:
1994                                 /*case EHOSTUNREACH: -- not posix */
1995                                         dst_blacklist_su((c->state==S_CONN_CONNECT)?
1996                                                                                         BLST_ERR_CONNECT:
1997                                                                                         BLST_ERR_SEND,
1998                                                                                 c->rcv.proto,
1999                                                                                 &c->rcv.src_su, 0);
2000                                         break;
2001                         }
2002 #endif /* USE_DST_BLACKLIST */
2003                 LOG(L_ERR, "ERROR: tcp_send: failed to send on %p (%s:%d->%s): %s (%d)"
2004                                         "\n", c, ip_addr2a(&c->rcv.dst_ip), c->rcv.dst_port,
2005                                         su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
2006                                         strerror(errno), errno);
2007 #ifdef TCP_ASYNC
2008 error:
2009 #endif /* TCP_ASYNC */
2010                 /* error on the connection , mark it as bad and set 0 timeout */
2011                 c->state=S_CONN_BAD;
2012                 c->timeout=get_ticks_raw();
2013                 /* tell "main" it should drop this (optional it will t/o anyway?)*/
2014                 response[0]=(long)c;
2015                 response[1]=CONN_ERROR;
2016                 if (send_all(unix_tcp_sock, response, sizeof(response))<=0){
2017                         LOG(L_CRIT, "BUG: tcp_send: error return failed (write):%s (%d)\n",
2018                                         strerror(errno), errno);
2019                         tcpconn_chld_put(c); /* deref. it manually */
2020                         n=-1;
2021                 }
2022                 /* CONN_ERROR will auto-dec refcnt => we must not call tcpconn_put 
2023                  * if it succeeds */
2024 #ifdef TCP_FD_CACHE
2025                 if (unlikely(fd_cache_e)){
2026                         LOG(L_ERR, "ERROR: tcp_send %s: error on cached fd, removing from"
2027                                         " the cache (%d, %p, %d)\n", 
2028                                         su2a(&c->rcv.src_su, sizeof(c->rcv.src_su)),
2029                                         fd, fd_cache_e->con, fd_cache_e->id);
2030                         tcp_fd_cache_rm(fd_cache_e);
2031                         close(fd);
2032                 }else
2033 #endif /* TCP_FD_CACHE */
2034                 if (do_close_fd) close(fd);
2035                 return n; /* error return, no tcpconn_put */
2036         }
2037         
2038 #ifdef TCP_ASYNC
2039         lock_release(&c->write_lock);
2040 #endif /* TCP_ASYNC */
2041         /* in non-async mode here we're either in S_CONN_OK or S_CONN_ACCEPT*/
2042         if (unlikely(c->state==S_CONN_CONNECT || c->state==S_CONN_ACCEPT))
2043                         c->state=S_CONN_OK;
2044 end:
2045 #ifdef TCP_FD_CACHE
2046         if (unlikely((fd_cache_e==0) && use_fd_cache)){
2047                 tcp_fd_cache_add(c, fd);
2048         }else
2049 #endif /* TCP_FD_CACHE */
2050         if (do_close_fd) close(fd);
2051 release_c:
2052         tcpconn_chld_put(c); /* release c (dec refcnt & free on 0) */
2053 end_no_conn:
2054         return n;
2055 #ifdef TCP_CONNECT_WAIT
2056 conn_wait_error:
2057         /* connect or send failed on newly created connection which was not
2058          * yet sent to tcp_main (but was already hashed) => don't send to main,
2059          * unhash and destroy directly (if refcnt>2 it will be destroyed when the 
2060          * last sender releases the connection (tcpconn_chld_put(c))) or when
2061          * tcp_main receives a CONN_ERROR it*/
2062         c->state=S_CONN_BAD;
2063         TCPCONN_LOCK;
2064                 if (c->flags & F_CONN_HASHED){
2065                         /* if some other parallel tcp_send did send CONN_ERROR to
2066                          * tcp_main, the connection might be already detached */
2067                         _tcpconn_detach(c);
2068                         c->flags&=~F_CONN_HASHED;
2069                         TCPCONN_UNLOCK;
2070                         tcpconn_put(c);
2071                 }else
2072                         TCPCONN_UNLOCK;
2073         /* dec refcnt -> mark it for destruction */
2074         tcpconn_chld_put(c);
2075         return -1;
2076 #endif /* TCP_CONNET_WAIT */
2077 }
2078
2079
2080
2081 int tcp_init(struct socket_info* sock_info)
2082 {
2083         union sockaddr_union* addr;
2084         int optval;
2085 #ifdef HAVE_TCP_ACCEPT_FILTER
2086         struct accept_filter_arg afa;
2087 #endif /* HAVE_TCP_ACCEPT_FILTER */
2088 #ifdef DISABLE_NAGLE
2089         int flag;
2090         struct protoent* pe;
2091
2092         if (tcp_proto_no==-1){ /* if not already set */
2093                 pe=getprotobyname("tcp");
2094                 if (pe==0){
2095                         LOG(L_ERR, "ERROR: tcp_init: could not get TCP protocol number\n");
2096                         tcp_proto_no=-1;
2097                 }else{
2098                         tcp_proto_no=pe->p_proto;
2099                 }
2100         }
2101 #endif
2102         
2103         addr=&sock_info->su;
2104         /* sock_info->proto=PROTO_TCP; */
2105         if (init_su(addr, &sock_info->address, sock_info->port_no)<0){
2106                 LOG(L_ERR, "ERROR: tcp_init: could no init sockaddr_union\n");
2107                 goto error;
2108         }
2109         DBG("tcp_init: added %s\n", su2a(addr, sizeof(*addr)));
2110         sock_info->socket=socket(AF2PF(addr->s.sa_family), SOCK_STREAM, 0);
2111         if (sock_info->socket==-1){
2112                 LOG(L_ERR, "ERROR: tcp_init: socket: %s\n", strerror(errno));
2113                 goto error;
2114         }
2115 #ifdef DISABLE_NAGLE
2116         flag=1;
2117         if ( (tcp_proto_no!=-1) &&
2118                  (setsockopt(sock_info->socket, tcp_proto_no , TCP_NODELAY,
2119                                          &flag, sizeof(flag))<0) ){
2120                 LOG(L_ERR, "ERROR: tcp_init: could not disable Nagle: %s\n",
2121                                 strerror(errno));
2122         }
2123 #endif
2124
2125
2126 #if  !defined(TCP_DONT_REUSEADDR) 
2127         /* Stevens, "Network Programming", Section 7.5, "Generic Socket
2128      * Options": "...server started,..a child continues..on existing
2129          * connection..listening server is restarted...call to bind fails
2130          * ... ALL TCP servers should specify the SO_REUSEADDRE option 
2131          * to allow the server to be restarted in this situation
2132          *
2133          * Indeed, without this option, the server can't restart.
2134          *   -jiri
2135          */
2136         optval=1;
2137         if (setsockopt(sock_info->socket, SOL_SOCKET, SO_REUSEADDR,
2138                                 (void*)&optval, sizeof(optval))==-1) {
2139                 LOG(L_ERR, "ERROR: tcp_init: setsockopt %s\n",
2140                         strerror(errno));
2141                 goto error;
2142         }
2143 #endif
2144         /* tos */
2145         optval = tos;
2146         if (setsockopt(sock_info->socket, IPPROTO_IP, IP_TOS, (void*)&optval, 
2147                                 sizeof(optval)) ==-1){
2148                 LOG(L_WARN, "WARNING: tcp_init: setsockopt tos: %s\n", strerror(errno));
2149                 /* continue since this is not critical */
2150         }
2151 #ifdef HAVE_TCP_DEFER_ACCEPT
2152         /* linux only */
2153         if ((optval=cfg_get(tcp, tcp_cfg, defer_accept))){
2154                 if (setsockopt(sock_info->socket, IPPROTO_TCP, TCP_DEFER_ACCEPT,
2155                                         (void*)&optval, sizeof(optval)) ==-1){
2156                         LOG(L_WARN, "WARNING: tcp_init: setsockopt TCP_DEFER_ACCEPT %s\n",
2157                                                 strerror(errno));
2158                 /* continue since this is not critical */
2159                 }
2160         }
2161 #endif /* HAVE_TCP_DEFFER_ACCEPT */
2162 #ifdef HAVE_TCP_SYNCNT
2163         if ((optval=cfg_get(tcp, tcp_cfg, syncnt))){
2164                 if (setsockopt(sock_info->socket, IPPROTO_TCP, TCP_SYNCNT, &optval,
2165                                                 sizeof(optval))<0){
2166                         LOG(L_WARN, "WARNING: tcp_init: failed to set"
2167                                                 " maximum SYN retr. count: %s\n", strerror(errno));
2168                 }
2169         }
2170 #endif
2171 #ifdef HAVE_TCP_LINGER2
2172         if ((optval=cfg_get(tcp, tcp_cfg, linger2))){
2173                 if (setsockopt(sock_info->socket, IPPROTO_TCP, TCP_LINGER2, &optval,
2174                                                 sizeof(optval))<0){
2175                         LOG(L_WARN, "WARNING: tcp_init: failed to set"
2176                                                 " maximum LINGER2 timeout: %s\n", strerror(errno));
2177                 }
2178         }
2179 #endif
2180         init_sock_keepalive(sock_info->socket);
2181         if (bind(sock_info->socket, &addr->s, sockaddru_len(*addr))==-1){
2182                 LOG(L_ERR, "ERROR: tcp_init: bind(%x, %p, %d) on %s:%d : %s\n",
2183                                 sock_info->socket,  &addr->s, 
2184                                 (unsigned)sockaddru_len(*addr),
2185                                 sock_info->address_str.s,
2186                                 sock_info->port_no,
2187                                 strerror(errno));
2188                 goto error;
2189         }
2190         if (listen(sock_info->socket, TCP_LISTEN_BACKLOG)==-1){
2191                 LOG(L_ERR, "ERROR: tcp_init: listen(%x, %p, %d) on %s: %s\n",
2192                                 sock_info->socket, &addr->s, 
2193                                 (unsigned)sockaddru_len(*addr),
2194                                 sock_info->address_str.s,
2195                                 strerror(errno));
2196                 goto error;
2197         }
2198 #ifdef HAVE_TCP_ACCEPT_FILTER
2199         /* freebsd */
2200         if (cfg_get(tcp, tcp_cfg, defer_accept)){
2201                 memset(&afa, 0, sizeof(afa));
2202                 strcpy(afa.af_name, "dataready");
2203                 if (setsockopt(sock_info->socket, SOL_SOCKET, SO_ACCEPTFILTER,
2204                                         (void*)&afa, sizeof(afa)) ==-1){
2205                         LOG(L_WARN, "WARNING: tcp_init: setsockopt SO_ACCEPTFILTER %s\n",
2206                                                 strerror(errno));
2207                 /* continue since this is not critical */
2208                 }
2209         }
2210 #endif /* HAVE_TCP_ACCEPT_FILTER */
2211         
2212         return 0;
2213 error:
2214         if (sock_info->socket!=-1){
2215                 close(sock_info->socket);
2216                 sock_info->socket=-1;
2217         }
2218         return -1;
2219 }
2220
2221
2222
2223 /* close tcp_main's fd from a tcpconn
2224  * WARNING: call only in tcp_main context */
2225 inline static void tcpconn_close_main_fd(struct tcp_connection* tcpconn)
2226 {
2227         int fd;
2228         
2229         
2230         fd=tcpconn->s;
2231 #ifdef USE_TLS
2232         /*FIXME: lock ->writelock ? */
2233         if (tcpconn->type==PROTO_TLS)
2234                 tls_close(tcpconn, fd);
2235 #endif
2236 #ifdef TCP_FD_CACHE
2237         if (likely(cfg_get(tcp, tcp_cfg, fd_cache))) shutdown(fd, SHUT_RDWR);
2238 #endif /* TCP_FD_CACHE */
2239 close_again:
2240         if (unlikely(close(fd)<0)){
2241                 if (errno==EINTR)
2242                         goto close_again;
2243                 LOG(L_ERR, "ERROR: tcpconn_put_destroy; close() failed: %s (%d)\n",
2244                                 strerror(errno), errno);
2245         }
2246 }
2247
2248
2249
2250 /* dec refcnt & frees the connection if refcnt==0
2251  * returns 1 if the connection is freed, 0 otherwise
2252  *
2253  * WARNING: use only from child processes */
2254 inline static int tcpconn_chld_put(struct tcp_connection* tcpconn)
2255 {
2256         if (unlikely(atomic_dec_and_test(&tcpconn->refcnt))){
2257                 DBG("tcpconn_chld_put: destroying connection %p (%d, %d) "
2258                                 "flags %04x\n", tcpconn, tcpconn->id,
2259                                 tcpconn->s, tcpconn->flags);
2260                 /* sanity checks */
2261                 membar_read_atomic_op(); /* make sure we see the current flags */
2262                 if (unlikely(!(tcpconn->flags & F_CONN_FD_CLOSED) ||
2263                         (tcpconn->flags &
2264                                 (F_CONN_HASHED|F_CONN_MAIN_TIMER|
2265                                  F_CONN_READ_W|F_CONN_WRITE_W)) )){
2266                         LOG(L_CRIT, "BUG: tcpconn_chld_put: %p bad flags = %0x\n",
2267                                         tcpconn, tcpconn->flags);
2268                         abort();
2269                 }
2270                 _tcpconn_free(tcpconn); /* destroys also the wbuf_q if still present*/
2271                 return 1;
2272         }
2273         return 0;
2274 }
2275
2276
2277
2278 /* simple destroy function (the connection should be already removed
2279  * from the hashes and the fds should not be watched anymore for IO)
2280  */
2281 inline static void tcpconn_destroy(struct tcp_connection* tcpconn)
2282 {
2283                 DBG("tcpconn_destroy: destroying connection %p (%d, %d) "
2284                                 "flags %04x\n", tcpconn, tcpconn->id,
2285                                 tcpconn->s, tcpconn->flags);
2286                 if (unlikely(tcpconn->flags & F_CONN_HASHED)){
2287                         LOG(L_CRIT, "BUG: tcpconn_destroy: called with hashed"
2288                                                 " connection (%p)\n", tcpconn);
2289                         /* try to continue */
2290                         if (likely(tcpconn->flags & F_CONN_MAIN_TIMER))
2291                                 local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
2292                         TCPCONN_LOCK;
2293                                 _tcpconn_detach(tcpconn);
2294                         TCPCONN_UNLOCK;
2295                 }
2296                 if (likely(!(tcpconn->flags & F_CONN_FD_CLOSED))){
2297                         tcpconn_close_main_fd(tcpconn);
2298                         (*tcp_connections_no)--;
2299                 }
2300                 _tcpconn_free(tcpconn); /* destroys also the wbuf_q if still present*/
2301 }
2302
2303
2304
2305 /* tries to destroy the connection: dec. refcnt and if 0 destroys the
2306  *  connection, else it will mark it as BAD and close the main fds
2307  *
2308  * returns 1 if the connection was destroyed, 0 otherwise
2309  *
2310  * WARNING: - the connection _has_ to be removed from the hash and timer
2311  *  first (use tcpconn_try_unhash() for this )
2312  *         - the fd should not be watched anymore (io_watch_del()...)
2313  *         - must be called _only_ from the tcp_main process context
2314  *          (or else the fd will remain open)
2315  */
2316 inline static int tcpconn_put_destroy(struct tcp_connection* tcpconn)
2317 {
2318         if (unlikely((tcpconn->flags & 
2319                         (F_CONN_WRITE_W|F_CONN_HASHED|F_CONN_MAIN_TIMER|F_CONN_READ_W)) )){
2320                 /* sanity check */
2321                 if (unlikely(tcpconn->flags & F_CONN_HASHED)){
2322                         LOG(L_CRIT, "BUG: tcpconn_destroy: called with hashed and/or"
2323                                                 "on timer connection (%p), flags = %0x\n",
2324                                                 tcpconn, tcpconn->flags);
2325                         /* try to continue */
2326                         if (likely(tcpconn->flags & F_CONN_MAIN_TIMER))
2327                                 local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
2328                         TCPCONN_LOCK;
2329                                 _tcpconn_detach(tcpconn);
2330                         TCPCONN_UNLOCK;
2331                 }else{
2332                         LOG(L_CRIT, "BUG: tcpconn_put_destroy: %p flags = %0x\n",
2333                                         tcpconn, tcpconn->flags);
2334                 }
2335         }
2336         tcpconn->state=S_CONN_BAD;
2337         /* in case it's still in a reader timer */
2338         tcpconn->timeout=get_ticks_raw();
2339         /* fast close: close fds now */
2340         if (likely(!(tcpconn->flags & F_CONN_FD_CLOSED))){
2341                 tcpconn_close_main_fd(tcpconn);
2342                 tcpconn->flags|=F_CONN_FD_CLOSED;
2343                 (*tcp_connections_no)--;
2344         }
2345         /* all the flags / ops on the tcpconn must be done prior to decrementing
2346          * the refcnt. and at least a membar_write_atomic_op() mem. barrier or
2347          *  a mb_atomic_* op must * be used to make sure all the changed flags are
2348          *  written into memory prior to the new refcnt value */
2349         if (unlikely(mb_atomic_dec_and_test(&tcpconn->refcnt))){
2350                 _tcpconn_free(tcpconn);
2351                 return 1;
2352         }
2353         return 0;
2354 }
2355
2356
2357
2358 /* try to remove a connection from the hashes and timer.
2359  * returns 1 if the connection was removed, 0 if not (connection not in
2360  *  hash)
2361  *
2362  * WARNING: call it only in the  tcp_main process context or else the
2363  *  timer removal won't work.
2364  */
2365 inline static int tcpconn_try_unhash(struct tcp_connection* tcpconn)
2366 {
2367         if (likely(tcpconn->flags & F_CONN_HASHED)){
2368                 tcpconn->state=S_CONN_BAD;
2369                 if (likely(tcpconn->flags & F_CONN_MAIN_TIMER)){
2370                         local_timer_del(&tcp_main_ltimer, &tcpconn->timer);
2371                         tcpconn->flags&=~F_CONN_MAIN_TIMER;
2372                 }else
2373                         /* in case it's still in a reader timer */
2374                         tcpconn->timeout=get_ticks_raw();
2375                 TCPCONN_LOCK;
2376                         if (tcpconn->flags & F_CONN_HASHED){
2377                                 tcpconn->flags&=~F_CONN_HASHED;
2378                                 _tcpconn_detach(tcpconn);
2379                                 TCPCONN_UNLOCK;
2380                         }else{
2381                                 /* tcp_send was faster and did unhash it itself */
2382                                 TCPCONN_UNLOCK;
2383                                 return 0;
2384                         }
2385 #ifdef TCP_ASYNC
2386                 /* empty possible write buffers (optional) */
2387                 if (unlikely(_wbufq_non_empty(tcpconn))){
2388                         lock_get(&tcpconn->write_lock);
2389                                 /* check again, while holding the lock */
2390                                 if (likely(_wbufq_non_empty(tcpconn)))
2391                                         _wbufq_destroy(&tcpconn->wbuf_q);
2392                         lock_release(&tcpconn->write_lock);
2393                 }
2394 #endif /* TCP_ASYNC */
2395                 return 1;
2396         }
2397         return 0;
2398 }
2399
2400
2401
2402 #ifdef SEND_FD_QUEUE
2403 struct send_fd_info{
2404         struct tcp_connection* tcp_conn;
2405         ticks_t expire;
2406         int unix_sock;
2407         unsigned int retries; /* debugging */
2408 };
2409
2410 struct tcp_send_fd_q{
2411         struct send_fd_info* data; /* buffer */
2412         struct send_fd_info* crt;  /* pointer inside the buffer */
2413         struct send_fd_info* end;  /* points after the last valid position */
2414 };
2415
2416
2417 static struct tcp_send_fd_q send2child_q;
2418
2419
2420
2421 static int send_fd_queue_init(struct tcp_send_fd_q *q, unsigned int size)
2422 {
2423         q->data=pkg_malloc(size*sizeof(struct send_fd_info));
2424         if (q->data==0){
2425                 LOG(L_ERR, "ERROR: send_fd_queue_init: out of memory\n");
2426                 return -1;
2427         }
2428         q->crt=&q->data[0];
2429         q->end=&q->data[size];
2430         return 0;
2431 }
2432
2433 static void send_fd_queue_destroy(struct tcp_send_fd_q *q)
2434 {
2435         if (q->data){
2436                 pkg_free(q->data);
2437                 q->data=0;
2438                 q->crt=q->end=0;
2439         }
2440 }
2441
2442
2443
2444 static int init_send_fd_queues()
2445 {
2446         if (send_fd_queue_init(&send2child_q, SEND_FD_QUEUE_SIZE)!=0)
2447                 goto error;
2448         return 0;
2449 error:
2450         LOG(L_ERR, "ERROR: init_send_fd_queues: init failed\n");
2451         return -1;
2452 }
2453
2454
2455
2456 static void destroy_send_fd_queues()
2457 {
2458         send_fd_queue_destroy(&send2child_q);
2459 }
2460
2461
2462
2463
2464 inline static int send_fd_queue_add(    struct tcp_send_fd_q* q, 
2465                                                                                 int unix_sock,
2466                                                                                 struct tcp_connection *t)
2467 {
2468         struct send_fd_info* tmp;
2469         unsigned long new_size;
2470         
2471         if (q->crt>=q->end){
2472                 new_size=q->end-&q->data[0];
2473                 if (new_size< MAX_SEND_FD_QUEUE_SIZE/2){
2474                         new_size*=2;
2475                 }else new_size=MAX_SEND_FD_QUEUE_SIZE;
2476                 if (unlikely(q->crt>=&q->data[new_size])){
2477                         LOG(L_ERR, "ERROR: send_fd_queue_add: queue full: %ld/%ld\n",
2478                                         (long)(q->crt-&q->data[0]-1), new_size);
2479                         goto error;
2480                 }
2481                 LOG(L_CRIT, "INFO: send_fd_queue: queue full: %ld, extending to %ld\n",
2482                                 (long)(q->end-&q->data[0]), new_size);
2483                 tmp=pkg_realloc(q->data, new_size*sizeof(struct send_fd_info));
2484                 if (unlikely(tmp==0)){
2485                         LOG(L_ERR, "ERROR: send_fd_queue_add: out of memory\n");
2486                         goto error;
2487                 }
2488                 q->crt=(q->crt-&q->data[0])+tmp;
2489                 q->data=tmp;
2490                 q->end=&q->data[new_size];
2491         }
2492         q->crt->tcp_conn=t;
2493         q->crt->unix_sock=unix_sock;
2494         q->crt->expire=get_ticks_raw()+SEND_FD_QUEUE_TIMEOUT;
2495         q->crt->retries=0;
2496         q->crt++;
2497         return 0;
2498 error:
2499         return -1;
2500 }
2501
2502
2503
2504 inline static void send_fd_queue_run(struct tcp_send_fd_q* q)
2505 {
2506         struct send_fd_info* p;
2507         struct send_fd_info* t;
2508         
2509         for (p=t=&q->data[0]; p<q->crt; p++){
2510                 if (unlikely(send_fd(p->unix_sock, &(p->tcp_conn),
2511                                         sizeof(struct tcp_connection*), p->tcp_conn->s)<=0)){
2512                         if ( ((errno==EAGAIN)||(errno==EWOULDBLOCK)) && 
2513                                                         ((s_ticks_t)(p->expire-get_ticks_raw())>0)){
2514                                 /* leave in queue for a future try */
2515                                 *t=*p;
2516                                 t->retries++;
2517                                 t++;
2518                         }else{
2519                                 LOG(L_ERR, "ERROR: run_send_fd_queue: send_fd failed"
2520                                                    " on socket %d , queue entry %ld, retries %d,"
2521                                                    " connection %p, tcp socket %d, errno=%d (%s) \n",
2522                                                    p->unix_sock, (long)(p-&q->data[0]), p->retries,
2523                                                    p->tcp_conn, p->tcp_conn->s, errno,
2524                                                    strerror(errno));
2525 #ifdef TCP_ASYNC
2526                                 if (p->tcp_conn->flags & F_CONN_WRITE_W){
2527                                         io_watch_del(&io_h, p->tcp_conn->s, -1, IO_FD_CLOSING);
2528                                         p->tcp_conn->flags &=~F_CONN_WRITE_W;
2529                                 }
2530 #endif
2531                                 p->tcp_conn->flags &= ~F_CONN_READER;
2532                                 if (likely(tcpconn_try_unhash(p->tcp_conn)))
2533                                         tcpconn_put(p->tcp_conn);
2534                                 tcpconn_put_destroy(p->tcp_conn); /* dec refcnt & destroy */
2535                         }
2536                 }
2537         }
2538         q->crt=t;
2539 }
2540 #else
2541 #define send_fd_queue_run(q)
2542 #endif
2543
2544
2545 /* non blocking write() on a tcpconnection, unsafe version (should be called
2546  * while holding  c->write_lock). The fd should be non-blocking.
2547  *  returns number of bytes written on success, -1 on error (and sets errno)
2548  */
2549 inline static int _tcpconn_write_nb(int fd, struct tcp_connection* c,
2550                                                                         char* buf, int len)
2551 {
2552         int n;
2553         
2554 again:
2555 #ifdef USE_TLS
2556         if (unlikely(c->type==PROTO_TLS))
2557                 /* FIXME: tls_nonblocking_write !! */
2558                 n=tls_blocking_write(c, fd, buf, len);
2559         else
2560 #endif /* USE_TLS */
2561                 n=send(fd, buf, len,
2562 #ifdef HAVE_MSG_NOSIGNAL
2563                                         MSG_NOSIGNAL
2564 #else
2565                                         0
2566 #endif /* HAVE_MSG_NOSIGNAL */
2567                           );
2568         if (unlikely(n<0)){
2569                 if (errno==EINTR) goto again;
2570         }
2571         return n;
2572 }
2573
2574
2575
2576 /* handles io from a tcp child process
2577  * params: tcp_c - pointer in the tcp_children array, to the entry for
2578  *                 which an io event was detected 
2579  *         fd_i  - fd index in the fd_array (usefull for optimizing
2580  *                 io_watch_deletes)
2581  * returns:  handle_* return convention: -1 on error, 0 on EAGAIN (no more
2582  *           io events queued), >0 on success. success/error refer only to
2583  *           the reads from the fd.
2584  */
2585 inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
2586 {
2587         struct tcp_connection* tcpconn;
2588         long response[2];
2589         int cmd;
2590         int bytes;
2591         int n;
2592         ticks_t t;
2593         ticks_t crt_timeout;
2594         ticks_t con_lifetime;
2595         
2596         if (unlikely(tcp_c->unix_sock<=0)){
2597                 /* (we can't have a fd==0, 0 is never closed )*/
2598                 LOG(L_CRIT, "BUG: handle_tcp_child: fd %d for %d "
2599                                 "(pid %d, ser no %d)\n", tcp_c->unix_sock,
2600                                 (int)(tcp_c-&tcp_children[0]), tcp_c->pid, tcp_c->proc_no);
2601                 goto error;
2602         }
2603         /* read until sizeof(response)
2604          * (this is a SOCK_STREAM so read is not atomic) */
2605         bytes=recv_all(tcp_c->unix_sock, response, sizeof(response), MSG_DONTWAIT);
2606         if (unlikely(bytes<(int)sizeof(response))){
2607                 if (bytes==0){
2608                         /* EOF -> bad, child has died */
2609                         DBG("DBG: handle_tcp_child: dead tcp child %d (pid %d, no %d)"
2610                                         " (shutting down?)\n", (int)(tcp_c-&tcp_children[0]), 
2611                                         tcp_c->pid, tcp_c->proc_no );
2612                         /* don't listen on it any more */
2613                         io_watch_del(&io_h, tcp_c->unix_sock, fd_i, 0); 
2614                         goto error; /* eof. so no more io here, it's ok to return error */
2615                 }else if (bytes<0){
2616                         /* EAGAIN is ok if we try to empty the buffer
2617                          * e.g.: SIGIO_RT overflow mode or EPOLL ET */
2618                         if ((errno!=EAGAIN) && (errno!=EWOULDBLOCK)){
2619                                 LOG(L_CRIT, "ERROR: handle_tcp_child: read from tcp child %ld "
2620                                                 " (pid %d, no %d) %s [%d]\n",
2621                                                 (long)(tcp_c-&tcp_children[0]), tcp_c->pid,
2622                                                 tcp_c->proc_no, strerror(errno), errno );
2623                         }else{
2624                                 bytes=0;
2625                         }
2626                         /* try to ignore ? */
2627                         goto end;
2628                 }else{
2629                         /* should never happen */
2630                         LOG(L_CRIT, "BUG: handle_tcp_child: too few bytes received (%d)\n",
2631                                         bytes );
2632                         bytes=0; /* something was read so there is no error; otoh if
2633                                           receive_fd returned less then requested => the receive
2634                                           buffer is empty => no more io queued on this fd */
2635                         goto end;
2636                 }
2637         }
2638         
2639         DBG("handle_tcp_child: reader response= %lx, %ld from %d \n",
2640                                         response[0], response[1], (int)(tcp_c-&tcp_children[0]));
2641         cmd=response[1];
2642         tcpconn=(struct tcp_connection*)response[0];
2643         if (unlikely(tcpconn==0)){
2644                 /* should never happen */
2645                 LOG(L_CRIT, "BUG: handle_tcp_child: null tcpconn pointer received"
2646                                  " from tcp child %d (pid %d): %lx, %lx\n",
2647                                         (int)(tcp_c-&tcp_children[0]), tcp_c->pid,
2648                                         response[0], response[1]) ;
2649                 goto end;
2650         }
2651         switch(cmd){
2652                 case CONN_RELEASE:
2653                         tcp_c->busy--;
2654                         if (unlikely(tcpconn_put(tcpconn))){
2655                                 tcpconn_destroy(tcpconn);
2656                                 break;
2657                         }
2658                         if (unlikely(tcpconn->state==S_CONN_BAD)){ 
2659 #ifdef TCP_ASYNC
2660                                 if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
2661                                         io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
2662                                         tcpconn->flags &= ~F_CONN_WRITE_W;
2663                                 }
2664 #endif /* TCP_ASYNC */
2665                                 if (tcpconn_try_unhash(tcpconn))
2666                                         tcpconn_put_destroy(tcpconn);
2667                                 break;
2668                         }
2669                         /* update the timeout*/
2670                         t=get_ticks_raw();
2671                         con_lifetime=cfg_get(tcp, tcp_cfg, con_lifetime);
2672                         tcpconn->timeout=t+con_lifetime;
2673                         crt_timeout=con_lifetime;
2674 #ifdef TCP_ASYNC
2675                         if (unlikely(cfg_get(tcp, tcp_cfg, async) && 
2676                                                         _wbufq_non_empty(tcpconn) )){
2677                                 if (unlikely(TICKS_GE(t, tcpconn->wbuf_q.wr_timeout))){
2678                                         DBG("handle_tcp_child: wr. timeout on CONN_RELEASE for %p "
2679                                                         "refcnt= %d\n", tcpconn,
2680                                                         atomic_get(&tcpconn->refcnt));
2681                                         /* timeout */
2682 #ifdef USE_DST_BLACKLIST
2683                                         if (cfg_get(core, core_cfg, use_dst_blacklist))
2684                                                 dst_blacklist_su((tcpconn->state==S_CONN_CONNECT)?
2685                                                                                                         BLST_ERR_CONNECT:
2686                                                                                                         BLST_ERR_SEND,
2687                                                                                                         tcpconn->rcv.proto,
2688                                                                                                         &tcpconn->rcv.src_su, 0);
2689 #endif /* USE_DST_BLACKLIST */
2690                                         if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
2691                                                 io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
2692                                                 tcpconn->flags&=~F_CONN_WRITE_W;
2693                                         }
2694                                         if (tcpconn_try_unhash(tcpconn))
2695                                                 tcpconn_put_destroy(tcpconn);
2696                                         break;
2697                                 }else{
2698                                         crt_timeout=MIN_unsigned(con_lifetime,
2699                                                                                         tcpconn->wbuf_q.wr_timeout-t);
2700                                 }
2701                         }
2702 #endif /* TCP_ASYNC */
2703                         /* re-activate the timer */
2704                         tcpconn->timer.f=tcpconn_main_timeout;
2705                         local_timer_reinit(&tcpconn->timer);
2706                         local_timer_add(&tcp_main_ltimer, &tcpconn->timer, crt_timeout, t);
2707                         /* must be after the de-ref*/
2708                         tcpconn->flags|=(F_CONN_MAIN_TIMER|F_CONN_READ_W|F_CONN_WANTS_RD);
2709                         tcpconn->flags&=~(F_CONN_READER|F_CONN_OOB_DATA);
2710 #ifdef TCP_ASYNC
2711                         if (unlikely(tcpconn->flags & F_CONN_WRITE_W))
2712                                 n=io_watch_chg(&io_h, tcpconn->s, POLLIN| POLLOUT, -1);
2713                         else
2714 #endif /* TCP_ASYNC */
2715                                 n=io_watch_add(&io_h, tcpconn->s, POLLIN, F_TCPCONN, tcpconn);
2716                         if (unlikely(n<0)){
2717                                 LOG(L_CRIT, "ERROR: tcp_main: handle_tcp_child: failed to add"
2718                                                 " new socket to the fd list\n");
2719                                 tcpconn->flags&=~F_CONN_READ_W;
2720 #ifdef TCP_ASYNC
2721                                 if (unlikely(tcpconn->flags & F_CONN_WRITE_W)){
2722                                         io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
2723                                         tcpconn->flags&=~F_CONN_WRITE_W;
2724                                 }
2725 #endif /* TCP_ASYNC */
2726                                 if (tcpconn_try_unhash(tcpconn));
2727                                         tcpconn_put_destroy(tcpconn);
2728                                 break;
2729                         }
2730                         DBG("handle_tcp_child: CONN_RELEASE  %p refcnt= %d\n", 
2731                                                         tcpconn, atomic_get(&tcpconn->refcnt));
2732                         break;
2733                 case CONN_ERROR:
2734                 case CONN_DESTROY:
2735                 case CONN_EOF:
2736                         /* WARNING: this will auto-dec. refcnt! */
2737                                 tcp_c->busy--;
2738                                 /* main doesn't listen on it => we don't have to delete it
2739                                  if (tcpconn->s!=-1)
2740                                         io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
2741                                 */
2742 #ifdef TCP_ASYNC
2743                                 if ((tcpconn->flags & F_CONN_WRITE_W) && (tcpconn->s!=-1)){
2744                                         io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
2745                                         tcpconn->flags&=~F_CONN_WRITE_W;
2746                                 }
2747 #endif /* TCP_ASYNC */
2748                                 if (tcpconn_try_unhash(tcpconn))
2749                                         tcpconn_put(tcpconn);
2750                                 tcpconn_put_destroy(tcpconn); /* deref & delete if refcnt==0 */
2751                                 break;
2752                 default:
2753                                 LOG(L_CRIT, "BUG: handle_tcp_child:  unknown cmd %d"
2754                                                                         " from tcp reader %d\n",
2755                                                                         cmd, (int)(tcp_c-&tcp_children[0]));
2756         }
2757 end:
2758         return bytes;
2759 error:
2760         return -1;
2761 }
2762
2763
2764
2765 /* handles io from a "generic" ser process (get fd or new_fd from a tcp_send)
2766  * 
2767  * params: p     - pointer in the ser processes array (pt[]), to the entry for
2768  *                 which an io event was detected
2769  *         fd_i  - fd index in the fd_array (usefull for optimizing
2770  *                 io_watch_deletes)
2771  * returns:  handle_* return convention:
2772  *          -1 on error reading from the fd,
2773  *           0 on EAGAIN  or when no  more io events are queued 
2774  *             (receive buffer empty),
2775  *           >0 on successfull reads from the fd (the receive buffer might
2776  *             be non-empty).
2777  */
2778 inline static int handle_ser_child(struct process_table* p, int fd_i)
2779 {
2780         struct tcp_connection* tcpconn;
2781         long response[2];
2782         int cmd;
2783         int bytes;
2784         int ret;
2785         int fd;
2786         int flags;
2787         ticks_t t;
2788         ticks_t con_lifetime;
2789 #ifdef TCP_ASYNC
2790         ticks_t nxt_timeout;
2791 #endif /* TCP_ASYNC */
2792         
2793         ret=-1;
2794         if (unlikely(p->unix_sock<=0)){
2795                 /* (we can't have a fd==0, 0 is never closed )*/
2796                 LOG(L_CRIT, "BUG: handle_ser_child: fd %d for %d "
2797                                 "(pid %d)\n", p->unix_sock, (int)(p-&pt[0]), p->pid);
2798                 goto error;
2799         }
2800                         
2801         /* get all bytes and the fd (if transmitted)
2802          * (this is a SOCK_STREAM so read is not atomic) */
2803         bytes=receive_fd(p->unix_sock, response, sizeof(response), &fd,
2804                                                 MSG_DONTWAIT);
2805         if (unlikely(bytes<(int)sizeof(response))){
2806                 /* too few bytes read */
2807                 if (bytes==0){
2808                         /* EOF -> bad, child has died */
2809                         DBG("DBG: handle_ser_child: dead child %d, pid %d"
2810                                         " (shutting down?)\n", (int)(p-&pt[0]), p->pid);
2811                         /* don't listen on it any more */
2812                         io_watch_del(&io_h, p->unix_sock, fd_i, 0);
2813                         goto error; /* child dead => no further io events from it */
2814                 }else if (bytes<0){
2815                         /* EAGAIN is ok if we try to empty the buffer
2816                          * e.g: SIGIO_RT overflow mode or EPOLL ET */
2817                         if ((errno!=EAGAIN) && (errno!=EWOULDBLOCK)){
2818                                 LOG(L_CRIT, "ERROR: handle_ser_child: read from child %d  "
2819                                                 "(pid %d):  %s [%d]\n", (int)(p-&pt[0]), p->pid,
2820                                                 strerror(errno), errno);
2821                                 ret=-1;
2822                         }else{
2823                                 ret=0;
2824                         }
2825                         /* try to ignore ? */
2826                         goto end;
2827                 }else{
2828                         /* should never happen */
2829                         LOG(L_CRIT, "BUG: handle_ser_child: too few bytes received (%d)\n",
2830                                         bytes );
2831                         ret=0; /* something was read so there is no error; otoh if
2832                                           receive_fd returned less then requested => the receive
2833                                           buffer is empty => no more io queued on this fd */
2834                         goto end;
2835                 }
2836         }
2837         ret=1; /* something was received, there might be more queued */
2838         DBG("handle_ser_child: read response= %lx, %ld, fd %d from %d (%d)\n",
2839                                         response[0], response[1], fd, (int)(p-&pt[0]), p->pid);
2840         cmd=response[1];
2841         tcpconn=(struct tcp_connection*)response[0];
2842         if (unlikely(tcpconn==0)){
2843                 LOG(L_CRIT, "BUG: handle_ser_child: null tcpconn pointer received"
2844                                  " from child %d (pid %d): %lx, %lx\n",
2845                                         (int)(p-&pt[0]), p->pid, response[0], response[1]) ;
2846                 goto end;
2847         }
2848         switch(cmd){
2849                 case CONN_ERROR:
2850                         LOG(L_ERR, "handle_ser_child: ERROR: received CON_ERROR for %p"
2851                                         " (id %d), refcnt %d\n", 
2852                                         tcpconn, tcpconn->id, atomic_get(&tcpconn->refcnt));
2853 #ifdef TCP_CONNECT_WAIT
2854                         /* if the connection is pending => it might be on the way of
2855                          * reaching tcp_main (e.g. CONN_NEW_COMPLETE or 
2856                          *  CONN_NEW_PENDING_WRITE) =>  it cannot be destroyed here */
2857                         if ( !(tcpconn->flags & F_CONN_PENDING) && 
2858                                         tcpconn_try_unhash(tcpconn) )
2859                                 tcpconn_put(tcpconn);
2860 #else /* ! TCP_CONNECT_WAIT */
2861                         if ( tcpconn_try_unhash(tcpconn) )
2862                                 tcpconn_put(tcpconn);
2863 #endif /* TCP_CONNECT_WAIT */
2864                         if ( ((tcpconn->flags & (F_CONN_WRITE_W|F_CONN_READ_W)) ) &&
2865                                         (tcpconn->s!=-1)){
2866                                 io_watch_del(&io_h, tcpconn->s, -1, IO_FD_CLOSING);
2867                                 tcpconn->flags&=~(F_CONN_WRITE_W|F_CONN_READ_W);
2868                         }
2869                         tcpconn_put_destroy(tcpconn); /* dec refcnt & destroy on 0 */
2870                         break;
2871                 case CONN_GET_FD:
2872                         /* send the requested FD  */
2873                         /* WARNING: take care of setting refcnt properly to
2874                          * avoid race conditions */
2875                         if (unlikely(send_fd(p->unix_sock, &tcpconn, sizeof(tcpconn),
2876                                                                 tcpconn->s)<=0)){
2877                                 LOG(L_ERR, "ERROR: handle_ser_child: send_fd failed\n");
2878                         }
2879                         break;
2880                 case CONN_NEW:
2881                         /* update the fd in the requested tcpconn*/
2882                         /* WARNING: take care of setting refcnt properly to
2883                          * avoid race conditions */
2884                         if (unlikely(fd==-1)){
2885                                 LOG(L_CRIT, "BUG: handle_ser_child: CONN_NEW:"
2886                                                         " no fd received\n");
2887                                 tcpconn->flags|=F_CONN_FD_CLOSED;
2888                                 tcpconn_put_destroy(tcpconn);
2889                                 break;
2890                         }
2891                         (*tcp_connections_no)++;
2892                         tcpconn->s=fd;
2893                         /* add tcpconn to the list*/
2894                         tcpconn_add(tcpconn);
2895                         /* update the timeout*/
2896                         t=get_ticks_raw();
2897                         con_lifetime=cfg_get(tcp, tcp_cfg, con_lifetime);
2898                         tcpconn->timeout=t+con_lifetime;
2899                         /* activate the timer (already properly init. in tcpconn_new())
2900                          * no need for reinit */
2901          &nb