- io_wait support for write
authorAndrei Pelinescu-Onciul <andrei@iptel.org>
Thu, 29 Nov 2007 21:01:45 +0000 (21:01 +0000)
committerAndrei Pelinescu-Onciul <andrei@iptel.org>
Thu, 29 Nov 2007 21:01:45 +0000 (21:01 +0000)
- io_wait: added io_watch_chg(..)
- updated tcp code to the io_wait api changes

io_wait.c
io_wait.h
tcp_main.c
tcp_options.c
tcp_read.c

index 8accb56..7b53913 100644 (file)
--- a/io_wait.c
+++ b/io_wait.c
@@ -3,26 +3,17 @@
  * 
  * Copyright (C) 2005 iptelorg GmbH
  *
- * This file is part of ser, a free SIP server.
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
  *
- * ser is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version
- *
- * For a license to use the ser software under conditions
- * other than those described here, or to purchase support for this
- * software, please contact iptel.org by e-mail at the following addresses:
- *    info@iptel.org
- *
- * ser is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  */
 /* 
  * tcp io wait common stuff used by tcp_main.c & tcp_read.c
@@ -289,7 +280,8 @@ static void destroy_devpoll(io_wait_h* h)
 #ifdef HAVE_SELECT
 static int init_select(io_wait_h* h)
 {
-       FD_ZERO(&h->master_set);
+       FD_ZERO(&h->master_rset);
+       FD_ZERO(&h->master_wset);
        return 0;
 }
 #endif
index 3f13e04..3831f5d 100644 (file)
--- a/io_wait.h
+++ b/io_wait.h
@@ -3,33 +3,24 @@
  * 
  * Copyright (C) 2005 iptelorg GmbH
  *
- * This file is part of ser, a free SIP server.
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
  *
- * ser is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version
- *
- * For a license to use the ser software under conditions
- * other than those described here, or to purchase support for this
- * software, please contact iptel.org by e-mail at the following addresses:
- *    info@iptel.org
- *
- * ser is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  */
 /*
  * tcp io wait common stuff used by tcp_main.c & tcp_read.c
  * All the functions are inline because of speed reasons and because they are
  * used only from 2 places.
  * You also have to define:
- *     int handle_io(struct fd_map* fm, int idx) (see below)
+ *     int handle_io(struct fd_map* fm, short events, int idx) (see below)
  *     (this could be trivially replaced by a callback pointer entry attached
  *      to the io_wait handler if more flexibility rather then performance
  *      is needed)
@@ -51,6 +42,7 @@
  *  2006-05-30  sigio 64 bit workarround enabled for kernels < 2.6.5 (andrei)
  *  2007-11-22  when handle_io() is called in a loop check & stop if the fd was
  *               removed inside handle_io() (andrei)
+ *  2007-11-29  support for write (POLLOUT); added io_watch_chg() (andrei)
  */
 
 
@@ -116,12 +108,13 @@ struct fd_map{
        int fd;               /* fd no */
        fd_type type;         /* "data" type */
        void* data;           /* pointer to the corresponding structure */
+       short events;         /* events we are interested int */
 };
 
 
 #ifdef HAVE_KQUEUE
 #ifndef KQ_CHANGES_ARRAY_SIZE
-#define KQ_CHANGES_ARRAY_SIZE 128
+#define KQ_CHANGES_ARRAY_SIZE 256
 
 #ifdef __OS_netbsd
 #define KEV_UDATA_CAST (intptr_t)
