parser/sdp: more suggestive debug message
[sip-router] / io_wait.h
index 0487e59..b7ae925 100644 (file)
--- a/io_wait.h
+++ b/io_wait.h
@@ -1,6 +1,6 @@
-/* 
+/*
  * $Id$
- * 
+ *
  * Copyright (C) 2005 iptelorg GmbH
  *
  * Permission to use, copy, modify, and distribute this software for any
@@ -31,9 +31,9 @@
  *                 this assumption)
  *     local_malloc (defaults to pkg_malloc)
  *     local_free   (defaults to pkg_free)
- *  
+ *
  */
-/* 
+/*
  * History:
  * --------
  *  2005-06-13  created by andrei
@@ -43,6 +43,9 @@
  *  2007-11-22  when handle_io() is called in a loop check & stop if the fd was
  *               removed inside handle_io() (andrei)
  *  2007-11-29  support for write (POLLOUT); added io_watch_chg() (andrei)
+ *  2008-02-04  POLLRDHUP & EPOLLRDHUP support (automatically enabled if POLLIN
+ *               is set) (andrei)
+ *  2010-06-17  re-enabled & enhanced the EV_ERROR for kqueue (andrei)
  */
 
 
 #include <sys/socket.h> /* recv */
 #include <signal.h> /* sigprocmask, sigwait a.s.o */
 #endif
+
+#define _GNU_SOURCE  /* for POLLRDHUP on linux */
+#include <sys/poll.h>
+#include <fcntl.h>
+
 #ifdef HAVE_EPOLL
 #include <sys/epoll.h>
 #endif
 #endif
 #ifdef HAVE_SELECT
 /* needed on openbsd for select*/
-#include <sys/time.h> 
-#include <sys/types.h> 
+#include <sys/time.h>
+#include <sys/types.h>
 #include <unistd.h>
 /* needed according to POSIX for select*/
 #include <sys/select.h>
 #endif
-#include <sys/poll.h>
-#include <fcntl.h>
 
 #include "dprint.h"
 
 #include "pt.h" /* mypid() */
 #endif
 
+#include "compiler_opt.h"
+
+
+#ifdef HAVE_EPOLL
+/* fix defines for EPOLL */
+#if defined POLLRDHUP && ! defined EPOLLRDHUP
+#define EPOLLRDHUP POLLRDHUP  /* should work on all linuxes */
+#endif /* POLLRDHUP && EPOLLRDHUP */
+#endif /* HAVE_EPOLL */
+
 
 extern int _os_ver; /* os version number, needed to select bugs workarrounds */
 
 
 #if 0
 enum fd_types; /* this should be defined from the including file,
-                                 see tcp_main.c for an example, 
+                                 see tcp_main.c for an example,
                                  0 has a special meaning: not used/empty*/
 #endif
 
