- more tcp stuff and a lot of merging w/ latest cvs
authorAndrei Pelinescu-Onciul <andrei@iptel.org>
Fri, 7 Feb 2003 17:02:15 +0000 (17:02 +0000)
committerAndrei Pelinescu-Onciul <andrei@iptel.org>
Fri, 7 Feb 2003 17:02:15 +0000 (17:02 +0000)
- content-length is appended automatically to messages that cross from udp to
 tcp
- tcp2udp and udp2tcp now work under heavy stress
 (e.g.: throttle 200, 10 ser processes on dual cpu)
- tcp performance still sucks, some things like disabling Nagle are still not in yet (for better debugging)

14 files changed:
Makefile.defs
NEWS
TODO
data_lump.c
forward.c
hash_func.c
hash_func.h
main.c
msg_translator.c
receive.c
tcp_conn.h
tcp_main.c
tcp_read.c
tcp_server.h

index 00c628d..1358d36 100644 (file)
@@ -8,7 +8,7 @@
 VERSION = 0
 PATCHLEVEL = 8
 SUBLEVEL =   11
-EXTRAVERSION = pre5-tcp1-locking
+EXTRAVERSION = pre6-tcp2
 
 RELEASE=$(VERSION).$(PATCHLEVEL).$(SUBLEVEL)$(EXTRAVERSION)
 OS = $(shell uname -s | sed -e s/SunOS/solaris/ | tr "[A-Z]" "[a-z]")
diff --git a/NEWS b/NEWS
index 8d701db..137fb63 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -38,6 +38,7 @@ New features
 - powerpc fast locking support
 - netbsd support
 - 64 bits arch. support (e.g. netbsd/sparc64).
+- tcp2udp and udp2tcp stateless forwarding (see forward_udp & forward_tcp)
 
 Changes to use of ser scripts
 =============================
diff --git a/TODO b/TODO
index af0364f..b493755 100644 (file)
--- a/TODO
+++ b/TODO
@@ -10,7 +10,7 @@ x fix 0 parameter module f. call
 x better Via parsing (handle ' ' in uri, eg: foo.bar : 1234 ; received=) and
  ipv6 addresses ([fec0:aa::01]).
 - fix format string vulnerability in log()
-- fix alignement access problems (warning on Sun)
+- fix alignment access problems (warning on Sun)
 x (different way) add request header bitmap field for the modules
 - introduce variables & function in the script language (cfg. file)
 
@@ -69,4 +69,5 @@ x freopen stdin, stdout, stderr to /dev/null
 x generic locking lib
 - convert tm to use new locking lib
 - tcp disable nagle & other socket stuff (close()?)
+- force add rport (setflag(rport)???)
 
index 520f653..5364774 100644 (file)
@@ -43,7 +43,7 @@
 #include <dmalloc.h>
 #endif
 
-/* WARNING: all lump add/insert operations excpect a pkg_malloc'ed char* 
+/* WARNING: all lump add/insert operations expect a pkg_malloc'ed char* 
  * pointer the will be DEALLOCATED when the sip_msg is destroyed! */
 
 enum lump_dir { LD_NEXT, LD_BEFORE, LD_AFTER };
index 2a9d58c..8be1307 100644 (file)
--- a/forward.c
+++ b/forward.c
@@ -312,7 +312,7 @@ int forward_request( struct sip_msg* msg, struct proxy_l * p, int proto)
                goto error1;
        }
         /* send it! */
-       DBG("Sending:\n%s.\n", buf);
+       DBG("Sending:\n%.*s.\n", (int)len, buf);
        DBG("orig. len=%d, new_len=%d, proto=%d\n", msg->len, len, proto );
        
        
@@ -510,8 +510,9 @@ int forward_reply(struct sip_msg* msg)
                STATS_TX_RESPONSE(  (msg->first_line.u.reply.statuscode/100) );
 #endif
 
-       DBG(" reply forwarded to %s:%d\n", msg->via2->host.s,
-               (unsigned short) msg->via2->port);
+       DBG(" reply forwarded to %.*s:%d\n", 
+                       msg->via2->host.len, msg->via2->host.s,
+                       (unsigned short) msg->via2->port);
 
        pkg_free(new_buf);
        free(to);
index 1d91ab3..cdd7d8f 100644 (file)
@@ -154,25 +154,11 @@ void hashtest_cycle( int hits[TABLE_ENTRIES+5], char *ip )
                                }
 }
 
