libceph: introduce connection modes and ms_mode option
[linux-2.6-microblaze.git] / net / ceph / mon_client.c
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/module.h>
5 #include <linux/types.h>
6 #include <linux/slab.h>
7 #include <linux/random.h>
8 #include <linux/sched.h>
9
10 #include <linux/ceph/ceph_features.h>
11 #include <linux/ceph/mon_client.h>
12 #include <linux/ceph/libceph.h>
13 #include <linux/ceph/debugfs.h>
14 #include <linux/ceph/decode.h>
15 #include <linux/ceph/auth.h>
16
17 /*
18  * Interact with Ceph monitor cluster.  Handle requests for new map
19  * versions, and periodically resend as needed.  Also implement
20  * statfs() and umount().
21  *
22  * A small cluster of Ceph "monitors" are responsible for managing critical
23  * cluster configuration and state information.  An odd number (e.g., 3, 5)
24  * of cmon daemons use a modified version of the Paxos part-time parliament
25  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
26  * list of clients who have mounted the file system.
27  *
28  * We maintain an open, active session with a monitor at all times in order to
29  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
30  * TCP socket to ensure we detect a failure.  If the connection does break, we
31  * randomly hunt for a new monitor.  Once the connection is reestablished, we
32  * resend any outstanding requests.
33  */
34
35 static const struct ceph_connection_operations mon_con_ops;
36
37 static int __validate_auth(struct ceph_mon_client *monc);
38
39 static int decode_mon_info(void **p, void *end, bool msgr2,
40                            struct ceph_entity_addr *addr)
41 {
42         void *mon_info_end;
43         u32 struct_len;
44         u8 struct_v;
45         int ret;
46
47         ret = ceph_start_decoding(p, end, 1, "mon_info_t", &struct_v,
48                                   &struct_len);
49         if (ret)
50                 return ret;
51
52         mon_info_end = *p + struct_len;
53         ceph_decode_skip_string(p, end, e_inval);  /* skip mon name */
54         ret = ceph_decode_entity_addrvec(p, end, msgr2, addr);
55         if (ret)
56                 return ret;
57
58         *p = mon_info_end;
59         return 0;
60
61 e_inval:
62         return -EINVAL;
63 }
64
65 /*
66  * Decode a monmap blob (e.g., during mount).
67  *
68  * Assume MonMap v3 (i.e. encoding with MONNAMES and MONENC).
69  */
70 static struct ceph_monmap *ceph_monmap_decode(void **p, void *end, bool msgr2)
71 {
72         struct ceph_monmap *monmap = NULL;
73         struct ceph_fsid fsid;
74         u32 struct_len;
75         int blob_len;
76         int num_mon;
77         u8 struct_v;
78         u32 epoch;
79         int ret;
80         int i;
81
82         ceph_decode_32_safe(p, end, blob_len, e_inval);
83         ceph_decode_need(p, end, blob_len, e_inval);
84
85         ret = ceph_start_decoding(p, end, 6, "monmap", &struct_v, &struct_len);
86         if (ret)
87                 goto fail;
88
89         dout("%s struct_v %d\n", __func__, struct_v);
90         ceph_decode_copy_safe(p, end, &fsid, sizeof(fsid), e_inval);
91         ceph_decode_32_safe(p, end, epoch, e_inval);
92         if (struct_v >= 6) {
93                 u32 feat_struct_len;
94                 u8 feat_struct_v;
95
96                 *p += sizeof(struct ceph_timespec);  /* skip last_changed */
97                 *p += sizeof(struct ceph_timespec);  /* skip created */
98
99                 ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
100                                           &feat_struct_v, &feat_struct_len);
101                 if (ret)
102                         goto fail;
103
104                 *p += feat_struct_len;  /* skip persistent_features */
105
106                 ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
107                                           &feat_struct_v, &feat_struct_len);
108                 if (ret)
109                         goto fail;
110
111                 *p += feat_struct_len;  /* skip optional_features */
112         }
113         ceph_decode_32_safe(p, end, num_mon, e_inval);
114
115         dout("%s fsid %pU epoch %u num_mon %d\n", __func__, &fsid, epoch,
116              num_mon);
117         if (num_mon > CEPH_MAX_MON)
118                 goto e_inval;
119
120         monmap = kmalloc(struct_size(monmap, mon_inst, num_mon), GFP_NOIO);
121         if (!monmap) {
122                 ret = -ENOMEM;
123                 goto fail;
124         }
125         monmap->fsid = fsid;
126         monmap->epoch = epoch;
127         monmap->num_mon = num_mon;
128
129         /* legacy_mon_addr map or mon_info map */
130         for (i = 0; i < num_mon; i++) {
131                 struct ceph_entity_inst *inst = &monmap->mon_inst[i];
132
133                 ceph_decode_skip_string(p, end, e_inval);  /* skip mon name */
134                 inst->name.type = CEPH_ENTITY_TYPE_MON;
135                 inst->name.num = cpu_to_le64(i);
136
137                 if (struct_v >= 6)
138                         ret = decode_mon_info(p, end, msgr2, &inst->addr);
139                 else
140                         ret = ceph_decode_entity_addr(p, end, &inst->addr);
141                 if (ret)
142                         goto fail;
143
144                 dout("%s mon%d addr %s\n", __func__, i,
145                      ceph_pr_addr(&inst->addr));
146         }
147
148         return monmap;
149
150 e_inval:
151         ret = -EINVAL;
152 fail:
153         kfree(monmap);
154         return ERR_PTR(ret);
155 }
156
157 /*
158  * return true if *addr is included in the monmap.
159  */
160 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
161 {
162         int i;
163
164         for (i = 0; i < m->num_mon; i++) {
165                 if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr))
166                         return 1;
167         }
168
169         return 0;
170 }
171
172 /*
173  * Send an auth request.
174  */
175 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
176 {
177         monc->pending_auth = 1;
178         monc->m_auth->front.iov_len = len;
179         monc->m_auth->hdr.front_len = cpu_to_le32(len);
180         ceph_msg_revoke(monc->m_auth);
181         ceph_msg_get(monc->m_auth);  /* keep our ref */
182         ceph_con_send(&monc->con, monc->m_auth);
183 }
184
185 /*
186  * Close monitor session, if any.
187  */
188 static void __close_session(struct ceph_mon_client *monc)
189 {
190         dout("__close_session closing mon%d\n", monc->cur_mon);
191         ceph_msg_revoke(monc->m_auth);
192         ceph_msg_revoke_incoming(monc->m_auth_reply);
193         ceph_msg_revoke(monc->m_subscribe);
194         ceph_msg_revoke_incoming(monc->m_subscribe_ack);
195         ceph_con_close(&monc->con);
196
197         monc->pending_auth = 0;
198         ceph_auth_reset(monc->auth);
199 }
200
201 /*
202  * Pick a new monitor at random and set cur_mon.  If we are repicking
203  * (i.e. cur_mon is already set), be sure to pick a different one.
204  */
205 static void pick_new_mon(struct ceph_mon_client *monc)
206 {
207         int old_mon = monc->cur_mon;
208
209         BUG_ON(monc->monmap->num_mon < 1);
210
211         if (monc->monmap->num_mon == 1) {
212                 monc->cur_mon = 0;
213         } else {
214                 int max = monc->monmap->num_mon;
215                 int o = -1;
216                 int n;
217
218                 if (monc->cur_mon >= 0) {
219                         if (monc->cur_mon < monc->monmap->num_mon)
220                                 o = monc->cur_mon;
221                         if (o >= 0)
222                                 max--;
223                 }
224
225                 n = prandom_u32() % max;
226                 if (o >= 0 && n >= o)
227                         n++;
228
229                 monc->cur_mon = n;
230         }
231
232         dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
233              monc->cur_mon, monc->monmap->num_mon);
234 }
235
236 /*
237  * Open a session with a new monitor.
238  */
239 static void __open_session(struct ceph_mon_client *monc)
240 {
241         int ret;
242
243         pick_new_mon(monc);
244
245         monc->hunting = true;
246         if (monc->had_a_connection) {
247                 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
248                 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
249                         monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
250         }
251
252         monc->sub_renew_after = jiffies; /* i.e., expired */
253         monc->sub_renew_sent = 0;
254
255         dout("%s opening mon%d\n", __func__, monc->cur_mon);
256         ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
257                       &monc->monmap->mon_inst[monc->cur_mon].addr);
258
259         /*
260          * send an initial keepalive to ensure our timestamp is valid
261          * by the time we are in an OPENED state
262          */
263         ceph_con_keepalive(&monc->con);
264
265         /* initiate authentication handshake */
266         ret = ceph_auth_build_hello(monc->auth,
267                                     monc->m_auth->front.iov_base,
268                                     monc->m_auth->front_alloc_len);
269         BUG_ON(ret <= 0);
270         __send_prepared_auth_request(monc, ret);
271 }
272
273 static void reopen_session(struct ceph_mon_client *monc)
274 {
275         if (!monc->hunting)
276                 pr_info("mon%d %s session lost, hunting for new mon\n",
277                     monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr));
278
279         __close_session(monc);
280         __open_session(monc);
281 }
282
283 void ceph_monc_reopen_session(struct ceph_mon_client *monc)
284 {
285         mutex_lock(&monc->mutex);
286         reopen_session(monc);
287         mutex_unlock(&monc->mutex);
288 }
289
290 static void un_backoff(struct ceph_mon_client *monc)
291 {
292         monc->hunt_mult /= 2; /* reduce by 50% */
293         if (monc->hunt_mult < 1)
294                 monc->hunt_mult = 1;
295         dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult);
296 }
297
298 /*
299  * Reschedule delayed work timer.
300  */
301 static void __schedule_delayed(struct ceph_mon_client *monc)
302 {
303         unsigned long delay;
304
305         if (monc->hunting)
306                 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
307         else
308                 delay = CEPH_MONC_PING_INTERVAL;
309
310         dout("__schedule_delayed after %lu\n", delay);
311         mod_delayed_work(system_wq, &monc->delayed_work,
312                          round_jiffies_relative(delay));
313 }
314
315 const char *ceph_sub_str[] = {
316         [CEPH_SUB_MONMAP] = "monmap",
317         [CEPH_SUB_OSDMAP] = "osdmap",
318         [CEPH_SUB_FSMAP]  = "fsmap.user",
319         [CEPH_SUB_MDSMAP] = "mdsmap",
320 };
321
322 /*
323  * Send subscribe request for one or more maps, according to
324  * monc->subs.
325  */
326 static void __send_subscribe(struct ceph_mon_client *monc)
327 {
328         struct ceph_msg *msg = monc->m_subscribe;
329         void *p = msg->front.iov_base;
330         void *const end = p + msg->front_alloc_len;
331         int num = 0;
332         int i;
333
334         dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
335
336         BUG_ON(monc->cur_mon < 0);
337
338         if (!monc->sub_renew_sent)
339                 monc->sub_renew_sent = jiffies | 1; /* never 0 */
340
341         msg->hdr.version = cpu_to_le16(2);
342
343         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
344                 if (monc->subs[i].want)
345                         num++;
346         }
347         BUG_ON(num < 1); /* monmap sub is always there */
348         ceph_encode_32(&p, num);
349         for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
350                 char buf[32];
351                 int len;
352
353                 if (!monc->subs[i].want)
354                         continue;
355
356                 len = sprintf(buf, "%s", ceph_sub_str[i]);
357                 if (i == CEPH_SUB_MDSMAP &&
358                     monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
359                         len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
360
361                 dout("%s %s start %llu flags 0x%x\n", __func__, buf,
362                      le64_to_cpu(monc->subs[i].item.start),
363                      monc->subs[i].item.flags);
364                 ceph_encode_string(&p, end, buf, len);
365                 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
366                 p += sizeof(monc->subs[i].item);
367         }
368
369         BUG_ON(p > end);
370         msg->front.iov_len = p - msg->front.iov_base;
371         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
372         ceph_msg_revoke(msg);
373         ceph_con_send(&monc->con, ceph_msg_get(msg));
374 }
375
376 static void handle_subscribe_ack(struct ceph_mon_client *monc,
377                                  struct ceph_msg *msg)
378 {
379         unsigned int seconds;
380         struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
381
382         if (msg->front.iov_len < sizeof(*h))
383                 goto bad;
384         seconds = le32_to_cpu(h->duration);
385
386         mutex_lock(&monc->mutex);
387         if (monc->sub_renew_sent) {
388                 /*
389                  * This is only needed for legacy (infernalis or older)
390                  * MONs -- see delayed_work().
391                  */
392                 monc->sub_renew_after = monc->sub_renew_sent +
393                                             (seconds >> 1) * HZ - 1;
394                 dout("%s sent %lu duration %d renew after %lu\n", __func__,
395                      monc->sub_renew_sent, seconds, monc->sub_renew_after);
396                 monc->sub_renew_sent = 0;
397         } else {
398                 dout("%s sent %lu renew after %lu, ignoring\n", __func__,
399                      monc->sub_renew_sent, monc->sub_renew_after);
400         }
401         mutex_unlock(&monc->mutex);
402         return;
403 bad:
404         pr_err("got corrupt subscribe-ack msg\n");
405         ceph_msg_dump(msg);
406 }
407
408 /*
409  * Register interest in a map
410  *
411  * @sub: one of CEPH_SUB_*
412  * @epoch: X for "every map since X", or 0 for "just the latest"
413  */
414 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
415                                  u32 epoch, bool continuous)
416 {
417         __le64 start = cpu_to_le64(epoch);
418         u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
419
420         dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
421              epoch, continuous);
422
423         if (monc->subs[sub].want &&
424             monc->subs[sub].item.start == start &&
425             monc->subs[sub].item.flags == flags)
426                 return false;
427
428         monc->subs[sub].item.start = start;
429         monc->subs[sub].item.flags = flags;
430         monc->subs[sub].want = true;
431
432         return true;
433 }
434
435 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
436                         bool continuous)
437 {
438         bool need_request;
439
440         mutex_lock(&monc->mutex);
441         need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
442         mutex_unlock(&monc->mutex);
443
444         return need_request;
445 }
446 EXPORT_SYMBOL(ceph_monc_want_map);
447
448 /*
449  * Keep track of which maps we have
450  *
451  * @sub: one of CEPH_SUB_*
452  */
453 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
454                                 u32 epoch)
455 {
456         dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
457
458         if (monc->subs[sub].want) {
459                 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
460                         monc->subs[sub].want = false;
461                 else
462                         monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
463         }
464
465         monc->subs[sub].have = epoch;
466 }
467
468 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
469 {
470         mutex_lock(&monc->mutex);
471         __ceph_monc_got_map(monc, sub, epoch);
472         mutex_unlock(&monc->mutex);
473 }
474 EXPORT_SYMBOL(ceph_monc_got_map);
475
476 void ceph_monc_renew_subs(struct ceph_mon_client *monc)
477 {
478         mutex_lock(&monc->mutex);
479         __send_subscribe(monc);
480         mutex_unlock(&monc->mutex);
481 }
482 EXPORT_SYMBOL(ceph_monc_renew_subs);
483
484 /*
485  * Wait for an osdmap with a given epoch.
486  *
487  * @epoch: epoch to wait for
488  * @timeout: in jiffies, 0 means "wait forever"
489  */
490 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
491                           unsigned long timeout)
492 {
493         unsigned long started = jiffies;
494         long ret;
495
496         mutex_lock(&monc->mutex);
497         while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
498                 mutex_unlock(&monc->mutex);
499
500                 if (timeout && time_after_eq(jiffies, started + timeout))
501                         return -ETIMEDOUT;
502
503                 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
504                                      monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
505                                      ceph_timeout_jiffies(timeout));
506                 if (ret < 0)
507                         return ret;
508
509                 mutex_lock(&monc->mutex);
510         }
511
512         mutex_unlock(&monc->mutex);
513         return 0;
514 }
515 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
516
517 /*
518  * Open a session with a random monitor.  Request monmap and osdmap,
519  * which are waited upon in __ceph_open_session().
520  */
521 int ceph_monc_open_session(struct ceph_mon_client *monc)
522 {
523         mutex_lock(&monc->mutex);
524         __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
525         __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
526         __open_session(monc);
527         __schedule_delayed(monc);
528         mutex_unlock(&monc->mutex);
529         return 0;
530 }
531 EXPORT_SYMBOL(ceph_monc_open_session);
532
533 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
534                                  struct ceph_msg *msg)
535 {
536         struct ceph_client *client = monc->client;
537         struct ceph_monmap *monmap;
538         void *p, *end;
539
540         mutex_lock(&monc->mutex);
541
542         dout("handle_monmap\n");
543         p = msg->front.iov_base;
544         end = p + msg->front.iov_len;
545
546         monmap = ceph_monmap_decode(&p, end, false);
547         if (IS_ERR(monmap)) {
548                 pr_err("problem decoding monmap, %d\n",
549                        (int)PTR_ERR(monmap));
550                 ceph_msg_dump(msg);
551                 goto out;
552         }
553
554         if (ceph_check_fsid(client, &monmap->fsid) < 0) {
555                 kfree(monmap);
556                 goto out;
557         }
558
559         kfree(monc->monmap);
560         monc->monmap = monmap;
561
562         __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
563         client->have_fsid = true;
564
565 out:
566         mutex_unlock(&monc->mutex);
567         wake_up_all(&client->auth_wq);
568 }
569
570 /*
571  * generic requests (currently statfs, mon_get_version)
572  */
573 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
574
575 static void release_generic_request(struct kref *kref)
576 {
577         struct ceph_mon_generic_request *req =
578                 container_of(kref, struct ceph_mon_generic_request, kref);
579
580         dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
581              req->reply);
582         WARN_ON(!RB_EMPTY_NODE(&req->node));
583
584         if (req->reply)
585                 ceph_msg_put(req->reply);
586         if (req->request)
587                 ceph_msg_put(req->request);
588
589         kfree(req);
590 }
591
592 static void put_generic_request(struct ceph_mon_generic_request *req)
593 {
594         if (req)
595                 kref_put(&req->kref, release_generic_request);
596 }
597
598 static void get_generic_request(struct ceph_mon_generic_request *req)
599 {
600         kref_get(&req->kref);
601 }
602
603 static struct ceph_mon_generic_request *
604 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
605 {
606         struct ceph_mon_generic_request *req;
607
608         req = kzalloc(sizeof(*req), gfp);
609         if (!req)
610                 return NULL;
611
612         req->monc = monc;
613         kref_init(&req->kref);
614         RB_CLEAR_NODE(&req->node);
615         init_completion(&req->completion);
616
617         dout("%s greq %p\n", __func__, req);
618         return req;
619 }
620
621 static void register_generic_request(struct ceph_mon_generic_request *req)
622 {
623         struct ceph_mon_client *monc = req->monc;
624
625         WARN_ON(req->tid);
626
627         get_generic_request(req);
628         req->tid = ++monc->last_tid;
629         insert_generic_request(&monc->generic_request_tree, req);
630 }
631
632 static void send_generic_request(struct ceph_mon_client *monc,
633                                  struct ceph_mon_generic_request *req)
634 {
635         WARN_ON(!req->tid);
636
637         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
638         req->request->hdr.tid = cpu_to_le64(req->tid);
639         ceph_con_send(&monc->con, ceph_msg_get(req->request));
640 }
641
642 static void __finish_generic_request(struct ceph_mon_generic_request *req)
643 {
644         struct ceph_mon_client *monc = req->monc;
645
646         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
647         erase_generic_request(&monc->generic_request_tree, req);
648
649         ceph_msg_revoke(req->request);
650         ceph_msg_revoke_incoming(req->reply);
651 }
652
653 static void finish_generic_request(struct ceph_mon_generic_request *req)
654 {
655         __finish_generic_request(req);
656         put_generic_request(req);
657 }
658
659 static void complete_generic_request(struct ceph_mon_generic_request *req)
660 {
661         if (req->complete_cb)
662                 req->complete_cb(req);
663         else
664                 complete_all(&req->completion);
665         put_generic_request(req);
666 }
667
668 static void cancel_generic_request(struct ceph_mon_generic_request *req)
669 {
670         struct ceph_mon_client *monc = req->monc;
671         struct ceph_mon_generic_request *lookup_req;
672
673         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
674
675         mutex_lock(&monc->mutex);
676         lookup_req = lookup_generic_request(&monc->generic_request_tree,
677                                             req->tid);
678         if (lookup_req) {
679                 WARN_ON(lookup_req != req);
680                 finish_generic_request(req);
681         }
682
683         mutex_unlock(&monc->mutex);
684 }
685
686 static int wait_generic_request(struct ceph_mon_generic_request *req)
687 {
688         int ret;
689
690         dout("%s greq %p tid %llu\n", __func__, req, req->tid);
691         ret = wait_for_completion_interruptible(&req->completion);
692         if (ret)
693                 cancel_generic_request(req);
694         else
695                 ret = req->result; /* completed */
696
697         return ret;
698 }
699
700 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
701                                          struct ceph_msg_header *hdr,
702                                          int *skip)
703 {
704         struct ceph_mon_client *monc = con->private;
705         struct ceph_mon_generic_request *req;
706         u64 tid = le64_to_cpu(hdr->tid);
707         struct ceph_msg *m;
708
709         mutex_lock(&monc->mutex);
710         req = lookup_generic_request(&monc->generic_request_tree, tid);
711         if (!req) {
712                 dout("get_generic_reply %lld dne\n", tid);
713                 *skip = 1;
714                 m = NULL;
715         } else {
716                 dout("get_generic_reply %lld got %p\n", tid, req->reply);
717                 *skip = 0;
718                 m = ceph_msg_get(req->reply);
719                 /*
720                  * we don't need to track the connection reading into
721                  * this reply because we only have one open connection
722                  * at a time, ever.
723                  */
724         }
725         mutex_unlock(&monc->mutex);
726         return m;
727 }
728
729 /*
730  * statfs
731  */
732 static void handle_statfs_reply(struct ceph_mon_client *monc,
733                                 struct ceph_msg *msg)
734 {
735         struct ceph_mon_generic_request *req;
736         struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
737         u64 tid = le64_to_cpu(msg->hdr.tid);
738
739         dout("%s msg %p tid %llu\n", __func__, msg, tid);
740
741         if (msg->front.iov_len != sizeof(*reply))
742                 goto bad;
743
744         mutex_lock(&monc->mutex);
745         req = lookup_generic_request(&monc->generic_request_tree, tid);
746         if (!req) {
747                 mutex_unlock(&monc->mutex);
748                 return;
749         }
750
751         req->result = 0;
752         *req->u.st = reply->st; /* struct */
753         __finish_generic_request(req);
754         mutex_unlock(&monc->mutex);
755
756         complete_generic_request(req);
757         return;
758
759 bad:
760         pr_err("corrupt statfs reply, tid %llu\n", tid);
761         ceph_msg_dump(msg);
762 }
763
764 /*
765  * Do a synchronous statfs().
766  */
767 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
768                         struct ceph_statfs *buf)
769 {
770         struct ceph_mon_generic_request *req;
771         struct ceph_mon_statfs *h;
772         int ret = -ENOMEM;
773
774         req = alloc_generic_request(monc, GFP_NOFS);
775         if (!req)
776                 goto out;
777
778         req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
779                                     true);
780         if (!req->request)
781                 goto out;
782
783         req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
784         if (!req->reply)
785                 goto out;
786
787         req->u.st = buf;
788         req->request->hdr.version = cpu_to_le16(2);
789
790         mutex_lock(&monc->mutex);
791         register_generic_request(req);
792         /* fill out request */
793         h = req->request->front.iov_base;
794         h->monhdr.have_version = 0;
795         h->monhdr.session_mon = cpu_to_le16(-1);
796         h->monhdr.session_mon_tid = 0;
797         h->fsid = monc->monmap->fsid;
798         h->contains_data_pool = (data_pool != CEPH_NOPOOL);
799         h->data_pool = cpu_to_le64(data_pool);
800         send_generic_request(monc, req);
801         mutex_unlock(&monc->mutex);
802
803         ret = wait_generic_request(req);
804 out:
805         put_generic_request(req);
806         return ret;
807 }
808 EXPORT_SYMBOL(ceph_monc_do_statfs);
809
810 static void handle_get_version_reply(struct ceph_mon_client *monc,
811                                      struct ceph_msg *msg)
812 {
813         struct ceph_mon_generic_request *req;
814         u64 tid = le64_to_cpu(msg->hdr.tid);
815         void *p = msg->front.iov_base;
816         void *end = p + msg->front_alloc_len;
817         u64 handle;
818
819         dout("%s msg %p tid %llu\n", __func__, msg, tid);
820
821         ceph_decode_need(&p, end, 2*sizeof(u64), bad);
822         handle = ceph_decode_64(&p);
823         if (tid != 0 && tid != handle)
824                 goto bad;
825
826         mutex_lock(&monc->mutex);
827         req = lookup_generic_request(&monc->generic_request_tree, handle);
828         if (!req) {
829                 mutex_unlock(&monc->mutex);
830                 return;
831         }
832
833         req->result = 0;
834         req->u.newest = ceph_decode_64(&p);
835         __finish_generic_request(req);
836         mutex_unlock(&monc->mutex);
837
838         complete_generic_request(req);
839         return;
840
841 bad:
842         pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
843         ceph_msg_dump(msg);
844 }
845
846 static struct ceph_mon_generic_request *
847 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
848                         ceph_monc_callback_t cb, u64 private_data)
849 {
850         struct ceph_mon_generic_request *req;
851
852         req = alloc_generic_request(monc, GFP_NOIO);
853         if (!req)
854                 goto err_put_req;
855
856         req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
857                                     sizeof(u64) + sizeof(u32) + strlen(what),
858                                     GFP_NOIO, true);
859         if (!req->request)
860                 goto err_put_req;
861
862         req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
863                                   true);
864         if (!req->reply)
865                 goto err_put_req;
866
867         req->complete_cb = cb;
868         req->private_data = private_data;
869
870         mutex_lock(&monc->mutex);
871         register_generic_request(req);
872         {
873                 void *p = req->request->front.iov_base;
874                 void *const end = p + req->request->front_alloc_len;
875
876                 ceph_encode_64(&p, req->tid); /* handle */
877                 ceph_encode_string(&p, end, what, strlen(what));
878                 WARN_ON(p != end);
879         }
880         send_generic_request(monc, req);
881         mutex_unlock(&monc->mutex);
882
883         return req;
884
885 err_put_req:
886         put_generic_request(req);
887         return ERR_PTR(-ENOMEM);
888 }
889
890 /*
891  * Send MMonGetVersion and wait for the reply.
892  *
893  * @what: one of "mdsmap", "osdmap" or "monmap"
894  */
895 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
896                           u64 *newest)
897 {
898         struct ceph_mon_generic_request *req;
899         int ret;
900
901         req = __ceph_monc_get_version(monc, what, NULL, 0);
902         if (IS_ERR(req))
903                 return PTR_ERR(req);
904
905         ret = wait_generic_request(req);
906         if (!ret)
907                 *newest = req->u.newest;
908
909         put_generic_request(req);
910         return ret;
911 }
912 EXPORT_SYMBOL(ceph_monc_get_version);
913
914 /*
915  * Send MMonGetVersion,
916  *
917  * @what: one of "mdsmap", "osdmap" or "monmap"
918  */
919 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
920                                 ceph_monc_callback_t cb, u64 private_data)
921 {
922         struct ceph_mon_generic_request *req;
923
924         req = __ceph_monc_get_version(monc, what, cb, private_data);
925         if (IS_ERR(req))
926                 return PTR_ERR(req);
927
928         put_generic_request(req);
929         return 0;
930 }
931 EXPORT_SYMBOL(ceph_monc_get_version_async);
932
933 static void handle_command_ack(struct ceph_mon_client *monc,
934                                struct ceph_msg *msg)
935 {
936         struct ceph_mon_generic_request *req;
937         void *p = msg->front.iov_base;
938         void *const end = p + msg->front_alloc_len;
939         u64 tid = le64_to_cpu(msg->hdr.tid);
940
941         dout("%s msg %p tid %llu\n", __func__, msg, tid);
942
943         ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
944                                                             sizeof(u32), bad);
945         p += sizeof(struct ceph_mon_request_header);
946
947         mutex_lock(&monc->mutex);
948         req = lookup_generic_request(&monc->generic_request_tree, tid);
949         if (!req) {
950                 mutex_unlock(&monc->mutex);
951                 return;
952         }
953
954         req->result = ceph_decode_32(&p);
955         __finish_generic_request(req);
956         mutex_unlock(&monc->mutex);
957
958         complete_generic_request(req);
959         return;
960
961 bad:
962         pr_err("corrupt mon_command ack, tid %llu\n", tid);
963         ceph_msg_dump(msg);
964 }
965
966 static __printf(2, 0)
967 int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
968                          va_list ap)
969 {
970         struct ceph_mon_generic_request *req;
971         struct ceph_mon_command *h;
972         int ret = -ENOMEM;
973         int len;
974
975         req = alloc_generic_request(monc, GFP_NOIO);
976         if (!req)
977                 goto out;
978
979         req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
980         if (!req->request)
981                 goto out;
982
983         req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
984                                   true);
985         if (!req->reply)
986                 goto out;
987
988         mutex_lock(&monc->mutex);
989         register_generic_request(req);
990         h = req->request->front.iov_base;
991         h->monhdr.have_version = 0;
992         h->monhdr.session_mon = cpu_to_le16(-1);
993         h->monhdr.session_mon_tid = 0;
994         h->fsid = monc->monmap->fsid;
995         h->num_strs = cpu_to_le32(1);
996         len = vsprintf(h->str, fmt, ap);
997         h->str_len = cpu_to_le32(len);
998         send_generic_request(monc, req);
999         mutex_unlock(&monc->mutex);
1000
1001         ret = wait_generic_request(req);
1002 out:
1003         put_generic_request(req);
1004         return ret;
1005 }
1006
1007 static __printf(2, 3)
1008 int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
1009 {
1010         va_list ap;
1011         int ret;
1012
1013         va_start(ap, fmt);
1014         ret = do_mon_command_vargs(monc, fmt, ap);
1015         va_end(ap);
1016         return ret;
1017 }
1018
1019 int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
1020                             struct ceph_entity_addr *client_addr)
1021 {
1022         int ret;
1023
1024         ret = do_mon_command(monc,
1025                              "{ \"prefix\": \"osd blocklist\", \
1026                                 \"blocklistop\": \"add\", \
1027                                 \"addr\": \"%pISpc/%u\" }",
1028                              &client_addr->in_addr,
1029                              le32_to_cpu(client_addr->nonce));
1030         if (ret == -EINVAL) {
1031                 /*
1032                  * The monitor returns EINVAL on an unrecognized command.
1033                  * Try the legacy command -- it is exactly the same except
1034                  * for the name.
1035                  */
1036                 ret = do_mon_command(monc,
1037                                      "{ \"prefix\": \"osd blacklist\", \
1038                                         \"blacklistop\": \"add\", \
1039                                         \"addr\": \"%pISpc/%u\" }",
1040                                      &client_addr->in_addr,
1041                                      le32_to_cpu(client_addr->nonce));
1042         }
1043         if (ret)
1044                 return ret;
1045
1046         /*
1047          * Make sure we have the osdmap that includes the blocklist
1048          * entry.  This is needed to ensure that the OSDs pick up the
1049          * new blocklist before processing any future requests from
1050          * this client.
1051          */
1052         return ceph_wait_for_latest_osdmap(monc->client, 0);
1053 }
1054 EXPORT_SYMBOL(ceph_monc_blocklist_add);
1055
1056 /*
1057  * Resend pending generic requests.
1058  */
1059 static void __resend_generic_request(struct ceph_mon_client *monc)
1060 {
1061         struct ceph_mon_generic_request *req;
1062         struct rb_node *p;
1063
1064         for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
1065                 req = rb_entry(p, struct ceph_mon_generic_request, node);
1066                 ceph_msg_revoke(req->request);
1067                 ceph_msg_revoke_incoming(req->reply);
1068                 ceph_con_send(&monc->con, ceph_msg_get(req->request));
1069         }
1070 }
1071
1072 /*
1073  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
1074  * renew/retry subscription as needed (in case it is timing out, or we
1075  * got an ENOMEM).  And keep the monitor connection alive.
1076  */
1077 static void delayed_work(struct work_struct *work)
1078 {
1079         struct ceph_mon_client *monc =
1080                 container_of(work, struct ceph_mon_client, delayed_work.work);
1081
1082         dout("monc delayed_work\n");
1083         mutex_lock(&monc->mutex);
1084         if (monc->hunting) {
1085                 dout("%s continuing hunt\n", __func__);
1086                 reopen_session(monc);
1087         } else {
1088                 int is_auth = ceph_auth_is_authenticated(monc->auth);
1089                 if (ceph_con_keepalive_expired(&monc->con,
1090                                                CEPH_MONC_PING_TIMEOUT)) {
1091                         dout("monc keepalive timeout\n");
1092                         is_auth = 0;
1093                         reopen_session(monc);
1094                 }
1095
1096                 if (!monc->hunting) {
1097                         ceph_con_keepalive(&monc->con);
1098                         __validate_auth(monc);
1099                         un_backoff(monc);
1100                 }
1101
1102                 if (is_auth &&
1103                     !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
1104                         unsigned long now = jiffies;
1105
1106                         dout("%s renew subs? now %lu renew after %lu\n",
1107                              __func__, now, monc->sub_renew_after);
1108                         if (time_after_eq(now, monc->sub_renew_after))
1109                                 __send_subscribe(monc);
1110                 }
1111         }
1112         __schedule_delayed(monc);
1113         mutex_unlock(&monc->mutex);
1114 }
1115
1116 /*
1117  * On startup, we build a temporary monmap populated with the IPs
1118  * provided by mount(2).
1119  */
1120 static int build_initial_monmap(struct ceph_mon_client *monc)
1121 {
1122         struct ceph_options *opt = monc->client->options;
1123         struct ceph_entity_addr *mon_addr = opt->mon_addr;
1124         int num_mon = opt->num_mon;
1125         int i;
1126
1127         /* build initial monmap */
1128         monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon),
1129                                GFP_KERNEL);
1130         if (!monc->monmap)
1131                 return -ENOMEM;
1132         for (i = 0; i < num_mon; i++) {
1133                 monc->monmap->mon_inst[i].addr = mon_addr[i];
1134                 monc->monmap->mon_inst[i].addr.nonce = 0;
1135                 monc->monmap->mon_inst[i].name.type =
1136                         CEPH_ENTITY_TYPE_MON;
1137                 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
1138         }
1139         monc->monmap->num_mon = num_mon;
1140         return 0;
1141 }
1142
1143 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1144 {
1145         int err = 0;
1146
1147         dout("init\n");
1148         memset(monc, 0, sizeof(*monc));
1149         monc->client = cl;
1150         monc->monmap = NULL;
1151         mutex_init(&monc->mutex);
1152
1153         err = build_initial_monmap(monc);
1154         if (err)
1155                 goto out;
1156
1157         /* connection */
1158         /* authentication */
1159         monc->auth = ceph_auth_init(cl->options->name, cl->options->key,
1160                                     cl->options->con_modes);
1161         if (IS_ERR(monc->auth)) {
1162                 err = PTR_ERR(monc->auth);
1163                 goto out_monmap;
1164         }
1165         monc->auth->want_keys =
1166                 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
1167                 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
1168
1169         /* msgs */
1170         err = -ENOMEM;
1171         monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1172                                      sizeof(struct ceph_mon_subscribe_ack),
1173                                      GFP_KERNEL, true);
1174         if (!monc->m_subscribe_ack)
1175                 goto out_auth;
1176
1177         monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1178                                          GFP_KERNEL, true);
1179         if (!monc->m_subscribe)
1180                 goto out_subscribe_ack;
1181
1182         monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1183                                           GFP_KERNEL, true);
1184         if (!monc->m_auth_reply)
1185                 goto out_subscribe;
1186
1187         monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
1188         monc->pending_auth = 0;
1189         if (!monc->m_auth)
1190                 goto out_auth_reply;
1191
1192         ceph_con_init(&monc->con, monc, &mon_con_ops,
1193                       &monc->client->msgr);
1194
1195         monc->cur_mon = -1;
1196         monc->had_a_connection = false;
1197         monc->hunt_mult = 1;
1198
1199         INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1200         monc->generic_request_tree = RB_ROOT;
1201         monc->last_tid = 0;
1202
1203         monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1204
1205         return 0;
1206
1207 out_auth_reply:
1208         ceph_msg_put(monc->m_auth_reply);
1209 out_subscribe:
1210         ceph_msg_put(monc->m_subscribe);
1211 out_subscribe_ack:
1212         ceph_msg_put(monc->m_subscribe_ack);
1213 out_auth:
1214         ceph_auth_destroy(monc->auth);
1215 out_monmap:
1216         kfree(monc->monmap);
1217 out:
1218         return err;
1219 }
1220 EXPORT_SYMBOL(ceph_monc_init);
1221
1222 void ceph_monc_stop(struct ceph_mon_client *monc)
1223 {
1224         dout("stop\n");
1225         cancel_delayed_work_sync(&monc->delayed_work);
1226
1227         mutex_lock(&monc->mutex);
1228         __close_session(monc);
1229         monc->cur_mon = -1;
1230         mutex_unlock(&monc->mutex);
1231
1232         /*
1233          * flush msgr queue before we destroy ourselves to ensure that:
1234          *  - any work that references our embedded con is finished.
1235          *  - any osd_client or other work that may reference an authorizer
1236          *    finishes before we shut down the auth subsystem.
1237          */
1238         ceph_msgr_flush();
1239
1240         ceph_auth_destroy(monc->auth);
1241
1242         WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1243
1244         ceph_msg_put(monc->m_auth);
1245         ceph_msg_put(monc->m_auth_reply);
1246         ceph_msg_put(monc->m_subscribe);
1247         ceph_msg_put(monc->m_subscribe_ack);
1248
1249         kfree(monc->monmap);
1250 }
1251 EXPORT_SYMBOL(ceph_monc_stop);
1252
1253 static void finish_hunting(struct ceph_mon_client *monc)
1254 {
1255         if (monc->hunting) {
1256                 dout("%s found mon%d\n", __func__, monc->cur_mon);
1257                 monc->hunting = false;
1258                 monc->had_a_connection = true;
1259                 un_backoff(monc);
1260                 __schedule_delayed(monc);
1261         }
1262 }
1263
1264 static void finish_auth(struct ceph_mon_client *monc, int auth_err,
1265                         bool was_authed)
1266 {
1267         dout("%s auth_err %d was_authed %d\n", __func__, auth_err, was_authed);
1268         WARN_ON(auth_err > 0);
1269
1270         monc->pending_auth = 0;
1271         if (auth_err) {
1272                 monc->client->auth_err = auth_err;
1273                 wake_up_all(&monc->client->auth_wq);
1274                 return;
1275         }
1276
1277         if (!was_authed && ceph_auth_is_authenticated(monc->auth)) {
1278                 dout("%s authenticated, starting session global_id %llu\n",
1279                      __func__, monc->auth->global_id);
1280
1281                 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1282                 monc->client->msgr.inst.name.num =
1283                                         cpu_to_le64(monc->auth->global_id);
1284
1285                 __send_subscribe(monc);
1286                 __resend_generic_request(monc);
1287
1288                 pr_info("mon%d %s session established\n", monc->cur_mon,
1289                         ceph_pr_addr(&monc->con.peer_addr));
1290         }
1291 }
1292
1293 static void handle_auth_reply(struct ceph_mon_client *monc,
1294                               struct ceph_msg *msg)
1295 {
1296         bool was_authed;
1297         int ret;
1298
1299         mutex_lock(&monc->mutex);
1300         was_authed = ceph_auth_is_authenticated(monc->auth);
1301         ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1302                                      msg->front.iov_len,
1303                                      monc->m_auth->front.iov_base,
1304                                      monc->m_auth->front_alloc_len);
1305         if (ret > 0) {
1306                 __send_prepared_auth_request(monc, ret);
1307         } else {
1308                 finish_auth(monc, ret, was_authed);
1309                 finish_hunting(monc);
1310         }
1311         mutex_unlock(&monc->mutex);
1312 }
1313
1314 static int __validate_auth(struct ceph_mon_client *monc)
1315 {
1316         int ret;
1317
1318         if (monc->pending_auth)
1319                 return 0;
1320
1321         ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1322                               monc->m_auth->front_alloc_len);
1323         if (ret <= 0)
1324                 return ret; /* either an error, or no need to authenticate */
1325         __send_prepared_auth_request(monc, ret);
1326         return 0;
1327 }
1328
1329 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1330 {
1331         int ret;
1332
1333         mutex_lock(&monc->mutex);
1334         ret = __validate_auth(monc);
1335         mutex_unlock(&monc->mutex);
1336         return ret;
1337 }
1338 EXPORT_SYMBOL(ceph_monc_validate_auth);
1339
1340 /*
1341  * handle incoming message
1342  */
1343 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1344 {
1345         struct ceph_mon_client *monc = con->private;
1346         int type = le16_to_cpu(msg->hdr.type);
1347
1348         switch (type) {
1349         case CEPH_MSG_AUTH_REPLY:
1350                 handle_auth_reply(monc, msg);
1351                 break;
1352
1353         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1354                 handle_subscribe_ack(monc, msg);
1355                 break;
1356
1357         case CEPH_MSG_STATFS_REPLY:
1358                 handle_statfs_reply(monc, msg);
1359                 break;
1360
1361         case CEPH_MSG_MON_GET_VERSION_REPLY:
1362                 handle_get_version_reply(monc, msg);
1363                 break;
1364
1365         case CEPH_MSG_MON_COMMAND_ACK:
1366                 handle_command_ack(monc, msg);
1367                 break;
1368
1369         case CEPH_MSG_MON_MAP:
1370                 ceph_monc_handle_map(monc, msg);
1371                 break;
1372
1373         case CEPH_MSG_OSD_MAP:
1374                 ceph_osdc_handle_map(&monc->client->osdc, msg);
1375                 break;
1376
1377         default:
1378                 /* can the chained handler handle it? */
1379                 if (monc->client->extra_mon_dispatch &&
1380                     monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1381                         break;
1382
1383                 pr_err("received unknown message type %d %s\n", type,
1384                        ceph_msg_type_name(type));
1385         }
1386         ceph_msg_put(msg);
1387 }
1388
1389 /*
1390  * Allocate memory for incoming message
1391  */
1392 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1393                                       struct ceph_msg_header *hdr,
1394                                       int *skip)
1395 {
1396         struct ceph_mon_client *monc = con->private;
1397         int type = le16_to_cpu(hdr->type);
1398         int front_len = le32_to_cpu(hdr->front_len);
1399         struct ceph_msg *m = NULL;
1400
1401         *skip = 0;
1402
1403         switch (type) {
1404         case CEPH_MSG_MON_SUBSCRIBE_ACK:
1405                 m = ceph_msg_get(monc->m_subscribe_ack);
1406                 break;
1407         case CEPH_MSG_STATFS_REPLY:
1408         case CEPH_MSG_MON_COMMAND_ACK:
1409                 return get_generic_reply(con, hdr, skip);
1410         case CEPH_MSG_AUTH_REPLY:
1411                 m = ceph_msg_get(monc->m_auth_reply);
1412                 break;
1413         case CEPH_MSG_MON_GET_VERSION_REPLY:
1414                 if (le64_to_cpu(hdr->tid) != 0)
1415                         return get_generic_reply(con, hdr, skip);
1416
1417                 /*
1418                  * Older OSDs don't set reply tid even if the orignal
1419                  * request had a non-zero tid.  Work around this weirdness
1420                  * by allocating a new message.
1421                  */
1422                 fallthrough;
1423         case CEPH_MSG_MON_MAP:
1424         case CEPH_MSG_MDS_MAP:
1425         case CEPH_MSG_OSD_MAP:
1426         case CEPH_MSG_FS_MAP_USER:
1427                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1428                 if (!m)
1429                         return NULL;    /* ENOMEM--return skip == 0 */
1430                 break;
1431         }
1432
1433         if (!m) {
1434                 pr_info("alloc_msg unknown type %d\n", type);
1435                 *skip = 1;
1436         } else if (front_len > m->front_alloc_len) {
1437                 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1438                         front_len, m->front_alloc_len,
1439                         (unsigned int)con->peer_name.type,
1440                         le64_to_cpu(con->peer_name.num));
1441                 ceph_msg_put(m);
1442                 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1443         }
1444
1445         return m;
1446 }
1447
1448 /*
1449  * If the monitor connection resets, pick a new monitor and resubmit
1450  * any pending requests.
1451  */
1452 static void mon_fault(struct ceph_connection *con)
1453 {
1454         struct ceph_mon_client *monc = con->private;
1455
1456         mutex_lock(&monc->mutex);
1457         dout("%s mon%d\n", __func__, monc->cur_mon);
1458         if (monc->cur_mon >= 0) {
1459                 if (!monc->hunting) {
1460                         dout("%s hunting for new mon\n", __func__);
1461                         reopen_session(monc);
1462                         __schedule_delayed(monc);
1463                 } else {
1464                         dout("%s already hunting\n", __func__);
1465                 }
1466         }
1467         mutex_unlock(&monc->mutex);
1468 }
1469
1470 /*
1471  * We can ignore refcounting on the connection struct, as all references
1472  * will come from the messenger workqueue, which is drained prior to
1473  * mon_client destruction.
1474  */
1475 static struct ceph_connection *con_get(struct ceph_connection *con)
1476 {
1477         return con;
1478 }
1479
1480 static void con_put(struct ceph_connection *con)
1481 {
1482 }
1483
1484 static const struct ceph_connection_operations mon_con_ops = {
1485         .get = con_get,
1486         .put = con_put,
1487         .dispatch = dispatch,
1488         .fault = mon_fault,
1489         .alloc_msg = mon_alloc_msg,
1490 };