@@ -154,7 +147,8 @@ struct io_wait_handler{
        int dpoll_fd;
 #endif
 #ifdef HAVE_SELECT
-       fd_set master_set;
+       fd_set master_rset; /* read set */
+       fd_set master_wset; /* write set */
        int max_fd_select; /* maximum select used fd */
 #endif
        /* common stuff for POLL, SIGIO_RT and SELECT
@@ -184,10 +178,12 @@ typedef struct io_wait_handler io_wait_h;
 /* add a fd_map structure to the fd hash */
 static inline struct fd_map* hash_fd_map(      io_wait_h* h,
                                                                                        int fd,
+                                                                                       short events,
                                                                                        fd_type type,
                                                                                        void* data)
 {
        h->fd_hash[fd].fd=fd;
+       h->fd_hash[fd].events=events;
        h->fd_hash[fd].type=type;
        h->fd_hash[fd].data=data;
        return &h->fd_hash[fd];
@@ -199,8 +195,9 @@ static inline struct fd_map* hash_fd_map(   io_wait_h* h,
 /* generic handle io routine, this must be defined in the including file
  * (faster then registering a callback pointer)
  *
- * params:  fm  - pointer to a fd hash entry
- *          idx - index in the fd_array (or -1 if not known)
+ * params:  fm     - pointer to a fd hash entry
+ *          events - combinations of POLLIN, POLLOUT, POLLERR & POLLHUP
+ *          idx    - index in the fd_array (or -1 if not known)
  * return: -1 on error
  *          0 on EAGAIN or when by some other way it is known that no more 
  *            io events are queued on the fd (the receive buffer is empty).
@@ -209,9 +206,9 @@ static inline struct fd_map* hash_fd_map(   io_wait_h* h,
  *         >0 on successfull read from the fd (when there might be more io
  *            queued -- the receive buffer might still be non-empty)
  */
-inline static int handle_io(struct fd_map* fm, int idx);
+inline static int handle_io(struct fd_map* fm, short events, int idx);
 #else
-int handle_io(struct fd_map* fm, int idx);
+int handle_io(struct fd_map* fm, short events, int idx);
 #endif
 
 
@@ -220,6 +217,10 @@ int handle_io(struct fd_map* fm, int idx);
 /*
  * kqueue specific function: register a change
  * (adds a change to the kevent change array, and if full flushes it first)
+ *
+ * TODO: check if the event already exists in the change list or if it's
+ *       complementary to an event in the list (e.g. EVFILT_WRITE, EV_DELETE
+ *       and EVFILT_WRITE, EV_ADD for the same fd).
  * returns: -1 on error, 0 on success
  */
 static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag, 
@@ -254,7 +255,15 @@ again:
 
 
 /* generic io_watch_add function
+ * Params:
+ *     h      - pointer to initialized io_wait handle
+ *     fd     - fd to watch
+ *     events - bitmap with the fd events for which the fd should be watched
+ *              (combination of POLLIN and POLLOUT)
+ *     type   - fd type (non 0 value, returned in the call to handle_io)
+ *     data   - pointer/private data returned in the handle_io call
  * returns 0 on success, -1 on error
+ *
  * WARNING: handle_io() can be called immediately (from io_watch_add()) so
  *  make sure that any dependent init. (e.g. data stuff) is made before
  *  calling io_watch_add
@@ -264,15 +273,16 @@ again:
  *  switch())*/
 inline static int io_watch_add(        io_wait_h* h,
                                                                int fd,
+                                                               short events,
                                                                fd_type type,
                                                                void* data)
 {
 
        /* helper macros */
-#define fd_array_setup \
+#define fd_array_setup(ev) \
        do{ \
                h->fd_array[h->fd_no].fd=fd; \
-               h->fd_array[h->fd_no].events=POLLIN; /* useless for select */ \
+               h->fd_array[h->fd_no].events=(ev); /* useless for select */ \
                h->fd_array[h->fd_no].revents=0;     /* useless for select */ \
        }while(0)
        
@@ -311,12 +321,17 @@ inline static int io_watch_add(   io_wait_h* h,
        idx=-1;
 #endif
        e=0;
-       if (fd==-1){
+       /* sanity checks */
+       if (unlikely(fd==-1)){
                LOG(L_CRIT, "BUG: io_watch_add: fd is -1!\n");
                goto error;
        }
+       if (unlikely((events&(POLLIN|POLLOUT))==0)){
+               LOG(L_CRIT, "BUG: io_watch_add: invalid events: 0x%0x\n", events);
+               goto error;
+       }
        /* check if not too big */
-       if (h->fd_no>=h->max_fd_no){
+       if (unlikely(h->fd_no>=h->max_fd_no)){
                LOG(L_CRIT, "ERROR: io_watch_add: maximum fd number exceeded:"
                                " %d/%d\n", h->fd_no, h->max_fd_no);
                goto error;
@@ -325,35 +340,38 @@ inline static int io_watch_add(   io_wait_h* h,
                        h, fd, type, data, h->fd_no);
        /*  hash sanity check */
        e=get_fd_map(h, fd);
-       if (e && (e->type!=0 /*F_NONE*/)){
+       if (unlikely(e && (e->type!=0 /*F_NONE*/))){
                LOG(L_ERR, "ERROR: io_watch_add: trying to overwrite entry %d"
                                " in the hash(%d, %d, %p) with (%d, %d, %p)\n",
                                fd, e->fd, e->type, e->data, fd, type, data);
                goto error;
        }
        
-       if ((e=hash_fd_map(h, fd, type, data))==0){
+       if (unlikely((e=hash_fd_map(h, fd, events, type, data))==0)){
                LOG(L_ERR, "ERROR: io_watch_add: failed to hash the fd %d\n", fd);
                goto error;
        }
        switch(h->poll_method){ /* faster then pointer to functions */
                case POLL_POLL:
-                       fd_array_setup;
+                       fd_array_setup(events);
                        set_fd_flags(O_NONBLOCK);
                        break;
 #ifdef HAVE_SELECT
                case POLL_SELECT:
-                       fd_array_setup;
-                       FD_SET(fd, &h->master_set);
+                       fd_array_setup(events);
+                       if (likely(events & POLLIN))
+                               FD_SET(fd, &h->master_rset);
+                       if (unlikely(events & POLLOUT))
+                               FD_SET(fd, &h->master_wset);
                        if (h->max_fd_select<fd) h->max_fd_select=fd;
                        break;
 #endif
 #ifdef HAVE_SIGIO_RT
                case POLL_SIGIO_RT:
-                       fd_array_setup;
+                       fd_array_setup(events);
                        /* re-set O_ASYNC might be needed, if not done from 
                         * io_watch_del (or if somebody wants to add a fd which has
-                        * already O_ASYNC/F_SETSIG set on a dupplicate)
+                        * already O_ASYNC/F_SETSIG set on a duplicate)
                         */
                        /* set async & signal */
                        if (fcntl(fd, F_SETOWN, my_pid())==-1){
@@ -384,11 +402,12 @@ inline static int io_watch_add(   io_wait_h* h,
 #endif
 #ifdef HAVE_EPOLL
                case POLL_EPOLL_LT:
-                       ep_event.events=EPOLLIN;
+                       ep_event.events=(EPOLLIN & ((int)!(events & POLLIN)-1) ) |
+                                                        (EPOLLOUT & ((int)!(events & POLLOUT)-1) );
                        ep_event.data.ptr=e;
 again1:
                        n=epoll_ctl(h->epfd, EPOLL_CTL_ADD, fd, &ep_event);
-                       if (n==-1){
+                       if (unlikely(n==-1)){
                                if (errno==EAGAIN) goto again1;
                                LOG(L_ERR, "ERROR: io_watch_add: epoll_ctl failed: %s [%d]\n",
                                        strerror(errno), errno);
@@ -397,11 +416,13 @@ again1:
                        break;
                case POLL_EPOLL_ET:
                        set_fd_flags(O_NONBLOCK);
-                       ep_event.events=EPOLLIN|EPOLLET;
+                       ep_event.events=(EPOLLIN & ((int)!(events & POLLIN)-1) )  |
+                                                        (EPOLLOUT & ((int)!(events & POLLOUT)-1) ) |
+                                                         EPOLLET;
                        ep_event.data.ptr=e;
 again2:
                        n=epoll_ctl(h->epfd, EPOLL_CTL_ADD, fd, &ep_event);
-                       if (n==-1){
+                       if (unlikely(n==-1)){
                                if (errno==EAGAIN) goto again2;
                                LOG(L_ERR, "ERROR: io_watch_add: epoll_ctl failed: %s [%d]\n",
                                        strerror(errno), errno);
@@ -413,14 +434,20 @@ again2:
 #endif
 #ifdef HAVE_KQUEUE
                case POLL_KQUEUE:
-                       if (kq_ev_change(h, fd, EVFILT_READ, EV_ADD, e)==-1)
+                       if (likely( events & POLLINT)){
+                               if (unlikely(kq_ev_change(h, fd, EVFILT_READ, EV_ADD, e)==-1))
+                               goto error;
+                       }
+                       if (unlikely( events & POLLOUT)){
+                               if (unlikely(kq_ev_change(h, fd, EVFILT_WRITE, EV_ADD, e)==-1))
                                goto error;
+                       }
                        break;
 #endif
 #ifdef HAVE_DEVPOLL
                case POLL_DEVPOLL:
                        pfd.fd=fd;
-                       pfd.events=POLLIN;
+                       pfd.events=events;
                        pfd.revents=0;
 again_devpoll:
                        if (write(h->dpoll_fd, &pfd, sizeof(pfd))==-1){
@@ -445,10 +472,12 @@ again_devpoll:
        if (check_io){
                /* handle possible pre-existing events */
                pf.fd=fd;
-               pf.events=POLLIN;
+               pf.events=events;
 check_io_again:
-               while(e->type && ((n=poll(&pf, 1, 0))>0) && (handle_io(e, idx)>0));
-               if (e->type && (n==-1)){
+               while(e->type && ((n=poll(&pf, 1, 0))>0) && 
+                               (handle_io(e, pf.revents, idx)>0) &&
+                               (pf.revents & e->events));
+               if (unlikely(e->type && (n==-1))){
                        if (errno==EINTR) goto check_io_again;
                        LOG(L_ERR, "ERROR: io_watch_add: check_io poll: %s [%d]\n",
                                                strerror(errno), errno);
@@ -482,18 +511,19 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
        
 #define fix_fd_array \
        do{\
-                       if (idx==-1){ \
+                       if (unlikely(idx==-1)){ \
                                /* fix idx if -1 and needed */ \
                                for (idx=0; (idx<h->fd_no) && \
                                                        (h->fd_array[idx].fd!=fd); idx++); \
                        } \
-                       if (idx<h->fd_no){ \
+                       if (likely(idx<h->fd_no)){ \
                                memmove(&h->fd_array[idx], &h->fd_array[idx+1], \
                                        (h->fd_no-(idx+1))*sizeof(*(h->fd_array))); \
                        } \
        }while(0)
        
        struct fd_map* e;
+       int events;
 #ifdef HAVE_EPOLL
        int n;
        struct epoll_event ep_event;
@@ -505,7 +535,7 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
        int fd_flags;
 #endif
        
-       if ((fd<0) || (fd>=h->max_fd_no)){
+       if (unlikely((fd<0) || (fd>=h->max_fd_no))){
                LOG(L_CRIT, "BUG: io_watch_del: invalid fd %d, not in [0, %d) \n",
                                                fd, h->fd_no);
                goto error;
@@ -514,18 +544,18 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
                        h, fd, idx, flags, h->fd_no);
        e=get_fd_map(h, fd);
        /* more sanity checks */
-       if (e==0){
+       if (unlikely(e==0)){
                LOG(L_CRIT, "BUG: io_watch_del: no corresponding hash entry for %d\n",
                                        fd);
                goto error;
        }
-       if (e->type==0 /*F_NONE*/){
+       if (unlikely(e->type==0 /*F_NONE*/)){
                LOG(L_ERR, "ERROR: io_watch_del: trying to delete already erased"
                                " entry %d in the hash(%d, %d, %p) )\n",
                                fd, e->fd, e->type, e->data);
                goto error;
        }
-       
+       events=e->events;
        unhash_fd_map(e);
        
        switch(h->poll_method){
@@ -534,11 +564,14 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
                        break;
 #ifdef HAVE_SELECT
                case POLL_SELECT:
-                       fix_fd_array;
-                       FD_CLR(fd, &h->master_set);
-                       if (h->max_fd_select && (h->max_fd_select==fd))
+                       if (likely(events & POLLIN))
+                               FD_CLR(fd, &h->master_rset);
+                       if (unlikely(events & POLLOUT))
+                               FD_CLR(fd, &h->master_wset);
+                       if (unlikely(h->max_fd_select && (h->max_fd_select==fd)))
                                /* we don't know the prev. max, so we just decrement it */
                                h->max_fd_select--; 
+                       fix_fd_array;
                        break;
 #endif
 #ifdef HAVE_SIGIO_RT
@@ -553,12 +586,12 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
                        /*if (!(flags & IO_FD_CLOSING)){*/
                                /* reset ASYNC */
                                fd_flags=fcntl(fd, F_GETFL); 
-                               if (fd_flags==-1){ 
+                               if (unlikely(fd_flags==-1)){ 
                                        LOG(L_ERR, "ERROR: io_watch_del: fnctl: GETFL failed:" 
                                                        " %s [%d]\n", strerror(errno), errno); 
                                        goto error; 
                                } 
-                               if (fcntl(fd, F_SETFL, fd_flags&(~O_ASYNC))==-1){ 
+                               if (unlikely(fcntl(fd, F_SETFL, fd_flags&(~O_ASYNC))==-1)){ 
                                        LOG(L_ERR, "ERROR: io_watch_del: fnctl: SETFL" 
                                                                " failed: %s [%d]\n", strerror(errno), errno); 
                                        goto error; 
@@ -569,15 +602,17 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
                case POLL_EPOLL_LT:
                case POLL_EPOLL_ET:
                        /* epoll doesn't seem to automatically remove sockets,
-                        * if the socket is a dupplicate/moved and the original
+                        * if the socket is a duplicate/moved and the original
                         * is still open. The fd is removed from the epoll set
                         * only when the original (and all the  copies?) is/are 
                         * closed. This is probably a bug in epoll. --andrei */
 #ifdef EPOLL_NO_CLOSE_BUG
                        if (!(flags & IO_FD_CLOSING)){
 #endif
+again_epoll:
                                n=epoll_ctl(h->epfd, EPOLL_CTL_DEL, fd, &ep_event);
-                               if (n==-1){
+                               if (unlikely(n==-1)){
+                                       if (errno==EAGAIN) goto again_epoll;
                                        LOG(L_ERR, "ERROR: io_watch_del: removing fd from epoll "
                                                        "list failed: %s [%d]\n", strerror(errno), errno);
                                        goto error;
@@ -590,8 +625,21 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
 #ifdef HAVE_KQUEUE
                case POLL_KQUEUE:
                        if (!(flags & IO_FD_CLOSING)){
-                               if (kq_ev_change(h, fd, EVFILT_READ, EV_DELETE, 0)==-1)
-                                       goto error;
+                               if (likely(events & POLLIN)){
+                                       if (unlikely(kq_ev_change(h, fd, EVFILT_READ,
+                                                                                                       EV_DELETE, 0) ==-1)){
+                                               /* try to delete the write filter anyway */
+                                               if (events & POLLOUT){
+                                                       kq_ev_change(h, fd, EVFILT_WRITE, EV_DELETE, 0);
+                                               }
+                                               goto error;
+                                       }
+                               }
+                               if (unlikely(events & POLLOUT)){
+                                       if (unlikely(kq_ev_change(h, fd, EVFILT_WRITE,
+                                                                                                       EV_DELETE, 0) ==-1))
+                                               goto error;
+                               }
                        }
                        break;
 #endif
@@ -627,6 +675,180 @@ error:
 
 
 
+/* parameters:    h - handler 
+ *               fd - file descriptor
+ *           events - new events to watch for
+ *              idx - index in the fd_array if known, -1 if not
+ *                    (if index==-1 fd_array will be searched for the
+ *                     corresponding fd* entry -- slower but unavoidable in 
+ *                     some cases). index is not used (no fd_array) for epoll,
+ *                     /dev/poll and kqueue
+ * returns 0 if ok, -1 on error */
+inline static int io_watch_chg(io_wait_h* h, int fd, short events, int idx )
+{
+       
+#define fd_array_chg(ev) \
+       do{\
+                       if (unlikely(idx==-1)){ \
+                               /* fix idx if -1 and needed */ \
+                               for (idx=0; (idx<h->fd_no) && \
+                                                       (h->fd_array[idx].fd!=fd); idx++); \
+                       } \
+                       if (likely(idx<h->fd_no)){ \
+                               h->fd_array[idx].events=(ev); \
+                       } \
+       }while(0)
+       
+       struct fd_map* e;
+       int add_events;
+       int del_events;
+#ifdef HAVE_EPOLL
+       int n;
+       struct epoll_event ep_event;
+#endif
+#ifdef HAVE_DEVPOLL
+       struct pollfd pfd;
+#endif
+       
+       if (unlikely((fd<0) || (fd>=h->max_fd_no))){
+               LOG(L_CRIT, "BUG: io_watch_chg: invalid fd %d, not in [0, %d) \n",
+                                               fd, h->fd_no);
+               goto error;
+       }
+       if (unlikely((events&(POLLIN|POLLOUT))==0)){
+               LOG(L_CRIT, "BUG: io_watch_chg: invalid events: 0x%0x\n", events);
+               goto error;
+       }
+       DBG("DBG: io_watch_chg (%p, %d, 0x%x, 0x%x) fd_no=%d called\n",
+                       h, fd, events, idx, h->fd_no);
+       e=get_fd_map(h, fd);
+       /* more sanity checks */
+       if (unlikely(e==0)){
+               LOG(L_CRIT, "BUG: io_watch_chg: no corresponding hash entry for %d\n",
+                                       fd);
+               goto error;
+       }
+       if (unlikely(e->type==0 /*F_NONE*/)){
+               LOG(L_ERR, "ERROR: io_watch_chg: trying to change an already erased"
+                               " entry %d in the hash(%d, %d, %p) )\n",
+                               fd, e->fd, e->type, e->data);
+               goto error;
+       }
+       
+       add_events=events & ~e->events;
+       del_events=e->events & ~events;
+       e->events=events;
+       switch(h->poll_method){
+               case POLL_POLL:
+                       fd_array_chg(events);
+                       break;
+#ifdef HAVE_SELECT
+               case POLL_SELECT:
+                       fd_array_chg(events);
+                       if (unlikely(del_events & POLLIN))
+                               FD_CLR(fd, &h->master_rset);
+                       else if (unlikely(add_events & POLLIN))
+                               FD_SET(fd, &h->master_rset);
+                       if (likely(del_events & POLLOUT))
+                               FD_CLR(fd, &h->master_wset);
+                       else if (likely(add_events & POLLOUT))
+                               FD_SET(fd, &h->master_wset);
+                       break;
+#endif
+#ifdef HAVE_SIGIO_RT
+               case POLL_SIGIO_RT:
+                       fd_array_chg(events);
+                       break;
+#endif
+#ifdef HAVE_EPOLL
+               case POLL_EPOLL_LT:
+                               ep_event.events=(EPOLLIN & ((int)!(events & POLLIN)-1) ) |
+                                                                (EPOLLOUT & ((int)!(events & POLLOUT)-1) );
+                               ep_event.data.ptr=e;
+again_epoll_lt:
+                               n=epoll_ctl(h->epfd, EPOLL_CTL_MOD, fd, &ep_event);
+                               if (unlikely(n==-1)){
+                                       if (errno==EAGAIN) goto again_epoll_lt;
+                                       LOG(L_ERR, "ERROR: io_watch_chg: modifying epoll events"
+                                                       " failed: %s [%d]\n", strerror(errno), errno);
+                                       goto error;
+                               }
+                       break;
+               case POLL_EPOLL_ET:
+                               ep_event.events=(EPOLLIN & ((int)!(events & POLLIN)-1) ) |
+                                                                (EPOLLOUT & ((int)!(events & POLLOUT)-1) ) |
+                                                                EPOLLET;
+                               ep_event.data.ptr=e;
+again_epoll_et:
+                               n=epoll_ctl(h->epfd, EPOLL_CTL_MOD, fd, &ep_event);
+                               if (unlikely(n==-1)){
+                                       if (errno==EAGAIN) goto again_epoll_et;
+                                       LOG(L_ERR, "ERROR: io_watch_chg: modifying epoll events"
+                                                       " failed: %s [%d]\n", strerror(errno), errno);
+                                       goto error;
+                               }
+                       break;
+#endif
+#ifdef HAVE_KQUEUE
+               case POLL_KQUEUE:
+                       if (unlikely(del_events & POLLIN)){
+                               if (unlikely(kq_ev_change(h, fd, EVFILT_READ,
+                                                                                                               EV_DELETE, 0) ==-1))
+                                               goto error;
+                       }else if (unlikely(add_events & POLLIN)){
+                               if (unlikely(kq_ev_change(h, fd, EVFILT_READ, EV_ADD, e) ==-1))
+                                       goto error;
+                       }
+                       if (likely(del_events & POLLOUT)){
+                               if (unlikely(kq_ev_change(h, fd, EVFILT_WRITE,
+                                                                                                               EV_DELETE, 0) ==-1))
+                                               goto error;
+                       }else if (likely(add_events & POLLOUT)){
+                               if (unlikely(kq_ev_change(h, fd, EVFILT_WRITE, EV_ADD, e)==-1))
+                                       goto error;
+                       }
+                       break;
+#endif
+#ifdef HAVE_DEVPOLL
+               case POLL_DEVPOLL:
+                               /* for /dev/poll the closed fds _must_ be removed
+                                  (they are not removed automatically on close()) */
+                               pfd.fd=fd;
+                               pfd.events=POLLREMOVE;
+                               pfd.revents=0;
+again_devpoll1:
+                               if (unlikely(write(h->dpoll_fd, &pfd, sizeof(pfd))==-1)){
+                                       if (errno==EINTR) goto again_devpoll1;
+                                       LOG(L_ERR, "ERROR: io_watch_chg: removing fd from "
+                                                               "/dev/poll failed: %s [%d]\n", 
+                                                               strerror(errno), errno);
+                                       goto error;
+                               }
+again_devpoll2:
+                               pfd.events=events;
+                               pfd.revents=0;
+                               if (unlikely(write(h->dpoll_fd, &pfd, sizeof(pfd))==-1)){
+                                       if (errno==EINTR) goto again_devpoll2;
+                                       LOG(L_ERR, "ERROR: io_watch_chg: re-adding fd to "
+                                                               "/dev/poll failed: %s [%d]\n", 
+                                                               strerror(errno), errno);
+                                       goto error;
+                               }
+                               break;
+#endif
+               default:
+                       LOG(L_CRIT, "BUG: io_watch_chg: no support for poll method "
+                                       " %s (%d)\n", poll_method_str[h->poll_method], 
+                                       h->poll_method);
+                       goto error;
+       }
+       h->fd_no--;
+       return 0;
+error:
+       return -1;
+#undef fix_fd_array
+}
+
 /* io_wait_loop_x style function 
  * wait for io using poll()
  * params: h      - io_wait handle
@@ -650,11 +872,12 @@ again:
                        }
                }
                for (r=0; (r<h->fd_no) && n; r++){
-                       if (h->fd_array[r].revents & (POLLIN|POLLERR|POLLHUP)){
+                       fm=get_fd_map(h, h->fd_array[r].fd);
+                       if (h->fd_array[r].revents & (fm->events|POLLERR|POLLHUP)){
                                n--;
                                /* sanity checks */
-                               if ((h->fd_array[r].fd >= h->max_fd_no)||
-                                               (h->fd_array[r].fd < 0)){
+                               if (unlikely((h->fd_array[r].fd >= h->max_fd_no)||
+                                                               (h->fd_array[r].fd < 0))){
                                        LOG(L_CRIT, "BUG: io_wait_loop_poll: bad fd %d "
                                                        "(no in the 0 - %d range)\n",
                                                        h->fd_array[r].fd, h->max_fd_no);
@@ -662,8 +885,13 @@ again:
                                        h->fd_array[r].events=0; /* clear the events */
                                        continue;
                                }
-                               fm=get_fd_map(h, h->fd_array[r].fd);
-                               while(fm->type && (handle_io(fm, r) > 0) && repeat);
+                               /* repeat handle_io if repeat, fd still watched (not deleted
+                                *  inside handle_io), handle_io returns that there's still
+                                *  IO and the fd is still watched for the triggering event */
+                               while(fm->type && 
+                                               (handle_io(fm, h->fd_array[r].revents, r) > 0) &&
+                                               repeat &&
+                                               (fm->events & h->fd_array[r].revents) );
                        }
                }
 error:
@@ -676,17 +904,20 @@ error:
 /* wait for io using select */
 inline static int io_wait_loop_select(io_wait_h* h, int t, int repeat)
 {
-       fd_set sel_set;
+       fd_set sel_rset;
+       fd_set sel_wset;
        int n, ret;
        struct timeval timeout;
        int r;
        struct fd_map* fm;
+       int revents;
        
 again:
-               sel_set=h->master_set;
+               sel_rset=h->master_rset;
+               sel_wset=h->master_wset;
                timeout.tv_sec=t;
                timeout.tv_usec=0;
-               ret=n=select(h->max_fd_select+1, &sel_set, 0, 0, &timeout);
+               ret=n=select(h->max_fd_select+1, &sel_rset, &sel_wset, 0, &timeout);
                if (n<0){
                        if (errno==EINTR) goto again; /* just a signal */
                        LOG(L_ERR, "ERROR: io_wait_loop_select: select: %s [%d]\n",
@@ -696,9 +927,15 @@ again:
                }
                /* use poll fd array */
                for(r=0; (r<h->max_fd_no) && n; r++){
-                       if (FD_ISSET(h->fd_array[r].fd, &sel_set)){
+                       revents=0;
+                       if (likely(FD_ISSET(h->fd_array[r].fd, &sel_rset)))
+                               revents|=POLLIN;
+                       if (unlikely(FD_ISSET(h->fd_array[r].fd, &sel_wset)))
+                               revents|=POLLOUT;
+                       if (likely(revents)){
                                fm=get_fd_map(h, h->fd_array[r].fd);
-                               while(fm->type && (handle_io(fm, r)>0) && repeat);
+                               while(fm->type && (fm->events & revents) && 
+                                               (handle_io(fm, revents, r)>0) && repeat);
                                n--;
                        }
                };
@@ -713,10 +950,11 @@ inline static int io_wait_loop_epoll(io_wait_h* h, int t, int repeat)
 {
        int n, r;
        struct fd_map* fm;
+       int revents;
        
 again:
                n=epoll_wait(h->epfd, h->ep_array, h->fd_no, t*1000);
-               if (n==-1){
+               if (unlikely(n==-1)){
                        if (errno==EINTR) goto again; /* signal, ignore it */
                        else{
                                LOG(L_ERR, "ERROR:io_wait_loop_epoll: "
@@ -735,9 +973,14 @@ again:
                }
 #endif
                for (r=0; r<n; r++){
-                       if (h->ep_array[r].events & (EPOLLIN|EPOLLERR|EPOLLHUP)){
+                       revents= (POLLIN & (!(h->ep_array[r].events & EPOLLIN)-1)) |
+                                        (POLLOUT & (!(h->ep_array[r].events & EPOLLOUT)-1)) |
+                                        (POLLERR & (!(h->ep_array[r].events & EPOLLERR)-1)) |
+                                        (POLLHUP & (!(h->ep_array[r].events & EPOLLHUP)-1));
+                       if (likely(revents)){
                                fm=(struct fd_map*)h->ep_array[r].data.ptr;
-                               while(fm->type && (handle_io(fm,-1)>0) && repeat);
+                               while(fm->type && (fm->events & revents) && 
+                                               (handle_io(fm, revents, -1)>0) && repeat);
                        }else{
                                LOG(L_ERR, "ERROR:io_wait_loop_epoll: unexpected event %x"
                                                        " on %d/%d, data=%p\n", h->ep_array[r].events,
@@ -763,7 +1006,7 @@ inline static int io_wait_loop_kqueue(io_wait_h* h, int t, int repeat)
 again:
                n=kevent(h->kq_fd, h->kq_changes, h->kq_nchanges,  h->kq_array,
                                        h->fd_no, &tspec);
-               if (n==-1){
+               if (unlikely(n==-1)){
                        if (errno==EINTR) goto again; /* signal, ignore it */
                        else{
                                LOG(L_ERR, "ERROR: io_wait_loop_kqueue: kevent:"
@@ -778,18 +1021,30 @@ again:
                                        r, n, h->kq_array[r].ident, (long)h->kq_array[r].udata,
                                        h->kq_array[r].flags);
 #endif
-                       if (h->kq_array[r].flags & EV_ERROR){
+#if 0
+                       if (unlikely(h->kq_array[r].flags & EV_ERROR)){
                                /* error in changes: we ignore it, it can be caused by
                                   trying to remove an already closed fd: race between
-                                  adding smething to the changes array, close() and
+                                  adding something to the changes array, close() and
                                   applying the changes */
                                LOG(L_INFO, "INFO: io_wait_loop_kqueue: kevent error on "
                                                        "fd %d: %s [%ld]\n", h->kq_array[r].ident,
                                                        strerror(h->kq_array[r].data),
                                                        (long)h->kq_array[r].data);
-                       }else{ /* READ/EOF */
+                       }else{ 
+#endif
                                fm=(struct fd_map*)h->kq_array[r].udata;
-                               while(fm->type && (handle_io(fm, -1)>0) && repeat);
+                               if (likely(h->kq_array[r].filter==EVFILT_READ)){
+                                       revents=POLLIN | 
+                                               (((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP);
+                                       while(fm->type && (fm->events & revents) && 
+                                                       (handle_io(fm, revents, -1)>0) && repeat);
+                               }else if (h->kq_array[r].filter==EVFILT_WRITE){
+                                       revents=POLLOUT | 
+                                               (((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP);
+                                       while(fm->type && (fm->events & revents) && 
+                                                       (handle_io(fm, revents, -1)>0) && repeat);
+                               }
                        }
                }
 error:
@@ -810,12 +1065,14 @@ inline static int io_wait_loop_sigio_rt(io_wait_h* h, int t)
        int sigio_band;
        int sigio_fd;
        struct fd_map* fm;
+       int revents;
        
        
        ret=1; /* 1 event per call normally */
        ts.tv_sec=t;
        ts.tv_nsec=0;
-       if (!sigismember(&h->sset, h->signo) || !sigismember(&h->sset, SIGIO)){
+       if (unlikely(!sigismember(&h->sset, h->signo) ||
+                                       !sigismember(&h->sset, SIGIO))) {
                LOG(L_CRIT, "BUG: io_wait_loop_sigio_rt: the signal mask"
                                " is not properly set!\n");
                goto error;
@@ -823,7 +1080,7 @@ inline static int io_wait_loop_sigio_rt(io_wait_h* h, int t)
 
 again:
        n=sigtimedwait(&h->sset, &siginfo, &ts);
-       if (n==-1){
+       if (unlikely(n==-1)){
                if (errno==EINTR) goto again; /* some other signal, ignore it */
                else if (errno==EAGAIN){ /* timeout */
                        ret=0;
@@ -834,7 +1091,7 @@ again:
                        goto error;
                }
        }
-       if (n!=SIGIO){
+       if (likely(n!=SIGIO)){
 #ifdef SIGINFO64_WORKARROUND
                /* on linux siginfo.si_band is defined as long in userspace
                 * and as int in kernel (< 2.6.5) => on 64 bits things will break!
@@ -853,7 +1110,7 @@ again:
                        sigio_band=siginfo.si_band;
                        sigio_fd=siginfo.si_fd;
                }
-               if (siginfo.si_code==SI_SIGIO){
+               if (unlikely(siginfo.si_code==SI_SIGIO)){
                        /* old style, we don't know the event (linux 2.2.?) */
                        LOG(L_WARN, "WARNING: io_wait_loop_sigio_rt: old style sigio"
                                        " interface\n");
@@ -861,7 +1118,7 @@ again:
                        /* we can have queued signals generated by fds not watched
                         * any more, or by fds in transition, to a child => ignore them*/
                        if (fm->type)
-                               handle_io(fm, -1);
+                               handle_io(fm, POLLIN|POLLOUT, -1);
                }else{
 #ifdef EXTRA_DEBUG
                        DBG("io_wait_loop_sigio_rt: siginfo: signal=%d (%d),"
@@ -874,13 +1131,17 @@ again:
                        /* on some errors (e.g. when receving TCP RST), sigio_band will
                         * be set to 0x08 (undocumented, no corresp. POLL_xx), so better
                         * catch all events --andrei */
-                       if (sigio_band/*&(POLL_IN|POLL_ERR|POLL_HUP)*/){
+                       if (likely(sigio_band)/*&(POLL_IN|POLL_ERR|POLL_HUP)*/){
                                fm=get_fd_map(h, sigio_fd);
+                               revents=(POLLIN & (!(sigio_band & POLL_IN)-1)) |
+                                               (POLLOUT & (!(sigio_band & POLL_OUT)-1)) |
+                                               (POLLERR & (!(sigio_band & POLL_ERR)-1)) |
+                                               (POLLHUP & (!(sigio_band & POLL_HUP)-1));
                                /* we can have queued signals generated by fds not watched
                                 * any more, or by fds in transition, to a child 
                                 * => ignore them */
-                               if (fm->type)
-                                       handle_io(fm, -1);
+                               if (fm->type && (fm->events & revents))
+                                       handle_io(fm, revents, -1);
                                else
                                        LOG(L_ERR, "WARNING: io_wait_loop_sigio_rt: ignoring event"
                                                        " %x on fd %d (fm->fd=%d, fm->data=%p)\n",
@@ -930,7 +1191,7 @@ inline static int io_wait_loop_devpoll(io_wait_h* h, int t, int repeat)
                dpoll.dp_fds=h->fd_array;
 again:
                ret=n=ioctl(h->dpoll_fd, DP_POLL, &dpoll);
-               if (n==-1){
+               if (unlikely(n==-1)){
                        if (errno==EINTR) goto again; /* signal, ignore it */
                        else{
                                LOG(L_ERR, "ERROR:io_wait_loop_devpoll: ioctl: %s [%d]\n",
@@ -946,7 +1207,8 @@ again:
                        }
                        /* POLLIN|POLLHUP just go through */
                        fm=get_fd_map(h, h->fd_array[r].fd);
-                       while(fm->type && (handle_io(fm, r) > 0) && repeat);
+                       while(fm->type && (fm->events & h->fd_array[r].revents) &&
+                                       (handle_io(fm, h->fd_array[r].revents, r) > 0) && repeat);
                }
 error:
        return ret;
index 426fc28..6c40f95 100644 (file)
@@ -1730,7 +1730,8 @@ inline static int handle_tcp_child(struct tcp_child* tcp_c, int fd_i)
                        /* must be after the de-ref*/
                        tcpconn->flags&=~(F_CONN_REMOVED|F_CONN_READER);
                        if (unlikely(
-                                       io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn)<0)){
+                                       io_watch_add(&io_h, tcpconn->s, POLLIN,
+                                                                                               F_TCPCONN, tcpconn)<0)){
                                LOG(L_CRIT, "ERROR: tcp_main: handle_tcp_child: failed to add"
                                                " new socket to the fd list\n");
                                tcpconn->flags|=F_CONN_REMOVED;
@@ -1879,7 +1880,8 @@ inline static int handle_ser_child(struct process_table* p, int fd_i)
                                                                tcp_con_lifetime, t);
                        tcpconn->flags&=~F_CONN_REMOVED;
                        if (unlikely(
-                                       io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn)<0)){
+                                       io_watch_add(&io_h, tcpconn->s, POLLIN,
+                                                                                               F_TCPCONN, tcpconn)<0)){
                                LOG(L_CRIT, "ERROR: tcp_main: handle_ser_child: failed to add"
                                                " new socket to the fd list\n");
                                tcpconn->flags|=F_CONN_REMOVED;
@@ -2036,7 +2038,8 @@ static inline int handle_new_connect(struct socket_info* si)
                local_timer_add(&tcp_main_ltimer, &tcpconn->timer, 
                                                                tcp_con_lifetime, get_ticks_raw());
                tcpconn->flags&=~F_CONN_REMOVED;
-               if (unlikely(io_watch_add(&io_h, tcpconn->s, F_TCPCONN, tcpconn)<0)){
+               if (unlikely(io_watch_add(&io_h, tcpconn->s, POLLIN, 
+                                                                                                       F_TCPCONN, tcpconn)<0)){
                        LOG(L_CRIT, "ERROR: tcp_main: handle_new_connect: failed to add"
                                                " new socket to the fd list\n");
                        tcpconn->flags|=F_CONN_REMOVED;
@@ -2128,7 +2131,7 @@ error:
  *         >0 on successfull read from the fd (when there might be more io
  *            queued -- the receive buffer might still be non-empty)
  */
-inline static int handle_io(struct fd_map* fm, int idx)
+inline static int handle_io(struct fd_map* fm, short events, int idx)
 {      
        int ret;
        
@@ -2310,7 +2313,7 @@ void tcp_main_loop()
        /* add all the sockets we listen on for connections */
        for (si=tcp_listen; si; si=si->next){
                if ((si->proto==PROTO_TCP) &&(si->socket!=-1)){
-                       if (io_watch_add(&io_h, si->socket, F_SOCKINFO, si)<0){
+                       if (io_watch_add(&io_h, si->socket, POLLIN, F_SOCKINFO, si)<0){
                                LOG(L_CRIT, "ERROR: tcp_main_loop: init: failed to add "
                                                        "listen socket to the fd list\n");
                                goto error;
@@ -2323,7 +2326,7 @@ void tcp_main_loop()
        if (!tls_disable && tls_loaded()){
                for (si=tls_listen; si; si=si->next){
                        if ((si->proto==PROTO_TLS) && (si->socket!=-1)){
-                               if (io_watch_add(&io_h, si->socket, F_SOCKINFO, si)<0){
+                               if (io_watch_add(&io_h, si->socket, POLLIN, F_SOCKINFO, si)<0){
                                        LOG(L_CRIT, "ERROR: tcp_main_loop: init: failed to add "
                                                        "tls listen socket to the fd list\n");
                                        goto error;
@@ -2339,7 +2342,7 @@ void tcp_main_loop()
         *  (get fd, new connection a.s.o) */
        for (r=1; r<process_no; r++){
                if (pt[r].unix_sock>0) /* we can't have 0, we never close it!*/
-                       if (io_watch_add(&io_h, pt[r].unix_sock, F_PROC, &pt[r])<0){
+                       if (io_watch_add(&io_h, pt[r].unix_sock, POLLIN,F_PROC, &pt[r])<0){
                                        LOG(L_CRIT, "ERROR: tcp_main_loop: init: failed to add "
                                                        "process %d unix socket to the fd list\n", r);
                                        goto error;
@@ -2348,8 +2351,8 @@ void tcp_main_loop()
        /* add all the unix sokets used for communication with the tcp childs */
        for (r=0; r<tcp_children_no; r++){
                if (tcp_children[r].unix_sock>0)/*we can't have 0, we never close it!*/
-                       if (io_watch_add(&io_h, tcp_children[r].unix_sock, F_TCPCHILD,
-                                                       &tcp_children[r]) <0){
+                       if (io_watch_add(&io_h, tcp_children[r].unix_sock, POLLIN,
+                                                                       F_TCPCHILD, &tcp_children[r]) <0){
                                LOG(L_CRIT, "ERROR: tcp_main_loop: init: failed to add "
                                                "tcp child %d unix socket to the fd list\n", r);
                                goto error;
index 103709e..6653414 100644 (file)
@@ -56,6 +56,7 @@ void init_tcp_options()
        if (tcp_options.option){\
                WARN("tcp_options: tcp_" ##option \
                                "cannot be enabled (recompile needed)\n"); \
+               tcp_options.option=0; \
        }
 
 
@@ -64,6 +65,7 @@ void init_tcp_options()
        if (tcp_options.option){\
                WARN("tcp_options: tcp_" ##option \
                                "cannot be enabled (no OS support)\n"); \
+               tcp_options.option=0; \
        }
 
 
index 04cc483..7374afb 100644 (file)
@@ -718,7 +718,7 @@ static ticks_t tcpconn_read_timeout(ticks_t t, struct timer_ln* tl, void* data)
  *         >0 on successfull read from the fd (when there might be more io
  *            queued -- the receive buffer might still be non-empty)
  */
-inline static int handle_io(struct fd_map* fm, int idx)
+inline static int handle_io(struct fd_map* fm, short events, int idx)
 {      
        int ret;
        int n;
@@ -778,7 +778,7 @@ again:
                        timer_reinit(&con->timer);
                        local_timer_add(&tcp_reader_ltimer, &con->timer,
                                                                S_TO_TICKS(TCP_CHILD_TIMEOUT), t);
-                       if (unlikely(io_watch_add(&io_w, s, F_TCPCONN, con))<0){
+                       if (unlikely(io_watch_add(&io_w, s, POLLIN, F_TCPCONN, con))<0){
                                LOG(L_CRIT, "ERROR: tcp_receive: handle_io: failed to add"
                                                " new socket to the fd list\n");
                                tcpconn_listrm(tcp_conn_lst, con, c_next, c_prev);
@@ -845,7 +845,7 @@ void tcp_receive_loop(int unix_sock)
        if (init_local_timer(&tcp_reader_ltimer, get_ticks_raw())!=0)
                goto error;
        /* add the unix socket */
-       if (io_watch_add(&io_w, tcpmain_sock, F_TCPMAIN, 0)<0){
+       if (io_watch_add(&io_w, tcpmain_sock, POLLIN,  F_TCPMAIN, 0)<0){
                LOG(L_CRIT, "ERROR: tcp_receive_loop: init: failed to add socket "
                                                        " to the fd list\n");
                goto error;