- even more tcp receive bs
authorAndrei Pelinescu-Onciul <andrei@iptel.org>
Tue, 10 Dec 2002 19:41:44 +0000 (19:41 +0000)
committerAndrei Pelinescu-Onciul <andrei@iptel.org>
Tue, 10 Dec 2002 19:41:44 +0000 (19:41 +0000)
- sip msg.buff & orig are no longer 0 terminated (to avoid an extra copy in the  tcp case)

TODO
forward.c
main.c
parser/msg_parser.c
pt.h
receive.c
tcp_conn.h
tcp_main.c
tcp_read.c
test/stateless.cfg

diff --git a/TODO b/TODO
index 0709327..c881e17 100644 (file)
--- a/TODO
+++ b/TODO
@@ -16,6 +16,8 @@ x (different way) add request header bitmap field for the modules
 
 
 High priority:
+- parse_uri should not copy anymore the uri members (and it should not 0
+ terminate them anylonger).
 x fix/replace T_REF/T_UNREF
 x review all the tm locking
 x if () {} else {}
@@ -60,5 +62,5 @@ x jku: branch hash computation over canonical values
 - jku: try CRC as opposed to MD5
 
 
-- freopen stdin, stdou to /dev/null
+x freopen stdin, stdout, stderr to /dev/null
 
index 8981d4f..689a86d 100644 (file)
--- a/forward.c
+++ b/forward.c
@@ -224,9 +224,10 @@ int forward_request( struct sip_msg* msg, struct proxy_l * p)
        }
        /* sent requests stats */
        else STATS_TX_REQUEST(  msg->first_line.u.request.method_value );
+       
        pkg_free(buf);
        free(to);
-       /* received_buf & line_buf will be freed in receiv_msg by free_lump_list*/
+       /* received_buf & line_buf will be freed in receive_msg by free_lump_list*/
        return 0;
 
 error1:
diff --git a/main.c b/main.c
index 2bd4a69..a3a74cd 100644 (file)
--- a/main.c
+++ b/main.c
@@ -633,6 +633,8 @@ int main_loop()
                         * so we open all first*/
                }
 #ifdef USE_TCP
+                       /* start tcp receivers */
+               if (tcp_init_children()<0) goto error;
                        /* start tcp master proc */
                process_no++;
                if ((pid=fork())<0){
@@ -641,7 +643,7 @@ int main_loop()
                }else if (pid==0){
                        /* child */
                        /* is_main=0; */
-                       tcp_main();
+                       tcp_main_loop();
                }else{
                        pt[process_no].pid=pid;
                        strncpy(pt[process_no].desc, "tcp main process", MAX_PT_DESC );
@@ -686,7 +688,11 @@ int main_loop()
                goto error;
        }
 
-       if (timer_list){
+#ifndef USE_TCP
+       /* if we are using tcp we always need the timer */
+       if (timer_list)
+#endif
+       {
                /* fork again for the attendant process*/
                process_no++;
                if ((pid=fork())<0){
index 5cc45df..9e8d015 100644 (file)
@@ -485,7 +485,7 @@ int parse_msg(char* buf, unsigned int len, struct sip_msg* msg)
        
 error:
        /* more debugging, msg->orig is/should be null terminated*/
-       LOG(L_ERR, "ERROR: parse_msg: message=<%s>\n", msg->orig);
+       LOG(L_ERR, "ERROR: parse_msg: message=<%.*s>\n", (int)msg->len, msg->orig);
        return -1;
 }
 
diff --git a/pt.h b/pt.h
index e76fb3d..d56ebb3 100644 (file)
--- a/pt.h
+++ b/pt.h
@@ -60,7 +60,14 @@ inline static int process_count()
                /* timer process */
                + (timer_list ? 1 : 0 )
                /* fifo server */
-               +((fifo==NULL || strlen(fifo)==0) ? 0 : 1 );
+               +((fifo==NULL || strlen(fifo)==0) ? 0 : 1 )
+#ifdef USE_TCP
+               + 1/* tcp main */ + tcp_children_no + 
+               (timer_list ? 0: 1) /* add the timer proc. if not already taken
+                                                          into account */
+#endif
+               
+               ;
 }
 
 
index 9cf3a0f..01ed8fa 100644 (file)
--- a/receive.c
+++ b/receive.c
@@ -75,7 +75,7 @@ int receive_msg(char* buf, unsigned int len, union sockaddr_union* src_su)
        msg->len=len;
        /* zero termination (termination of orig message bellow not that
           useful as most of the work is done with scrath-pad; -jiri  */
-       buf[len]=0;
+       /* buf[len]=0; */ /* WARNING: zero term removed! */
        su2ip_addr(&msg->src_ip, src_su);
        msg->dst_ip=bind_address->address; /* won't work if listening on 0.0.0.0 */
        msg->id=msg_no;
@@ -86,7 +86,8 @@ int receive_msg(char* buf, unsigned int len, union sockaddr_union* src_su)
                goto error01;
        }
        memcpy(msg->orig, buf, len);
