{
        ssize_t transferred = 0;
 
+       /*
+        * AIO submission can race with bio completion to get here while
+        * expecting to have the last io completed by bio completion.
+        * In that case -EIOCBQUEUED is in fact not an error we want
+        * to preserve through this call.
+        */
+       if (ret == -EIOCBQUEUED)
+               ret = 0;
+
        if (dio->result) {
                transferred = dio->result;
 
        return ret;
 }
 
-/*
- * Called when a BIO has been processed.  If the count goes to zero then IO is
- * complete and we can signal this to the AIO layer.
- */
-static void dio_complete_aio(struct dio *dio)
-{
-       int ret;
-
-       ret = dio_complete(dio, dio->iocb->ki_pos, 0);
-
-       /* Complete AIO later if falling back to buffered i/o */
-       if (dio->result == dio->size ||
-               ((dio->rw == READ) && dio->result)) {
-               aio_complete(dio->iocb, ret, 0);
-               kfree(dio);
-       }
-}
-
 static int dio_bio_complete(struct dio *dio, struct bio *bio);
 /*
  * Asynchronous IO callback. 
        if (remaining == 1 && waiter_holds_ref)
                wake_up_process(dio->waiter);
 
-       if (remaining == 0)
-               dio_complete_aio(dio);
+       if (remaining == 0) {
+               int ret = dio_complete(dio, dio->iocb->ki_pos, 0);
+               aio_complete(dio->iocb, ret, 0);
+               kfree(dio);
+       }
 
        return 0;
 }
                mutex_unlock(&dio->inode->i_mutex);
 
        /*
-        * OK, all BIOs are submitted, so we can decrement bio_count to truly
-        * reflect the number of to-be-processed BIOs.
+        * The only time we want to leave bios in flight is when a successful
+        * partial aio read or full aio write have been setup.  In that case
+        * bio completion will call aio_complete.  The only time it's safe to
+        * call aio_complete is when we return -EIOCBQUEUED, so we key on that.
+        * This had *better* be the only place that raises -EIOCBQUEUED.
         */
-       if (dio->is_async) {
-               int should_wait = 0;
-
-               if (dio->result < dio->size && (rw & WRITE)) {
-                       dio->waiter = current;
-                       should_wait = 1;
-               }
-               if (ret == 0)
-                       ret = dio->result;
-
-               if (should_wait)
-                       dio_await_completion(dio);
-
-               /* this can free the dio */
-               if (atomic_dec_and_test(&dio->refcount))
-                       dio_complete_aio(dio);
+       BUG_ON(ret == -EIOCBQUEUED);
+       if (dio->is_async && ret == 0 && dio->result &&
+           ((rw & READ) || (dio->result == dio->size)))
+               ret = -EIOCBQUEUED;
 
-               if (should_wait)
-                       kfree(dio);
-       } else {
+       if (ret != -EIOCBQUEUED)
                dio_await_completion(dio);
 
+       /*
+        * Sync will always be dropping the final ref and completing the
+        * operation.  AIO can if it was a broken operation described above
+        * or in fact if all the bios race to complete before we get here.
+        * In that case dio_complete() translates the EIOCBQUEUED into
+        * the proper return code that the caller will hand to aio_complete().
+        */
+       if (atomic_dec_and_test(&dio->refcount)) {
                ret = dio_complete(dio, offset, ret);
+               kfree(dio);
+       } else
+               BUG_ON(ret != -EIOCBQUEUED);
 
-               /* We could have also come here on an AIO file extend */
-               if (!is_sync_kiocb(iocb) && (rw & WRITE) &&
-                   ret >= 0 && dio->result == dio->size)
-                       /*
-                        * For AIO writes where we have completed the
-                        * i/o, we have to mark the the aio complete.
-                        */
-                       aio_complete(iocb, ret, 0);
-
-               if (atomic_dec_and_test(&dio->refcount))
-                       kfree(dio);
-               else
-                       BUG();
-       }
        return ret;
 }
 
 
                if (pos < size) {
                        retval = generic_file_direct_IO(READ, iocb,
                                                iov, pos, nr_segs);
-                       if (retval > 0 && !is_sync_kiocb(iocb))
-                               retval = -EIOCBQUEUED;
                        if (retval > 0)
                                *ppos = pos + retval;
                }
         * Sync the fs metadata but not the minor inode changes and
         * of course not the data as we did direct DMA for the IO.
         * i_mutex is held, which protects generic_osync_inode() from
-        * livelocking.
+        * livelocking.  AIO O_DIRECT ops attempt to sync metadata here.
         */
-       if (written >= 0 && ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
+       if ((written >= 0 || written == -EIOCBQUEUED) &&
+           ((file->f_flags & O_SYNC) || IS_SYNC(inode))) {
                int err = generic_osync_inode(inode, mapping, OSYNC_METADATA);
                if (err < 0)
                        written = err;
        }
-       if (written == count && !is_sync_kiocb(iocb))
-               written = -EIOCBQUEUED;
        return written;
 }
 EXPORT_SYMBOL(generic_file_direct_write);