efcdde47127842af18be42b33cf6a8d212ad5c56
[linux-2.6-microblaze.git] / net / ceph / mon_client.c
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/module.h>
5 #include <linux/types.h>
6 #include <linux/slab.h>
7 #include <linux/random.h>
8 #include <linux/sched.h>
9
10 #include <linux/ceph/ceph_features.h>
11 #include <linux/ceph/mon_client.h>
12 #include <linux/ceph/libceph.h>
13 #include <linux/ceph/debugfs.h>
14 #include <linux/ceph/decode.h>
15 #include <linux/ceph/auth.h>
16
17 /*
18  * Interact with Ceph monitor cluster.  Handle requests for new map
19  * versions, and periodically resend as needed.  Also implement
20  * statfs() and umount().
21  *
22  * A small cluster of Ceph "monitors" are responsible for managing critical
23  * cluster configuration and state information.  An odd number (e.g., 3, 5)
24  * of cmon daemons use a modified version of the Paxos part-time parliament
25  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
26  * list of clients who have mounted the file system.
27  *
28  * We maintain an open, active session with a monitor at all times in order to
29  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
30  * TCP socket to ensure we detect a failure.  If the connection does break, we
31  * randomly hunt for a new monitor.  Once the connection is reestablished, we
32  * resend any outstanding requests.
33  */
34
35 static const struct ceph_connection_operations mon_con_ops;
36
37 static int __validate_auth(struct ceph_mon_client *monc);
38
39 /*
40  * Decode a monmap blob (e.g., during mount).
41  */
42 static struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
43 {
44         struct ceph_monmap *m = NULL;
45         int i, err = -EINVAL;
46         struct ceph_fsid fsid;
47         u32 epoch, num_mon;
48         u32 len;
49
50         ceph_decode_32_safe(&p, end, len, bad);
51         ceph_decode_need(&p, end, len, bad);
52
53         dout("monmap_decode %p %p len %d (%d)\n", p, end, len, (int)(end-p));
54         p += sizeof(u16);  /* skip version */
55
56         ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
57         ceph_decode_copy(&p, &fsid, sizeof(fsid));
58         epoch = ceph_decode_32(&p);
59
60         num_mon = ceph_decode_32(&p);
61
62         if (num_mon > CEPH_MAX_MON)
63                 goto bad;
64         m = kmalloc(struct_size(m, mon_inst, num_mon), GFP_NOFS);
65         if (m == NULL)
66                 return ERR_PTR(-ENOMEM);
67         m->fsid = fsid;
68         m->epoch = epoch;
69         m->num_mon = num_mon;
70         for (i = 0; i < num_mon; ++i) {
71                 struct ceph_entity_inst *inst = &m->mon_inst[i];
72
73                 /* copy name portion */
74                 ceph_decode_copy_safe(&p, end, &inst->name,
75                                         sizeof(inst->name), bad);
76                 err = ceph_decode_entity_addr(&p, end, &inst->addr);
77                 if (err)
78                         goto bad;
79         }
80         dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
81              m->num_mon);
82         for (i = 0; i < m->num_mon; i++)
83                 dout("monmap_decode  mon%d is %s\n", i,
84                      ceph_pr_addr(&m->mon_inst[i].addr));
85         return m;
86 bad:
87         dout("monmap_decode failed with %d\n", err);
88         kfree(m);
89         return ERR_PTR(err);
90 }
91
92 /*
93  * return true if *addr is included in the monmap.
94  */
95 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
96 {
97         int i;
98
99         for (i = 0; i < m->num_mon; i++)
100                 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
101                         return 1;
102         return 0;
103 }
104
105 /*
106  * Send an auth request.
107  */
108 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
109 {
110         monc->pending_auth = 1;
111         monc->m_auth->front.iov_len = len;
112         monc->m_auth->hdr.front_len = cpu_to_le32(len);
113         ceph_msg_revoke(monc->m_auth);
114         ceph_msg_get(monc->m_auth);  /* keep our ref */
115         ceph_con_send(&monc->con, monc->m_auth);
116 }
117
118 /*
119  * Close monitor session, if any.
120  */
121 static void __close_session(struct ceph_mon_client *monc)
122 {
123         dout("__close_session closing mon%d\n", monc->cur_mon);
124         ceph_msg_revoke(monc->m_auth);
125         ceph_msg_revoke_incoming(monc->m_auth_reply);
126         ceph_msg_revoke(monc->m_subscribe);
127         ceph_msg_revoke_incoming(monc->m_subscribe_ack);
128         ceph_con_close(&monc->con);
129
130         monc->pending_auth = 0;
131         ceph_auth_reset(monc->auth);
132 }
133
134 /*
135  * Pick a new monitor at random and set cur_mon.  If we are repicking
136  * (i.e. cur_mon is already set), be sure to pick a different one.
137  */
138 static void pick_new_mon(struct ceph_mon_client *monc)
139 {
140         int old_mon = monc->cur_mon;
141
142         BUG_ON(monc->monmap->num_mon < 1);
143
144         if (monc->monmap->num_mon == 1) {
145                 monc->cur_mon = 0;
146         } else {
147                 int max = monc->monmap->num_mon;
148                 int o = -1;
149                 int n;
150
151                 if (monc->cur_mon >= 0) {
152                         if (monc->cur_mon < monc->monmap->num_mon)
153                                 o = monc->cur_mon;
154                         if (o >= 0)
155                                 max--;
156                 }
157
158                 n = prandom_u32() % max;
159                 if (o >= 0 && n >= o)
160                         n++;
161
162                 monc->cur_mon = n;
163         }
164
165         dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
166              monc->cur_mon, monc->monmap->num_mon);
167 }
168
169 /*
170  * Open a session with a new monitor.
171  */
172 static void __open_session(struct ceph_mon_client *monc)
173 {
174         int ret;
175
176         pick_new_mon(monc);
177
178         monc->hunting = true;
179         if (monc->had_a_connection) {
180                 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
181                 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
182                         monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
183         }
184
185         monc->sub_renew_after = jiffies; /* i.e., expired */
186         monc->sub_renew_sent = 0;
187
188         dout("%s opening mon%d\n", __func__, monc->cur_mon);
189         ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
190                       &monc->monmap->mon_inst[monc->cur_mon].addr);
191
192         /*
193          * send an initial keepalive to ensure our timestamp is valid
194          * by the time we are in an OPENED state
195          */
196         ceph_con_keepalive(&monc->con);
197
198         /* initiate authentication handshake */
199         ret = ceph_auth_build_hello(monc->auth,
200                                     monc->m_auth->front.iov_base,
201                                     monc->m_auth->front_alloc_len);
202         BUG_ON(ret <= 0);
203         __send_prepared_auth_request(monc, ret);
204 }
205
206 static void reopen_session(struct ceph_mon_client *monc)
207 {
208         if (!monc->hunting)
209                 pr_info("mon%d %s session lost, hunting for new mon\n",
210                     monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr));
211
212         __close_session(monc);
213         __open_session(monc);
214 }
215
216 void ceph_monc_reopen_session(struct ceph_mon_client *monc)
217 {
218         mutex_lock(&monc->mutex);
219         reopen_session(monc);
220         mutex_unlock(&monc->mutex);
221 }
222
223 static void un_backoff(struct ceph_mon_client *monc)
224 {
225         monc->hunt_mult /= 2; /* reduce by 50% */
226         if (monc->hunt_mult < 1)
227                 monc->hunt_mult = 1;
228         dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult);
229 }
230
231 /*
232  * Reschedule delayed work timer.
233  */
234 static void __schedule_delayed(struct ceph_mon_client *monc)
235 {
236         unsigned long delay;
237
238         if (monc->hunting)
239                 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
240         else
241                 delay = CEPH_MONC_PING_INTERVAL;
242
243         dout("__schedule_delayed after %lu\n", delay);
244         mod_delayed_work(system_wq, &monc->delayed_work,
245                          round_jiffies_relative(delay));
246 }
247
248 const char *ceph_sub_str[] = {
249         [CEPH_SUB_MONMAP] = "monmap",
250         [CEPH_SUB_OSDMAP] = "osdmap",
251         [CEPH_SUB_FSMAP]  = "fsmap.user",
252         [CEPH_SUB_MDSMAP] = "mdsmap",
253 };
254
255 /*
256  * Send subscribe request for one or more maps, according to
257  * monc->subs.
258  */
259 static void __send_subscribe(struct ceph_mon_client *monc)
260 {
261         struct ceph_msg *msg = monc->m_subscribe;
262         void *p = msg->front.iov_base;
263         void *const end = p + msg->front_alloc_len;
264         int num = 0;
265         int i;
266
267         dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
268
269         BUG_ON(monc->cur_mon < 0);
270
271         if (!monc->sub_renew_sent)
272                 monc->sub_renew_sent = jiffies | 1; /* never 0 */
273
274         msg->hdr.version = cpu_to_le16(2);
275
276         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
277                 if (monc->subs[i].want)
278                         num++;
279         }
280         BUG_ON(num < 1); /* monmap sub is always there */
281         ceph_encode_32(&p, num);
282         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
283                 char buf[32];
284                 int len;
285
286                 if (!monc->subs[i].want)
287                         continue;
288
289                 len = sprintf(buf, "%s", ceph_sub_str[i]);
290                 if (i == CEPH_SUB_MDSMAP &&
291                     monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
292                         len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
293
294                 dout("%s %s start %llu flags 0x%x\n", __func__, buf,
295                      le64_to_cpu(monc->subs[i].item.start),
296                      monc->subs[i].item.flags);
297                 ceph_encode_string(&p, end, buf, len);
298                 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
299                 p += sizeof(monc->subs[i].item);
300         }
301
302         BUG_ON(p > end);
303         msg->front.iov_len = p - msg->front.iov_base;
304         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
305         ceph_msg_revoke(msg);
306         ceph_con_send(&monc->con, ceph_msg_get(msg));
307 }
308
309 static void handle_subscribe_ack(struct ceph_mon_client *monc,
310                                  struct ceph_msg *msg)
311 {
312         unsigned int seconds;
313         struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
314
315         if (msg->front.iov_len < sizeof(*h))
316                 goto bad;
317         seconds = le32_to_cpu(h->duration);
318
319         mutex_lock(&monc->mutex);
320         if (monc->sub_renew_sent) {
321                 /*
322                  * This is only needed for legacy (infernalis or older)
323                  * MONs -- see delayed_work().
324                  */
325                 monc->sub_renew_after = monc->sub_renew_sent +
326                                             (seconds >> 1) * HZ - 1;
327                 dout("%s sent %lu duration %d renew after %lu\n", __func__,
328                      monc->sub_renew_sent, seconds, monc->sub_renew_after);
329                 monc->sub_renew_sent = 0;
330         } else {
331                 dout("%s sent %lu renew after %lu, ignoring\n", __func__,
332                      monc->sub_renew_sent, monc->sub_renew_after);
333         }
334         mutex_unlock(&monc->mutex);
335         return;
336 bad:
337         pr_err("got corrupt subscribe-ack msg\n");
338         ceph_msg_dump(msg);
339 }
340
341 /*
342  * Register interest in a map
343  *
344  * @sub: one of CEPH_SUB_*
345  * @epoch: X for "every map since X", or 0 for "just the latest"
346  */
347 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
348                                  u32 epoch, bool continuous)
349 {
350         __le64 start = cpu_to_le64(epoch);
351         u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
352
353         dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
354              epoch, continuous);
355
356         if (monc->subs[sub].want &&
357             monc->subs[sub].item.start == start &&
358             monc->subs[sub].item.flags == flags)
359                 return false;
360
361         monc->subs[sub].item.start = start;
362         monc->subs[sub].item.flags = flags;
363         monc->subs[sub].want = true;
364
365         return true;
366 }
367
368 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
369                         bool continuous)
370 {
371         bool need_request;
372
373         mutex_lock(&monc->mutex);
374         need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
375         mutex_unlock(&monc->mutex);
376
377         return need_request;
378 }
379 EXPORT_SYMBOL(ceph_monc_want_map);
380
381 /*
382  * Keep track of which maps we have
383  *
384  * @sub: one of CEPH_SUB_*
385  */
386 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
387                                 u32 epoch)
388 {
389         dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
390
391         if (monc->subs[sub].want) {
392                 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
393                         monc->subs[sub].want = false;
394                 else
395                         monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
396         }
397
398         monc->subs[sub].have = epoch;
399 }
400
401 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
402 {
403         mutex_lock(&monc->mutex);
404         __ceph_monc_got_map(monc, sub, epoch);
405         mutex_unlock(&monc->mutex);
406 }
407 EXPORT_SYMBOL(ceph_monc_got_map);
408
409 void ceph_monc_renew_subs(struct ceph_mon_client *monc)
410 {
411         mutex_lock(&monc->mutex);
412         __send_subscribe(monc);
413         mutex_unlock(&monc->mutex);
414 }
415 EXPORT_SYMBOL(ceph_monc_renew_subs);
416
417 /*
418  * Wait for an osdmap with a given epoch.
419  *
420  * @epoch: epoch to wait for
421  * @timeout: in jiffies, 0 means "wait forever"
422  */
423 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
424                           unsigned long timeout)
425 {
426         unsigned long started = jiffies;
427         long ret;
428
429         mutex_lock(&monc->mutex);
430         while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
431                 mutex_unlock(&monc->mutex);
432
433                 if (timeout && time_after_eq(jiffies, started + timeout))
434                         return -ETIMEDOUT;
435
436                 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
437                                      monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
438                                      ceph_timeout_jiffies(timeout));
439                 if (ret < 0)
440                         return ret;
441
442                 mutex_lock(&monc->mutex);
443         }
444
445         mutex_unlock(&monc->mutex);
446         return 0;
447 }
448 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
449
450 /*
451  * Open a session with a random monitor.  Request monmap and osdmap,
452  * which are waited upon in __ceph_open_session().
453  */
454 int ceph_monc_open_session(struct ceph_mon_client *monc)
455 {
456         mutex_lock(&monc->mutex);
457         __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
458         __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
459         __open_session(monc);
460         __schedule_delayed(monc);
461         mutex_unlock(&monc->mutex);
462         return 0;
463 }
464 EXPORT_SYMBOL(ceph_monc_open_session);
465
466 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
467                                  struct ceph_msg *msg)
468 {
469         struct ceph_client *client = monc->client;
470         struct ceph_monmap *monmap;
471         void *p, *end;
472
473         mutex_lock(&monc->mutex);
474
475         dout("handle_monmap\n");
476         p = msg->front.iov_base;
477         end = p + msg->front.iov_len;
478
479         monmap = ceph_monmap_decode(p, end);
480         if (IS_ERR(monmap)) {
481                 pr_err("problem decoding monmap, %d\n",
482                        (int)PTR_ERR(monmap));
483                 ceph_msg_dump(msg);
484                 goto out;
485         }
486
487         if (ceph_check_fsid(client, &monmap->fsid) < 0) {
488                 kfree(monmap);
489                 goto out;
490         }
491
492         kfree(monc->monmap);
493         monc->monmap = monmap;
494
495         __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
496         client->have_fsid = true;
497
498 out:
499         mutex_unlock(&monc->mutex);
500         wake_up_all(&client->auth_wq);
501 }
502
503 /*
504  * generic requests (currently statfs, mon_get_version)
505  */
506 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
507
508 static void release_generic_request(struct kref *kref)
509 {
510         struct ceph_mon_generic_request *req =
511                 container_of(kref, struct ceph_mon_generic_request, kref);
512
513         dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
514              req->reply);
515         WARN_ON(!RB_EMPTY_NODE(&req->node));
516
517         if (req->reply)
518                 ceph_msg_put(req->reply);
519         if (req->request)
520                 ceph_msg_put(req->request);
521
522         kfree(req);
523 }
524
525 static void put_generic_request(struct ceph_mon_generic_request *req)
526 {
527         if (req)
528                 kref_put(&req->kref, release_generic_request);
529 }
530
531 static void get_generic_request(struct ceph_mon_generic_request *req)
532 {
533         kref_get(&req->kref);
534 }
535
536 static struct ceph_mon_generic_request *
537 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
538 {
539         struct ceph_mon_generic_request *req;
540
541         req = kzalloc(sizeof(*req), gfp);
542         if (!req)
543                 return NULL;
544
545         req->monc = monc;
546         kref_init(&req->kref);
547         RB_CLEAR_NODE(&req->node);
548         init_completion(&req->completion);
549
550         dout("%s greq %p\n", __func__, req);
551         return req;
552 }
553
554 static void register_generic_request(struct ceph_mon_generic_request *req)
555 {
556         struct ceph_mon_client *monc = req->monc;
557
558         WARN_ON(req->tid);
559
560         get_generic_request(req);
561         req->tid = ++monc->last_tid;
562         insert_generic_request(&monc->generic_request_tree, req);
563 }
564
565 static void send_generic_request(struct ceph_mon_client *monc,
566                                  struct ceph_mon_generic_request *req)
567 {
568         WARN_ON(!req->tid);
569
570         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
571         req->request->hdr.tid = cpu_to_le64(req->tid);
572         ceph_con_send(&monc->con, ceph_msg_get(req->request));
573 }
574
575 static void __finish_generic_request(struct ceph_mon_generic_request *req)
576 {
577         struct ceph_mon_client *monc = req->monc;
578
579         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
580         erase_generic_request(&monc->generic_request_tree, req);
581
582         ceph_msg_revoke(req->request);
583         ceph_msg_revoke_incoming(req->reply);
584 }
585
586 static void finish_generic_request(struct ceph_mon_generic_request *req)
587 {
588         __finish_generic_request(req);
589         put_generic_request(req);
590 }
591
592 static void complete_generic_request(struct ceph_mon_generic_request *req)
593 {
594         if (req->complete_cb)
595                 req->complete_cb(req);
596         else
597                 complete_all(&req->completion);
598         put_generic_request(req);
599 }
600
601 static void cancel_generic_request(struct ceph_mon_generic_request *req)
602 {
603         struct ceph_mon_client *monc = req->monc;
604         struct ceph_mon_generic_request *lookup_req;
605
606         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
607
608         mutex_lock(&monc->mutex);
609         lookup_req = lookup_generic_request(&monc->generic_request_tree,
610                                             req->tid);
611         if (lookup_req) {
612                 WARN_ON(lookup_req != req);
613                 finish_generic_request(req);
614         }
615
616         mutex_unlock(&monc->mutex);
617 }
618
619 static int wait_generic_request(struct ceph_mon_generic_request *req)
620 {
621         int ret;
622
623         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
624         ret = wait_for_completion_interruptible(&req->completion);
625         if (ret)
626                 cancel_generic_request(req);
627         else
628                 ret = req->result; /* completed */
629
630         return ret;
631 }
632
633 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
634                                          struct ceph_msg_header *hdr,
635                                          int *skip)
636 {
637         struct ceph_mon_client *monc = con->private;
638         struct ceph_mon_generic_request *req;
639         u64 tid = le64_to_cpu(hdr->tid);
640         struct ceph_msg *m;
641
642         mutex_lock(&monc->mutex);
643         req = lookup_generic_request(&monc->generic_request_tree, tid);
644         if (!req) {
645                 dout("get_generic_reply %lld dne\n", tid);
646                 *skip = 1;
647                 m = NULL;
648         } else {
649                 dout("get_generic_reply %lld got %p\n", tid, req->reply);
650                 *skip = 0;
651                 m = ceph_msg_get(req->reply);
652                 /*
653                  * we don't need to track the connection reading into
654                  * this reply because we only have one open connection
655                  * at a time, ever.
656                  */
657         }
658         mutex_unlock(&monc->mutex);
659         return m;
660 }
661
662 /*
663  * statfs
664  */
665 static void handle_statfs_reply(struct ceph_mon_client *monc,
666                                 struct ceph_msg *msg)
667 {
668         struct ceph_mon_generic_request *req;
669         struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
670         u64 tid = le64_to_cpu(msg->hdr.tid);
671
672         dout("%s msg %p tid %llu\n", __func__, msg, tid);
673
674         if (msg->front.iov_len != sizeof(*reply))
675                 goto bad;
676
677         mutex_lock(&monc->mutex);
678         req = lookup_generic_request(&monc->generic_request_tree, tid);
679         if (!req) {
680                 mutex_unlock(&monc->mutex);
681                 return;
682         }
683
684         req->result = 0;
685         *req->u.st = reply->st; /* struct */
686         __finish_generic_request(req);
687         mutex_unlock(&monc->mutex);
688
689         complete_generic_request(req);
690         return;
691
692 bad:
693         pr_err("corrupt statfs reply, tid %llu\n", tid);
694         ceph_msg_dump(msg);
695 }
696
697 /*
698  * Do a synchronous statfs().
699  */
700 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
701                         struct ceph_statfs *buf)
702 {
703         struct ceph_mon_generic_request *req;
704         struct ceph_mon_statfs *h;
705         int ret = -ENOMEM;
706
707         req = alloc_generic_request(monc, GFP_NOFS);
708         if (!req)
709                 goto out;
710
711         req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
712                                     true);
713         if (!req->request)
714                 goto out;
715
716         req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
717         if (!req->reply)
718                 goto out;
719
720         req->u.st = buf;
721         req->request->hdr.version = cpu_to_le16(2);
722
723         mutex_lock(&monc->mutex);
724         register_generic_request(req);
725         /* fill out request */
726         h = req->request->front.iov_base;
727         h->monhdr.have_version = 0;
728         h->monhdr.session_mon = cpu_to_le16(-1);
729         h->monhdr.session_mon_tid = 0;
730         h->fsid = monc->monmap->fsid;
731         h->contains_data_pool = (data_pool != CEPH_NOPOOL);
732         h->data_pool = cpu_to_le64(data_pool);
733         send_generic_request(monc, req);
734         mutex_unlock(&monc->mutex);
735
736         ret = wait_generic_request(req);
737 out:
738         put_generic_request(req);
739         return ret;
740 }
741 EXPORT_SYMBOL(ceph_monc_do_statfs);
742
743 static void handle_get_version_reply(struct ceph_mon_client *monc,
744                                      struct ceph_msg *msg)
745 {
746         struct ceph_mon_generic_request *req;
747         u64 tid = le64_to_cpu(msg->hdr.tid);
748         void *p = msg->front.iov_base;
749         void *end = p + msg->front_alloc_len;
750         u64 handle;
751
752         dout("%s msg %p tid %llu\n", __func__, msg, tid);
753
754         ceph_decode_need(&p, end, 2*sizeof(u64), bad);
755         handle = ceph_decode_64(&p);
756         if (tid != 0 && tid != handle)
757                 goto bad;
758
759         mutex_lock(&monc->mutex);
760         req = lookup_generic_request(&monc->generic_request_tree, handle);
761         if (!req) {
762                 mutex_unlock(&monc->mutex);
763                 return;
764         }
765
766         req->result = 0;
767         req->u.newest = ceph_decode_64(&p);
768         __finish_generic_request(req);
769         mutex_unlock(&monc->mutex);
770
771         complete_generic_request(req);
772         return;
773
774 bad:
775         pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
776         ceph_msg_dump(msg);
777 }
778
779 static struct ceph_mon_generic_request *
780 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
781                         ceph_monc_callback_t cb, u64 private_data)
782 {
783         struct ceph_mon_generic_request *req;
784
785         req = alloc_generic_request(monc, GFP_NOIO);
786         if (!req)
787                 goto err_put_req;
788
789         req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
790                                     sizeof(u64) + sizeof(u32) + strlen(what),
791                                     GFP_NOIO, true);
792         if (!req->request)
793                 goto err_put_req;
794
795         req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
796                                   true);
797         if (!req->reply)
798                 goto err_put_req;
799
800         req->complete_cb = cb;
801         req->private_data = private_data;
802
803         mutex_lock(&monc->mutex);
804         register_generic_request(req);
805         {
806                 void *p = req->request->front.iov_base;
807                 void *const end = p + req->request->front_alloc_len;
808
809                 ceph_encode_64(&p, req->tid); /* handle */
810                 ceph_encode_string(&p, end, what, strlen(what));
811                 WARN_ON(p != end);
812         }
813         send_generic_request(monc, req);
814         mutex_unlock(&monc->mutex);
815
816         return req;
817
818 err_put_req:
819         put_generic_request(req);
820         return ERR_PTR(-ENOMEM);
821 }
822
823 /*
824  * Send MMonGetVersion and wait for the reply.
825  *
826  * @what: one of "mdsmap", "osdmap" or "monmap"
827  */
828 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
829                           u64 *newest)
830 {
831         struct ceph_mon_generic_request *req;
832         int ret;
833
834         req = __ceph_monc_get_version(monc, what, NULL, 0);
835         if (IS_ERR(req))
836                 return PTR_ERR(req);
837
838         ret = wait_generic_request(req);
839         if (!ret)
840                 *newest = req->u.newest;
841
842         put_generic_request(req);
843         return ret;
844 }
845 EXPORT_SYMBOL(ceph_monc_get_version);
846
847 /*
848  * Send MMonGetVersion,
849  *
850  * @what: one of "mdsmap", "osdmap" or "monmap"
851  */
852 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
853                                 ceph_monc_callback_t cb, u64 private_data)
854 {
855         struct ceph_mon_generic_request *req;
856
857         req = __ceph_monc_get_version(monc, what, cb, private_data);
858         if (IS_ERR(req))
859                 return PTR_ERR(req);
860
861         put_generic_request(req);
862         return 0;
863 }
864 EXPORT_SYMBOL(ceph_monc_get_version_async);
865
866 static void handle_command_ack(struct ceph_mon_client *monc,
867                                struct ceph_msg *msg)
868 {
869         struct ceph_mon_generic_request *req;
870         void *p = msg->front.iov_base;
871         void *const end = p + msg->front_alloc_len;
872         u64 tid = le64_to_cpu(msg->hdr.tid);
873
874         dout("%s msg %p tid %llu\n", __func__, msg, tid);
875
876         ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
877                                                             sizeof(u32), bad);
878         p += sizeof(struct ceph_mon_request_header);
879
880         mutex_lock(&monc->mutex);
881         req = lookup_generic_request(&monc->generic_request_tree, tid);
882         if (!req) {
883                 mutex_unlock(&monc->mutex);
884                 return;
885         }
886
887         req->result = ceph_decode_32(&p);
888         __finish_generic_request(req);
889         mutex_unlock(&monc->mutex);
890
891         complete_generic_request(req);
892         return;
893
894 bad:
895         pr_err("corrupt mon_command ack, tid %llu\n", tid);
896         ceph_msg_dump(msg);
897 }
898
899 int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
900                             struct ceph_entity_addr *client_addr)
901 {
902         struct ceph_mon_generic_request *req;
903         struct ceph_mon_command *h;
904         int ret = -ENOMEM;
905         int len;
906
907         req = alloc_generic_request(monc, GFP_NOIO);
908         if (!req)
909                 goto out;
910
911         req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
912         if (!req->request)
913                 goto out;
914
915         req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
916                                   true);
917         if (!req->reply)
918                 goto out;
919
920         mutex_lock(&monc->mutex);
921         register_generic_request(req);
922         h = req->request->front.iov_base;
923         h->monhdr.have_version = 0;
924         h->monhdr.session_mon = cpu_to_le16(-1);
925         h->monhdr.session_mon_tid = 0;
926         h->fsid = monc->monmap->fsid;
927         h->num_strs = cpu_to_le32(1);
928         len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
929                                  \"blacklistop\": \"add\", \
930                                  \"addr\": \"%pISpc/%u\" }",
931                       &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
932         h->str_len = cpu_to_le32(len);
933         send_generic_request(monc, req);
934         mutex_unlock(&monc->mutex);
935
936         ret = wait_generic_request(req);
937         if (!ret)
938                 /*
939                  * Make sure we have the osdmap that includes the blocklist
940                  * entry.  This is needed to ensure that the OSDs pick up the
941                  * new blocklist before processing any future requests from
942                  * this client.
943                  */
944                 ret = ceph_wait_for_latest_osdmap(monc->client, 0);
945
946 out:
947         put_generic_request(req);
948         return ret;
949 }
950 EXPORT_SYMBOL(ceph_monc_blocklist_add);
951
952 /*
953  * Resend pending generic requests.
954  */
955 static void __resend_generic_request(struct ceph_mon_client *monc)
956 {
957         struct ceph_mon_generic_request *req;
958         struct rb_node *p;
959
960         for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
961                 req = rb_entry(p, struct ceph_mon_generic_request, node);
962                 ceph_msg_revoke(req->request);
963                 ceph_msg_revoke_incoming(req->reply);
964                 ceph_con_send(&monc->con, ceph_msg_get(req->request));
965         }
966 }
967
968 /*
969  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
970  * renew/retry subscription as needed (in case it is timing out, or we
971  * got an ENOMEM).  And keep the monitor connection alive.
972  */
973 static void delayed_work(struct work_struct *work)
974 {
975         struct ceph_mon_client *monc =
976                 container_of(work, struct ceph_mon_client, delayed_work.work);
977
978         dout("monc delayed_work\n");
979         mutex_lock(&monc->mutex);
980         if (monc->hunting) {
981                 dout("%s continuing hunt\n", __func__);
982                 reopen_session(monc);
983         } else {
984                 int is_auth = ceph_auth_is_authenticated(monc->auth);
985                 if (ceph_con_keepalive_expired(&monc->con,
986                                                CEPH_MONC_PING_TIMEOUT)) {
987                         dout("monc keepalive timeout\n");
988                         is_auth = 0;
989                         reopen_session(monc);
990                 }
991
992                 if (!monc->hunting) {
993                         ceph_con_keepalive(&monc->con);
994                         __validate_auth(monc);
995                         un_backoff(monc);
996                 }
997
998                 if (is_auth &&
999                     !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
1000                         unsigned long now = jiffies;
1001
1002                         dout("%s renew subs? now %lu renew after %lu\n",
1003                              __func__, now, monc->sub_renew_after);
1004                         if (time_after_eq(now, monc->sub_renew_after))
1005                                 __send_subscribe(monc);
1006                 }
1007         }
1008         __schedule_delayed(monc);
1009         mutex_unlock(&monc->mutex);
1010 }
1011
1012 /*
1013  * On startup, we build a temporary monmap populated with the IPs
1014  * provided by mount(2).
1015  */
1016 static int build_initial_monmap(struct ceph_mon_client *monc)
1017 {
1018         struct ceph_options *opt = monc->client->options;
1019         struct ceph_entity_addr *mon_addr = opt->mon_addr;
1020         int num_mon = opt->num_mon;
1021         int i;
1022
1023         /* build initial monmap */
1024         monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon),
1025                                GFP_KERNEL);
1026         if (!monc->monmap)
1027                 return -ENOMEM;
1028         for (i = 0; i < num_mon; i++) {
1029                 monc->monmap->mon_inst[i].addr = mon_addr[i];
1030                 monc->monmap->mon_inst[i].addr.nonce = 0;
1031                 monc->monmap->mon_inst[i].name.type =
1032                         CEPH_ENTITY_TYPE_MON;
1033                 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
1034         }
1035         monc->monmap->num_mon = num_mon;
1036         return 0;
1037 }
1038
1039 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1040 {
1041         int err = 0;
1042
1043         dout("init\n");
1044         memset(monc, 0, sizeof(*monc));
1045         monc->client = cl;
1046         monc->monmap = NULL;
1047         mutex_init(&monc->mutex);
1048
1049         err = build_initial_monmap(monc);
1050         if (err)
1051                 goto out;
1052
1053         /* connection */
1054         /* authentication */
1055         monc->auth = ceph_auth_init(cl->options->name,
1056                                     cl->options->key);
1057         if (IS_ERR(monc->auth)) {
1058                 err = PTR_ERR(monc->auth);
1059                 goto out_monmap;
1060         }
1061         monc->auth->want_keys =
1062                 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
1063                 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
1064
1065         /* msgs */
1066         err = -ENOMEM;
1067         monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1068                                      sizeof(struct ceph_mon_subscribe_ack),
1069                                      GFP_KERNEL, true);
1070         if (!monc->m_subscribe_ack)
1071                 goto out_auth;
1072
1073         monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1074                                          GFP_KERNEL, true);
1075         if (!monc->m_subscribe)
1076                 goto out_subscribe_ack;
1077
1078         monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1079                                           GFP_KERNEL, true);
1080         if (!monc->m_auth_reply)
1081                 goto out_subscribe;
1082
1083         monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
1084         monc->pending_auth = 0;
1085         if (!monc->m_auth)
1086                 goto out_auth_reply;
1087
1088         ceph_con_init(&monc->con, monc, &mon_con_ops,
1089                       &monc->client->msgr);
1090
1091         monc->cur_mon = -1;
1092         monc->had_a_connection = false;
1093         monc->hunt_mult = 1;
1094
1095         INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1096         monc->generic_request_tree = RB_ROOT;
1097         monc->last_tid = 0;
1098
1099         monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1100
1101         return 0;
1102
1103 out_auth_reply:
1104         ceph_msg_put(monc->m_auth_reply);
1105 out_subscribe:
1106         ceph_msg_put(monc->m_subscribe);
1107 out_subscribe_ack:
1108         ceph_msg_put(monc->m_subscribe_ack);
1109 out_auth:
1110         ceph_auth_destroy(monc->auth);
1111 out_monmap:
1112         kfree(monc->monmap);
1113 out:
1114         return err;
1115 }
1116 EXPORT_SYMBOL(ceph_monc_init);
1117
1118 void ceph_monc_stop(struct ceph_mon_client *monc)
1119 {
1120         dout("stop\n");
1121         cancel_delayed_work_sync(&monc->delayed_work);
1122
1123         mutex_lock(&monc->mutex);
1124         __close_session(monc);
1125         monc->cur_mon = -1;
1126         mutex_unlock(&monc->mutex);
1127
1128         /*
1129          * flush msgr queue before we destroy ourselves to ensure that:
1130          *  - any work that references our embedded con is finished.
1131          *  - any osd_client or other work that may reference an authorizer
1132          *    finishes before we shut down the auth subsystem.
1133          */
1134         ceph_msgr_flush();
1135
1136         ceph_auth_destroy(monc->auth);
1137
1138         WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1139
1140         ceph_msg_put(monc->m_auth);
1141         ceph_msg_put(monc->m_auth_reply);
1142         ceph_msg_put(monc->m_subscribe);
1143         ceph_msg_put(monc->m_subscribe_ack);
1144
1145         kfree(monc->monmap);
1146 }
1147 EXPORT_SYMBOL(ceph_monc_stop);
1148
1149 static void finish_hunting(struct ceph_mon_client *monc)
1150 {
1151         if (monc->hunting) {
1152                 dout("%s found mon%d\n", __func__, monc->cur_mon);
1153                 monc->hunting = false;
1154                 monc->had_a_connection = true;
1155                 un_backoff(monc);
1156                 __schedule_delayed(monc);
1157         }
1158 }
1159
1160 static void handle_auth_reply(struct ceph_mon_client *monc,
1161                               struct ceph_msg *msg)
1162 {
1163         int ret;
1164         int was_auth = 0;
1165
1166         mutex_lock(&monc->mutex);
1167         was_auth = ceph_auth_is_authenticated(monc->auth);
1168         monc->pending_auth = 0;
1169         ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1170                                      msg->front.iov_len,
1171                                      monc->m_auth->front.iov_base,
1172                                      monc->m_auth->front_alloc_len);
1173         if (ret > 0) {
1174                 __send_prepared_auth_request(monc, ret);
1175                 goto out;
1176         }
1177
1178         finish_hunting(monc);
1179
1180         if (ret < 0) {
1181                 monc->client->auth_err = ret;
1182         } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
1183                 dout("authenticated, starting session\n");
1184
1185                 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1186                 monc->client->msgr.inst.name.num =
1187                                         cpu_to_le64(monc->auth->global_id);
1188
1189                 __send_subscribe(monc);
1190                 __resend_generic_request(monc);
1191
1192                 pr_info("mon%d %s session established\n", monc->cur_mon,
1193                         ceph_pr_addr(&monc->con.peer_addr));
1194         }
1195
1196 out:
1197         mutex_unlock(&monc->mutex);
1198         if (monc->client->auth_err < 0)
1199                 wake_up_all(&monc->client->auth_wq);
1200 }
1201
1202 static int __validate_auth(struct ceph_mon_client *monc)
1203 {
1204         int ret;
1205
1206         if (monc->pending_auth)
1207                 return 0;
1208
1209         ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1210                               monc->m_auth->front_alloc_len);
1211         if (ret <= 0)
1212                 return ret; /* either an error, or no need to authenticate */
1213         __send_prepared_auth_request(monc, ret);
1214         return 0;
1215 }
1216
1217 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1218 {
1219         int ret;
1220
1221         mutex_lock(&monc->mutex);
1222         ret = __validate_auth(monc);
1223         mutex_unlock(&monc->mutex);
1224         return ret;
1225 }
1226 EXPORT_SYMBOL(ceph_monc_validate_auth);
1227
1228 /*
1229  * handle incoming message
1230  */
1231 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1232 {
1233         struct ceph_mon_client *monc = con->private;
1234         int type = le16_to_cpu(msg->hdr.type);
1235
1236         switch (type) {
1237         case CEPH_MSG_AUTH_REPLY:
1238                 handle_auth_reply(monc, msg);
1239                 break;
1240
1241         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1242                 handle_subscribe_ack(monc, msg);
1243                 break;
1244
1245         case CEPH_MSG_STATFS_REPLY:
1246                 handle_statfs_reply(monc, msg);
1247                 break;
1248
1249         case CEPH_MSG_MON_GET_VERSION_REPLY:
1250                 handle_get_version_reply(monc, msg);
1251                 break;
1252
1253         case CEPH_MSG_MON_COMMAND_ACK:
1254                 handle_command_ack(monc, msg);
1255                 break;
1256
1257         case CEPH_MSG_MON_MAP:
1258                 ceph_monc_handle_map(monc, msg);
1259                 break;
1260
1261         case CEPH_MSG_OSD_MAP:
1262                 ceph_osdc_handle_map(&monc->client->osdc, msg);
1263                 break;
1264
1265         default:
1266                 /* can the chained handler handle it? */
1267                 if (monc->client->extra_mon_dispatch &&
1268                     monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1269                         break;
1270
1271                 pr_err("received unknown message type %d %s\n", type,
1272                        ceph_msg_type_name(type));
1273         }
1274         ceph_msg_put(msg);
1275 }
1276
1277 /*
1278  * Allocate memory for incoming message
1279  */
1280 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1281                                       struct ceph_msg_header *hdr,
1282                                       int *skip)
1283 {
1284         struct ceph_mon_client *monc = con->private;
1285         int type = le16_to_cpu(hdr->type);
1286         int front_len = le32_to_cpu(hdr->front_len);
1287         struct ceph_msg *m = NULL;
1288
1289         *skip = 0;
1290
1291         switch (type) {
1292         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1293                 m = ceph_msg_get(monc->m_subscribe_ack);
1294                 break;
1295         case CEPH_MSG_STATFS_REPLY:
1296         case CEPH_MSG_MON_COMMAND_ACK:
1297                 return get_generic_reply(con, hdr, skip);
1298         case CEPH_MSG_AUTH_REPLY:
1299                 m = ceph_msg_get(monc->m_auth_reply);
1300                 break;
1301         case CEPH_MSG_MON_GET_VERSION_REPLY:
1302                 if (le64_to_cpu(hdr->tid) != 0)
1303                         return get_generic_reply(con, hdr, skip);
1304
1305                 /*
1306                  * Older OSDs don't set reply tid even if the orignal
1307                  * request had a non-zero tid.  Work around this weirdness
1308                  * by allocating a new message.
1309                  */
1310                 fallthrough;
1311         case CEPH_MSG_MON_MAP:
1312         case CEPH_MSG_MDS_MAP:
1313         case CEPH_MSG_OSD_MAP:
1314         case CEPH_MSG_FS_MAP_USER:
1315                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1316                 if (!m)
1317                         return NULL;    /* ENOMEM--return skip == 0 */
1318                 break;
1319         }
1320
1321         if (!m) {
1322                 pr_info("alloc_msg unknown type %d\n", type);
1323                 *skip = 1;
1324         } else if (front_len > m->front_alloc_len) {
1325                 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1326                         front_len, m->front_alloc_len,
1327                         (unsigned int)con->peer_name.type,
1328                         le64_to_cpu(con->peer_name.num));
1329                 ceph_msg_put(m);
1330                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1331         }
1332
1333         return m;
1334 }
1335
1336 /*
1337  * If the monitor connection resets, pick a new monitor and resubmit
1338  * any pending requests.
1339  */
1340 static void mon_fault(struct ceph_connection *con)
1341 {
1342         struct ceph_mon_client *monc = con->private;
1343
1344         mutex_lock(&monc->mutex);
1345         dout("%s mon%d\n", __func__, monc->cur_mon);
1346         if (monc->cur_mon >= 0) {
1347                 if (!monc->hunting) {
1348                         dout("%s hunting for new mon\n", __func__);
1349                         reopen_session(monc);
1350                         __schedule_delayed(monc);
1351                 } else {
1352                         dout("%s already hunting\n", __func__);
1353                 }
1354         }
1355         mutex_unlock(&monc->mutex);
1356 }
1357
1358 /*
1359  * We can ignore refcounting on the connection struct, as all references
1360  * will come from the messenger workqueue, which is drained prior to
1361  * mon_client destruction.
1362  */
1363 static struct ceph_connection *con_get(struct ceph_connection *con)
1364 {
1365         return con;
1366 }
1367
1368 static void con_put(struct ceph_connection *con)
1369 {
1370 }
1371
1372 static const struct ceph_connection_operations mon_con_ops = {
1373         .get = con_get,
1374         .put = con_put,
1375         .dispatch = dispatch,
1376         .fault = mon_fault,
1377         .alloc_msg = mon_alloc_msg,
1378 };