-       msg->orig[len]=0; /* null terminate it,good for using str* functions
+       /* WARNING: zero term removed! */
+       /* msg->orig[len]=0; */ /* null terminate it,good for using str* functions
                                                 on it*/
        
        if (parse_msg(buf,len, msg)!=0){
index fabe4b8..73ec5b4 100644 (file)
@@ -30,6 +30,7 @@
 #ifndef _tcp_conn_h
 #define _tcp_conn_h
 
+#include "ip_addr.h"
 
 
 #define TCP_BUF_SIZE 65535
@@ -76,8 +77,10 @@ struct tcp_connection{
        struct tcp_req req; /* request data */
        int refcnt;
        int timeout; /* connection timeout, after this it will be removed*/
-       struct tcp_connection* next;
+       struct tcp_connection* next; /* next, prev in hash table, used by "main" */
        struct tcp_connection* prev;
+       struct tcp_connection* c_next; /* child next prev (use locally) */
+       struct tcp_connection* c_prev;
 };
 
 
@@ -93,7 +96,8 @@ struct tcp_connection{
 
 
 /* add a tcpconn to a list*/
-#define tcpconn_listadd(head, c) \
+/* list head, new element, next member, prev member */
+#define tcpconn_listadd(head, c, next, prev) \
        do{ \
                /* add it at the begining of the list*/ \
                (c)->next=(head); \
@@ -104,7 +108,7 @@ struct tcp_connection{
 
 
 /* remove a tcpconn from a list*/
-#define tcpconn_listrm(head, c) \
+#define tcpconn_listrm(head, c, next, prev) \
        do{ \
                if ((head)==(c)) (head)=(c)->next; \
                if ((c)->next) (c)->next->prev=(c)->prev; \
index 8f56a44..506d1be 100644 (file)
 #include "pass_fd.h"
 #include "tcp_conn.h"
 #include "globals.h"
+#include "pt.h"
 #include "mem/mem.h"
+#include "mem/shm_mem.h"
+#include "timer.h"
 
 
 
@@ -77,7 +80,7 @@ struct tcp_child tcp_children[MAX_TCP_CHILDREN];
 
 
 
-struct tcp_connection*  tcpconn_add(int sock, union sockaddr_union* su)
+struct tcp_connection*  tcpconn_add(int sock, union sockaddr_union* su, int i)
 {
        struct tcp_connection *c;
        
@@ -89,13 +92,14 @@ struct tcp_connection*  tcpconn_add(int sock, union sockaddr_union* su)
        }
        c->s=sock;
        c->su=*su;
+       c->sock_idx=i;
        c->refcnt=0;
        su2ip_addr(&c->ip, su);
        init_tcp_req(&c->req);
        c->timeout=get_ticks()+TCP_CON_TIMEOUT;
 
        /* add it at the begining of the list*/
-       tcpconn_listadd(conn_list, c);
+       tcpconn_listadd(conn_list, c, next, prev);
        return c;
        
 error:
@@ -106,7 +110,7 @@ error:
 
 void tcpconn_rm(struct tcp_connection* c)
 {
-       tcpconn_listrm(conn_list, c);
+       tcpconn_listrm(conn_list, c, next, prev);
        shm_free(c);
 }
 
@@ -115,16 +119,16 @@ void tcpconn_rm(struct tcp_connection* c)
 void tcpconn_timeout()
 {
        struct tcp_connection *c, *next;
-       int jiffies;;
+       int ticks;;
        
        
-       jiffies=get_ticks();
+       ticks=get_ticks();
        c=conn_list;
        while(c){
                next=c->next;
-               if ((c->refcnt==0) && (jiffies<c->timeout)) {
-                       DBG("tcpconn_timeout: timeout for %p (%d < %d)\n",
-                                       c, jiffies, c->timeout);
+               if ((c->refcnt==0) && (ticks>c->timeout)) {
+                       DBG("tcpconn_timeout: timeout for %p (%d > %d)\n",
+                                       c, ticks, c->timeout);
                        tcpconn_rm(c);
                }
                c=next;
@@ -179,20 +183,34 @@ error:
 static int send2child(struct tcp_connection* tcpconn)
 {
        int i;
+       int min_busy;
+       int idx;
        
+       min_busy=tcp_children[0].busy;
+       idx=0;
        for (i=0; i<tcp_children_no; i++){
                if (!tcp_children[i].busy){
-                       tcp_children[i].busy=1;
-                       tcp_children[i].n_reqs++;
-                       tcpconn->refcnt++;
-                       DBG("send2child: to child %d, %ld\n", i, (long)tcpconn);
-                       send_fd(tcp_children[i].s, &tcpconn, sizeof(tcpconn), tcpconn->s);
+                       idx=i;
+                       min_busy=0;
+                       break;
                        return 0;
+               }else if (min_busy>tcp_children[i].busy){
+                       min_busy=tcp_children[i].busy;
+                       idx=i;
                }
        }
-       if (i==tcp_children_no){
-               return -1;
+       
+       tcp_children[idx].busy++;
+       tcp_children[idx].n_reqs++;
+       tcpconn->refcnt++;
+       if (min_busy){
+               LOG(L_WARN, "WARNING: send2child:no free tcp receiver, "
+                               " connection passed to the least busy one (%d)\n",
+                               min_busy);
        }
+       DBG("send2child: to child %d, %ld\n", idx, (long)tcpconn);
+       send_fd(tcp_children[idx].s, &tcpconn, sizeof(tcpconn), tcpconn->s);
+       
        return 0; /* just to fix a warning*/
 }
 
@@ -236,8 +254,8 @@ void tcp_main_loop()
        
        while(1){
                sel_set=master_set;
-               timeout->tv_sec=TCP_MAIN_SELECT_TIMEOUT;
-               timeout->tv_usec=0;
+               timeout.tv_sec=TCP_MAIN_SELECT_TIMEOUT;
+               timeout.tv_usec=0;
                n=select(maxfd+1, &sel_set, 0 ,0 , &timeout);
                if (n<0){
                        if (errno==EINTR) continue; /* just a signal */
@@ -260,7 +278,7 @@ void tcp_main_loop()
                                }
                                
                                /* add socket to list */
-                               tcpconn=tcpconn_add(new_sock, &su);
+                               tcpconn=tcpconn_add(new_sock, &su, r);
                                DBG("tcp_main_loop: new connection: %p %d\n",
                                                tcpconn, tcpconn->s);
                                /* pass it to a child */
@@ -298,10 +316,11 @@ void tcp_main_loop()
 read_again:
                                bytes=read(tcp_children[r].s, response, sizeof(response));
                                if (bytes==0){
-                                       /* EOF -> bad, chidl has died */
+                                       /* EOF -> bad, child has died */
                                        LOG(L_CRIT, "BUG: tcp_main_loop: dead child %d\n", r);
                                        /* terminating everybody */
-                                       exit(-1);
+                                       FD_CLR(tcp_children[r].s, &master_set);
+                                       /*exit(-1)*/;
                                }else if (bytes<0){
                                        if (errno==EINTR) goto read_again;
                                        else{
@@ -351,7 +370,7 @@ read_again:
 
 
 /* starts the tcp processes */
-int tcp_main()
+int tcp_init_children()
 {
        int r;
        int sockfd[2];
@@ -373,6 +392,7 @@ int tcp_main()
                        goto error;
                }
                
+               process_no++;
                pid=fork();
                if (pid<0){
                        LOG(L_ERR, "ERROR: tcp_main: fork failed: %s\n",
@@ -385,14 +405,15 @@ int tcp_main()
                        tcp_children[r].s=sockfd[0];
                        tcp_children[r].busy=0;
                        tcp_children[r].n_reqs=0;
+                       pt[process_no].pid=pid;
+                       strncpy(pt[process_no].desc, "tcp receiver", MAX_PT_DESC);
                }else{
                        /* child */
                        close(sockfd[0]);
                        tcp_receive_loop(sockfd[1]);
                }
        }
-       
-       tcp_main_loop();
+       return 0;
 error:
        return -1;
 }
index 05ad086..d88bc27 100644 (file)
@@ -45,6 +45,7 @@
 #include "pass_fd.h"
 #include "globals.h"
 #include "receive.h"
+#include "timer.h"
 
 
 #define q_memchr memchr
@@ -59,7 +60,7 @@ int tcp_read(struct tcp_req *r, int fd)
        bytes_free=TCP_BUF_SIZE- (int)(r->pos - r->buf);
        
        if (bytes_free==0){
-               fprintf(stderr, "buffer overrun, dropping\n");
+               LOG(L_ERR, "ERROR: tcp_read: buffer overrun, dropping\n");
                r->error=TCP_REQ_OVERRUN;
                return -1;
        }
@@ -71,7 +72,7 @@ again:
                        return 0; /* nothing has been read */
                }else if (errno == EINTR) goto again;
                else{
-                       fprintf(stderr, "error reading: %s\n", strerror(errno));
+                       LOG(L_ERR, "ERROR: tcp_read: error reading: %s\n",strerror(errno));
                        r->error=TCP_READ_ERROR;
                        return -1;
                }
@@ -299,7 +300,8 @@ int tcp_read_headers(struct tcp_req *r, int fd)
                                break;
                        
                        default:
-                               fprintf(stderr, "BUG: unexpected state %d\n", r->state);
+                               LOG(L_CRIT, "BUG: tcp_read_headers: unexpected state %d\n",
+                                               r->state);
                                abort();
                }
        }
@@ -310,23 +312,113 @@ skip:
 
 
 
+int tcp_read_req(struct tcp_connection* con)
+{
+       int bytes;
+       int state;
+       long size;
+       struct tcp_req* req;
+       int s;
+               
+               state=0;
+               s=con->fd;
+               req=&con->req;
+               if(req->complete==0 && req->error==TCP_REQ_OK){
+                       bytes=tcp_read_headers(req, s);
+                                               /* if timeout state=0; goto end__req; */
+                       DBG("read= %d bytes, parsed=%d, state=%d, error=%d\n",
+                                       bytes, req->parsed-req->buf, req->state, req->error );
+                       if (bytes==-1){
+                               LOG(L_ERR, "ERROR: tcp_read_req: error reading \n");
+                               state=-1;
+                               goto end_req;
+                       }
+                       if (bytes==0){
+                               DBG( "tcp_read_req: EOF\n");
+                               state=-1;
+                               goto end_req;
+                       }
+               
+               }
+               if (req->error!=TCP_REQ_OK){
+                       LOG(L_ERR,"ERROR: tcp_read_req: bad request, state=%d, error=%d\n",
+                                       req->state, req->error);
+                       state=-1;
+                       goto end_req;
+               }
+               if (req->complete){
+                       DBG("tcp_read_req: end of header part\n");
+                       DBG("tcp_read_req: headers:\n%.*s.\n",
+                                       req->body-req->buf, req->buf);
+                       if (req->has_content_len){
+                               DBG("tcp_read_req: content-length= %d\n", req->content_len);
+                               DBG("tcp_read_req: body:\n%.*s\n", req->content_len,req->body);
+                       }else{
+                               req->error=TCP_REQ_BAD_LEN;
+                               LOG(L_ERR, "ERROR: tcp_read_req: content length not present or"
+                                               " unparsable\n");
+                               state=-1;
+                               goto end_req;
+                       }
+                       /* if we are here everything is nice and ok*/
+                       state=0;
+                       /* just for debugging use sendipv4 as receiving socket */
+                       DBG("calling receive_msg(%p, %d, )\n",
+                                       req->buf, (int)(req->parsed-req->buf));
+                       bind_address=sendipv4; /*&tcp_info[con->sock_idx];*/
+                       receive_msg(req->buf, req->parsed-req->buf, &con->su);
+                       
+                       /* prepare for next request */
+                       size=req->pos-req->body;
+                       if (size) memmove(req->buf, req->body, size);
+                       DBG("tcp_read_req: preparing for new request, kept %ld bytes\n",
+                                       size);
+                       req->pos=req->buf+size;
+                       req->parsed=req->buf;
+                       req->body=0;
+                       req->error=TCP_REQ_OK;
+                       req->state=H_STARTWS;
+                       req->complete=req->content_len=req->has_content_len=0;
+                       req->bytes_to_go=0;
+                       
+               }
+               
+               
+       end_req:
+               return state;
+}
+
+
+
+void release_tcpconn(struct tcp_connection* c, long state, int unix_sock)
+{
+       long response[2];
+       
+               DBG( "releasing con %p, state %ld\n", c, state );
+               /* release req & signal the parent */
+               if (c->fd!=-1) close(c->fd);
+               /* errno==EINTR, EWOULDBLOCK a.s.o todo */
+               response[0]=(long)c;
+               response[1]=state;
+               write(unix_sock, response, sizeof(response));
+}
+
+
 
 void tcp_receive_loop(int unix_sock)
 {
-       struct tcp_req* req;
        struct tcp_connection* list; /* list with connections in use */
        struct tcp_connection* con;
-       int bytes;
-       long size;
+       struct tcp_connection* c_next;
        int n;
        int nfds;
        int s;
        long state;
-       long response[2];
        fd_set master_set;
        fd_set sel_set;
        int maxfd;
        struct timeval timeout;
+       int ticks;
        
        
        /* init */
@@ -337,8 +429,8 @@ void tcp_receive_loop(int unix_sock)
        
        /* listen on the unix socket for the fd */
        for(;;){
-                       timeout->tv_sec=TCP_CHILD_SELECT_TIMEOUT;
-                       timeout->tv_usec=0;
+                       timeout.tv_sec=TCP_CHILD_SELECT_TIMEOUT;
+                       timeout.tv_usec=0;
                        sel_set=master_set;
                        nfds=select(maxfd+1, &sel_set, 0 , 0 , &timeout);
                        if (nfds<0){
@@ -366,97 +458,52 @@ void tcp_receive_loop(int unix_sock)
                                        LOG(L_ERR, "WARNING: tcp_receive_loop: 0 bytes read\n");
                                        continue;
                                }
-                               DBG("received n=%d con=%ld, fd=%d\n", n, con, s);
+                               con->fd=s;
+                               DBG("received n=%d con=%p, fd=%d\n", n, con, s);
                                if (s==-1) {
                                        LOG(L_ERR, "ERROR: tcp_receive_loop: read_fd:"
                                                                        "no fd read\n");
                                        state=-1;
-                                       goto end_req; /* ?*/
+                                       release_tcpconn(con, state, unix_sock);
                                }
                                if (con==0){
                                        LOG(L_ERR, "ERROR: tcp_receive_loop: null pointer\n");
                                        state=-1;
-                                       goto end_req;
+                                       release_tcpconn(con, state, unix_sock);
                                }
-                               con->fd=s;
+                               con->timeout=get_ticks()+TCP_CHILD_TIMEOUT;
                                FD_SET(s, &master_set);
                                if (maxfd<s) maxfd=s;
-                               tcpconn_listadd(list, con);
+                               tcpconn_listadd(list, con, c_next, c_prev);
                        }
-                       for (con=list; con && nfds ; con=con->next){
-                               if (FD_ISSET(con->fd, &sel_set)){
+                       ticks=get_ticks();
+                       for (con=list; con ; con=c_next){
+                               c_next=con->c_next; /* safe for removing*/
+                               if (nfds && FD_ISSET(con->fd, &sel_set)){
                                        nfds--;
-                                       req=&con->req;
-again:
-               while(req->complete==0 && req->error==TCP_REQ_OK){
-                       bytes=tcp_read_headers(req, s);
-                                               /* if timeout state=0; goto end__req; */
-                       fprintf(stderr, "read= %d bytes, parsed=%d, state=%d, error=%d\n",
-                                       bytes, req->parsed-req->buf, req->state, req->error );
-                       if (bytes==-1){
-                               fprintf(stderr, "ERROR!\n");
-                               state=-1;
-                               goto end_req;
-                       }
-                       if (bytes==0){
-                               fprintf(stderr, "EOF!\n");
-                               state=-1;
-                               goto end_req;
+                                       state=tcp_read_req(con);
+                                       if (state==-1){
+                                               FD_CLR(con->fd, &master_set);
+                                               tcpconn_listrm(list, con, c_next, c_prev);
+                                               release_tcpconn(con, state, unix_sock);
+                                       }else{
+                                               /* update timeout */
+                                               con->timeout=ticks+TCP_CHILD_TIMEOUT;
+                                       }
+                               }else{
+                                       /* timeout */
+                                       if (con->timeout<=ticks){
+                                               /* expired, return to "tcp main" */
+                                               DBG("tcp_receive_loop: %p expired (%d, %d)\n",
+                                                               con, con->timeout, ticks);
+                                               state=0;
+                                               FD_CLR(con->fd, &master_set);
+                                               tcpconn_listrm(list, con, c_next, c_prev);
+                                               release_tcpconn(con, state, unix_sock);
+                                       }
+                               }
                        }
-
-               }
-               if (req->error!=TCP_REQ_OK){
-                       fprintf(stderr, "bad request, state=%d, error=%d\n",
-                                       req->state, req->error);
-                       state=-1;
-                       goto end_req;
-               }
-               fprintf(stderr, "end of header part\n");
-               fprintf(stderr, "headers:\n%.*s.\n",req->body-req->buf, req->buf);
-               if (req->has_content_len){
-                       fprintf(stderr, "content-length= %d\n", req->content_len);
-                       fprintf(stderr, "body:\n%.*s\n", req->content_len, req->body);
-               }else{
-                       req->error=TCP_REQ_BAD_LEN;
-                       fprintf(stderr, "content length not present or unparsable\n");
-                       state=-1;
-                       goto end_req;
-               }
-
-               /* if we are here everything is nice and ok*/
-               state=0;
-               /* just for debugging use sendipv4 as receiving socket */
-               DBG("calling receive_msg(%p, %d, %p)\n",
-                               req->buf, (int)(req->parsed-req->buf), &sendipv4->su);
-               bind_address=sendipv4;
-               receive_msg(req->buf, req->parsed-req->buf, &sendipv4->su);
-
-               /* prepare for next request */
-               size=req->pos-req->body;
-               if (size) memmove(req->buf, req->body, size);
-               fprintf(stderr, "\npreparing for new request, kept %ld bytes\n", size);
-               req->pos=req->buf+size;
-               req->parsed=req->buf;
-               req->body=0;
-               req->error=TCP_REQ_OK;
-               req->state=H_STARTWS;
-               req->complete=req->content_len=req->has_content_len=0;
-               req->bytes_to_go=0;
-       
-               /* process last req. */
                
-               goto again;
-               
-       end_req:
-                       fprintf(stderr, "end req\n");
-               /* release req & signal the parent */
-               if (s!=-1) close(s);
-               /* errno==EINTR, EWOULDBLOCK a.s.o todo */
-               response[0]=con;
-               response[1]=state;
-               write(unix_sock, response, sizeof(response));
-               
-       
        }
 }
 
index a9f2aa2..4dbd1cb 100644 (file)
@@ -25,6 +25,7 @@ rev_dns=off      # (cmd. line: -R)
 # for more info: sip_router -h
 alias=iptel.org
 alias="foo.bar"
+fifo="/tmp/ser_fifo"
 
 #modules