libceph, ceph: implement msgr2.1 protocol (crc and secure modes)
[linux-2.6-microblaze.git] / net / ceph / mon_client.c
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/module.h>
5 #include <linux/types.h>
6 #include <linux/slab.h>
7 #include <linux/random.h>
8 #include <linux/sched.h>
9
10 #include <linux/ceph/ceph_features.h>
11 #include <linux/ceph/mon_client.h>
12 #include <linux/ceph/libceph.h>
13 #include <linux/ceph/debugfs.h>
14 #include <linux/ceph/decode.h>
15 #include <linux/ceph/auth.h>
16
17 /*
18  * Interact with Ceph monitor cluster.  Handle requests for new map
19  * versions, and periodically resend as needed.  Also implement
20  * statfs() and umount().
21  *
22  * A small cluster of Ceph "monitors" are responsible for managing critical
23  * cluster configuration and state information.  An odd number (e.g., 3, 5)
24  * of cmon daemons use a modified version of the Paxos part-time parliament
25  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
26  * list of clients who have mounted the file system.
27  *
28  * We maintain an open, active session with a monitor at all times in order to
29  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
30  * TCP socket to ensure we detect a failure.  If the connection does break, we
31  * randomly hunt for a new monitor.  Once the connection is reestablished, we
32  * resend any outstanding requests.
33  */
34
35 static const struct ceph_connection_operations mon_con_ops;
36
37 static int __validate_auth(struct ceph_mon_client *monc);
38
39 static int decode_mon_info(void **p, void *end, bool msgr2,
40                            struct ceph_entity_addr *addr)
41 {
42         void *mon_info_end;
43         u32 struct_len;
44         u8 struct_v;
45         int ret;
46
47         ret = ceph_start_decoding(p, end, 1, "mon_info_t", &struct_v,
48                                   &struct_len);
49         if (ret)
50                 return ret;
51
52         mon_info_end = *p + struct_len;
53         ceph_decode_skip_string(p, end, e_inval);  /* skip mon name */
54         ret = ceph_decode_entity_addrvec(p, end, msgr2, addr);
55         if (ret)
56                 return ret;
57
58         *p = mon_info_end;
59         return 0;
60
61 e_inval:
62         return -EINVAL;
63 }
64
65 /*
66  * Decode a monmap blob (e.g., during mount).
67  *
68  * Assume MonMap v3 (i.e. encoding with MONNAMES and MONENC).
69  */
70 static struct ceph_monmap *ceph_monmap_decode(void **p, void *end, bool msgr2)
71 {
72         struct ceph_monmap *monmap = NULL;
73         struct ceph_fsid fsid;
74         u32 struct_len;
75         int blob_len;
76         int num_mon;
77         u8 struct_v;
78         u32 epoch;
79         int ret;
80         int i;
81
82         ceph_decode_32_safe(p, end, blob_len, e_inval);
83         ceph_decode_need(p, end, blob_len, e_inval);
84
85         ret = ceph_start_decoding(p, end, 6, "monmap", &struct_v, &struct_len);
86         if (ret)
87                 goto fail;
88
89         dout("%s struct_v %d\n", __func__, struct_v);
90         ceph_decode_copy_safe(p, end, &fsid, sizeof(fsid), e_inval);
91         ceph_decode_32_safe(p, end, epoch, e_inval);
92         if (struct_v >= 6) {
93                 u32 feat_struct_len;
94                 u8 feat_struct_v;
95
96                 *p += sizeof(struct ceph_timespec);  /* skip last_changed */
97                 *p += sizeof(struct ceph_timespec);  /* skip created */
98
99                 ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
100                                           &feat_struct_v, &feat_struct_len);
101                 if (ret)
102                         goto fail;
103
104                 *p += feat_struct_len;  /* skip persistent_features */
105
106                 ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
107                                           &feat_struct_v, &feat_struct_len);
108                 if (ret)
109                         goto fail;
110
111                 *p += feat_struct_len;  /* skip optional_features */
112         }
113         ceph_decode_32_safe(p, end, num_mon, e_inval);
114
115         dout("%s fsid %pU epoch %u num_mon %d\n", __func__, &fsid, epoch,
116              num_mon);
117         if (num_mon > CEPH_MAX_MON)
118                 goto e_inval;
119
120         monmap = kmalloc(struct_size(monmap, mon_inst, num_mon), GFP_NOIO);
121         if (!monmap) {
122                 ret = -ENOMEM;
123                 goto fail;
124         }
125         monmap->fsid = fsid;
126         monmap->epoch = epoch;
127         monmap->num_mon = num_mon;
128
129         /* legacy_mon_addr map or mon_info map */
130         for (i = 0; i < num_mon; i++) {
131                 struct ceph_entity_inst *inst = &monmap->mon_inst[i];
132
133                 ceph_decode_skip_string(p, end, e_inval);  /* skip mon name */
134                 inst->name.type = CEPH_ENTITY_TYPE_MON;
135                 inst->name.num = cpu_to_le64(i);
136
137                 if (struct_v >= 6)
138                         ret = decode_mon_info(p, end, msgr2, &inst->addr);
139                 else
140                         ret = ceph_decode_entity_addr(p, end, &inst->addr);
141                 if (ret)
142                         goto fail;
143
144                 dout("%s mon%d addr %s\n", __func__, i,
145                      ceph_pr_addr(&inst->addr));
146         }
147
148         return monmap;
149
150 e_inval:
151         ret = -EINVAL;
152 fail:
153         kfree(monmap);
154         return ERR_PTR(ret);
155 }
156
157 /*
158  * return true if *addr is included in the monmap.
159  */
160 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
161 {
162         int i;
163
164         for (i = 0; i < m->num_mon; i++) {
165                 if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr))
166                         return 1;
167         }
168
169         return 0;
170 }
171
172 /*
173  * Send an auth request.
174  */
175 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
176 {
177         monc->pending_auth = 1;
178         monc->m_auth->front.iov_len = len;
179         monc->m_auth->hdr.front_len = cpu_to_le32(len);
180         ceph_msg_revoke(monc->m_auth);
181         ceph_msg_get(monc->m_auth);  /* keep our ref */
182         ceph_con_send(&monc->con, monc->m_auth);
183 }
184
185 /*
186  * Close monitor session, if any.
187  */
188 static void __close_session(struct ceph_mon_client *monc)
189 {
190         dout("__close_session closing mon%d\n", monc->cur_mon);
191         ceph_msg_revoke(monc->m_auth);
192         ceph_msg_revoke_incoming(monc->m_auth_reply);
193         ceph_msg_revoke(monc->m_subscribe);
194         ceph_msg_revoke_incoming(monc->m_subscribe_ack);
195         ceph_con_close(&monc->con);
196
197         monc->pending_auth = 0;
198         ceph_auth_reset(monc->auth);
199 }
200
201 /*
202  * Pick a new monitor at random and set cur_mon.  If we are repicking
203  * (i.e. cur_mon is already set), be sure to pick a different one.
204  */
205 static void pick_new_mon(struct ceph_mon_client *monc)
206 {
207         int old_mon = monc->cur_mon;
208
209         BUG_ON(monc->monmap->num_mon < 1);
210
211         if (monc->monmap->num_mon == 1) {
212                 monc->cur_mon = 0;
213         } else {
214                 int max = monc->monmap->num_mon;
215                 int o = -1;
216                 int n;
217
218                 if (monc->cur_mon >= 0) {
219                         if (monc->cur_mon < monc->monmap->num_mon)
220                                 o = monc->cur_mon;
221                         if (o >= 0)
222                                 max--;
223                 }
224
225                 n = prandom_u32() % max;
226                 if (o >= 0 && n >= o)
227                         n++;
228
229                 monc->cur_mon = n;
230         }
231
232         dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
233              monc->cur_mon, monc->monmap->num_mon);
234 }
235
236 /*
237  * Open a session with a new monitor.
238  */
239 static void __open_session(struct ceph_mon_client *monc)
240 {
241         int ret;
242
243         pick_new_mon(monc);
244
245         monc->hunting = true;
246         if (monc->had_a_connection) {
247                 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
248                 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
249                         monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
250         }
251
252         monc->sub_renew_after = jiffies; /* i.e., expired */
253         monc->sub_renew_sent = 0;
254
255         dout("%s opening mon%d\n", __func__, monc->cur_mon);
256         ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
257                       &monc->monmap->mon_inst[monc->cur_mon].addr);
258
259         /*
260          * Queue a keepalive to ensure that in case of an early fault
261          * the messenger doesn't put us into STANDBY state and instead
262          * retries.  This also ensures that our timestamp is valid by
263          * the time we finish hunting and delayed_work() checks it.
264          */
265         ceph_con_keepalive(&monc->con);
266         if (ceph_msgr2(monc->client)) {
267                 monc->pending_auth = 1;
268                 return;
269         }
270
271         /* initiate authentication handshake */
272         ret = ceph_auth_build_hello(monc->auth,
273                                     monc->m_auth->front.iov_base,
274                                     monc->m_auth->front_alloc_len);
275         BUG_ON(ret <= 0);
276         __send_prepared_auth_request(monc, ret);
277 }
278
279 static void reopen_session(struct ceph_mon_client *monc)
280 {
281         if (!monc->hunting)
282                 pr_info("mon%d %s session lost, hunting for new mon\n",
283                     monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr));
284
285         __close_session(monc);
286         __open_session(monc);
287 }
288
289 void ceph_monc_reopen_session(struct ceph_mon_client *monc)
290 {
291         mutex_lock(&monc->mutex);
292         reopen_session(monc);
293         mutex_unlock(&monc->mutex);
294 }
295
296 static void un_backoff(struct ceph_mon_client *monc)
297 {
298         monc->hunt_mult /= 2; /* reduce by 50% */
299         if (monc->hunt_mult < 1)
300                 monc->hunt_mult = 1;
301         dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult);
302 }
303
304 /*
305  * Reschedule delayed work timer.
306  */
307 static void __schedule_delayed(struct ceph_mon_client *monc)
308 {
309         unsigned long delay;
310
311         if (monc->hunting)
312                 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
313         else
314                 delay = CEPH_MONC_PING_INTERVAL;
315
316         dout("__schedule_delayed after %lu\n", delay);
317         mod_delayed_work(system_wq, &monc->delayed_work,
318                          round_jiffies_relative(delay));
319 }
320
321 const char *ceph_sub_str[] = {
322         [CEPH_SUB_MONMAP] = "monmap",
323         [CEPH_SUB_OSDMAP] = "osdmap",
324         [CEPH_SUB_FSMAP]  = "fsmap.user",
325         [CEPH_SUB_MDSMAP] = "mdsmap",
326 };
327
328 /*
329  * Send subscribe request for one or more maps, according to
330  * monc->subs.
331  */
332 static void __send_subscribe(struct ceph_mon_client *monc)
333 {
334         struct ceph_msg *msg = monc->m_subscribe;
335         void *p = msg->front.iov_base;
336         void *const end = p + msg->front_alloc_len;
337         int num = 0;
338         int i;
339
340         dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
341
342         BUG_ON(monc->cur_mon < 0);
343
344         if (!monc->sub_renew_sent)
345                 monc->sub_renew_sent = jiffies | 1; /* never 0 */
346
347         msg->hdr.version = cpu_to_le16(2);
348
349         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
350                 if (monc->subs[i].want)
351                         num++;
352         }
353         BUG_ON(num < 1); /* monmap sub is always there */
354         ceph_encode_32(&p, num);
355         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
356                 char buf[32];
357                 int len;
358
359                 if (!monc->subs[i].want)
360                         continue;
361
362                 len = sprintf(buf, "%s", ceph_sub_str[i]);
363                 if (i == CEPH_SUB_MDSMAP &&
364                     monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
365                         len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
366
367                 dout("%s %s start %llu flags 0x%x\n", __func__, buf,
368                      le64_to_cpu(monc->subs[i].item.start),
369                      monc->subs[i].item.flags);
370                 ceph_encode_string(&p, end, buf, len);
371                 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
372                 p += sizeof(monc->subs[i].item);
373         }
374
375         BUG_ON(p > end);
376         msg->front.iov_len = p - msg->front.iov_base;
377         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
378         ceph_msg_revoke(msg);
379         ceph_con_send(&monc->con, ceph_msg_get(msg));
380 }
381
382 static void handle_subscribe_ack(struct ceph_mon_client *monc,
383                                  struct ceph_msg *msg)
384 {
385         unsigned int seconds;
386         struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
387
388         if (msg->front.iov_len < sizeof(*h))
389                 goto bad;
390         seconds = le32_to_cpu(h->duration);
391
392         mutex_lock(&monc->mutex);
393         if (monc->sub_renew_sent) {
394                 /*
395                  * This is only needed for legacy (infernalis or older)
396                  * MONs -- see delayed_work().
397                  */
398                 monc->sub_renew_after = monc->sub_renew_sent +
399                                             (seconds >> 1) * HZ - 1;
400                 dout("%s sent %lu duration %d renew after %lu\n", __func__,
401                      monc->sub_renew_sent, seconds, monc->sub_renew_after);
402                 monc->sub_renew_sent = 0;
403         } else {
404                 dout("%s sent %lu renew after %lu, ignoring\n", __func__,
405                      monc->sub_renew_sent, monc->sub_renew_after);
406         }
407         mutex_unlock(&monc->mutex);
408         return;
409 bad:
410         pr_err("got corrupt subscribe-ack msg\n");
411         ceph_msg_dump(msg);
412 }
413
414 /*
415  * Register interest in a map
416  *
417  * @sub: one of CEPH_SUB_*
418  * @epoch: X for "every map since X", or 0 for "just the latest"
419  */
420 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
421                                  u32 epoch, bool continuous)
422 {
423         __le64 start = cpu_to_le64(epoch);
424         u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
425
426         dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
427              epoch, continuous);
428
429         if (monc->subs[sub].want &&
430             monc->subs[sub].item.start == start &&
431             monc->subs[sub].item.flags == flags)
432                 return false;
433
434         monc->subs[sub].item.start = start;
435         monc->subs[sub].item.flags = flags;
436         monc->subs[sub].want = true;
437
438         return true;
439 }
440
441 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
442                         bool continuous)
443 {
444         bool need_request;
445
446         mutex_lock(&monc->mutex);
447         need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
448         mutex_unlock(&monc->mutex);
449
450         return need_request;
451 }
452 EXPORT_SYMBOL(ceph_monc_want_map);
453
454 /*
455  * Keep track of which maps we have
456  *
457  * @sub: one of CEPH_SUB_*
458  */
459 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
460                                 u32 epoch)
461 {
462         dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
463
464         if (monc->subs[sub].want) {
465                 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
466                         monc->subs[sub].want = false;
467                 else
468                         monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
469         }
470
471         monc->subs[sub].have = epoch;
472 }
473
474 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
475 {
476         mutex_lock(&monc->mutex);
477         __ceph_monc_got_map(monc, sub, epoch);
478         mutex_unlock(&monc->mutex);
479 }
480 EXPORT_SYMBOL(ceph_monc_got_map);
481
482 void ceph_monc_renew_subs(struct ceph_mon_client *monc)
483 {
484         mutex_lock(&monc->mutex);
485         __send_subscribe(monc);
486         mutex_unlock(&monc->mutex);
487 }
488 EXPORT_SYMBOL(ceph_monc_renew_subs);
489
490 /*
491  * Wait for an osdmap with a given epoch.
492  *
493  * @epoch: epoch to wait for
494  * @timeout: in jiffies, 0 means "wait forever"
495  */
496 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
497                           unsigned long timeout)
498 {
499         unsigned long started = jiffies;
500         long ret;
501
502         mutex_lock(&monc->mutex);
503         while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
504                 mutex_unlock(&monc->mutex);
505
506                 if (timeout && time_after_eq(jiffies, started + timeout))
507                         return -ETIMEDOUT;
508
509                 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
510                                      monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
511                                      ceph_timeout_jiffies(timeout));
512                 if (ret < 0)
513                         return ret;
514
515                 mutex_lock(&monc->mutex);
516         }
517
518         mutex_unlock(&monc->mutex);
519         return 0;
520 }
521 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
522
523 /*
524  * Open a session with a random monitor.  Request monmap and osdmap,
525  * which are waited upon in __ceph_open_session().
526  */
527 int ceph_monc_open_session(struct ceph_mon_client *monc)
528 {
529         mutex_lock(&monc->mutex);
530         __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
531         __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
532         __open_session(monc);
533         __schedule_delayed(monc);
534         mutex_unlock(&monc->mutex);
535         return 0;
536 }
537 EXPORT_SYMBOL(ceph_monc_open_session);
538
539 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
540                                  struct ceph_msg *msg)
541 {
542         struct ceph_client *client = monc->client;
543         struct ceph_monmap *monmap;
544         void *p, *end;
545
546         mutex_lock(&monc->mutex);
547
548         dout("handle_monmap\n");
549         p = msg->front.iov_base;
550         end = p + msg->front.iov_len;
551
552         monmap = ceph_monmap_decode(&p, end, ceph_msgr2(client));
553         if (IS_ERR(monmap)) {
554                 pr_err("problem decoding monmap, %d\n",
555                        (int)PTR_ERR(monmap));
556                 ceph_msg_dump(msg);
557                 goto out;
558         }
559
560         if (ceph_check_fsid(client, &monmap->fsid) < 0) {
561                 kfree(monmap);
562                 goto out;
563         }
564
565         kfree(monc->monmap);
566         monc->monmap = monmap;
567
568         __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
569         client->have_fsid = true;
570
571 out:
572         mutex_unlock(&monc->mutex);
573         wake_up_all(&client->auth_wq);
574 }
575
576 /*
577  * generic requests (currently statfs, mon_get_version)
578  */
579 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
580
581 static void release_generic_request(struct kref *kref)
582 {
583         struct ceph_mon_generic_request *req =
584                 container_of(kref, struct ceph_mon_generic_request, kref);
585
586         dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
587              req->reply);
588         WARN_ON(!RB_EMPTY_NODE(&req->node));
589
590         if (req->reply)
591                 ceph_msg_put(req->reply);
592         if (req->request)
593                 ceph_msg_put(req->request);
594
595         kfree(req);
596 }
597
598 static void put_generic_request(struct ceph_mon_generic_request *req)
599 {
600         if (req)
601                 kref_put(&req->kref, release_generic_request);
602 }
603
604 static void get_generic_request(struct ceph_mon_generic_request *req)
605 {
606         kref_get(&req->kref);
607 }
608
609 static struct ceph_mon_generic_request *
610 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
611 {
612         struct ceph_mon_generic_request *req;
613
614         req = kzalloc(sizeof(*req), gfp);
615         if (!req)
616                 return NULL;
617
618         req->monc = monc;
619         kref_init(&req->kref);
620         RB_CLEAR_NODE(&req->node);
621         init_completion(&req->completion);
622
623         dout("%s greq %p\n", __func__, req);
624         return req;
625 }
626
627 static void register_generic_request(struct ceph_mon_generic_request *req)
628 {
629         struct ceph_mon_client *monc = req->monc;
630
631         WARN_ON(req->tid);
632
633         get_generic_request(req);
634         req->tid = ++monc->last_tid;
635         insert_generic_request(&monc->generic_request_tree, req);
636 }
637
638 static void send_generic_request(struct ceph_mon_client *monc,
639                                  struct ceph_mon_generic_request *req)
640 {
641         WARN_ON(!req->tid);
642
643         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
644         req->request->hdr.tid = cpu_to_le64(req->tid);
645         ceph_con_send(&monc->con, ceph_msg_get(req->request));
646 }
647
648 static void __finish_generic_request(struct ceph_mon_generic_request *req)
649 {
650         struct ceph_mon_client *monc = req->monc;
651
652         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
653         erase_generic_request(&monc->generic_request_tree, req);
654
655         ceph_msg_revoke(req->request);
656         ceph_msg_revoke_incoming(req->reply);
657 }
658
659 static void finish_generic_request(struct ceph_mon_generic_request *req)
660 {
661         __finish_generic_request(req);
662         put_generic_request(req);
663 }
664
665 static void complete_generic_request(struct ceph_mon_generic_request *req)
666 {
667         if (req->complete_cb)
668                 req->complete_cb(req);
669         else
670                 complete_all(&req->completion);
671         put_generic_request(req);
672 }
673
674 static void cancel_generic_request(struct ceph_mon_generic_request *req)
675 {
676         struct ceph_mon_client *monc = req->monc;
677         struct ceph_mon_generic_request *lookup_req;
678
679         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
680
681         mutex_lock(&monc->mutex);
682         lookup_req = lookup_generic_request(&monc->generic_request_tree,
683                                             req->tid);
684         if (lookup_req) {
685                 WARN_ON(lookup_req != req);
686                 finish_generic_request(req);
687         }
688
689         mutex_unlock(&monc->mutex);
690 }
691
692 static int wait_generic_request(struct ceph_mon_generic_request *req)
693 {
694         int ret;
695
696         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
697         ret = wait_for_completion_interruptible(&req->completion);
698         if (ret)
699                 cancel_generic_request(req);
700         else
701                 ret = req->result; /* completed */
702
703         return ret;
704 }
705
706 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
707                                          struct ceph_msg_header *hdr,
708                                          int *skip)
709 {
710         struct ceph_mon_client *monc = con->private;
711         struct ceph_mon_generic_request *req;
712         u64 tid = le64_to_cpu(hdr->tid);
713         struct ceph_msg *m;
714
715         mutex_lock(&monc->mutex);
716         req = lookup_generic_request(&monc->generic_request_tree, tid);
717         if (!req) {
718                 dout("get_generic_reply %lld dne\n", tid);
719                 *skip = 1;
720                 m = NULL;
721         } else {
722                 dout("get_generic_reply %lld got %p\n", tid, req->reply);
723                 *skip = 0;
724                 m = ceph_msg_get(req->reply);
725                 /*
726                  * we don't need to track the connection reading into
727                  * this reply because we only have one open connection
728                  * at a time, ever.
729                  */
730         }
731         mutex_unlock(&monc->mutex);
732         return m;
733 }
734
735 /*
736  * statfs
737  */
738 static void handle_statfs_reply(struct ceph_mon_client *monc,
739                                 struct ceph_msg *msg)
740 {
741         struct ceph_mon_generic_request *req;
742         struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
743         u64 tid = le64_to_cpu(msg->hdr.tid);
744
745         dout("%s msg %p tid %llu\n", __func__, msg, tid);
746
747         if (msg->front.iov_len != sizeof(*reply))
748                 goto bad;
749
750         mutex_lock(&monc->mutex);
751         req = lookup_generic_request(&monc->generic_request_tree, tid);
752         if (!req) {
753                 mutex_unlock(&monc->mutex);
754                 return;
755         }
756
757         req->result = 0;
758         *req->u.st = reply->st; /* struct */
759         __finish_generic_request(req);
760         mutex_unlock(&monc->mutex);
761
762         complete_generic_request(req);
763         return;
764
765 bad:
766         pr_err("corrupt statfs reply, tid %llu\n", tid);
767         ceph_msg_dump(msg);
768 }
769
770 /*
771  * Do a synchronous statfs().
772  */
773 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
774                         struct ceph_statfs *buf)
775 {
776         struct ceph_mon_generic_request *req;
777         struct ceph_mon_statfs *h;
778         int ret = -ENOMEM;
779
780         req = alloc_generic_request(monc, GFP_NOFS);
781         if (!req)
782                 goto out;
783
784         req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
785                                     true);
786         if (!req->request)
787                 goto out;
788
789         req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
790         if (!req->reply)
791                 goto out;
792
793         req->u.st = buf;
794         req->request->hdr.version = cpu_to_le16(2);
795
796         mutex_lock(&monc->mutex);
797         register_generic_request(req);
798         /* fill out request */
799         h = req->request->front.iov_base;
800         h->monhdr.have_version = 0;
801         h->monhdr.session_mon = cpu_to_le16(-1);
802         h->monhdr.session_mon_tid = 0;
803         h->fsid = monc->monmap->fsid;
804         h->contains_data_pool = (data_pool != CEPH_NOPOOL);
805         h->data_pool = cpu_to_le64(data_pool);
806         send_generic_request(monc, req);
807         mutex_unlock(&monc->mutex);
808
809         ret = wait_generic_request(req);
810 out:
811         put_generic_request(req);
812         return ret;
813 }
814 EXPORT_SYMBOL(ceph_monc_do_statfs);
815
816 static void handle_get_version_reply(struct ceph_mon_client *monc,
817                                      struct ceph_msg *msg)
818 {
819         struct ceph_mon_generic_request *req;
820         u64 tid = le64_to_cpu(msg->hdr.tid);
821         void *p = msg->front.iov_base;
822         void *end = p + msg->front_alloc_len;
823         u64 handle;
824
825         dout("%s msg %p tid %llu\n", __func__, msg, tid);
826
827         ceph_decode_need(&p, end, 2*sizeof(u64), bad);
828         handle = ceph_decode_64(&p);
829         if (tid != 0 && tid != handle)
830                 goto bad;
831
832         mutex_lock(&monc->mutex);
833         req = lookup_generic_request(&monc->generic_request_tree, handle);
834         if (!req) {
835                 mutex_unlock(&monc->mutex);
836                 return;
837         }
838
839         req->result = 0;
840         req->u.newest = ceph_decode_64(&p);
841         __finish_generic_request(req);
842         mutex_unlock(&monc->mutex);
843
844         complete_generic_request(req);
845         return;
846
847 bad:
848         pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
849         ceph_msg_dump(msg);
850 }
851
852 static struct ceph_mon_generic_request *
853 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
854                         ceph_monc_callback_t cb, u64 private_data)
855 {
856         struct ceph_mon_generic_request *req;
857
858         req = alloc_generic_request(monc, GFP_NOIO);
859         if (!req)
860                 goto err_put_req;
861
862         req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
863                                     sizeof(u64) + sizeof(u32) + strlen(what),
864                                     GFP_NOIO, true);
865         if (!req->request)
866                 goto err_put_req;
867
868         req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
869                                   true);
870         if (!req->reply)
871                 goto err_put_req;
872
873         req->complete_cb = cb;
874         req->private_data = private_data;
875
876         mutex_lock(&monc->mutex);
877         register_generic_request(req);
878         {
879                 void *p = req->request->front.iov_base;
880                 void *const end = p + req->request->front_alloc_len;
881
882                 ceph_encode_64(&p, req->tid); /* handle */
883                 ceph_encode_string(&p, end, what, strlen(what));
884                 WARN_ON(p != end);
885         }
886         send_generic_request(monc, req);
887         mutex_unlock(&monc->mutex);
888
889         return req;
890
891 err_put_req:
892         put_generic_request(req);
893         return ERR_PTR(-ENOMEM);
894 }
895
896 /*
897  * Send MMonGetVersion and wait for the reply.
898  *
899  * @what: one of "mdsmap", "osdmap" or "monmap"
900  */
901 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
902                           u64 *newest)
903 {
904         struct ceph_mon_generic_request *req;
905         int ret;
906
907         req = __ceph_monc_get_version(monc, what, NULL, 0);
908         if (IS_ERR(req))
909                 return PTR_ERR(req);
910
911         ret = wait_generic_request(req);
912         if (!ret)
913                 *newest = req->u.newest;
914
915         put_generic_request(req);
916         return ret;
917 }
918 EXPORT_SYMBOL(ceph_monc_get_version);
919
920 /*
921  * Send MMonGetVersion,
922  *
923  * @what: one of "mdsmap", "osdmap" or "monmap"
924  */
925 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
926                                 ceph_monc_callback_t cb, u64 private_data)
927 {
928         struct ceph_mon_generic_request *req;
929
930         req = __ceph_monc_get_version(monc, what, cb, private_data);
931         if (IS_ERR(req))
932                 return PTR_ERR(req);
933
934         put_generic_request(req);
935         return 0;
936 }
937 EXPORT_SYMBOL(ceph_monc_get_version_async);
938
939 static void handle_command_ack(struct ceph_mon_client *monc,
940                                struct ceph_msg *msg)
941 {
942         struct ceph_mon_generic_request *req;
943         void *p = msg->front.iov_base;
944         void *const end = p + msg->front_alloc_len;
945         u64 tid = le64_to_cpu(msg->hdr.tid);
946
947         dout("%s msg %p tid %llu\n", __func__, msg, tid);
948
949         ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
950                                                             sizeof(u32), bad);
951         p += sizeof(struct ceph_mon_request_header);
952
953         mutex_lock(&monc->mutex);
954         req = lookup_generic_request(&monc->generic_request_tree, tid);
955         if (!req) {
956                 mutex_unlock(&monc->mutex);
957                 return;
958         }
959
960         req->result = ceph_decode_32(&p);
961         __finish_generic_request(req);
962         mutex_unlock(&monc->mutex);
963
964         complete_generic_request(req);
965         return;
966
967 bad:
968         pr_err("corrupt mon_command ack, tid %llu\n", tid);
969         ceph_msg_dump(msg);
970 }
971
972 static __printf(2, 0)
973 int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
974                          va_list ap)
975 {
976         struct ceph_mon_generic_request *req;
977         struct ceph_mon_command *h;
978         int ret = -ENOMEM;
979         int len;
980
981         req = alloc_generic_request(monc, GFP_NOIO);
982         if (!req)
983                 goto out;
984
985         req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
986         if (!req->request)
987                 goto out;
988
989         req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
990                                   true);
991         if (!req->reply)
992                 goto out;
993
994         mutex_lock(&monc->mutex);
995         register_generic_request(req);
996         h = req->request->front.iov_base;
997         h->monhdr.have_version = 0;
998         h->monhdr.session_mon = cpu_to_le16(-1);
999         h->monhdr.session_mon_tid = 0;
1000         h->fsid = monc->monmap->fsid;
1001         h->num_strs = cpu_to_le32(1);
1002         len = vsprintf(h->str, fmt, ap);
1003         h->str_len = cpu_to_le32(len);
1004         send_generic_request(monc, req);
1005         mutex_unlock(&monc->mutex);
1006
1007         ret = wait_generic_request(req);
1008 out:
1009         put_generic_request(req);
1010         return ret;
1011 }
1012
1013 static __printf(2, 3)
1014 int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
1015 {
1016         va_list ap;
1017         int ret;
1018
1019         va_start(ap, fmt);
1020         ret = do_mon_command_vargs(monc, fmt, ap);
1021         va_end(ap);
1022         return ret;
1023 }
1024
1025 int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
1026                             struct ceph_entity_addr *client_addr)
1027 {
1028         int ret;
1029
1030         ret = do_mon_command(monc,
1031                              "{ \"prefix\": \"osd blocklist\", \
1032                                 \"blocklistop\": \"add\", \
1033                                 \"addr\": \"%pISpc/%u\" }",
1034                              &client_addr->in_addr,
1035                              le32_to_cpu(client_addr->nonce));
1036         if (ret == -EINVAL) {
1037                 /*
1038                  * The monitor returns EINVAL on an unrecognized command.
1039                  * Try the legacy command -- it is exactly the same except
1040                  * for the name.
1041                  */
1042                 ret = do_mon_command(monc,
1043                                      "{ \"prefix\": \"osd blacklist\", \
1044                                         \"blacklistop\": \"add\", \
1045                                         \"addr\": \"%pISpc/%u\" }",
1046                                      &client_addr->in_addr,
1047                                      le32_to_cpu(client_addr->nonce));
1048         }
1049         if (ret)
1050                 return ret;
1051
1052         /*
1053          * Make sure we have the osdmap that includes the blocklist
1054          * entry.  This is needed to ensure that the OSDs pick up the
1055          * new blocklist before processing any future requests from
1056          * this client.
1057          */
1058         return ceph_wait_for_latest_osdmap(monc->client, 0);
1059 }
1060 EXPORT_SYMBOL(ceph_monc_blocklist_add);
1061
1062 /*
1063  * Resend pending generic requests.
1064  */
1065 static void __resend_generic_request(struct ceph_mon_client *monc)
1066 {
1067         struct ceph_mon_generic_request *req;
1068         struct rb_node *p;
1069
1070         for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
1071                 req = rb_entry(p, struct ceph_mon_generic_request, node);
1072                 ceph_msg_revoke(req->request);
1073                 ceph_msg_revoke_incoming(req->reply);
1074                 ceph_con_send(&monc->con, ceph_msg_get(req->request));
1075         }
1076 }
1077
1078 /*
1079  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
1080  * renew/retry subscription as needed (in case it is timing out, or we
1081  * got an ENOMEM).  And keep the monitor connection alive.
1082  */
1083 static void delayed_work(struct work_struct *work)
1084 {
1085         struct ceph_mon_client *monc =
1086                 container_of(work, struct ceph_mon_client, delayed_work.work);
1087
1088         dout("monc delayed_work\n");
1089         mutex_lock(&monc->mutex);
1090         if (monc->hunting) {
1091                 dout("%s continuing hunt\n", __func__);
1092                 reopen_session(monc);
1093         } else {
1094                 int is_auth = ceph_auth_is_authenticated(monc->auth);
1095                 if (ceph_con_keepalive_expired(&monc->con,
1096                                                CEPH_MONC_PING_TIMEOUT)) {
1097                         dout("monc keepalive timeout\n");
1098                         is_auth = 0;
1099                         reopen_session(monc);
1100                 }
1101
1102                 if (!monc->hunting) {
1103                         ceph_con_keepalive(&monc->con);
1104                         __validate_auth(monc);
1105                         un_backoff(monc);
1106                 }
1107
1108                 if (is_auth &&
1109                     !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
1110                         unsigned long now = jiffies;
1111
1112                         dout("%s renew subs? now %lu renew after %lu\n",
1113                              __func__, now, monc->sub_renew_after);
1114                         if (time_after_eq(now, monc->sub_renew_after))
1115                                 __send_subscribe(monc);
1116                 }
1117         }
1118         __schedule_delayed(monc);
1119         mutex_unlock(&monc->mutex);
1120 }
1121
1122 /*
1123  * On startup, we build a temporary monmap populated with the IPs
1124  * provided by mount(2).
1125  */
1126 static int build_initial_monmap(struct ceph_mon_client *monc)
1127 {
1128         __le32 my_type = ceph_msgr2(monc->client) ?
1129                 CEPH_ENTITY_ADDR_TYPE_MSGR2 : CEPH_ENTITY_ADDR_TYPE_LEGACY;
1130         struct ceph_options *opt = monc->client->options;
1131         int num_mon = opt->num_mon;
1132         int i;
1133
1134         /* build initial monmap */
1135         monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon),
1136                                GFP_KERNEL);
1137         if (!monc->monmap)
1138                 return -ENOMEM;
1139
1140         for (i = 0; i < num_mon; i++) {
1141                 struct ceph_entity_inst *inst = &monc->monmap->mon_inst[i];
1142
1143                 memcpy(&inst->addr.in_addr, &opt->mon_addr[i].in_addr,
1144                        sizeof(inst->addr.in_addr));
1145                 inst->addr.type = my_type;
1146                 inst->addr.nonce = 0;
1147                 inst->name.type = CEPH_ENTITY_TYPE_MON;
1148                 inst->name.num = cpu_to_le64(i);
1149         }
1150         monc->monmap->num_mon = num_mon;
1151         return 0;
1152 }
1153
1154 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1155 {
1156         int err = 0;
1157
1158         dout("init\n");
1159         memset(monc, 0, sizeof(*monc));
1160         monc->client = cl;
1161         monc->monmap = NULL;
1162         mutex_init(&monc->mutex);
1163
1164         err = build_initial_monmap(monc);
1165         if (err)
1166                 goto out;
1167
1168         /* connection */
1169         /* authentication */
1170         monc->auth = ceph_auth_init(cl->options->name, cl->options->key,
1171                                     cl->options->con_modes);
1172         if (IS_ERR(monc->auth)) {
1173                 err = PTR_ERR(monc->auth);
1174                 goto out_monmap;
1175         }
1176         monc->auth->want_keys =
1177                 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
1178                 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
1179
1180         /* msgs */
1181         err = -ENOMEM;
1182         monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1183                                      sizeof(struct ceph_mon_subscribe_ack),
1184                                      GFP_KERNEL, true);
1185         if (!monc->m_subscribe_ack)
1186                 goto out_auth;
1187
1188         monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1189                                          GFP_KERNEL, true);
1190         if (!monc->m_subscribe)
1191                 goto out_subscribe_ack;
1192
1193         monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1194                                           GFP_KERNEL, true);
1195         if (!monc->m_auth_reply)
1196                 goto out_subscribe;
1197
1198         monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
1199         monc->pending_auth = 0;
1200         if (!monc->m_auth)
1201                 goto out_auth_reply;
1202
1203         ceph_con_init(&monc->con, monc, &mon_con_ops,
1204                       &monc->client->msgr);
1205
1206         monc->cur_mon = -1;
1207         monc->had_a_connection = false;
1208         monc->hunt_mult = 1;
1209
1210         INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1211         monc->generic_request_tree = RB_ROOT;
1212         monc->last_tid = 0;
1213
1214         monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1215
1216         return 0;
1217
1218 out_auth_reply:
1219         ceph_msg_put(monc->m_auth_reply);
1220 out_subscribe:
1221         ceph_msg_put(monc->m_subscribe);
1222 out_subscribe_ack:
1223         ceph_msg_put(monc->m_subscribe_ack);
1224 out_auth:
1225         ceph_auth_destroy(monc->auth);
1226 out_monmap:
1227         kfree(monc->monmap);
1228 out:
1229         return err;
1230 }
1231 EXPORT_SYMBOL(ceph_monc_init);
1232
1233 void ceph_monc_stop(struct ceph_mon_client *monc)
1234 {
1235         dout("stop\n");
1236         cancel_delayed_work_sync(&monc->delayed_work);
1237
1238         mutex_lock(&monc->mutex);
1239         __close_session(monc);
1240         monc->cur_mon = -1;
1241         mutex_unlock(&monc->mutex);
1242
1243         /*
1244          * flush msgr queue before we destroy ourselves to ensure that:
1245          *  - any work that references our embedded con is finished.
1246          *  - any osd_client or other work that may reference an authorizer
1247          *    finishes before we shut down the auth subsystem.
1248          */
1249         ceph_msgr_flush();
1250
1251         ceph_auth_destroy(monc->auth);
1252
1253         WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1254
1255         ceph_msg_put(monc->m_auth);
1256         ceph_msg_put(monc->m_auth_reply);
1257         ceph_msg_put(monc->m_subscribe);
1258         ceph_msg_put(monc->m_subscribe_ack);
1259
1260         kfree(monc->monmap);
1261 }
1262 EXPORT_SYMBOL(ceph_monc_stop);
1263
1264 static void finish_hunting(struct ceph_mon_client *monc)
1265 {
1266         if (monc->hunting) {
1267                 dout("%s found mon%d\n", __func__, monc->cur_mon);
1268                 monc->hunting = false;
1269                 monc->had_a_connection = true;
1270                 un_backoff(monc);
1271                 __schedule_delayed(monc);
1272         }
1273 }
1274
1275 static void finish_auth(struct ceph_mon_client *monc, int auth_err,
1276                         bool was_authed)
1277 {
1278         dout("%s auth_err %d was_authed %d\n", __func__, auth_err, was_authed);
1279         WARN_ON(auth_err > 0);
1280
1281         monc->pending_auth = 0;
1282         if (auth_err) {
1283                 monc->client->auth_err = auth_err;
1284                 wake_up_all(&monc->client->auth_wq);
1285                 return;
1286         }
1287
1288         if (!was_authed && ceph_auth_is_authenticated(monc->auth)) {
1289                 dout("%s authenticated, starting session global_id %llu\n",
1290                      __func__, monc->auth->global_id);
1291
1292                 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1293                 monc->client->msgr.inst.name.num =
1294                                         cpu_to_le64(monc->auth->global_id);
1295
1296                 __send_subscribe(monc);
1297                 __resend_generic_request(monc);
1298
1299                 pr_info("mon%d %s session established\n", monc->cur_mon,
1300                         ceph_pr_addr(&monc->con.peer_addr));
1301         }
1302 }
1303
1304 static void handle_auth_reply(struct ceph_mon_client *monc,
1305                               struct ceph_msg *msg)
1306 {
1307         bool was_authed;
1308         int ret;
1309
1310         mutex_lock(&monc->mutex);
1311         was_authed = ceph_auth_is_authenticated(monc->auth);
1312         ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1313                                      msg->front.iov_len,
1314                                      monc->m_auth->front.iov_base,
1315                                      monc->m_auth->front_alloc_len);
1316         if (ret > 0) {
1317                 __send_prepared_auth_request(monc, ret);
1318         } else {
1319                 finish_auth(monc, ret, was_authed);
1320                 finish_hunting(monc);
1321         }
1322         mutex_unlock(&monc->mutex);
1323 }
1324
1325 static int __validate_auth(struct ceph_mon_client *monc)
1326 {
1327         int ret;
1328
1329         if (monc->pending_auth)
1330                 return 0;
1331
1332         ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1333                               monc->m_auth->front_alloc_len);
1334         if (ret <= 0)
1335                 return ret; /* either an error, or no need to authenticate */
1336         __send_prepared_auth_request(monc, ret);
1337         return 0;
1338 }
1339
1340 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1341 {
1342         int ret;
1343
1344         mutex_lock(&monc->mutex);
1345         ret = __validate_auth(monc);
1346         mutex_unlock(&monc->mutex);
1347         return ret;
1348 }
1349 EXPORT_SYMBOL(ceph_monc_validate_auth);
1350
1351 static int mon_get_auth_request(struct ceph_connection *con,
1352                                 void *buf, int *buf_len,
1353                                 void **authorizer, int *authorizer_len)
1354 {
1355         struct ceph_mon_client *monc = con->private;
1356         int ret;
1357
1358         mutex_lock(&monc->mutex);
1359         ret = ceph_auth_get_request(monc->auth, buf, *buf_len);
1360         mutex_unlock(&monc->mutex);
1361         if (ret < 0)
1362                 return ret;
1363
1364         *buf_len = ret;
1365         *authorizer = NULL;
1366         *authorizer_len = 0;
1367         return 0;
1368 }
1369
1370 static int mon_handle_auth_reply_more(struct ceph_connection *con,
1371                                       void *reply, int reply_len,
1372                                       void *buf, int *buf_len,
1373                                       void **authorizer, int *authorizer_len)
1374 {
1375         struct ceph_mon_client *monc = con->private;
1376         int ret;
1377
1378         mutex_lock(&monc->mutex);
1379         ret = ceph_auth_handle_reply_more(monc->auth, reply, reply_len,
1380                                           buf, *buf_len);
1381         mutex_unlock(&monc->mutex);
1382         if (ret < 0)
1383                 return ret;
1384
1385         *buf_len = ret;
1386         *authorizer = NULL;
1387         *authorizer_len = 0;
1388         return 0;
1389 }
1390
1391 static int mon_handle_auth_done(struct ceph_connection *con,
1392                                 u64 global_id, void *reply, int reply_len,
1393                                 u8 *session_key, int *session_key_len,
1394                                 u8 *con_secret, int *con_secret_len)
1395 {
1396         struct ceph_mon_client *monc = con->private;
1397         bool was_authed;
1398         int ret;
1399
1400         mutex_lock(&monc->mutex);
1401         WARN_ON(!monc->hunting);
1402         was_authed = ceph_auth_is_authenticated(monc->auth);
1403         ret = ceph_auth_handle_reply_done(monc->auth, global_id,
1404                                           reply, reply_len,
1405                                           session_key, session_key_len,
1406                                           con_secret, con_secret_len);
1407         finish_auth(monc, ret, was_authed);
1408         if (!ret)
1409                 finish_hunting(monc);
1410         mutex_unlock(&monc->mutex);
1411         return 0;
1412 }
1413
1414 static int mon_handle_auth_bad_method(struct ceph_connection *con,
1415                                       int used_proto, int result,
1416                                       const int *allowed_protos, int proto_cnt,
1417                                       const int *allowed_modes, int mode_cnt)
1418 {
1419         struct ceph_mon_client *monc = con->private;
1420         bool was_authed;
1421
1422         mutex_lock(&monc->mutex);
1423         WARN_ON(!monc->hunting);
1424         was_authed = ceph_auth_is_authenticated(monc->auth);
1425         ceph_auth_handle_bad_method(monc->auth, used_proto, result,
1426                                     allowed_protos, proto_cnt,
1427                                     allowed_modes, mode_cnt);
1428         finish_auth(monc, -EACCES, was_authed);
1429         mutex_unlock(&monc->mutex);
1430         return 0;
1431 }
1432
1433 /*
1434  * handle incoming message
1435  */
1436 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1437 {
1438         struct ceph_mon_client *monc = con->private;
1439         int type = le16_to_cpu(msg->hdr.type);
1440
1441         switch (type) {
1442         case CEPH_MSG_AUTH_REPLY:
1443                 handle_auth_reply(monc, msg);
1444                 break;
1445
1446         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1447                 handle_subscribe_ack(monc, msg);
1448                 break;
1449
1450         case CEPH_MSG_STATFS_REPLY:
1451                 handle_statfs_reply(monc, msg);
1452                 break;
1453
1454         case CEPH_MSG_MON_GET_VERSION_REPLY:
1455                 handle_get_version_reply(monc, msg);
1456                 break;
1457
1458         case CEPH_MSG_MON_COMMAND_ACK:
1459                 handle_command_ack(monc, msg);
1460                 break;
1461
1462         case CEPH_MSG_MON_MAP:
1463                 ceph_monc_handle_map(monc, msg);
1464                 break;
1465
1466         case CEPH_MSG_OSD_MAP:
1467                 ceph_osdc_handle_map(&monc->client->osdc, msg);
1468                 break;
1469
1470         default:
1471                 /* can the chained handler handle it? */
1472                 if (monc->client->extra_mon_dispatch &&
1473                     monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1474                         break;
1475
1476                 pr_err("received unknown message type %d %s\n", type,
1477                        ceph_msg_type_name(type));
1478         }
1479         ceph_msg_put(msg);
1480 }
1481
1482 /*
1483  * Allocate memory for incoming message
1484  */
1485 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1486                                       struct ceph_msg_header *hdr,
1487                                       int *skip)
1488 {
1489         struct ceph_mon_client *monc = con->private;
1490         int type = le16_to_cpu(hdr->type);
1491         int front_len = le32_to_cpu(hdr->front_len);
1492         struct ceph_msg *m = NULL;
1493
1494         *skip = 0;
1495
1496         switch (type) {
1497         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1498                 m = ceph_msg_get(monc->m_subscribe_ack);
1499                 break;
1500         case CEPH_MSG_STATFS_REPLY:
1501         case CEPH_MSG_MON_COMMAND_ACK:
1502                 return get_generic_reply(con, hdr, skip);
1503         case CEPH_MSG_AUTH_REPLY:
1504                 m = ceph_msg_get(monc->m_auth_reply);
1505                 break;
1506         case CEPH_MSG_MON_GET_VERSION_REPLY:
1507                 if (le64_to_cpu(hdr->tid) != 0)
1508                         return get_generic_reply(con, hdr, skip);
1509
1510                 /*
1511                  * Older OSDs don't set reply tid even if the orignal
1512                  * request had a non-zero tid.  Work around this weirdness
1513                  * by allocating a new message.
1514                  */
1515                 fallthrough;
1516         case CEPH_MSG_MON_MAP:
1517         case CEPH_MSG_MDS_MAP:
1518         case CEPH_MSG_OSD_MAP:
1519         case CEPH_MSG_FS_MAP_USER:
1520                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1521                 if (!m)
1522                         return NULL;    /* ENOMEM--return skip == 0 */
1523                 break;
1524         }
1525
1526         if (!m) {
1527                 pr_info("alloc_msg unknown type %d\n", type);
1528                 *skip = 1;
1529         } else if (front_len > m->front_alloc_len) {
1530                 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1531                         front_len, m->front_alloc_len,
1532                         (unsigned int)con->peer_name.type,
1533                         le64_to_cpu(con->peer_name.num));
1534                 ceph_msg_put(m);
1535                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1536         }
1537
1538         return m;
1539 }
1540
1541 /*
1542  * If the monitor connection resets, pick a new monitor and resubmit
1543  * any pending requests.
1544  */
1545 static void mon_fault(struct ceph_connection *con)
1546 {
1547         struct ceph_mon_client *monc = con->private;
1548
1549         mutex_lock(&monc->mutex);
1550         dout("%s mon%d\n", __func__, monc->cur_mon);
1551         if (monc->cur_mon >= 0) {
1552                 if (!monc->hunting) {
1553                         dout("%s hunting for new mon\n", __func__);
1554                         reopen_session(monc);
1555                         __schedule_delayed(monc);
1556                 } else {
1557                         dout("%s already hunting\n", __func__);
1558                 }
1559         }
1560         mutex_unlock(&monc->mutex);
1561 }
1562
1563 /*
1564  * We can ignore refcounting on the connection struct, as all references
1565  * will come from the messenger workqueue, which is drained prior to
1566  * mon_client destruction.
1567  */
1568 static struct ceph_connection *con_get(struct ceph_connection *con)
1569 {
1570         return con;
1571 }
1572
1573 static void con_put(struct ceph_connection *con)
1574 {
1575 }
1576
1577 static const struct ceph_connection_operations mon_con_ops = {
1578         .get = con_get,
1579         .put = con_put,
1580         .dispatch = dispatch,
1581         .fault = mon_fault,
1582         .alloc_msg = mon_alloc_msg,
1583         .get_auth_request = mon_get_auth_request,
1584         .handle_auth_reply_more = mon_handle_auth_reply_more,
1585         .handle_auth_done = mon_handle_auth_done,
1586         .handle_auth_bad_method = mon_handle_auth_bad_method,
1587 };