io_wait: fix kqueue and too many errors in changelist
authorAndrei Pelinescu-Onciul <andrei@iptel.org>
Thu, 17 Jun 2010 16:43:14 +0000 (18:43 +0200)
committerAndrei Pelinescu-Onciul <andrei@iptel.org>
Thu, 17 Jun 2010 16:49:03 +0000 (18:49 +0200)
kevent() tries to return errors in the changelist back in the
supplied eventlist array. However if this is not large enough, the
whole kevent() syscall will fail.
Now if kevent() fails with EBADF the call will be retried with a
smaller set of changes, until the entire original changelist is
applied.
Fixes also kq_ev_change() flush mode: on error it will try to
apply the changes one-by-one.

(this affects only systems that have kqueue: *bsd and darwin)

io_wait.h

index 44ef60c..93f1426 100644 (file)
--- a/io_wait.h
+++ b/io_wait.h
@@ -247,6 +247,7 @@ static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag,
                                                                void* data)
 {
        int n;
+       int r;
        struct timespec tspec;
 
        if (h->kq_nchanges>=h->kq_changes_size){
@@ -257,11 +258,36 @@ static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag,
                tspec.tv_nsec=0;
 again:
                n=kevent(h->kq_fd, h->kq_changes, h->kq_nchanges, 0, 0, &tspec);
-               if (n==-1){
-                       if (errno==EINTR) goto again;
-                       LOG(L_ERR, "ERROR: io_watch_add: kevent flush changes "
+               if (unlikely(n == -1)){
+                       if (likely(errno == EBADF)) {
+                               /* one of the file descriptors is bad, probably already
+                                  closed => try to apply changes one-by-one */
+                               for (r = 0; r < h->kq_nchanges; r++) {
+retry2:
+                                       n = kevent(h->kq_fd, &h->kq_changes[r], 1, 0, 0, &tspec);
+                                       if (n==-1) {
+                                               if (errno == EBADF)
+                                                       continue; /* skip over it */
+                                               if (errno == EINTR)
+                                                       goto retry2;
+                                               LOG(L_ERR, "ERROR: io_watch_add: kevent flush changes"
+                                                                       " failed: %s [%d]\n",
+                                                                               strerror(errno), errno);
+                                               /* shift the array */
+                                               memmove(&h->kq_changes[0], &h->kq_changes[r+1],
+                                                                       sizeof(h->kq_changes[0])*
+                                                                               (h->kq_nchanges-r-1));
+                                               h->kq_nchanges-=(r+1);
+                                               return -1;
+                                       }
+                               }
+                       } else if (errno == EINTR) goto again;
+                       else {
+                               LOG(L_ERR, "ERROR: io_watch_add: kevent flush changes"
                                                " failed: %s [%d]\n", strerror(errno), errno);
-                       return -1;
+                               h->kq_nchanges=0; /* reset changes array */
+                               return -1;
+                       }
                }
                h->kq_nchanges=0; /* changes array is empty */
        }
@@ -1076,22 +1102,43 @@ inline static int io_wait_loop_kqueue(io_wait_h* h, int t, int repeat)
        int n, r;
        struct timespec tspec;
        struct fd_map* fm;
+       int orig_changes;
+       int apply_changes;
        int revents;
        
        tspec.tv_sec=t;
        tspec.tv_nsec=0;
+       orig_changes=h->kq_nchanges;
+       apply_changes=orig_changes;
+       do {
 again:
-               n=kevent(h->kq_fd, h->kq_changes, h->kq_nchanges,  h->kq_array,
+               n=kevent(h->kq_fd, h->kq_changes, apply_changes,  h->kq_array,
                                        h->fd_no, &tspec);
                if (unlikely(n==-1)){
                        if (errno==EINTR) goto again; /* signal, ignore it */
-                       else{
+                       else if (errno==EBADF) {
+                               /* some of the FDs in kq_changes are bad (already closed)
+                                  and there is not enough space in kq_array to return all
+                                  of them back */
+                               apply_changes = h->fd_no;
+                               goto again;
+                       }else{
                                LOG(L_ERR, "ERROR: io_wait_loop_kqueue: kevent:"
                                                " %s [%d]\n", strerror(errno), errno);
                                goto error;
                        }
                }
-               h->kq_nchanges=0; /* reset changes array */
+               /* remove applied changes */
+               h->kq_nchanges -= apply_changes;
+               if (unlikely(apply_changes < orig_changes)) {
+                       orig_changes -= apply_changes;
+                       memmove(&h->kq_changes[0], &h->kq_changes[apply_changes],
+                                                                       sizeof(h->kq_changes[0])*h->kq_nchanges);
+                       apply_changes = orig_changes<h->fd_no ? orig_changes : h->fd_no;
+               } else {
+                       orig_changes = 0;
+                       apply_changes = 0;
+               }
                for (r=0; r<n; r++){
 #ifdef EXTRA_DEBUG
                        DBG("DBG: kqueue: event %d/%d: fd=%d, udata=%lx, flags=0x%x\n",
@@ -1148,6 +1195,7 @@ again:
                                }
                        }
                }
+       } while(unlikely(orig_changes));
 error:
        return n;
 }