@@ -128,20 +144,36 @@ struct fd_map{
 
 /* handler structure */
 struct io_wait_handler{
+       enum poll_types poll_method;
+       int flags;
+       struct fd_map* fd_hash;
+       int fd_no; /*  current index used in fd_array and the passed size for
+                                  ep_array (for kq_array at least
+                                   max(twice the size, kq_changes_size) should be
+                                  be passed). */
+       int max_fd_no; /* maximum fd no, is also the size of fd_array,
+                                                      fd_hash  and ep_array*/
+       /* common stuff for POLL, SIGIO_RT and SELECT
+        * since poll support is always compiled => this will always be compiled */
+       struct pollfd* fd_array; /* used also by devpoll as devpoll array */
+       int crt_fd_array_idx; /*  crt idx for which handle_io is called
+                                                        (updated also by del -> internal optimization) */
+       /* end of common stuff */
 #ifdef HAVE_EPOLL
-       struct epoll_event* ep_array;
        int epfd; /* epoll ctrl fd */
+       struct epoll_event* ep_array;
 #endif
 #ifdef HAVE_SIGIO_RT
        sigset_t sset; /* signal mask for sigio & sigrtmin */
        int signo;     /* real time signal used */
 #endif
 #ifdef HAVE_KQUEUE
+       int kq_fd;
        struct kevent* kq_array;   /* used for the eventlist*/
        struct kevent* kq_changes; /* used for the changelist */
        size_t kq_nchanges;
+       size_t kq_array_size;   /* array size */
        size_t kq_changes_size; /* size of the changes array */
-       int kq_fd;
 #endif
 #ifdef HAVE_DEVPOLL
        int dpoll_fd;
@@ -151,15 +183,6 @@ struct io_wait_handler{
        fd_set master_wset; /* write set */
        int max_fd_select; /* maximum select used fd */
 #endif
-       /* common stuff for POLL, SIGIO_RT and SELECT
-        * since poll support is always compiled => this will always be compiled */
-       struct fd_map* fd_hash;
-       struct pollfd* fd_array;
-       int fd_no; /*  current index used in fd_array */
-       int max_fd_no; /* maximum fd no, is also the size of fd_array,
-                                                      fd_hash  and ep_array*/
-       enum poll_types poll_method;
-       int flags;
 };
 
 typedef struct io_wait_handler io_wait_h;
@@ -199,7 +222,7 @@ static inline struct fd_map* hash_fd_map(   io_wait_h* h,
  *          events - combinations of POLLIN, POLLOUT, POLLERR & POLLHUP
  *          idx    - index in the fd_array (or -1 if not known)
  * return: -1 on error
- *          0 on EAGAIN or when by some other way it is known that no more 
+ *          0 on EAGAIN or when by some other way it is known that no more
  *            io events are queued on the fd (the receive buffer is empty).
  *            Usefull to detect when there are no more io events queued for
  *            sigio_rt, epoll_et, kqueue.
@@ -223,10 +246,11 @@ int handle_io(struct fd_map* fm, short events, int idx);
  *       and EVFILT_WRITE, EV_ADD for the same fd).
  * returns: -1 on error, 0 on success
  */
-static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag, 
+static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag,
                                                                void* data)
 {
        int n;
+       int r;
        struct timespec tspec;
 
        if (h->kq_nchanges>=h->kq_changes_size){
@@ -237,11 +261,35 @@ static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag,
                tspec.tv_nsec=0;
 again:
                n=kevent(h->kq_fd, h->kq_changes, h->kq_nchanges, 0, 0, &tspec);
-               if (n==-1){
-                       if (errno==EINTR) goto again;
-                       LOG(L_ERR, "ERROR: io_watch_add: kevent flush changes "
-                                               " failed: %s [%d]\n", strerror(errno), errno);
-                       return -1;
+               if (unlikely(n == -1)){
+                       if (unlikely(errno == EINTR)) goto again;
+                       else {
+                               /* for a detailed explanation of what follows see
+                                  io_wait_loop_kqueue EV_ERROR case */
+                               if (unlikely(!(errno == EBADF || errno == ENOENT)))
+                                       BUG("kq_ev_change: kevent flush changes failed"
+                                                       " (unexpected error): %s [%d]\n",
+                                                       strerror(errno), errno);
+                                       /* ignore error even if it's not a EBADF/ENOENT */
+                               /* one of the file descriptors is bad, probably already
+                                  closed => try to apply changes one-by-one */
+                               for (r = 0; r < h->kq_nchanges; r++) {
+retry2:
+                                       n = kevent(h->kq_fd, &h->kq_changes[r], 1, 0, 0, &tspec);
+                                       if (n==-1) {
+                                               if (unlikely(errno == EINTR))
+                                                       goto retry2;
+                                       /* for a detailed explanation of what follows see
+                                               io_wait_loop_kqueue EV_ERROR case */
+                                               if (unlikely(!(errno == EBADF || errno == ENOENT)))
+                                                       BUG("kq_ev_change: kevent flush changes failed:"
+                                                                       " (unexpected error) %s [%d] (%d/%lu)\n",
+                                                                               strerror(errno), errno,
+                                                                               r, (unsigned long)h->kq_nchanges);
+                                               continue; /* skip over it */
+                                       }
+                               }
+                       }
                }
                h->kq_nchanges=0; /* changes array is empty */
        }
@@ -312,6 +360,8 @@ inline static int io_watch_add(     io_wait_h* h,
 #endif
 #if defined(HAVE_SIGIO_RT) || defined (HAVE_EPOLL)
        int n;
+#endif
+#if defined(HAVE_SIGIO_RT)
        int idx;
        int check_io;
        struct pollfd pf;
@@ -342,8 +392,9 @@ inline static int io_watch_add(     io_wait_h* h,
        e=get_fd_map(h, fd);
        if (unlikely(e && (e->type!=0 /*F_NONE*/))){
                LOG(L_ERR, "ERROR: io_watch_add: trying to overwrite entry %d"
-                               " in the hash(%d, %d, %p) with (%d, %d, %p)\n",
-                               fd, e->fd, e->type, e->data, fd, type, data);
+                               " watched for %x in the hash(%d, %d, %p) with (%d, %d, %p)\n",
+                               fd, events, e->fd, e->type, e->data, fd, type, data);
+               e=0;
                goto error;
        }
        
@@ -353,6 +404,10 @@ inline static int io_watch_add(    io_wait_h* h,
        }
        switch(h->poll_method){ /* faster then pointer to functions */
                case POLL_POLL:
+#ifdef POLLRDHUP
+                       /* listen to POLLRDHUP by default (if POLLIN) */
+                       events|=((int)!(events & POLLIN) - 1) & POLLRDHUP;
+#endif /* POLLRDHUP */
                        fd_array_setup(events);
                        set_fd_flags(O_NONBLOCK);
                        break;
@@ -369,7 +424,7 @@ inline static int io_watch_add(     io_wait_h* h,
 #ifdef HAVE_SIGIO_RT
                case POLL_SIGIO_RT:
                        fd_array_setup(events);
-                       /* re-set O_ASYNC might be needed, if not done from 
+                       /* re-set O_ASYNC might be needed, if not done from
                         * io_watch_del (or if somebody wants to add a fd which has
                         * already O_ASYNC/F_SETSIG set on a duplicate)
                         */
@@ -402,8 +457,14 @@ inline static int io_watch_add(    io_wait_h* h,
 #endif
 #ifdef HAVE_EPOLL
                case POLL_EPOLL_LT:
-                       ep_event.events=(EPOLLIN & ((int)!(events & POLLIN)-1) ) |
-                                                        (EPOLLOUT & ((int)!(events & POLLOUT)-1) );
+                       ep_event.events=
+#ifdef POLLRDHUP
+                                               /* listen for EPOLLRDHUP too */
+                                               ((EPOLLIN|EPOLLRDHUP) & ((int)!(events & POLLIN)-1) ) |
+#else /* POLLRDHUP */
+                                               (EPOLLIN & ((int)!(events & POLLIN)-1) ) |
+#endif /* POLLRDHUP */
+                                               (EPOLLOUT & ((int)!(events & POLLOUT)-1) );
                        ep_event.data.ptr=e;
 again1:
                        n=epoll_ctl(h->epfd, EPOLL_CTL_ADD, fd, &ep_event);
@@ -416,9 +477,15 @@ again1:
                        break;
                case POLL_EPOLL_ET:
                        set_fd_flags(O_NONBLOCK);
-                       ep_event.events=(EPOLLIN & ((int)!(events & POLLIN)-1) )  |
-                                                        (EPOLLOUT & ((int)!(events & POLLOUT)-1) ) |
-                                                         EPOLLET;
+                       ep_event.events=
+#ifdef POLLRDHUP
+                                               /* listen for EPOLLRDHUP too */
+                                               ((EPOLLIN|EPOLLRDHUP) & ((int)!(events & POLLIN)-1) ) |
+#else /* POLLRDHUP */
+                                               (EPOLLIN & ((int)!(events & POLLIN)-1) ) |
+#endif /* POLLRDHUP */
+                                               (EPOLLOUT & ((int)!(events & POLLOUT)-1) ) |
+                                               EPOLLET;
                        ep_event.data.ptr=e;
 again2:
                        n=epoll_ctl(h->epfd, EPOLL_CTL_ADD, fd, &ep_event);
@@ -428,19 +495,22 @@ again2:
                                        strerror(errno), errno);
                                goto error;
                        }
-                       idx=-1;
-                       check_io=1;
                        break;
 #endif
 #ifdef HAVE_KQUEUE
                case POLL_KQUEUE:
-                       if (likely( events & POLLINT)){
+                       if (likely( events & POLLIN)){
                                if (unlikely(kq_ev_change(h, fd, EVFILT_READ, EV_ADD, e)==-1))
-                               goto error;
+                                       goto error;
                        }
                        if (unlikely( events & POLLOUT)){
                                if (unlikely(kq_ev_change(h, fd, EVFILT_WRITE, EV_ADD, e)==-1))
-                               goto error;
+                               {
+                                       if (likely(events & POLLIN)){
+                                               kq_ev_change(h, fd, EVFILT_READ, EV_DELETE, 0);
+                                       }
+                                       goto error;
+                               }
                        }
                        break;
 #endif
@@ -468,15 +538,16 @@ again_devpoll:
        
        h->fd_no++; /* "activate" changes, for epoll/kqueue/devpoll it
                                   has only informative value */
-#if defined(HAVE_SIGIO_RT) || defined (HAVE_EPOLL)
+#if defined(HAVE_SIGIO_RT)
        if (check_io){
                /* handle possible pre-existing events */
                pf.fd=fd;
                pf.events=events;
 check_io_again:
-               while(e->type && ((n=poll(&pf, 1, 0))>0) && 
+               n=0;
+               while(e->type && ((n=poll(&pf, 1, 0))>0) &&
                                (handle_io(e, pf.revents, idx)>0) &&
-                               (pf.revents & e->events));
+                               (pf.revents & (e->events|POLLERR|POLLHUP)));
                if (unlikely(e->type && (n==-1))){
                        if (errno==EINTR) goto check_io_again;
                        LOG(L_ERR, "ERROR: io_watch_add: check_io poll: %s [%d]\n",
@@ -489,20 +560,20 @@ error:
        if (e) unhash_fd_map(e);
        return -1;
 #undef fd_array_setup
-#undef set_fd_flags 
+#undef set_fd_flags
 }
 
 
 
 #define IO_FD_CLOSING 16
-/* parameters:    h - handler 
+/* parameters:    h - handler
  *               fd - file descriptor
  *            index - index in the fd_array if known, -1 if not
  *                    (if index==-1 fd_array will be searched for the
- *                     corresponding fd* entry -- slower but unavoidable in 
+ *                     corresponding fd* entry -- slower but unavoidable in
  *                     some cases). index is not used (no fd_array) for epoll,
  *                     /dev/poll and kqueue
- *            flags - optimization flags, e.g. IO_FD_CLOSING, the fd was 
+ *            flags - optimization flags, e.g. IO_FD_CLOSING, the fd was
  *                    or will shortly be closed, in some cases we can avoid
  *                    extra remove operations (e.g.: epoll, kqueue, sigio)
  * returns 0 if ok, -1 on error */
@@ -519,6 +590,8 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
                        if (likely(idx<h->fd_no)){ \
                                memmove(&h->fd_array[idx], &h->fd_array[idx+1], \
                                        (h->fd_no-(idx+1))*sizeof(*(h->fd_array))); \
+                               if ((idx<=h->crt_fd_array_idx) && (h->crt_fd_array_idx>=0)) \
+                                       h->crt_fd_array_idx--; \
                        } \
        }while(0)
        
@@ -551,12 +624,11 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
        }
        if (unlikely(e->type==0 /*F_NONE*/)){
                LOG(L_ERR, "ERROR: io_watch_del: trying to delete already erased"
-                               " entry %d in the hash(%d, %d, %p) )\n",
-                               fd, e->fd, e->type, e->data);
+                               " entry %d in the hash(%d, %d, %p) flags %x)\n",
+                               fd, e->fd, e->type, e->data, flags);
                goto error;
        }
        events=e->events;
-       unhash_fd_map(e);
        
        switch(h->poll_method){
                case POLL_POLL:
@@ -570,13 +642,12 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
                                FD_CLR(fd, &h->master_wset);
                        if (unlikely(h->max_fd_select && (h->max_fd_select==fd)))
                                /* we don't know the prev. max, so we just decrement it */
-                               h->max_fd_select--; 
+                               h->max_fd_select--;
                        fix_fd_array;
                        break;
 #endif
 #ifdef HAVE_SIGIO_RT
                case POLL_SIGIO_RT:
-                       fix_fd_array;
                        /* the O_ASYNC flag must be reset all the time, the fd
                         *  can be changed only if  O_ASYNC is reset (if not and
                         *  the fd is a duplicate, you will get signals from the dup. fd
@@ -585,17 +656,18 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
                         */
                        /*if (!(flags & IO_FD_CLOSING)){*/
                                /* reset ASYNC */
-                               fd_flags=fcntl(fd, F_GETFL); 
-                               if (unlikely(fd_flags==-1)){ 
-                                       LOG(L_ERR, "ERROR: io_watch_del: fnctl: GETFL failed:" 
-                                                       " %s [%d]\n", strerror(errno), errno); 
-                                       goto error; 
-                               } 
-                               if (unlikely(fcntl(fd, F_SETFL, fd_flags&(~O_ASYNC))==-1)){ 
-                                       LOG(L_ERR, "ERROR: io_watch_del: fnctl: SETFL" 
-                                                               " failed: %s [%d]\n", strerror(errno), errno); 
-                                       goto error; 
-                               } 
+                               fd_flags=fcntl(fd, F_GETFL);
+                               if (unlikely(fd_flags==-1)){
+                                       LOG(L_ERR, "ERROR: io_watch_del: fnctl: GETFL failed:"
+                                                       " %s [%d]\n", strerror(errno), errno);
+                                       goto error;
+                               }
+                               if (unlikely(fcntl(fd, F_SETFL, fd_flags&(~O_ASYNC))==-1)){
+                                       LOG(L_ERR, "ERROR: io_watch_del: fnctl: SETFL"
+                                                               " failed: %s [%d]\n", strerror(errno), errno);
+                                       goto error;
+                               }
+                       fix_fd_array; /* only on success */
                        break;
 #endif
 #ifdef HAVE_EPOLL
@@ -604,7 +676,7 @@ inline static int io_watch_del(io_wait_h* h, int fd, int idx, int flags)
                        /* epoll doesn't seem to automatically remove sockets,
                         * if the socket is a duplicate/moved and the original
                         * is still open. The fd is removed from the epoll set
-                        * only when the original (and all the  copies?) is/are 
+                        * only when the original (and all the  copies?) is/are
                         * closed. This is probably a bug in epoll. --andrei */
 #ifdef EPOLL_NO_CLOSE_BUG
                        if (!(flags & IO_FD_CLOSING)){
@@ -654,7 +726,7 @@ again_devpoll:
                                if (write(h->dpoll_fd, &pfd, sizeof(pfd))==-1){
                                        if (errno==EINTR) goto again_devpoll;
                                        LOG(L_ERR, "ERROR: io_watch_del: removing fd from "
-                                                               "/dev/poll failed: %s [%d]\n", 
+                                                               "/dev/poll failed: %s [%d]\n",
                                                                strerror(errno), errno);
                                        goto error;
                                }
@@ -662,10 +734,11 @@ again_devpoll:
 #endif
                default:
                        LOG(L_CRIT, "BUG: io_watch_del: no support for poll method "
-                                       " %s (%d)\n", poll_method_str[h->poll_method], 
+                                       " %s (%d)\n", poll_method_str[h->poll_method],
                                        h->poll_method);
                        goto error;
        }
+       unhash_fd_map(e); /* only on success */
        h->fd_no--;
        return 0;
 error:
@@ -675,12 +748,12 @@ error:
 
 
 
-/* parameters:    h - handler 
+/* parameters:    h - handler
  *               fd - file descriptor
  *           events - new events to watch for
  *              idx - index in the fd_array if known, -1 if not
  *                    (if index==-1 fd_array will be searched for the
- *                     corresponding fd* entry -- slower but unavoidable in 
+ *                     corresponding fd* entry -- slower but unavoidable in
  *                     some cases). index is not used (no fd_array) for epoll,
  *                     /dev/poll and kqueue
  * returns 0 if ok, -1 on error */
@@ -702,13 +775,13 @@ inline static int io_watch_chg(io_wait_h* h, int fd, short events, int idx )
        struct fd_map* e;
        int add_events;
        int del_events;
+#ifdef HAVE_DEVPOLL
+       struct pollfd pfd;
+#endif
 #ifdef HAVE_EPOLL
        int n;
        struct epoll_event ep_event;
 #endif
-#ifdef HAVE_DEVPOLL
-       struct pollfd pfd;
-#endif
        
        if (unlikely((fd<0) || (fd>=h->max_fd_no))){
                LOG(L_CRIT, "BUG: io_watch_chg: invalid fd %d, not in [0, %d) \n",
@@ -737,10 +810,16 @@ inline static int io_watch_chg(io_wait_h* h, int fd, short events, int idx )
        
        add_events=events & ~e->events;
        del_events=e->events & ~events;
-       e->events=events;
        switch(h->poll_method){
                case POLL_POLL:
+#ifdef POLLRDHUP
+                       fd_array_chg(events |
+                                                       /* listen to POLLRDHUP by default (if POLLIN) */
+                                                       (((int)!(events & POLLIN) - 1) & POLLRDHUP)
+                                               );
+#else /* POLLRDHUP */
                        fd_array_chg(events);
+#endif /* POLLRDHUP */
                        break;
 #ifdef HAVE_SELECT
                case POLL_SELECT:
@@ -758,12 +837,20 @@ inline static int io_watch_chg(io_wait_h* h, int fd, short events, int idx )
 #ifdef HAVE_SIGIO_RT
                case POLL_SIGIO_RT:
                        fd_array_chg(events);
+                       /* no need for check_io, since SIGIO_RT listens by default for all
+                        * the events */
                        break;
 #endif
 #ifdef HAVE_EPOLL
                case POLL_EPOLL_LT:
-                               ep_event.events=(EPOLLIN & ((int)!(events & POLLIN)-1) ) |
-                                                                (EPOLLOUT & ((int)!(events & POLLOUT)-1) );
+                               ep_event.events=
+#ifdef POLLRDHUP
+                                               /* listen for EPOLLRDHUP too */
+                                               ((EPOLLIN|EPOLLRDHUP) & ((int)!(events & POLLIN)-1) ) |
+#else /* POLLRDHUP */
+                                               (EPOLLIN & ((int)!(events & POLLIN)-1) ) |
+#endif /* POLLRDHUP */
+                                               (EPOLLOUT & ((int)!(events & POLLOUT)-1) );
                                ep_event.data.ptr=e;
 again_epoll_lt:
                                n=epoll_ctl(h->epfd, EPOLL_CTL_MOD, fd, &ep_event);
@@ -775,9 +862,15 @@ again_epoll_lt:
                                }
                        break;
                case POLL_EPOLL_ET:
-                               ep_event.events=(EPOLLIN & ((int)!(events & POLLIN)-1) ) |
-                                                                (EPOLLOUT & ((int)!(events & POLLOUT)-1) ) |
-                                                                EPOLLET;
+                               ep_event.events=
+#ifdef POLLRDHUP
+                                               /* listen for EPOLLRDHUP too */
+                                               ((EPOLLIN|EPOLLRDHUP) & ((int)!(events & POLLIN)-1) ) |
+#else /* POLLRDHUP */
+                                               (EPOLLIN & ((int)!(events & POLLIN)-1) ) |
+#endif /* POLLRDHUP */
+                                               (EPOLLOUT & ((int)!(events & POLLOUT)-1) ) |
+                                               EPOLLET;
                                ep_event.data.ptr=e;
 again_epoll_et:
                                n=epoll_ctl(h->epfd, EPOLL_CTL_MOD, fd, &ep_event);
@@ -820,7 +913,7 @@ again_devpoll1:
                                if (unlikely(write(h->dpoll_fd, &pfd, sizeof(pfd))==-1)){
                                        if (errno==EINTR) goto again_devpoll1;
                                        LOG(L_ERR, "ERROR: io_watch_chg: removing fd from "
-                                                               "/dev/poll failed: %s [%d]\n", 
+                                                               "/dev/poll failed: %s [%d]\n",
                                                                strerror(errno), errno);
                                        goto error;
                                }
@@ -830,25 +923,30 @@ again_devpoll2:
                                if (unlikely(write(h->dpoll_fd, &pfd, sizeof(pfd))==-1)){
                                        if (errno==EINTR) goto again_devpoll2;
                                        LOG(L_ERR, "ERROR: io_watch_chg: re-adding fd to "
-                                                               "/dev/poll failed: %s [%d]\n", 
+                                                               "/dev/poll failed: %s [%d]\n",
                                                                strerror(errno), errno);
+                                       /* error re-adding the fd => mark it as removed/unhash */
+                                       unhash_fd_map(e);
                                        goto error;
                                }
                                break;
 #endif
                default:
                        LOG(L_CRIT, "BUG: io_watch_chg: no support for poll method "
-                                       " %s (%d)\n", poll_method_str[h->poll_method], 
+                                       " %s (%d)\n", poll_method_str[h->poll_method],
                                        h->poll_method);
                        goto error;
        }
+       e->events=events; /* only on success */
        return 0;
 error:
        return -1;
 #undef fix_fd_array
 }
 
-/* io_wait_loop_x style function 
+
+
+/* io_wait_loop_x style function.
  * wait for io using poll()
  * params: h      - io_wait handle
  *         t      - timeout in s
@@ -860,6 +958,7 @@ inline static int io_wait_loop_poll(io_wait_h* h, int t, int repeat)
        int n, r;
        int ret;
        struct fd_map* fm;
+       
 again:
                ret=n=poll(h->fd_array, h->fd_no, t*1000);
                if (n==-1){
@@ -884,13 +983,16 @@ again:
                                        h->fd_array[r].events=0; /* clear the events */
                                        continue;
                                }
+                               h->crt_fd_array_idx=r;
                                /* repeat handle_io if repeat, fd still watched (not deleted
                                 *  inside handle_io), handle_io returns that there's still
                                 *  IO and the fd is still watched for the triggering event */
-                               while(fm->type && 
+                               while(fm->type &&
                                                (handle_io(fm, h->fd_array[r].revents, r) > 0) &&
-                                               repeat &&
-                                               (fm->events & h->fd_array[r].revents) );
+                                               repeat && ((fm->events|POLLERR|POLLHUP) &
+                                                                                                       h->fd_array[r].revents));
+                               r=h->crt_fd_array_idx; /* can change due to io_watch_del(fd)
+                                                                                 array shifting */
                        }
                }
 error:
@@ -925,16 +1027,19 @@ again:
                        /* continue */
                }
                /* use poll fd array */
-               for(r=0; (r<h->max_fd_no) && n; r++){
+               for(r=0; (r<h->fd_no) && n; r++){
                        revents=0;
                        if (likely(FD_ISSET(h->fd_array[r].fd, &sel_rset)))
                                revents|=POLLIN;
                        if (unlikely(FD_ISSET(h->fd_array[r].fd, &sel_wset)))
                                revents|=POLLOUT;
-                       if (likely(revents)){
+                       if (unlikely(revents)){
+                               h->crt_fd_array_idx=r;
                                fm=get_fd_map(h, h->fd_array[r].fd);
-                               while(fm->type && (fm->events & revents) && 
+                               while(fm->type && (fm->events & revents) &&
                                                (handle_io(fm, revents, r)>0) && repeat);
+                               r=h->crt_fd_array_idx; /* can change due to io_watch_del(fd)
+                                                                                 array shifting */
                                n--;
                        }
                };
@@ -957,7 +1062,7 @@ again:
                        if (errno==EINTR) goto again; /* signal, ignore it */
                        else{
                                LOG(L_ERR, "ERROR:io_wait_loop_epoll: "
-                                               "epoll_wait(%d, %p, %d, %d): %s [%d]\n", 
+                                               "epoll_wait(%d, %p, %d, %d): %s [%d]\n",
                                                h->epfd, h->ep_array, h->fd_no, t*1000,
                                                strerror(errno), errno);
                                goto error;
@@ -972,13 +1077,18 @@ again:
                }
 #endif
                for (r=0; r<n; r++){
-                       revents= (POLLIN & (!(h->ep_array[r].events & EPOLLIN)-1)) |
+                       revents= (POLLIN & (!(h->ep_array[r].events & (EPOLLIN|EPOLLPRI))
+                                               -1)) |
                                         (POLLOUT & (!(h->ep_array[r].events & EPOLLOUT)-1)) |
                                         (POLLERR & (!(h->ep_array[r].events & EPOLLERR)-1)) |
-                                        (POLLHUP & (!(h->ep_array[r].events & EPOLLHUP)-1));
+                                        (POLLHUP & (!(h->ep_array[r].events & EPOLLHUP)-1))
+#ifdef POLLRDHUP
+                                       | (POLLRDHUP & (!(h->ep_array[r].events & EPOLLRDHUP)-1))
+#endif
+                                       ;
                        if (likely(revents)){
                                fm=(struct fd_map*)h->ep_array[r].data.ptr;
-                               while(fm->type && (fm->events & revents) && 
+                               while(fm->type && ((fm->events|POLLERR|POLLHUP) & revents) &&
                                                (handle_io(fm, revents, -1)>0) && repeat);
                        }else{
                                LOG(L_ERR, "ERROR:io_wait_loop_epoll: unexpected event %x"
@@ -999,54 +1109,124 @@ inline static int io_wait_loop_kqueue(io_wait_h* h, int t, int repeat)
        int n, r;
        struct timespec tspec;
        struct fd_map* fm;
+       int orig_changes;
+       int apply_changes;
+       int revents;
        
        tspec.tv_sec=t;
        tspec.tv_nsec=0;
+       orig_changes=h->kq_nchanges;
+       apply_changes=orig_changes;
+       do {
 again:
-               n=kevent(h->kq_fd, h->kq_changes, h->kq_nchanges,  h->kq_array,
-                                       h->fd_no, &tspec);
+               n=kevent(h->kq_fd, h->kq_changes, apply_changes,  h->kq_array,
+                                       h->kq_array_size, &tspec);
                if (unlikely(n==-1)){
-                       if (errno==EINTR) goto again; /* signal, ignore it */
-                       else{
-                               LOG(L_ERR, "ERROR: io_wait_loop_kqueue: kevent:"
+                       if (unlikely(errno==EINTR)) goto again; /* signal, ignore it */
+                       else {
+                               /* for a detailed explanation of what follows see below
+                                  the EV_ERROR case */
+                               if (unlikely(!(errno==EBADF || errno==ENOENT)))
+                                       BUG("io_wait_loop_kqueue: kevent: unexpected error"
                                                " %s [%d]\n", strerror(errno), errno);
-                               goto error;
+                               /* some of the FDs in kq_changes are bad (already closed)
+                                  and there is not enough space in kq_array to return all
+                                  of them back */
+                               apply_changes = h->kq_array_size;
+                               goto again;
                        }
                }
-               h->kq_nchanges=0; /* reset changes array */
+               /* remove applied changes */
+               h->kq_nchanges -= apply_changes;
+               if (unlikely(apply_changes < orig_changes)) {
+                       orig_changes -= apply_changes;
+                       memmove(&h->kq_changes[0], &h->kq_changes[apply_changes],
+                                                                       sizeof(h->kq_changes[0])*h->kq_nchanges);
+                       apply_changes = (orig_changes < h->kq_array_size) ? orig_changes :
+                                                               h->kq_array_size;
+               } else {
+                       orig_changes = 0;
+                       apply_changes = 0;
+               }
                for (r=0; r<n; r++){
 #ifdef EXTRA_DEBUG
                        DBG("DBG: kqueue: event %d/%d: fd=%d, udata=%lx, flags=0x%x\n",
                                        r, n, h->kq_array[r].ident, (long)h->kq_array[r].udata,
                                        h->kq_array[r].flags);
 #endif
-#if 0
-                       if (unlikely(h->kq_array[r].flags & EV_ERROR)){
-                               /* error in changes: we ignore it, it can be caused by
-                                  trying to remove an already closed fd: race between
-                                  adding something to the changes array, close() and
-                                  applying the changes */
-                               LOG(L_INFO, "INFO: io_wait_loop_kqueue: kevent error on "
-                                                       "fd %d: %s [%ld]\n", h->kq_array[r].ident,
+                       if (unlikely((h->kq_array[r].flags & EV_ERROR) ||
+                                                        h->kq_array[r].udata == 0)){
+                               /* error in changes: we ignore it if it has to do with a
+                                  bad fd or update==0. It can be caused by trying to remove an
+                                  already closed fd: race between adding something to the
+                                  changes array, close() and applying the changes (EBADF).
+                                  E.g. for ser tcp: tcp_main sends a fd to child for reading
+                                   => deletes it from the watched fds => the changes array
+                                       will contain an EV_DELETE for it. Before the changes
+                                       are applied (they are at the end of the main io_wait loop,
+                                       after all the fd events were processed), a CON_ERR sent
+                                       to tcp_main by a sender (send fail) is processed and causes
+                                       the fd to be closed. When the changes are applied =>
+                                       error for the EV_DELETE attempt of a closed fd.
+                                       Something similar can happen when a fd is scheduled
+                                       for removal, is close()'ed before being removed and
+                                       re-opened(a new sock. get the same fd). When the
+                                       watched fd changes will be applied the fd will be valid
+                                       (so no EBADF), but it's not already watch => ENOENT.
+                                       We report a BUG for the other errors (there's nothing
+                                       constructive we can do if we get an error we don't know
+                                       how to handle), but apart from that we ignore it in the
+                                       idea that it is better apply the rest of the changes,
+                                       rather then dropping all of them.
+                               */
+                               /*
+                                       example EV_ERROR for trying to delete a read watched fd,
+                                       that was already closed:
+                                       {
+                                               ident = 63,  [fd]
+                                               filter = -1, [EVFILT_READ]
+                                               flags = 16384, [EV_ERROR]
+                                               fflags = 0,
+                                               data = 9, [errno = EBADF]
+                                               udata = 0x0
+                                       }
+                               */
+                               if (h->kq_array[r].data != EBADF &&
+                                               h->kq_array[r].data != ENOENT)
+                                       BUG("io_wait_loop_kqueue: kevent unexpected error on "
+                                                       "fd %ld udata %lx: %s [%ld]\n",
+                                                       (long)h->kq_array[r].ident,
+                                                       (long)h->kq_array[r].udata,
                                                        strerror(h->kq_array[r].data),
                                                        (long)h->kq_array[r].data);
-                       }else{ 
-#endif
+                       }else{
                                fm=(struct fd_map*)h->kq_array[r].udata;
                                if (likely(h->kq_array[r].filter==EVFILT_READ)){
-                                       revents=POLLIN | 
-                                               (((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP);
-                                       while(fm->type && (fm->events & revents) && 
+                                       revents=POLLIN |
+                                               (((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
+                                               (((int)!((h->kq_array[r].flags & EV_EOF) &&
+                                                                       h->kq_array[r].fflags != 0) - 1)&POLLERR);
+                                       while(fm->type && (fm->events & revents) &&
                                                        (handle_io(fm, revents, -1)>0) && repeat);
                                }else if (h->kq_array[r].filter==EVFILT_WRITE){
-                                       revents=POLLOUT | 
-                                               (((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP);
-                                       while(fm->type && (fm->events & revents) && 
+                                       revents=POLLOUT |
+                                               (((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
+                                               (((int)!((h->kq_array[r].flags & EV_EOF) &&
+                                                                       h->kq_array[r].fflags != 0) - 1)&POLLERR);
+                                       while(fm->type && (fm->events & revents) &&
                                                        (handle_io(fm, revents, -1)>0) && repeat);
+                               }else{
+                                       BUG("io_wait_loop_kqueue: unknown filter: kqueue: event "
+                                                       "%d/%d: fd=%d, filter=%d, flags=0x%x, fflags=0x%x,"
+                                                       " data=%lx, udata=%lx\n",
+                                       r, n, (int)h->kq_array[r].ident, (int)h->kq_array[r].filter,
+                                       h->kq_array[r].flags, h->kq_array[r].fflags,
+                                       (unsigned long)h->kq_array[r].data,
+                                       (unsigned long)h->kq_array[r].udata);
                                }
                        }
                }
-error:
+       } while(unlikely(orig_changes));
        return n;
 }
 #endif
@@ -1065,6 +1245,9 @@ inline static int io_wait_loop_sigio_rt(io_wait_h* h, int t)
        int sigio_fd;
        struct fd_map* fm;
        int revents;
+#ifdef SIGINFO64_WORKARROUND
+       int* pi;
+#endif
        
        
        ret=1; /* 1 event per call normally */
@@ -1076,7 +1259,6 @@ inline static int io_wait_loop_sigio_rt(io_wait_h* h, int t)
                                " is not properly set!\n");
                goto error;
        }
-
 again:
        n=sigtimedwait(&h->sset, &siginfo, &ts);
        if (unlikely(n==-1)){
@@ -1101,8 +1283,9 @@ again:
                 *  On newer kernels this is fixed (si_band is long in the kernel too).
                 * -- andrei */
                if  ((_os_ver<0x020605) && (sizeof(siginfo.si_band)>sizeof(int))){
-                       sigio_band=*((int*)(void*)&siginfo.si_band);
-                       sigio_fd=*(((int*)(void*)&siginfo.si_band)+1);
+                       pi=(int*)(void*)&siginfo.si_band; /* avoid type punning warnings */
+                       sigio_band=*pi;
+                       sigio_fd=*(pi+1);
                }else
 #endif
                {
@@ -1119,39 +1302,51 @@ again:
                        if (fm->type)
                                handle_io(fm, POLLIN|POLLOUT, -1);
                }else{
+                       /* si_code contains the SIGPOLL reason: POLL_IN, POLL_OUT,
+                        *  POLL_MSG, POLL_ERR, POLL_PRI or POLL_HUP
+                        * and si_band the translated poll event bitmap:
+                        *  POLLIN|POLLRDNORM  (=POLL_IN),
+                        *  POLLOUT|POLLWRNORM|POLLWRBAND (=POLL_OUT),
+                        *  POLLIN|POLLRDNORM|POLLMSG (=POLL_MSG),
+                        *  POLLERR (=POLL_ERR),
+                        *  POLLPRI|POLLRDBAND (=POLL_PRI),
+                        *  POLLHUP|POLLERR (=POLL_HUP)
+                        *  [linux 2.6.22 fs/fcntl.c:447]
+                        */
 #ifdef EXTRA_DEBUG
                        DBG("io_wait_loop_sigio_rt: siginfo: signal=%d (%d),"
                                        " si_code=%d, si_band=0x%x,"
                                        " si_fd=%d\n",
-                                       siginfo.si_signo, n, siginfo.si_code, 
+                                       siginfo.si_signo, n, siginfo.si_code,
                                        (unsigned)sigio_band,
                                        sigio_fd);
 #endif
                        /* on some errors (e.g. when receving TCP RST), sigio_band will
-                        * be set to 0x08 (undocumented, no corresp. POLL_xx), so better
-                        * catch all events --andrei */
-                       if (likely(sigio_band)/*&(POLL_IN|POLL_ERR|POLL_HUP)*/){
+                        * be set to 0x08 (POLLERR) or 0x18 (POLLERR|POLLHUP - on stream
+                        *  unix socket close) , so better catch all events --andrei */
+                       if (likely(sigio_band)){
                                fm=get_fd_map(h, sigio_fd);
-                               revents=(POLLIN & (!(sigio_band & POLL_IN)-1)) |
-                                               (POLLOUT & (!(sigio_band & POLL_OUT)-1)) |
-                                               (POLLERR & (!(sigio_band & POLL_ERR)-1)) |
-                                               (POLLHUP & (!(sigio_band & POLL_HUP)-1));
+                               revents=sigio_band;
+                               /* fix revents==POLLPRI case */
+                               revents |= (!(revents & POLLPRI)-1) & POLLIN;
                                /* we can have queued signals generated by fds not watched
-                                * any more, or by fds in transition, to a child 
+                                * any more, or by fds in transition, to a child
                                 * => ignore them */
-                               if (fm->type && (fm->events & revents))
+                               if (fm->type && ((fm->events|POLLERR|POLLHUP) & revents))
                                        handle_io(fm, revents, -1);
                                else
-                                       LOG(L_ERR, "WARNING: io_wait_loop_sigio_rt: ignoring event"
-                                                       " %x on fd %d (fm->fd=%d, fm->data=%p)\n",
-                                                       sigio_band, sigio_fd, fm->fd, fm->data);
+                                       DBG("WARNING: io_wait_loop_sigio_rt: ignoring event"
+                                                       " %x on fd %d, watching for %x, si_code=%x "
+                                                       "(fm->type=%d, fm->fd=%d, fm->data=%p)\n",
+                                                       sigio_band, sigio_fd, fm->events, siginfo.si_code,
+                                                       fm->type, fm->fd, fm->data);
                        }else{
                                LOG(L_ERR, "ERROR: io_wait_loop_sigio_rt: unexpected event"
                                                        " on fd %d: %x\n", sigio_fd, sigio_band);
                        }
                }
        }else{
-               /* signal queue overflow 
+               /* signal queue overflow
                 * TODO: increase signal queue size: 2.4x /proc/.., 2.6x -rlimits */
                LOG(L_WARN, "WARNING: io_wait_loop_sigio_rt: signal queue overflowed"
                                        "- falling back to poll\n");