+ dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+ ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
+ &reply_pages, &reply_len);
+ if (ret && ret != -ETIMEDOUT) {
+ rbd_warn(rbd_dev, "failed to request lock: %d", ret);
+ goto out;
+ }
+
+ if (reply_len > 0 && reply_len <= PAGE_SIZE) {
+ void *p = page_address(reply_pages[0]);
+ void *const end = p + reply_len;
+ u32 n;
+
+ ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
+ while (n--) {
+ u8 struct_v;
+ u32 len;
+
+ ceph_decode_need(&p, end, 8 + 8, e_inval);
+ p += 8 + 8; /* skip gid and cookie */
+
+ ceph_decode_32_safe(&p, end, len, e_inval);
+ if (!len)
+ continue;
+
+ if (lock_owner_responded) {
+ rbd_warn(rbd_dev,
+ "duplicate lock owners detected");
+ ret = -EIO;
+ goto out;
+ }
+
+ lock_owner_responded = true;
+ ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
+ &struct_v, &len);
+ if (ret) {
+ rbd_warn(rbd_dev,
+ "failed to decode ResponseMessage: %d",
+ ret);
+ goto e_inval;
+ }
+
+ ret = ceph_decode_32(&p);
+ }
+ }
+
+ if (!lock_owner_responded) {
+ rbd_warn(rbd_dev, "no lock owners detected");
+ ret = -ETIMEDOUT;
+ }
+
+out:
+ ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
+ return ret;
+
+e_inval:
+ ret = -EINVAL;
+ goto out;
+}
+
+static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
+{
+ dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
+
+ cancel_delayed_work(&rbd_dev->lock_dwork);
+ if (wake_all)
+ wake_up_all(&rbd_dev->lock_waitq);
+ else
+ wake_up(&rbd_dev->lock_waitq);
+}
+
+static int get_lock_owner_info(struct rbd_device *rbd_dev,
+ struct ceph_locker **lockers, u32 *num_lockers)
+{
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+ u8 lock_type;
+ char *lock_tag;
+ int ret;
+
+ dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+ ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
+ &rbd_dev->header_oloc, RBD_LOCK_NAME,
+ &lock_type, &lock_tag, lockers, num_lockers);
+ if (ret)
+ return ret;
+
+ if (*num_lockers == 0) {
+ dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
+ goto out;
+ }
+
+ if (strcmp(lock_tag, RBD_LOCK_TAG)) {
+ rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
+ lock_tag);
+ ret = -EBUSY;
+ goto out;
+ }
+
+ if (lock_type == CEPH_CLS_LOCK_SHARED) {
+ rbd_warn(rbd_dev, "shared lock type detected");
+ ret = -EBUSY;
+ goto out;
+ }
+
+ if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
+ strlen(RBD_LOCK_COOKIE_PREFIX))) {
+ rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
+ (*lockers)[0].id.cookie);
+ ret = -EBUSY;
+ goto out;
+ }
+
+out:
+ kfree(lock_tag);
+ return ret;
+}
+
+static int find_watcher(struct rbd_device *rbd_dev,
+ const struct ceph_locker *locker)
+{
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+ struct ceph_watch_item *watchers;
+ u32 num_watchers;
+ u64 cookie;
+ int i;
+ int ret;
+
+ ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
+ &rbd_dev->header_oloc, &watchers,
+ &num_watchers);
+ if (ret)
+ return ret;
+
+ sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
+ for (i = 0; i < num_watchers; i++) {
+ if (!memcmp(&watchers[i].addr, &locker->info.addr,
+ sizeof(locker->info.addr)) &&
+ watchers[i].cookie == cookie) {
+ struct rbd_client_id cid = {
+ .gid = le64_to_cpu(watchers[i].name.num),
+ .handle = cookie,
+ };
+
+ dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
+ rbd_dev, cid.gid, cid.handle);
+ rbd_set_owner_cid(rbd_dev, &cid);
+ ret = 1;
+ goto out;
+ }
+ }
+
+ dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
+ ret = 0;
+out:
+ kfree(watchers);
+ return ret;
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static int rbd_try_lock(struct rbd_device *rbd_dev)
+{
+ struct ceph_client *client = rbd_dev->rbd_client->client;
+ struct ceph_locker *lockers;
+ u32 num_lockers;
+ int ret;
+
+ for (;;) {
+ ret = rbd_lock(rbd_dev);
+ if (ret != -EBUSY)
+ return ret;
+
+ /* determine if the current lock holder is still alive */
+ ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
+ if (ret)
+ return ret;
+
+ if (num_lockers == 0)
+ goto again;
+
+ ret = find_watcher(rbd_dev, lockers);
+ if (ret) {
+ if (ret > 0)
+ ret = 0; /* have to request lock */
+ goto out;
+ }
+
+ rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
+ ENTITY_NAME(lockers[0].id.name));
+
+ ret = ceph_monc_blacklist_add(&client->monc,
+ &lockers[0].info.addr);
+ if (ret) {
+ rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
+ ENTITY_NAME(lockers[0].id.name), ret);
+ goto out;
+ }
+
+ ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
+ &rbd_dev->header_oloc, RBD_LOCK_NAME,
+ lockers[0].id.cookie,
+ &lockers[0].id.name);
+ if (ret && ret != -ENOENT)
+ goto out;
+
+again:
+ ceph_free_lockers(lockers, num_lockers);
+ }
+
+out:
+ ceph_free_lockers(lockers, num_lockers);
+ return ret;
+}
+
+/*
+ * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
+ */
+static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
+ int *pret)
+{
+ enum rbd_lock_state lock_state;
+
+ down_read(&rbd_dev->lock_rwsem);
+ dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
+ rbd_dev->lock_state);
+ if (__rbd_is_lock_owner(rbd_dev)) {
+ lock_state = rbd_dev->lock_state;
+ up_read(&rbd_dev->lock_rwsem);
+ return lock_state;
+ }
+
+ up_read(&rbd_dev->lock_rwsem);
+ down_write(&rbd_dev->lock_rwsem);
+ dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
+ rbd_dev->lock_state);
+ if (!__rbd_is_lock_owner(rbd_dev)) {
+ *pret = rbd_try_lock(rbd_dev);
+ if (*pret)
+ rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
+ }
+
+ lock_state = rbd_dev->lock_state;
+ up_write(&rbd_dev->lock_rwsem);
+ return lock_state;
+}
+
+static void rbd_acquire_lock(struct work_struct *work)
+{
+ struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
+ struct rbd_device, lock_dwork);
+ enum rbd_lock_state lock_state;
+ int ret;
+
+ dout("%s rbd_dev %p\n", __func__, rbd_dev);
+again:
+ lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
+ if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
+ if (lock_state == RBD_LOCK_STATE_LOCKED)
+ wake_requests(rbd_dev, true);
+ dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
+ rbd_dev, lock_state, ret);
+ return;
+ }
+
+ ret = rbd_request_lock(rbd_dev);
+ if (ret == -ETIMEDOUT) {
+ goto again; /* treat this as a dead client */
+ } else if (ret < 0) {
+ rbd_warn(rbd_dev, "error requesting lock: %d", ret);
+ mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
+ RBD_RETRY_DELAY);
+ } else {
+ /*
+ * lock owner acked, but resend if we don't see them
+ * release the lock
+ */
+ dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
+ rbd_dev);
+ mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
+ msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
+ }
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static bool rbd_release_lock(struct rbd_device *rbd_dev)
+{
+ dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
+ rbd_dev->lock_state);
+ if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
+ return false;
+
+ rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
+ downgrade_write(&rbd_dev->lock_rwsem);