-int init_hash()
-{
-       if (TABLE_ENTRIES != (1<<10)) {
-               LOG(L_WARN, "WARNING: hash function optimized for %d entries\n",
-                       1<<10);
-               LOG(L_WARN, "WARNING: use of %d entries may lead "
-                       "to unflat distribution\n", TABLE_ENTRIES );
-       } else {
-               DBG("DEBUG: hash function initialized with optimum table size\n");
-       }
-       return 1;
-}
-
 void hashtest()
 {
        int hits[TABLE_ENTRIES+5];
        int i;
 
-       init_hash();    
        memset( hits, 0, sizeof hits );
        hashtest_cycle( hits, "192.168.99.100" );
        hashtest_cycle( hits, "172.168.99.100" );
index 2cd16d9..002c75a 100644 (file)
@@ -39,7 +39,6 @@
 int new_hash( str  call_id, str cseq_nr );
 int new_hash2( str  call_id, str cseq_nr );
 
-int init_hash();
 
 #define hash( cid, cseq) new_hash2( cid, cseq )
 
diff --git a/main.c b/main.c
index 75158c0..95eeb90 100644 (file)
--- a/main.c
+++ b/main.c
@@ -880,8 +880,10 @@ static void sig_usr(int signo)
        }else{
                /* process the important signals */
                switch(signo){
-                       case SIGINT:
                        case SIGPIPE:
+                                       LOG(L_INFO, "INFO: signal %d received\n", signo);
+                               break;
+                       case SIGINT:
                        case SIGTERM:
                                        LOG(L_INFO, "INFO: signal %d received\n", signo);
                                        /* print memory stats for non-main too */
@@ -1280,11 +1282,6 @@ try_again:
        DBG("test random number %u\n", rand());
        
        
-       /* init hash fucntion */
-       if (init_hash()<0) {
-               LOG(L_ERR, "ERROR: init_hash failed\n");
-               goto error;
-       }
 
        /*init mallocs (before parsing cfg !)*/
        if (init_mallocs()==-1)
index addfe3b..0ab2955 100644 (file)
@@ -342,6 +342,44 @@ char* id_builder(struct sip_msg* msg, unsigned int *id_len)
 
 
 
+char* clen_builder(struct sip_msg* msg, unsigned int *clen_len)
+{
+       char* buf;
+       int len;
+       int value;
+       char* value_s;
+       int value_len;
+       char* body;
+       
+       
+       body=get_body(msg);
+       if (body==0){
+               ser_error=E_BAD_REQ;
+               LOG(L_ERR, "ERROR: clen_builder: no message body found"
+                                       " (missing crlf?)");
+               return 0;
+       }
+       value=msg->len-(int)(body-msg->buf);
+       value_s=int2str(value, &value_len);
+       DBG("clen_builder: content-length: %d (%s)\n", value, value_s);
+               
+       len=CONTENT_LENGTH_LEN+value_len+CRLF_LEN;
+       buf=pkg_malloc(sizeof(char)*(len+1));
+       if (buf==0){
+               ser_error=E_OUT_OF_MEM;
+               LOG(L_ERR, "ERROR: clen_builder: out of memory\n");
+               return 0;
+       }
+       memcpy(buf, CONTENT_LENGTH, CONTENT_LENGTH_LEN);
+       memcpy(buf+CONTENT_LENGTH_LEN, value_s, value_len);
+       memcpy(buf+CONTENT_LENGTH_LEN+value_len, CRLF, CRLF_LEN);
+       buf[len+1]=0; /* null terminate it */
+       *clen_len=len;
+       return buf;
+}
+
+
+
 /* computes the "unpacked" len of a lump list,
    code moved from build_req_from_req */
 static inline int lumps_len(struct lump* l)
@@ -548,10 +586,14 @@ char * build_req_buf_from_sip_req( struct sip_msg* msg,
 #ifdef USE_TCP
        char* id_buf;
        int id_len;
+       char* clen_buf;
+       int clen_len;
        
        
        id_buf=0;
        id_len=0;
+       clen_buf=0;
+       clen_len=0;
 #endif
        extra_params.len=0;
        extra_params.s=0;
@@ -580,6 +622,24 @@ char * build_req_buf_from_sip_req( struct sip_msg* msg,
                extra_params.s=id_buf;
                extra_params.len=id_len;
        }
+       /* if sending proto == tcp, check if Content-Length needs to be added*/
+       if (proto==PROTO_TCP){
+               /* first of all parse content-length */
+               if (parse_headers(msg, HDR_CONTENTLENGTH, 0)==-1){
+                       LOG(L_ERR, "build_req_buf_from_sip_req:"
+                                                       " error parsing content-length\n");
+                       goto skip_clen;
+               }
+               if (msg->content_length==0){
+                       /* we need to add it */
+                       if ((clen_buf=clen_builder(msg, &clen_len))==0){
+                               LOG(L_ERR, "build_req_buf_from_sip_req:" 
+                                                               " clen_builder failed\n");
+                               goto skip_clen;
+                       }
+               }
+       }
+skip_clen:
 #endif
        branch.s=msg->add_to_branch_s;
        branch.len=msg->add_to_branch_len;
@@ -652,6 +712,19 @@ char * build_req_buf_from_sip_req( struct sip_msg* msg,
                if (insert_new_lump_after(anchor, rport_buf, rport_len, HDR_VIA)==0)
                        goto error03; /* free rport_buf*/
        }
+#ifdef USE_TCP
+       /* if clen needs to be added, add it */
+       if (clen_len){
+               /* msg->unparsed should point just before the final crlf,
+                * parse_headers is called from clen_builder */
+               anchor=anchor_lump(&(msg->add_rm), msg->unparsed-buf, 0,
+                                                        HDR_CONTENTLENGTH);
+               if (anchor==0) goto error04; /* free clen_buf*/
+               if (insert_new_lump_after(anchor, clen_buf, clen_len,
+                                       HDR_CONTENTLENGTH)==0)
+                       goto error04; /* free clen_buf*/
+       }
+#endif
 
        /* compute new msg len and fix overlapping zones*/
        new_len=len+lumps_len(msg->add_rm);
@@ -716,12 +789,17 @@ error02:
        if (received_buf) pkg_free(received_buf);
 error03:
        if (rport_buf) pkg_free(rport_buf);
+#ifdef USE_TCP
+error04:
+       if (clen_buf) pkg_free(clen_buf);
+#endif
 error00:
        *returned_len=0;
        return 0;
 }
 
 
+
 char * build_res_buf_from_sip_res( struct sip_msg* msg,
                                unsigned int *returned_len)
 {
@@ -733,7 +811,14 @@ char * build_res_buf_from_sip_res( struct sip_msg* msg,
 #endif
        char* buf;
        unsigned int len;
-
+#ifdef USE_TCP
+       struct lump* anchor;
+       char* clen_buf;
+       int clen_len;
+       
+       clen_buf=0;
+       clen_len=0;
+#endif
 #ifdef SCRATCH
        orig=msg->orig;
 #endif
@@ -763,12 +848,53 @@ char * build_res_buf_from_sip_res( struct sip_msg* msg,
                via_offset=msg->h_via1->name.s-buf;
        }
 #endif
+
+#ifdef USE_TCP
+
+       /* if sending proto == tcp, check if Content-Length needs to be added*/
+       if (msg->via2 && (msg->via2->proto==PROTO_TCP)){
+               DBG("build_res_from_sip_res: checking content-length for \n%.*s\n",
+                               (int)msg->len, msg->buf);
+               /* first of all parse content-length */
+               if (parse_headers(msg, HDR_CONTENTLENGTH, 0)==-1){
+                       LOG(L_ERR, "build_res_buf_from_sip_res:"
+                                                       " error parsing content-length\n");
+                       goto skip_clen;
+               }
+               if (msg->content_length==0){
+                       DBG("build_res_from_sip_res: no content_length hdr found\n");
+                       /* we need to add it */
+                       if ((clen_buf=clen_builder(msg, &clen_len))==0){
+                               LOG(L_ERR, "build_res_buf_from_sip_res:" 
+                                                               " clen_builder failed\n");
+                               goto skip_clen;
+                       }
+               }
+       }
+skip_clen:
+#endif
+       
        /* remove the first via*/
        if (del_lump( &(msg->repl_add_rm), via_offset, via_len, HDR_VIA)==0){
                LOG(L_ERR, "build_res_buf_from_sip_res: error trying to remove first"
                                        "via\n");
                goto error;
        }
+#ifdef USE_TCP
+       /* if clen needs to be added, add it */
+       if (clen_len){
+               /* msg->unparsed should point just before the final crlf,
+                * parse_headers is called from clen_builder */
+               anchor=anchor_lump(&(msg->repl_add_rm), msg->unparsed-buf, 0, 
+                                                       HDR_CONTENTLENGTH);
+               DBG("build_res_from_sip_res: adding content-length: %.*s\n",
+                               clen_len, clen_buf);
+               if (anchor==0) goto error_clen; /* free clen_buf*/
+               if (insert_new_lump_after(anchor, clen_buf, clen_len,
+                                       HDR_CONTENTLENGTH)==0)
+                       goto error_clen; /* free clen_buf*/
+       }
+#endif
        new_len=len+lumps_len(msg->repl_add_rm);
 
        DBG(" old size: %d, new size: %d\n", len, new_len);
@@ -796,12 +922,15 @@ char * build_res_buf_from_sip_res( struct sip_msg* msg,
 #endif
                len-s_offset);
         /* send it! */
-       DBG(" copied size: orig:%d, new: %d, rest: %d\n",
-                       s_offset, offset,
-                       len-s_offset );
+       DBG("build_res_from_sip_res: copied size: orig:%d, new: %d, rest: %d"
+                       " msg=\n%s\n", s_offset, offset, len-s_offset, new_buf);
 
        *returned_len=new_len;
        return new_buf;
+#ifdef USE_TCP
+error_clen:
+       if (clen_buf) pkg_free(clen_buf);
+#endif
 error:
        *returned_len=0;
        return 0;
index dc0c142..ac2a4a3 100644 (file)
--- a/receive.c
+++ b/receive.c
@@ -28,6 +28,7 @@
  * ---------
  * 2003-01-29 transport-independent message zero-termination in
  *            receive_msg (jiri)
+ * 2003-02-07 undoed jiri's zero term. changes (they break tcp) (andrei)
  */
 
 
@@ -81,7 +82,6 @@ int receive_msg(char* buf, unsigned int len, struct receive_info* rcv_info)
        /* zero termination (termination of orig message bellow not that
           useful as most of the work is done with scrath-pad; -jiri  */
        /* buf[len]=0; */ /* WARNING: zero term removed! */
-       buf[len]=0; /* transport-independent zero-termination */
        msg->rcv=*rcv_info;
        msg->id=msg_no;
 #ifdef SCRATCH
index a334f0c..b9d8589 100644 (file)
 #define _tcp_conn_h
 
 #include "ip_addr.h"
+#include "locking.h"
 
 
 #define TCP_BUF_SIZE 65535
 #define TCP_CON_TIMEOUT 60 /* in  seconds */
+#define TCP_CON_SEND_TIMEOUT 30 /* timeout after a send */
 #define TCP_CHILD_TIMEOUT 5 /* after 5 seconds, the child "returns" 
                                                         the connection to the tcp master process */
 #define TCP_MAIN_SELECT_TIMEOUT 5 /* how often "tcp main" checks for timeout*/
@@ -82,6 +84,7 @@ struct tcp_req{
 struct tcp_connection{
        int s; /*socket, used by "tcp main" */
        int fd; /* used only by "children", don't modify it! private data! */
+       lock_t write_lock;
        int id; /* id (unique!) used to retrieve a specific connection when
                   reply-ing*/
        struct receive_info rcv; /* src & dst ip, ports, proto a.s.o*/
@@ -92,9 +95,10 @@ struct tcp_connection{
        union sockaddr_union su;
 #endif
        struct tcp_req req; /* request data */
-       int refcnt;
+       volatile int refcnt;
+       int bad; /* if set this is a "bad" connection */
        int timeout; /* connection timeout, after this it will be removed*/
-       unsigned addr_hash; /* hash indexes in thge 2 tables */
+       unsigned addr_hash; /* hash indexes in the 2 tables */
        unsigned id_hash;
        struct tcp_connection* next; /* next, prev in hash table, used by "main" */
        struct tcp_connection* prev;
index 282598c..4ca1bd4 100644 (file)
@@ -38,6 +38,7 @@
 #include <sys/time.h>
 #include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/uio.h>  /* writev*/
 
 #include <unistd.h>
 
@@ -102,9 +103,15 @@ struct tcp_connection* tcpconn_new(int sock, union sockaddr_union* su,
        }
        c->s=sock;
        c->fd=-1; /* not initialized */
+       if (lock_init(&c->write_lock)==0){
+               LOG(L_ERR, "ERROR: tcpconn_add: init lock failed\n");
+               goto error;
+       }
+       
        c->rcv.src_su=*su;
        
        c->refcnt=0;
+       c->bad=0;
        su2ip_addr(&c->rcv.src_ip, su);
        c->rcv.src_port=su_getport(su);
        c->rcv.proto=PROTO_TCP;
@@ -171,6 +178,16 @@ struct tcp_connection*  tcpconn_add(struct tcp_connection *c)
 }
 
 
+/* unsafe tcpconn_rm version (nolocks) */
+void _tcpconn_rm(struct tcp_connection* c)
+{
+       tcpconn_listrm(tcpconn_addr_hash[c->addr_hash], c, next, prev);
+       tcpconn_listrm(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
+       lock_destroy(&c->write_lock);
+       shm_free(c);
+}
+
+
 
 void tcpconn_rm(struct tcp_connection* c)
 {
@@ -178,6 +195,7 @@ void tcpconn_rm(struct tcp_connection* c)
        tcpconn_listrm(tcpconn_addr_hash[c->addr_hash], c, next, prev);
        tcpconn_listrm(tcpconn_id_hash[c->id_hash], c, id_next, id_prev);
        TCPCONN_UNLOCK;
+       lock_destroy(&c->write_lock);
        shm_free(c);
 }
 
@@ -198,7 +216,7 @@ struct tcp_connection* _tcpconn_find(int id, struct ip_addr* ip, int port)
                        DBG("c=%p, c->id=%d, ip=",c, c->id);
                        print_ip(&c->rcv.src_ip);
                        DBG(" port=%d\n", ntohs(c->rcv.src_port));
-                       if (id==c->id) return c;
+                       if ((id==c->id)&&(!c->bad)) return c;
                }
        }else if (ip){
                hash=tcp_addr_hash(ip, port);
@@ -206,7 +224,8 @@ struct tcp_connection* _tcpconn_find(int id, struct ip_addr* ip, int port)
                        DBG("c=%p, c->id=%d, ip=",c, c->id);
                        print_ip(&c->rcv.src_ip);
                        DBG(" port=%d\n", ntohs(c->rcv.src_port));
-                       if ( (port==c->rcv.src_port) && (ip_addr_cmp(ip, &c->rcv.src_ip)) )
+                       if ( (!c->bad) && (port==c->rcv.src_port) &&
+                                       (ip_addr_cmp(ip, &c->rcv.src_ip)) )
                                return c;
                }
        }
@@ -215,13 +234,17 @@ struct tcp_connection* _tcpconn_find(int id, struct ip_addr* ip, int port)
 
 
 
-/* _tcpconn_find with locks */
-struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port)
+/* _tcpconn_find with locks and timeout */
+struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port,
+                                                                       int timeout)
 {
        struct tcp_connection* c;
        TCPCONN_LOCK;
        c=_tcpconn_find(id, ip, port);
-       if (c) c->refcnt++;
+       if (c){ 
+                       c->refcnt++;
+                       c->timeout=get_ticks()+timeout;
+       }
        TCPCONN_UNLOCK;
        return c;
 }
@@ -249,9 +272,9 @@ int tcp_send(char* buf, unsigned len, union sockaddr_union* to, int id)
        if (to){
                su2ip_addr(&ip, to);
                port=su_getport(to);
-               c=tcpconn_get(id, &ip, port); /* lock ;inc refcnt; unlock */
+               c=tcpconn_get(id, &ip, port, TCP_CON_SEND_TIMEOUT); 
        }else if (id){
-               c=tcpconn_get(id, 0, 0);
+               c=tcpconn_get(id, 0, 0, TCP_CON_SEND_TIMEOUT);
        }else{
                LOG(L_CRIT, "BUG: tcp_send called with null id & to\n");
                return -1;
@@ -260,7 +283,8 @@ int tcp_send(char* buf, unsigned len, union sockaddr_union* to, int id)
        if (id){
                if (c==0) {
                        if (to){
-                               c=tcpconn_get(0, &ip, port); /* try again w/o id */
+                               /* try again w/o id */
+                               c=tcpconn_get(0, &ip, port, TCP_CON_SEND_TIMEOUT);
                                goto no_id;
                        }else{
                                LOG(L_ERR, "ERROR: tcp_send: id %d not found, dropping\n",
@@ -323,8 +347,27 @@ get_fd:
        
 send_it:
        DBG("tcp_send: sending...\n");
-       n=write(fd, buf, len);
+       lock_get(&c->write_lock);
+       n=send(fd, buf, len, MSG_NOSIGNAL);
+       lock_release(&c->write_lock);
        DBG("tcp_send: after write: c= %p n=%d fd=%d\n",c, n, fd);
+       DBG("tcp_send: buf=\n%.*s\n", (int)len, buf);
+       if (n<0){
+               LOG(L_ERR, "ERROR: tcpsend: failed to send, n=%d: %s (%d)\n",
+                               n, strerror(errno), errno);
+               /* error on the connection , mark it as bad and set 0 timeout */
+               c->bad=1;
+               c->timeout=0;
+               /* tell "main" it should drop this (optional it will t/o anyway?)*/
+               response[0]=(long)c;
+               response[1]=CONN_ERROR;
+               n=write(unix_tcp_sock, response, sizeof(response));
+               if (n<0){
+                       LOG(L_ERR, "BUG: tcp_send: failed to get fd(write):%s (%d)\n",
+                                       strerror(errno), errno);
+                       goto release_c;
+               }
+       }
 end:
        close(fd);
 release_c:
@@ -343,6 +386,7 @@ void tcpconn_timeout(fd_set* set)
        
        
        ticks=get_ticks();
+       TCPCONN_LOCK; /* fixme: we can lock only on delete IMO */
        for(h=0; h<TCP_ADDR_HASH_SIZE; h++){
                c=tcpconn_addr_hash[h];
                while(c){
@@ -354,11 +398,12 @@ void tcpconn_timeout(fd_set* set)
                                        FD_CLR(c->s, set);
                                        close(c->s);
                                }
-                               tcpconn_rm(c);
+                               _tcpconn_rm(c);
                        }
                        c=next;
                }
        }
+       TCPCONN_UNLOCK;
 }
 
 
@@ -515,8 +560,12 @@ void tcp_main_loop()
                                        if(send2child(tcpconn)<0){
                                                LOG(L_ERR,"ERROR: tcp_main_loop: no children "
                                                                "available\n");
-                                               close(tcpconn->s);
-                                               tcpconn_rm(tcpconn);
+                                               TCPCONN_LOCK;
+                                               if (tcpconn->refcnt==0){
+                                                       close(tcpconn->s);
+                                                       _tcpconn_rm(tcpconn);
+                                               }else tcpconn->timeout=0; /* force expire */
+                                               TCPCONN_UNLOCK;
                                        }
                                }
                        }
@@ -536,8 +585,12 @@ void tcp_main_loop()
                                        if (send2child(tcpconn)<0){
                                                LOG(L_ERR,"ERROR: tcp_main_loop: no "
                                                                        "children available\n");
-                                               close(tcpconn->s);
-                                               tcpconn_rm(tcpconn);
+                                               TCPCONN_LOCK;
+                                               if (tcpconn->refcnt==0){
+                                                       close(tcpconn->s);
+                                                       _tcpconn_rm(tcpconn);
+                                               }else tcpconn->timeout=0; /* force expire*/
+                                               TCPCONN_UNLOCK;
                                        }
                                }
                        }
