rbd: lock should be quiesced on reacquire
authorIlya Dryomov <idryomov@gmail.com>
Thu, 30 May 2019 09:15:23 +0000 (11:15 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 8 Jul 2019 12:01:45 +0000 (14:01 +0200)
Quiesce exclusive lock at the top of rbd_reacquire_lock() instead
of only when ceph_cls_set_cookie() fails.  This avoids a deadlock on
rbd_dev->lock_rwsem.

If rbd_dev->lock_rwsem is needed for I/O completion, set_cookie can
hang ceph-msgr worker thread if set_cookie reply ends up behind an I/O
reply, because, like lock and unlock requests, set_cookie is sent and
waited upon with rbd_dev->lock_rwsem held for write.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Reviewed-by: Dongsheng Yang <dongsheng.yang@easystack.cn>
drivers/block/rbd.c

index 99a0ebb..4ae1cdf 100644 (file)
@@ -3004,6 +3004,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
 {
        struct rbd_client_id cid = rbd_get_cid(rbd_dev);
 
+       rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
        strcpy(rbd_dev->lock_cookie, cookie);
        rbd_set_owner_cid(rbd_dev, &cid);
        queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
@@ -3028,7 +3029,6 @@ static int rbd_lock(struct rbd_device *rbd_dev)
        if (ret)
                return ret;
 
-       rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
        __rbd_lock(rbd_dev, cookie);
        return 0;
 }
@@ -3411,13 +3411,11 @@ again:
        }
 }
 
-/*
- * lock_rwsem must be held for write
- */
-static bool rbd_release_lock(struct rbd_device *rbd_dev)
+static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
 {
-       dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
-            rbd_dev->lock_state);
+       dout("%s rbd_dev %p\n", __func__, rbd_dev);
+       lockdep_assert_held_exclusive(&rbd_dev->lock_rwsem);
+
        if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
                return false;
 
@@ -3433,12 +3431,22 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev)
        up_read(&rbd_dev->lock_rwsem);
 
        down_write(&rbd_dev->lock_rwsem);
-       dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
-            rbd_dev->lock_state);
        if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
                return false;
 
+       return true;
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static void rbd_release_lock(struct rbd_device *rbd_dev)
+{
+       if (!rbd_quiesce_lock(rbd_dev))
+               return;
+
        rbd_unlock(rbd_dev);
+
        /*
         * Give others a chance to grab the lock - we would re-acquire
         * almost immediately if we got new IO during ceph_osdc_sync()
@@ -3447,7 +3455,6 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev)
         * after wake_requests() in rbd_handle_released_lock().
         */
        cancel_delayed_work(&rbd_dev->lock_dwork);
-       return true;
 }
 
 static void rbd_release_lock_work(struct work_struct *work)
@@ -3795,7 +3802,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
        char cookie[32];
        int ret;
 
-       WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+       if (!rbd_quiesce_lock(rbd_dev))
+               return;
 
        format_lock_cookie(rbd_dev, cookie);
        ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
@@ -3811,9 +3819,8 @@ static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
                 * Lock cookie cannot be updated on older OSDs, so do
                 * a manual release and queue an acquire.
                 */
-               if (rbd_release_lock(rbd_dev))
-                       queue_delayed_work(rbd_dev->task_wq,
-                                          &rbd_dev->lock_dwork, 0);
+               rbd_unlock(rbd_dev);
+               queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
        } else {
                __rbd_lock(rbd_dev, cookie);
        }