libceph, rbd, ceph: move ceph_osdc_alloc_messages() calls
authorIlya Dryomov <idryomov@gmail.com>
Mon, 15 Oct 2018 14:11:37 +0000 (16:11 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 22 Oct 2018 08:28:22 +0000 (10:28 +0200)
The current requirement is that ceph_osdc_alloc_messages() should be
called after oid and oloc are known.  In preparation for preallocating
message data items, move ceph_osdc_alloc_messages() further down, so
that it is called when OSD op codes are known.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
drivers/block/rbd.c
fs/ceph/file.c
net/ceph/osd_client.c

index 9cc7ee3..8e5140b 100644 (file)
@@ -1500,9 +1500,6 @@ rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
                        rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
                goto err_req;
 
-       if (ceph_osdc_alloc_messages(req, GFP_NOIO))
-               goto err_req;
-
        return req;
 
 err_req:
@@ -1945,6 +1942,10 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
                }
                if (ret)
                        return ret;
+
+               ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+               if (ret)
+                       return ret;
        }
 
        return 0;
@@ -2404,6 +2405,10 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
                rbd_assert(0);
        }
 
+       ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+       if (ret)
+               return ret;
+
        rbd_obj_request_submit(obj_req);
        return 0;
 }
@@ -3783,10 +3788,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
        ceph_oloc_copy(&req->r_base_oloc, oloc);
        req->r_flags = CEPH_OSD_FLAG_READ;
 
-       ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
-       if (ret)
-               goto out_req;
-
        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
        if (IS_ERR(pages)) {
                ret = PTR_ERR(pages);
@@ -3797,6 +3798,10 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
        osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
                                         true);
 
+       ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+       if (ret)
+               goto out_req;
+
        ceph_osdc_start_request(osdc, req, false);
        ret = ceph_osdc_wait_request(osdc, req);
        if (ret >= 0)
index 0265f9a..0fa6b6b 100644 (file)
@@ -870,6 +870,11 @@ static void ceph_aio_retry_work(struct work_struct *work)
        ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
        ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
 
+       req->r_ops[0] = orig_req->r_ops[0];
+
+       req->r_mtime = aio_req->mtime;
+       req->r_data_offset = req->r_ops[0].extent.offset;
+
        ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
        if (ret) {
                ceph_osdc_put_request(req);
@@ -877,11 +882,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
                goto out;
        }
 
-       req->r_ops[0] = orig_req->r_ops[0];
-
-       req->r_mtime = aio_req->mtime;
-       req->r_data_offset = req->r_ops[0].extent.offset;
-
        ceph_osdc_put_request(orig_req);
 
        req->r_callback = ceph_aio_complete_req;
index a5fbb38..7ac7f21 100644 (file)
@@ -4483,12 +4483,6 @@ alloc_linger_request(struct ceph_osd_linger_request *lreq)
 
        ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
        ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
-
-       if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
-               ceph_osdc_put_request(req);
-               return NULL;
-       }
-
        return req;
 }
 
@@ -4506,6 +4500,12 @@ alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode)
         * filled in by linger_submit().
         */
        osd_req_op_watch_init(req, 0, 0, watch_opcode);
+
+       if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
+               ceph_osdc_put_request(req);
+               return NULL;
+       }
+
        return req;
 }
 
@@ -4656,12 +4656,12 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
        ceph_oloc_copy(&req->r_base_oloc, oloc);
        req->r_flags = CEPH_OSD_FLAG_READ;
 
-       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+       ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
+                                        payload_len);
        if (ret)
                goto out_put_req;
 
-       ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
-                                        payload_len);
+       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
        if (ret)
                goto out_put_req;
 
@@ -4766,6 +4766,10 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
                                                 response_data),
                                 pages, PAGE_SIZE, 0, false, true);
 
+       ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO);
+       if (ret)
+               goto out_put_lreq;
+
        linger_submit(lreq);
        ret = linger_reg_commit_wait(lreq);
        if (!ret)
@@ -4892,10 +4896,6 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
        ceph_oloc_copy(&req->r_base_oloc, oloc);
        req->r_flags = CEPH_OSD_FLAG_READ;
 
-       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
-       if (ret)
-               goto out_put_req;
-
        pages = ceph_alloc_page_vector(1, GFP_NOIO);
        if (IS_ERR(pages)) {
                ret = PTR_ERR(pages);
@@ -4907,6 +4907,10 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
                                                 response_data),
                                 pages, PAGE_SIZE, 0, false, true);
 
+       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+       if (ret)
+               goto out_put_req;
+
        ceph_osdc_start_request(osdc, req, false);
        ret = ceph_osdc_wait_request(osdc, req);
        if (ret >= 0) {
@@ -4969,10 +4973,6 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
        ceph_oloc_copy(&req->r_base_oloc, oloc);
        req->r_flags = flags;
 
-       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
-       if (ret)
-               goto out_put_req;
-
        ret = osd_req_op_cls_init(req, 0, class, method);
        if (ret)
                goto out_put_req;
@@ -4984,6 +4984,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
                osd_req_op_cls_response_data_pages(req, 0, &resp_page,
                                                   *resp_len, 0, false, false);
 
+       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+       if (ret)
+               goto out_put_req;
+
        ceph_osdc_start_request(osdc, req, false);
        ret = ceph_osdc_wait_request(osdc, req);
        if (ret >= 0) {