@@ -578,13 +631,14 @@ read_again:
                                                }
                                                tcpconn=(struct tcp_connection*)response[0];
                                                if (tcpconn){
-                                                       tcpconn->refcnt--;
-                                                       DBG("tcp_main_loop: %p refcnt= %d\n", 
-                                                                       tcpconn, tcpconn->refcnt);
+                                                               if (tcpconn->bad) goto tcpconn_destroy;
                                                                FD_SET(tcpconn->s, &master_set);
                                                                if (maxfd<tcpconn->s) maxfd=tcpconn->s;
                                                                /* update the timeout*/
                                                                tcpconn->timeout=get_ticks()+TCP_CON_TIMEOUT;
+                                                               tcpconn_put(tcpconn);
+                                                               DBG("tcp_main_loop: %p refcnt= %d\n", 
+                                                                       tcpconn, tcpconn->refcnt);
                                                }
                                                break;
                                        case CONN_ERROR:
@@ -597,14 +651,23 @@ read_again:
                                                }
                                                tcpconn=(struct tcp_connection*)response[0];
                                                if (tcpconn){
+                                                       if (tcpconn->s!=-1)
+                                                               FD_CLR(tcpconn->s, &master_set);
+               tcpconn_destroy:
+                                                       TCPCONN_LOCK; /*avoid races w/ tcp_send*/
                                                        tcpconn->refcnt--;
-                                                       if (tcpconn->refcnt==0){
+                                                       if (tcpconn->refcnt==0){ 
                                                                DBG("tcp_main_loop: destroying connection\n");
                                                                close(tcpconn->s);
-                                                               tcpconn_rm(tcpconn);
+                                                               _tcpconn_rm(tcpconn);
                                                        }else{
+                                                               /* force timeout */
+                                                               tcpconn->timeout=0;
+                                                               tcpconn->bad=1;
                                                                DBG("tcp_main_loop: delaying ...\n");
+                                                               
                                                        }
+                                                       TCPCONN_UNLOCK;
                                                }
                                                break;
                                        case CONN_GET_FD:
index 8d0216a..661de71 100644 (file)
@@ -46,9 +46,9 @@
 #include "globals.h"
 #include "receive.h"
 #include "timer.h"
+#include "ut.h"
 
 
-#define q_memchr memchr
 
 /* reads next available bytes
  * return number of bytes read, 0 on EOF or -1 on error,
@@ -77,6 +77,7 @@ again:
                        return -1;
                }
        }
+       DBG("tcp_read: read %d bytes:\n%.*s\n", bytes_read, bytes_read, r->pos);
        
        r->pos+=bytes_read;
        return bytes_read;
@@ -133,9 +134,13 @@ int tcp_read_headers(struct tcp_req *r, int fd)
                                                          break
 
 
-       
-       bytes=tcp_read(r, fd);
-       if (bytes<=0) return bytes;
+       /* if we still have some unparsed part, parse it first, don't do the read*/
+       if (r->parsed<r->pos){
+               bytes=0;
+       }else{
+               bytes=tcp_read(r, fd);
+               if (bytes<=0) return bytes;
+       }
        p=r->parsed;
        
        while(p<r->pos && r->error==TCP_REQ_OK){
@@ -154,7 +159,7 @@ int tcp_read_headers(struct tcp_req *r, int fd)
                        case H_SKIP:
                                /* find lf, we are in this state if we are not interested
                                 * in anything till end of line*/
-                               p=q_memchr(p, '\n', r->pos-r->parsed);
+                               p=q_memchr(p, '\n', r->pos-p);
                                if (p){
                                        p++;
                                        r->state=H_LF;
@@ -172,14 +177,18 @@ int tcp_read_headers(struct tcp_req *r, int fd)
                                        case '\n':
                                                /* found LF LF */
                                                r->state=H_BODY;
+                                               DBG("tcp_read_headers: switching to H_BODY (lflf)\n");
                                                if (r->has_content_len){
                                                        r->body=p+1;
                                                        r->bytes_to_go=r->content_len;
                                                        if (r->bytes_to_go==0){
                                                                r->complete=1;
+                                                               p++;
                                                                goto skip;
                                                        }
                                                }else{
+                                                       DBG("tcp_read_headers: ERROR: no clen, p=%X\n",
+                                                                       *p);
                                                        r->error=TCP_REQ_BAD_LEN;
                                                }
                                                break;
@@ -193,14 +202,18 @@ int tcp_read_headers(struct tcp_req *r, int fd)
                                if (*p=='\n'){
                                        /* found LF CR LF */
                                        r->state=H_BODY;
+                                       DBG("tcp_read_headers: switching to H_BODY (lfcrlf)\n");
                                        if (r->has_content_len){
                                                r->body=p+1;
                                                r->bytes_to_go=r->content_len;
                                                if (r->bytes_to_go==0){
                                                        r->complete=1;
+                                                       p++;
                                                        goto skip;
                                                }
                                        }else{
+                                               DBG("tcp_read_headers: ERROR: no clen, p=%X\n",
+                                                                       *p);
                                                r->error=TCP_REQ_BAD_LEN;
                                        }
                                }else r->state=H_SKIP;
@@ -342,17 +355,21 @@ int tcp_read_req(struct tcp_connection* con)
                resp=CONN_RELEASE;
                s=con->fd;
                req=&con->req;
+               size=0;
+again:
                if(req->complete==0 && req->error==TCP_REQ_OK){
                        bytes=tcp_read_headers(req, s);
                                                /* if timeout state=0; goto end__req; */
                        DBG("read= %d bytes, parsed=%d, state=%d, error=%d\n",
-                                       bytes, req->parsed-req->buf, req->state, req->error );
+                                       bytes, req->parsed-req->start, req->state, req->error );
+                       DBG("tcp_read_req: last char=%X, parsed msg=\n%.*s\n",
+                                       *(req->parsed-1), req->parsed-req->start, req->start);
                        if (bytes==-1){
                                LOG(L_ERR, "ERROR: tcp_read_req: error reading \n");
                                resp=CONN_ERROR;
                                goto end_req;
                        }
-                       if (bytes==0){
+                       if ((size==0) && (bytes==0)){
                                DBG( "tcp_read_req: EOF\n");
                                resp=CONN_EOF;
                                goto end_req;
@@ -360,15 +377,21 @@ int tcp_read_req(struct tcp_connection* con)
                
                }
                if (req->error!=TCP_REQ_OK){
-                       LOG(L_ERR,"ERROR: tcp_read_req: bad request, state=%d, error=%d\n",
-                                       req->state, req->error);
+                       LOG(L_ERR,"ERROR: tcp_read_req: bad request, state=%d, error=%d "
+                                         "buf:\n%.*s\nparsed:\n%.*s\n", req->state, req->error,
+                                         req->pos-req->buf, req->buf,
+                                         req->parsed-req->start, req->start);
+                       DBG("- received from: port %d, ip -", ntohs(con->rcv.src_port));
+                       print_ip(&con->rcv.src_ip); DBG("-\n");
                        resp=CONN_ERROR;
                        goto end_req;
                }
                if (req->complete){
                        DBG("tcp_read_req: end of header part\n");
+                       DBG("- received from: port %d, ip - ", ntohs(con->rcv.src_port));
+                       print_ip(&con->rcv.src_ip); DBG("-\n");
                        DBG("tcp_read_req: headers:\n%.*s.\n",
-                                       req->body-req->buf, req->buf);
+                                       req->body-req->start, req->start);
                        if (req->has_content_len){
                                DBG("tcp_read_req: content-length= %d\n", req->content_len);
                                DBG("tcp_read_req: body:\n%.*s\n", req->content_len,req->body);
@@ -383,7 +406,7 @@ int tcp_read_req(struct tcp_connection* con)
                        resp=CONN_RELEASE;
                        /* just for debugging use sendipv4 as receiving socket */
                        DBG("calling receive_msg(%p, %d, )\n",
-                                       req->buf, (int)(req->parsed-req->start));
+                                       req->start, (int)(req->parsed-req->start));
                        bind_address=sendipv4; /*&tcp_info[con->sock_idx];*/
                        con->rcv.proto_reserved1=con->id; /* copy the id */
                        if (receive_msg(req->start, req->parsed-req->start, &con->rcv)<0){
@@ -404,6 +427,8 @@ int tcp_read_req(struct tcp_connection* con)
                        req->state=H_SKIP_EMPTY;
                        req->complete=req->content_len=req->has_content_len=0;
                        req->bytes_to_go=0;
+                       /* if we still have some unparsed bytes, try to  parse them too*/
+                       if (size) goto again;
                        
                }
                
@@ -496,11 +521,13 @@ void tcp_receive_loop(int unix_sock)
                                        LOG(L_ERR, "ERROR: tcp_receive_loop: read_fd:"
                                                                        "no fd read\n");
                                        resp=CONN_ERROR;
+                                       con->bad=1;
                                        release_tcpconn(con, resp, unix_sock);
                                }
                                if (con==0){
                                        LOG(L_ERR, "ERROR: tcp_receive_loop: null pointer\n");
                                        resp=CONN_ERROR;
+                                       con->bad=1;
                                        release_tcpconn(con, resp, unix_sock);
                                }
                                con->timeout=get_ticks()+TCP_CHILD_TIMEOUT;
@@ -524,6 +551,7 @@ void tcp_receive_loop(int unix_sock)
                                        if (resp<0){
                                                FD_CLR(con->fd, &master_set);
                                                tcpconn_listrm(list, con, c_next, c_prev);
+                                               con->bad=1;
                                                release_tcpconn(con, resp, unix_sock);
                                        }else{
                                                /* update timeout */
index 6c089b5..34e2e3c 100644 (file)
@@ -34,7 +34,8 @@
 
 /* "public" functions*/
 
-struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port);
+struct tcp_connection* tcpconn_get(int id, struct ip_addr* ip, int port, 
+                                                                       int timeout);
 void tcpconn_put(struct tcp_connection* c);
 int tcp_send(char* buf, unsigned len, union sockaddr_union* to, int id);