ceph: ensure d_name/d_parent stability in ceph_mdsc_lease_send_msg()
[linux-2.6-microblaze.git] / fs / ceph / mds_client.c
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/fs.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
7 #include <linux/gfp.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12
13 #include "super.h"
14 #include "mds_client.h"
15
16 #include <linux/ceph/ceph_features.h>
17 #include <linux/ceph/messenger.h>
18 #include <linux/ceph/decode.h>
19 #include <linux/ceph/pagelist.h>
20 #include <linux/ceph/auth.h>
21 #include <linux/ceph/debugfs.h>
22
23 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
24
25 /*
26  * A cluster of MDS (metadata server) daemons is responsible for
27  * managing the file system namespace (the directory hierarchy and
28  * inodes) and for coordinating shared access to storage.  Metadata is
29  * partitioning hierarchically across a number of servers, and that
30  * partition varies over time as the cluster adjusts the distribution
31  * in order to balance load.
32  *
33  * The MDS client is primarily responsible to managing synchronous
34  * metadata requests for operations like open, unlink, and so forth.
35  * If there is a MDS failure, we find out about it when we (possibly
36  * request and) receive a new MDS map, and can resubmit affected
37  * requests.
38  *
39  * For the most part, though, we take advantage of a lossless
40  * communications channel to the MDS, and do not need to worry about
41  * timing out or resubmitting requests.
42  *
43  * We maintain a stateful "session" with each MDS we interact with.
44  * Within each session, we sent periodic heartbeat messages to ensure
45  * any capabilities or leases we have been issues remain valid.  If
46  * the session times out and goes stale, our leases and capabilities
47  * are no longer valid.
48  */
49
50 struct ceph_reconnect_state {
51         struct ceph_mds_session *session;
52         int nr_caps, nr_realms;
53         struct ceph_pagelist *pagelist;
54         unsigned msg_version;
55         bool allow_multi;
56 };
57
58 static void __wake_requests(struct ceph_mds_client *mdsc,
59                             struct list_head *head);
60 static void ceph_cap_release_work(struct work_struct *work);
61 static void ceph_cap_reclaim_work(struct work_struct *work);
62
63 static const struct ceph_connection_operations mds_con_ops;
64
65
66 /*
67  * mds reply parsing
68  */
69
70 static int parse_reply_info_quota(void **p, void *end,
71                                   struct ceph_mds_reply_info_in *info)
72 {
73         u8 struct_v, struct_compat;
74         u32 struct_len;
75
76         ceph_decode_8_safe(p, end, struct_v, bad);
77         ceph_decode_8_safe(p, end, struct_compat, bad);
78         /* struct_v is expected to be >= 1. we only
79          * understand encoding with struct_compat == 1. */
80         if (!struct_v || struct_compat != 1)
81                 goto bad;
82         ceph_decode_32_safe(p, end, struct_len, bad);
83         ceph_decode_need(p, end, struct_len, bad);
84         end = *p + struct_len;
85         ceph_decode_64_safe(p, end, info->max_bytes, bad);
86         ceph_decode_64_safe(p, end, info->max_files, bad);
87         *p = end;
88         return 0;
89 bad:
90         return -EIO;
91 }
92
93 /*
94  * parse individual inode info
95  */
96 static int parse_reply_info_in(void **p, void *end,
97                                struct ceph_mds_reply_info_in *info,
98                                u64 features)
99 {
100         int err = 0;
101         u8 struct_v = 0;
102
103         if (features == (u64)-1) {
104                 u32 struct_len;
105                 u8 struct_compat;
106                 ceph_decode_8_safe(p, end, struct_v, bad);
107                 ceph_decode_8_safe(p, end, struct_compat, bad);
108                 /* struct_v is expected to be >= 1. we only understand
109                  * encoding with struct_compat == 1. */
110                 if (!struct_v || struct_compat != 1)
111                         goto bad;
112                 ceph_decode_32_safe(p, end, struct_len, bad);
113                 ceph_decode_need(p, end, struct_len, bad);
114                 end = *p + struct_len;
115         }
116
117         ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
118         info->in = *p;
119         *p += sizeof(struct ceph_mds_reply_inode) +
120                 sizeof(*info->in->fragtree.splits) *
121                 le32_to_cpu(info->in->fragtree.nsplits);
122
123         ceph_decode_32_safe(p, end, info->symlink_len, bad);
124         ceph_decode_need(p, end, info->symlink_len, bad);
125         info->symlink = *p;
126         *p += info->symlink_len;
127
128         ceph_decode_copy_safe(p, end, &info->dir_layout,
129                               sizeof(info->dir_layout), bad);
130         ceph_decode_32_safe(p, end, info->xattr_len, bad);
131         ceph_decode_need(p, end, info->xattr_len, bad);
132         info->xattr_data = *p;
133         *p += info->xattr_len;
134
135         if (features == (u64)-1) {
136                 /* inline data */
137                 ceph_decode_64_safe(p, end, info->inline_version, bad);
138                 ceph_decode_32_safe(p, end, info->inline_len, bad);
139                 ceph_decode_need(p, end, info->inline_len, bad);
140                 info->inline_data = *p;
141                 *p += info->inline_len;
142                 /* quota */
143                 err = parse_reply_info_quota(p, end, info);
144                 if (err < 0)
145                         goto out_bad;
146                 /* pool namespace */
147                 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
148                 if (info->pool_ns_len > 0) {
149                         ceph_decode_need(p, end, info->pool_ns_len, bad);
150                         info->pool_ns_data = *p;
151                         *p += info->pool_ns_len;
152                 }
153                 /* btime, change_attr */
154                 {
155                         struct ceph_timespec btime;
156                         u64 change_attr;
157                         ceph_decode_need(p, end, sizeof(btime), bad);
158                         ceph_decode_copy(p, &btime, sizeof(btime));
159                         ceph_decode_64_safe(p, end, change_attr, bad);
160                 }
161
162                 /* dir pin */
163                 if (struct_v >= 2) {
164                         ceph_decode_32_safe(p, end, info->dir_pin, bad);
165                 } else {
166                         info->dir_pin = -ENODATA;
167                 }
168
169                 /* snapshot birth time, remains zero for v<=2 */
170                 if (struct_v >= 3) {
171                         ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
172                         ceph_decode_copy(p, &info->snap_btime,
173                                          sizeof(info->snap_btime));
174                 } else {
175                         memset(&info->snap_btime, 0, sizeof(info->snap_btime));
176                 }
177
178                 *p = end;
179         } else {
180                 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
181                         ceph_decode_64_safe(p, end, info->inline_version, bad);
182                         ceph_decode_32_safe(p, end, info->inline_len, bad);
183                         ceph_decode_need(p, end, info->inline_len, bad);
184                         info->inline_data = *p;
185                         *p += info->inline_len;
186                 } else
187                         info->inline_version = CEPH_INLINE_NONE;
188
189                 if (features & CEPH_FEATURE_MDS_QUOTA) {
190                         err = parse_reply_info_quota(p, end, info);
191                         if (err < 0)
192                                 goto out_bad;
193                 } else {
194                         info->max_bytes = 0;
195                         info->max_files = 0;
196                 }
197
198                 info->pool_ns_len = 0;
199                 info->pool_ns_data = NULL;
200                 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
201                         ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
202                         if (info->pool_ns_len > 0) {
203                                 ceph_decode_need(p, end, info->pool_ns_len, bad);
204                                 info->pool_ns_data = *p;
205                                 *p += info->pool_ns_len;
206                         }
207                 }
208
209                 info->dir_pin = -ENODATA;
210                 /* info->snap_btime remains zero */
211         }
212         return 0;
213 bad:
214         err = -EIO;
215 out_bad:
216         return err;
217 }
218
219 static int parse_reply_info_dir(void **p, void *end,
220                                 struct ceph_mds_reply_dirfrag **dirfrag,
221                                 u64 features)
222 {
223         if (features == (u64)-1) {
224                 u8 struct_v, struct_compat;
225                 u32 struct_len;
226                 ceph_decode_8_safe(p, end, struct_v, bad);
227                 ceph_decode_8_safe(p, end, struct_compat, bad);
228                 /* struct_v is expected to be >= 1. we only understand
229                  * encoding whose struct_compat == 1. */
230                 if (!struct_v || struct_compat != 1)
231                         goto bad;
232                 ceph_decode_32_safe(p, end, struct_len, bad);
233                 ceph_decode_need(p, end, struct_len, bad);
234                 end = *p + struct_len;
235         }
236
237         ceph_decode_need(p, end, sizeof(**dirfrag), bad);
238         *dirfrag = *p;
239         *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
240         if (unlikely(*p > end))
241                 goto bad;
242         if (features == (u64)-1)
243                 *p = end;
244         return 0;
245 bad:
246         return -EIO;
247 }
248
249 static int parse_reply_info_lease(void **p, void *end,
250                                   struct ceph_mds_reply_lease **lease,
251                                   u64 features)
252 {
253         if (features == (u64)-1) {
254                 u8 struct_v, struct_compat;
255                 u32 struct_len;
256                 ceph_decode_8_safe(p, end, struct_v, bad);
257                 ceph_decode_8_safe(p, end, struct_compat, bad);
258                 /* struct_v is expected to be >= 1. we only understand
259                  * encoding whose struct_compat == 1. */
260                 if (!struct_v || struct_compat != 1)
261                         goto bad;
262                 ceph_decode_32_safe(p, end, struct_len, bad);
263                 ceph_decode_need(p, end, struct_len, bad);
264                 end = *p + struct_len;
265         }
266
267         ceph_decode_need(p, end, sizeof(**lease), bad);
268         *lease = *p;
269         *p += sizeof(**lease);
270         if (features == (u64)-1)
271                 *p = end;
272         return 0;
273 bad:
274         return -EIO;
275 }
276
277 /*
278  * parse a normal reply, which may contain a (dir+)dentry and/or a
279  * target inode.
280  */
281 static int parse_reply_info_trace(void **p, void *end,
282                                   struct ceph_mds_reply_info_parsed *info,
283                                   u64 features)
284 {
285         int err;
286
287         if (info->head->is_dentry) {
288                 err = parse_reply_info_in(p, end, &info->diri, features);
289                 if (err < 0)
290                         goto out_bad;
291
292                 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
293                 if (err < 0)
294                         goto out_bad;
295
296                 ceph_decode_32_safe(p, end, info->dname_len, bad);
297                 ceph_decode_need(p, end, info->dname_len, bad);
298                 info->dname = *p;
299                 *p += info->dname_len;
300
301                 err = parse_reply_info_lease(p, end, &info->dlease, features);
302                 if (err < 0)
303                         goto out_bad;
304         }
305
306         if (info->head->is_target) {
307                 err = parse_reply_info_in(p, end, &info->targeti, features);
308                 if (err < 0)
309                         goto out_bad;
310         }
311
312         if (unlikely(*p != end))
313                 goto bad;
314         return 0;
315
316 bad:
317         err = -EIO;
318 out_bad:
319         pr_err("problem parsing mds trace %d\n", err);
320         return err;
321 }
322
323 /*
324  * parse readdir results
325  */
326 static int parse_reply_info_readdir(void **p, void *end,
327                                 struct ceph_mds_reply_info_parsed *info,
328                                 u64 features)
329 {
330         u32 num, i = 0;
331         int err;
332
333         err = parse_reply_info_dir(p, end, &info->dir_dir, features);
334         if (err < 0)
335                 goto out_bad;
336
337         ceph_decode_need(p, end, sizeof(num) + 2, bad);
338         num = ceph_decode_32(p);
339         {
340                 u16 flags = ceph_decode_16(p);
341                 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
342                 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
343                 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
344                 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
345         }
346         if (num == 0)
347                 goto done;
348
349         BUG_ON(!info->dir_entries);
350         if ((unsigned long)(info->dir_entries + num) >
351             (unsigned long)info->dir_entries + info->dir_buf_size) {
352                 pr_err("dir contents are larger than expected\n");
353                 WARN_ON(1);
354                 goto bad;
355         }
356
357         info->dir_nr = num;
358         while (num) {
359                 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
360                 /* dentry */
361                 ceph_decode_32_safe(p, end, rde->name_len, bad);
362                 ceph_decode_need(p, end, rde->name_len, bad);
363                 rde->name = *p;
364                 *p += rde->name_len;
365                 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
366
367                 /* dentry lease */
368                 err = parse_reply_info_lease(p, end, &rde->lease, features);
369                 if (err)
370                         goto out_bad;
371                 /* inode */
372                 err = parse_reply_info_in(p, end, &rde->inode, features);
373                 if (err < 0)
374                         goto out_bad;
375                 /* ceph_readdir_prepopulate() will update it */
376                 rde->offset = 0;
377                 i++;
378                 num--;
379         }
380
381 done:
382         if (*p != end)
383                 goto bad;
384         return 0;
385
386 bad:
387         err = -EIO;
388 out_bad:
389         pr_err("problem parsing dir contents %d\n", err);
390         return err;
391 }
392
393 /*
394  * parse fcntl F_GETLK results
395  */
396 static int parse_reply_info_filelock(void **p, void *end,
397                                      struct ceph_mds_reply_info_parsed *info,
398                                      u64 features)
399 {
400         if (*p + sizeof(*info->filelock_reply) > end)
401                 goto bad;
402
403         info->filelock_reply = *p;
404         *p += sizeof(*info->filelock_reply);
405
406         if (unlikely(*p != end))
407                 goto bad;
408         return 0;
409
410 bad:
411         return -EIO;
412 }
413
414 /*
415  * parse create results
416  */
417 static int parse_reply_info_create(void **p, void *end,
418                                   struct ceph_mds_reply_info_parsed *info,
419                                   u64 features)
420 {
421         if (features == (u64)-1 ||
422             (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
423                 if (*p == end) {
424                         info->has_create_ino = false;
425                 } else {
426                         info->has_create_ino = true;
427                         info->ino = ceph_decode_64(p);
428                 }
429         }
430
431         if (unlikely(*p != end))
432                 goto bad;
433         return 0;
434
435 bad:
436         return -EIO;
437 }
438
439 /*
440  * parse extra results
441  */
442 static int parse_reply_info_extra(void **p, void *end,
443                                   struct ceph_mds_reply_info_parsed *info,
444                                   u64 features)
445 {
446         u32 op = le32_to_cpu(info->head->op);
447
448         if (op == CEPH_MDS_OP_GETFILELOCK)
449                 return parse_reply_info_filelock(p, end, info, features);
450         else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
451                 return parse_reply_info_readdir(p, end, info, features);
452         else if (op == CEPH_MDS_OP_CREATE)
453                 return parse_reply_info_create(p, end, info, features);
454         else
455                 return -EIO;
456 }
457
458 /*
459  * parse entire mds reply
460  */
461 static int parse_reply_info(struct ceph_msg *msg,
462                             struct ceph_mds_reply_info_parsed *info,
463                             u64 features)
464 {
465         void *p, *end;
466         u32 len;
467         int err;
468
469         info->head = msg->front.iov_base;
470         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
471         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
472
473         /* trace */
474         ceph_decode_32_safe(&p, end, len, bad);
475         if (len > 0) {
476                 ceph_decode_need(&p, end, len, bad);
477                 err = parse_reply_info_trace(&p, p+len, info, features);
478                 if (err < 0)
479                         goto out_bad;
480         }
481
482         /* extra */
483         ceph_decode_32_safe(&p, end, len, bad);
484         if (len > 0) {
485                 ceph_decode_need(&p, end, len, bad);
486                 err = parse_reply_info_extra(&p, p+len, info, features);
487                 if (err < 0)
488                         goto out_bad;
489         }
490
491         /* snap blob */
492         ceph_decode_32_safe(&p, end, len, bad);
493         info->snapblob_len = len;
494         info->snapblob = p;
495         p += len;
496
497         if (p != end)
498                 goto bad;
499         return 0;
500
501 bad:
502         err = -EIO;
503 out_bad:
504         pr_err("mds parse_reply err %d\n", err);
505         return err;
506 }
507
508 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
509 {
510         if (!info->dir_entries)
511                 return;
512         free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
513 }
514
515
516 /*
517  * sessions
518  */
519 const char *ceph_session_state_name(int s)
520 {
521         switch (s) {
522         case CEPH_MDS_SESSION_NEW: return "new";
523         case CEPH_MDS_SESSION_OPENING: return "opening";
524         case CEPH_MDS_SESSION_OPEN: return "open";
525         case CEPH_MDS_SESSION_HUNG: return "hung";
526         case CEPH_MDS_SESSION_CLOSING: return "closing";
527         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
528         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
529         case CEPH_MDS_SESSION_REJECTED: return "rejected";
530         default: return "???";
531         }
532 }
533
534 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
535 {
536         if (refcount_inc_not_zero(&s->s_ref)) {
537                 dout("mdsc get_session %p %d -> %d\n", s,
538                      refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
539                 return s;
540         } else {
541                 dout("mdsc get_session %p 0 -- FAIL\n", s);
542                 return NULL;
543         }
544 }
545
546 void ceph_put_mds_session(struct ceph_mds_session *s)
547 {
548         dout("mdsc put_session %p %d -> %d\n", s,
549              refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
550         if (refcount_dec_and_test(&s->s_ref)) {
551                 if (s->s_auth.authorizer)
552                         ceph_auth_destroy_authorizer(s->s_auth.authorizer);
553                 kfree(s);
554         }
555 }
556
557 /*
558  * called under mdsc->mutex
559  */
560 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
561                                                    int mds)
562 {
563         if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
564                 return NULL;
565         return get_session(mdsc->sessions[mds]);
566 }
567
568 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
569 {
570         if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
571                 return false;
572         else
573                 return true;
574 }
575
576 static int __verify_registered_session(struct ceph_mds_client *mdsc,
577                                        struct ceph_mds_session *s)
578 {
579         if (s->s_mds >= mdsc->max_sessions ||
580             mdsc->sessions[s->s_mds] != s)
581                 return -ENOENT;
582         return 0;
583 }
584
585 /*
586  * create+register a new session for given mds.
587  * called under mdsc->mutex.
588  */
589 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
590                                                  int mds)
591 {
592         struct ceph_mds_session *s;
593
594         if (mds >= mdsc->mdsmap->m_num_mds)
595                 return ERR_PTR(-EINVAL);
596
597         s = kzalloc(sizeof(*s), GFP_NOFS);
598         if (!s)
599                 return ERR_PTR(-ENOMEM);
600
601         if (mds >= mdsc->max_sessions) {
602                 int newmax = 1 << get_count_order(mds + 1);
603                 struct ceph_mds_session **sa;
604
605                 dout("%s: realloc to %d\n", __func__, newmax);
606                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
607                 if (!sa)
608                         goto fail_realloc;
609                 if (mdsc->sessions) {
610                         memcpy(sa, mdsc->sessions,
611                                mdsc->max_sessions * sizeof(void *));
612                         kfree(mdsc->sessions);
613                 }
614                 mdsc->sessions = sa;
615                 mdsc->max_sessions = newmax;
616         }
617
618         dout("%s: mds%d\n", __func__, mds);
619         s->s_mdsc = mdsc;
620         s->s_mds = mds;
621         s->s_state = CEPH_MDS_SESSION_NEW;
622         s->s_ttl = 0;
623         s->s_seq = 0;
624         mutex_init(&s->s_mutex);
625
626         ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
627
628         spin_lock_init(&s->s_gen_ttl_lock);
629         s->s_cap_gen = 1;
630         s->s_cap_ttl = jiffies - 1;
631
632         spin_lock_init(&s->s_cap_lock);
633         s->s_renew_requested = 0;
634         s->s_renew_seq = 0;
635         INIT_LIST_HEAD(&s->s_caps);
636         s->s_nr_caps = 0;
637         s->s_trim_caps = 0;
638         refcount_set(&s->s_ref, 1);
639         INIT_LIST_HEAD(&s->s_waiting);
640         INIT_LIST_HEAD(&s->s_unsafe);
641         s->s_num_cap_releases = 0;
642         s->s_cap_reconnect = 0;
643         s->s_cap_iterator = NULL;
644         INIT_LIST_HEAD(&s->s_cap_releases);
645         INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
646
647         INIT_LIST_HEAD(&s->s_cap_flushing);
648
649         mdsc->sessions[mds] = s;
650         atomic_inc(&mdsc->num_sessions);
651         refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
652
653         ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
654                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
655
656         return s;
657
658 fail_realloc:
659         kfree(s);
660         return ERR_PTR(-ENOMEM);
661 }
662
663 /*
664  * called under mdsc->mutex
665  */
666 static void __unregister_session(struct ceph_mds_client *mdsc,
667                                struct ceph_mds_session *s)
668 {
669         dout("__unregister_session mds%d %p\n", s->s_mds, s);
670         BUG_ON(mdsc->sessions[s->s_mds] != s);
671         mdsc->sessions[s->s_mds] = NULL;
672         s->s_state = 0;
673         ceph_con_close(&s->s_con);
674         ceph_put_mds_session(s);
675         atomic_dec(&mdsc->num_sessions);
676 }
677
678 /*
679  * drop session refs in request.
680  *
681  * should be last request ref, or hold mdsc->mutex
682  */
683 static void put_request_session(struct ceph_mds_request *req)
684 {
685         if (req->r_session) {
686                 ceph_put_mds_session(req->r_session);
687                 req->r_session = NULL;
688         }
689 }
690
691 void ceph_mdsc_release_request(struct kref *kref)
692 {
693         struct ceph_mds_request *req = container_of(kref,
694                                                     struct ceph_mds_request,
695                                                     r_kref);
696         destroy_reply_info(&req->r_reply_info);
697         if (req->r_request)
698                 ceph_msg_put(req->r_request);
699         if (req->r_reply)
700                 ceph_msg_put(req->r_reply);
701         if (req->r_inode) {
702                 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
703                 /* avoid calling iput_final() in mds dispatch threads */
704                 ceph_async_iput(req->r_inode);
705         }
706         if (req->r_parent)
707                 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
708         ceph_async_iput(req->r_target_inode);
709         if (req->r_dentry)
710                 dput(req->r_dentry);
711         if (req->r_old_dentry)
712                 dput(req->r_old_dentry);
713         if (req->r_old_dentry_dir) {
714                 /*
715                  * track (and drop pins for) r_old_dentry_dir
716                  * separately, since r_old_dentry's d_parent may have
717                  * changed between the dir mutex being dropped and
718                  * this request being freed.
719                  */
720                 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
721                                   CEPH_CAP_PIN);
722                 ceph_async_iput(req->r_old_dentry_dir);
723         }
724         kfree(req->r_path1);
725         kfree(req->r_path2);
726         if (req->r_pagelist)
727                 ceph_pagelist_release(req->r_pagelist);
728         put_request_session(req);
729         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
730         kfree(req);
731 }
732
733 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
734
735 /*
736  * lookup session, bump ref if found.
737  *
738  * called under mdsc->mutex.
739  */
740 static struct ceph_mds_request *
741 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
742 {
743         struct ceph_mds_request *req;
744
745         req = lookup_request(&mdsc->request_tree, tid);
746         if (req)
747                 ceph_mdsc_get_request(req);
748
749         return req;
750 }
751
752 /*
753  * Register an in-flight request, and assign a tid.  Link to directory
754  * are modifying (if any).
755  *
756  * Called under mdsc->mutex.
757  */
758 static void __register_request(struct ceph_mds_client *mdsc,
759                                struct ceph_mds_request *req,
760                                struct inode *dir)
761 {
762         int ret = 0;
763
764         req->r_tid = ++mdsc->last_tid;
765         if (req->r_num_caps) {
766                 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
767                                         req->r_num_caps);
768                 if (ret < 0) {
769                         pr_err("__register_request %p "
770                                "failed to reserve caps: %d\n", req, ret);
771                         /* set req->r_err to fail early from __do_request */
772                         req->r_err = ret;
773                         return;
774                 }
775         }
776         dout("__register_request %p tid %lld\n", req, req->r_tid);
777         ceph_mdsc_get_request(req);
778         insert_request(&mdsc->request_tree, req);
779
780         req->r_uid = current_fsuid();
781         req->r_gid = current_fsgid();
782
783         if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
784                 mdsc->oldest_tid = req->r_tid;
785
786         if (dir) {
787                 ihold(dir);
788                 req->r_unsafe_dir = dir;
789         }
790 }
791
792 static void __unregister_request(struct ceph_mds_client *mdsc,
793                                  struct ceph_mds_request *req)
794 {
795         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
796
797         /* Never leave an unregistered request on an unsafe list! */
798         list_del_init(&req->r_unsafe_item);
799
800         if (req->r_tid == mdsc->oldest_tid) {
801                 struct rb_node *p = rb_next(&req->r_node);
802                 mdsc->oldest_tid = 0;
803                 while (p) {
804                         struct ceph_mds_request *next_req =
805                                 rb_entry(p, struct ceph_mds_request, r_node);
806                         if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
807                                 mdsc->oldest_tid = next_req->r_tid;
808                                 break;
809                         }
810                         p = rb_next(p);
811                 }
812         }
813
814         erase_request(&mdsc->request_tree, req);
815
816         if (req->r_unsafe_dir  &&
817             test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
818                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
819                 spin_lock(&ci->i_unsafe_lock);
820                 list_del_init(&req->r_unsafe_dir_item);
821                 spin_unlock(&ci->i_unsafe_lock);
822         }
823         if (req->r_target_inode &&
824             test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
825                 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
826                 spin_lock(&ci->i_unsafe_lock);
827                 list_del_init(&req->r_unsafe_target_item);
828                 spin_unlock(&ci->i_unsafe_lock);
829         }
830
831         if (req->r_unsafe_dir) {
832                 /* avoid calling iput_final() in mds dispatch threads */
833                 ceph_async_iput(req->r_unsafe_dir);
834                 req->r_unsafe_dir = NULL;
835         }
836
837         complete_all(&req->r_safe_completion);
838
839         ceph_mdsc_put_request(req);
840 }
841
842 /*
843  * Walk back up the dentry tree until we hit a dentry representing a
844  * non-snapshot inode. We do this using the rcu_read_lock (which must be held
845  * when calling this) to ensure that the objects won't disappear while we're
846  * working with them. Once we hit a candidate dentry, we attempt to take a
847  * reference to it, and return that as the result.
848  */
849 static struct inode *get_nonsnap_parent(struct dentry *dentry)
850 {
851         struct inode *inode = NULL;
852
853         while (dentry && !IS_ROOT(dentry)) {
854                 inode = d_inode_rcu(dentry);
855                 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
856                         break;
857                 dentry = dentry->d_parent;
858         }
859         if (inode)
860                 inode = igrab(inode);
861         return inode;
862 }
863
864 /*
865  * Choose mds to send request to next.  If there is a hint set in the
866  * request (e.g., due to a prior forward hint from the mds), use that.
867  * Otherwise, consult frag tree and/or caps to identify the
868  * appropriate mds.  If all else fails, choose randomly.
869  *
870  * Called under mdsc->mutex.
871  */
872 static int __choose_mds(struct ceph_mds_client *mdsc,
873                         struct ceph_mds_request *req)
874 {
875         struct inode *inode;
876         struct ceph_inode_info *ci;
877         struct ceph_cap *cap;
878         int mode = req->r_direct_mode;
879         int mds = -1;
880         u32 hash = req->r_direct_hash;
881         bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
882
883         /*
884          * is there a specific mds we should try?  ignore hint if we have
885          * no session and the mds is not up (active or recovering).
886          */
887         if (req->r_resend_mds >= 0 &&
888             (__have_session(mdsc, req->r_resend_mds) ||
889              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
890                 dout("choose_mds using resend_mds mds%d\n",
891                      req->r_resend_mds);
892                 return req->r_resend_mds;
893         }
894
895         if (mode == USE_RANDOM_MDS)
896                 goto random;
897
898         inode = NULL;
899         if (req->r_inode) {
900                 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
901                         inode = req->r_inode;
902                         ihold(inode);
903                 } else {
904                         /* req->r_dentry is non-null for LSSNAP request */
905                         rcu_read_lock();
906                         inode = get_nonsnap_parent(req->r_dentry);
907                         rcu_read_unlock();
908                         dout("__choose_mds using snapdir's parent %p\n", inode);
909                 }
910         } else if (req->r_dentry) {
911                 /* ignore race with rename; old or new d_parent is okay */
912                 struct dentry *parent;
913                 struct inode *dir;
914
915                 rcu_read_lock();
916                 parent = READ_ONCE(req->r_dentry->d_parent);
917                 dir = req->r_parent ? : d_inode_rcu(parent);
918
919                 if (!dir || dir->i_sb != mdsc->fsc->sb) {
920                         /*  not this fs or parent went negative */
921                         inode = d_inode(req->r_dentry);
922                         if (inode)
923                                 ihold(inode);
924                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
925                         /* direct snapped/virtual snapdir requests
926                          * based on parent dir inode */
927                         inode = get_nonsnap_parent(parent);
928                         dout("__choose_mds using nonsnap parent %p\n", inode);
929                 } else {
930                         /* dentry target */
931                         inode = d_inode(req->r_dentry);
932                         if (!inode || mode == USE_AUTH_MDS) {
933                                 /* dir + name */
934                                 inode = igrab(dir);
935                                 hash = ceph_dentry_hash(dir, req->r_dentry);
936                                 is_hash = true;
937                         } else {
938                                 ihold(inode);
939                         }
940                 }
941                 rcu_read_unlock();
942         }
943
944         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
945              (int)hash, mode);
946         if (!inode)
947                 goto random;
948         ci = ceph_inode(inode);
949
950         if (is_hash && S_ISDIR(inode->i_mode)) {
951                 struct ceph_inode_frag frag;
952                 int found;
953
954                 ceph_choose_frag(ci, hash, &frag, &found);
955                 if (found) {
956                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
957                                 u8 r;
958
959                                 /* choose a random replica */
960                                 get_random_bytes(&r, 1);
961                                 r %= frag.ndist;
962                                 mds = frag.dist[r];
963                                 dout("choose_mds %p %llx.%llx "
964                                      "frag %u mds%d (%d/%d)\n",
965                                      inode, ceph_vinop(inode),
966                                      frag.frag, mds,
967                                      (int)r, frag.ndist);
968                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
969                                     CEPH_MDS_STATE_ACTIVE)
970                                         goto out;
971                         }
972
973                         /* since this file/dir wasn't known to be
974                          * replicated, then we want to look for the
975                          * authoritative mds. */
976                         mode = USE_AUTH_MDS;
977                         if (frag.mds >= 0) {
978                                 /* choose auth mds */
979                                 mds = frag.mds;
980                                 dout("choose_mds %p %llx.%llx "
981                                      "frag %u mds%d (auth)\n",
982                                      inode, ceph_vinop(inode), frag.frag, mds);
983                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
984                                     CEPH_MDS_STATE_ACTIVE)
985                                         goto out;
986                         }
987                 }
988         }
989
990         spin_lock(&ci->i_ceph_lock);
991         cap = NULL;
992         if (mode == USE_AUTH_MDS)
993                 cap = ci->i_auth_cap;
994         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
995                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
996         if (!cap) {
997                 spin_unlock(&ci->i_ceph_lock);
998                 ceph_async_iput(inode);
999                 goto random;
1000         }
1001         mds = cap->session->s_mds;
1002         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
1003              inode, ceph_vinop(inode), mds,
1004              cap == ci->i_auth_cap ? "auth " : "", cap);
1005         spin_unlock(&ci->i_ceph_lock);
1006 out:
1007         /* avoid calling iput_final() while holding mdsc->mutex or
1008          * in mds dispatch threads */
1009         ceph_async_iput(inode);
1010         return mds;
1011
1012 random:
1013         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1014         dout("choose_mds chose random mds%d\n", mds);
1015         return mds;
1016 }
1017
1018
1019 /*
1020  * session messages
1021  */
1022 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
1023 {
1024         struct ceph_msg *msg;
1025         struct ceph_mds_session_head *h;
1026
1027         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1028                            false);
1029         if (!msg) {
1030                 pr_err("create_session_msg ENOMEM creating msg\n");
1031                 return NULL;
1032         }
1033         h = msg->front.iov_base;
1034         h->op = cpu_to_le32(op);
1035         h->seq = cpu_to_le64(seq);
1036
1037         return msg;
1038 }
1039
1040 static void encode_supported_features(void **p, void *end)
1041 {
1042         static const unsigned char bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1043         static const size_t count = ARRAY_SIZE(bits);
1044
1045         if (count > 0) {
1046                 size_t i;
1047                 size_t size = ((size_t)bits[count - 1] + 64) / 64 * 8;
1048
1049                 BUG_ON(*p + 4 + size > end);
1050                 ceph_encode_32(p, size);
1051                 memset(*p, 0, size);
1052                 for (i = 0; i < count; i++)
1053                         ((unsigned char*)(*p))[i / 8] |= 1 << (bits[i] % 8);
1054                 *p += size;
1055         } else {
1056                 BUG_ON(*p + 4 > end);
1057                 ceph_encode_32(p, 0);
1058         }
1059 }
1060
1061 /*
1062  * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1063  * to include additional client metadata fields.
1064  */
1065 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1066 {
1067         struct ceph_msg *msg;
1068         struct ceph_mds_session_head *h;
1069         int i = -1;
1070         int extra_bytes = 0;
1071         int metadata_key_count = 0;
1072         struct ceph_options *opt = mdsc->fsc->client->options;
1073         struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1074         void *p, *end;
1075
1076         const char* metadata[][2] = {
1077                 {"hostname", mdsc->nodename},
1078                 {"kernel_version", init_utsname()->release},
1079                 {"entity_id", opt->name ? : ""},
1080                 {"root", fsopt->server_path ? : "/"},
1081                 {NULL, NULL}
1082         };
1083
1084         /* Calculate serialized length of metadata */
1085         extra_bytes = 4;  /* map length */
1086         for (i = 0; metadata[i][0]; ++i) {
1087                 extra_bytes += 8 + strlen(metadata[i][0]) +
1088                         strlen(metadata[i][1]);
1089                 metadata_key_count++;
1090         }
1091         /* supported feature */
1092         extra_bytes += 4 + 8;
1093
1094         /* Allocate the message */
1095         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1096                            GFP_NOFS, false);
1097         if (!msg) {
1098                 pr_err("create_session_msg ENOMEM creating msg\n");
1099                 return NULL;
1100         }
1101         p = msg->front.iov_base;
1102         end = p + msg->front.iov_len;
1103
1104         h = p;
1105         h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1106         h->seq = cpu_to_le64(seq);
1107
1108         /*
1109          * Serialize client metadata into waiting buffer space, using
1110          * the format that userspace expects for map<string, string>
1111          *
1112          * ClientSession messages with metadata are v2
1113          */
1114         msg->hdr.version = cpu_to_le16(3);
1115         msg->hdr.compat_version = cpu_to_le16(1);
1116
1117         /* The write pointer, following the session_head structure */
1118         p += sizeof(*h);
1119
1120         /* Number of entries in the map */
1121         ceph_encode_32(&p, metadata_key_count);
1122
1123         /* Two length-prefixed strings for each entry in the map */
1124         for (i = 0; metadata[i][0]; ++i) {
1125                 size_t const key_len = strlen(metadata[i][0]);
1126                 size_t const val_len = strlen(metadata[i][1]);
1127
1128                 ceph_encode_32(&p, key_len);
1129                 memcpy(p, metadata[i][0], key_len);
1130                 p += key_len;
1131                 ceph_encode_32(&p, val_len);
1132                 memcpy(p, metadata[i][1], val_len);
1133                 p += val_len;
1134         }
1135
1136         encode_supported_features(&p, end);
1137         msg->front.iov_len = p - msg->front.iov_base;
1138         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1139
1140         return msg;
1141 }
1142
1143 /*
1144  * send session open request.
1145  *
1146  * called under mdsc->mutex
1147  */
1148 static int __open_session(struct ceph_mds_client *mdsc,
1149                           struct ceph_mds_session *session)
1150 {
1151         struct ceph_msg *msg;
1152         int mstate;
1153         int mds = session->s_mds;
1154
1155         /* wait for mds to go active? */
1156         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1157         dout("open_session to mds%d (%s)\n", mds,
1158              ceph_mds_state_name(mstate));
1159         session->s_state = CEPH_MDS_SESSION_OPENING;
1160         session->s_renew_requested = jiffies;
1161
1162         /* send connect message */
1163         msg = create_session_open_msg(mdsc, session->s_seq);
1164         if (!msg)
1165                 return -ENOMEM;
1166         ceph_con_send(&session->s_con, msg);
1167         return 0;
1168 }
1169
1170 /*
1171  * open sessions for any export targets for the given mds
1172  *
1173  * called under mdsc->mutex
1174  */
1175 static struct ceph_mds_session *
1176 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1177 {
1178         struct ceph_mds_session *session;
1179
1180         session = __ceph_lookup_mds_session(mdsc, target);
1181         if (!session) {
1182                 session = register_session(mdsc, target);
1183                 if (IS_ERR(session))
1184                         return session;
1185         }
1186         if (session->s_state == CEPH_MDS_SESSION_NEW ||
1187             session->s_state == CEPH_MDS_SESSION_CLOSING)
1188                 __open_session(mdsc, session);
1189
1190         return session;
1191 }
1192
1193 struct ceph_mds_session *
1194 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1195 {
1196         struct ceph_mds_session *session;
1197
1198         dout("open_export_target_session to mds%d\n", target);
1199
1200         mutex_lock(&mdsc->mutex);
1201         session = __open_export_target_session(mdsc, target);
1202         mutex_unlock(&mdsc->mutex);
1203
1204         return session;
1205 }
1206
1207 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1208                                           struct ceph_mds_session *session)
1209 {
1210         struct ceph_mds_info *mi;
1211         struct ceph_mds_session *ts;
1212         int i, mds = session->s_mds;
1213
1214         if (mds >= mdsc->mdsmap->m_num_mds)
1215                 return;
1216
1217         mi = &mdsc->mdsmap->m_info[mds];
1218         dout("open_export_target_sessions for mds%d (%d targets)\n",
1219              session->s_mds, mi->num_export_targets);
1220
1221         for (i = 0; i < mi->num_export_targets; i++) {
1222                 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1223                 if (!IS_ERR(ts))
1224                         ceph_put_mds_session(ts);
1225         }
1226 }
1227
1228 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1229                                            struct ceph_mds_session *session)
1230 {
1231         mutex_lock(&mdsc->mutex);
1232         __open_export_target_sessions(mdsc, session);
1233         mutex_unlock(&mdsc->mutex);
1234 }
1235
1236 /*
1237  * session caps
1238  */
1239
1240 static void detach_cap_releases(struct ceph_mds_session *session,
1241                                 struct list_head *target)
1242 {
1243         lockdep_assert_held(&session->s_cap_lock);
1244
1245         list_splice_init(&session->s_cap_releases, target);
1246         session->s_num_cap_releases = 0;
1247         dout("dispose_cap_releases mds%d\n", session->s_mds);
1248 }
1249
1250 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1251                                  struct list_head *dispose)
1252 {
1253         while (!list_empty(dispose)) {
1254                 struct ceph_cap *cap;
1255                 /* zero out the in-progress message */
1256                 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1257                 list_del(&cap->session_caps);
1258                 ceph_put_cap(mdsc, cap);
1259         }
1260 }
1261
1262 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1263                                      struct ceph_mds_session *session)
1264 {
1265         struct ceph_mds_request *req;
1266         struct rb_node *p;
1267
1268         dout("cleanup_session_requests mds%d\n", session->s_mds);
1269         mutex_lock(&mdsc->mutex);
1270         while (!list_empty(&session->s_unsafe)) {
1271                 req = list_first_entry(&session->s_unsafe,
1272                                        struct ceph_mds_request, r_unsafe_item);
1273                 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1274                                     req->r_tid);
1275                 __unregister_request(mdsc, req);
1276         }
1277         /* zero r_attempts, so kick_requests() will re-send requests */
1278         p = rb_first(&mdsc->request_tree);
1279         while (p) {
1280                 req = rb_entry(p, struct ceph_mds_request, r_node);
1281                 p = rb_next(p);
1282                 if (req->r_session &&
1283                     req->r_session->s_mds == session->s_mds)
1284                         req->r_attempts = 0;
1285         }
1286         mutex_unlock(&mdsc->mutex);
1287 }
1288
1289 /*
1290  * Helper to safely iterate over all caps associated with a session, with
1291  * special care taken to handle a racing __ceph_remove_cap().
1292  *
1293  * Caller must hold session s_mutex.
1294  */
1295 int ceph_iterate_session_caps(struct ceph_mds_session *session,
1296                               int (*cb)(struct inode *, struct ceph_cap *,
1297                                         void *), void *arg)
1298 {
1299         struct list_head *p;
1300         struct ceph_cap *cap;
1301         struct inode *inode, *last_inode = NULL;
1302         struct ceph_cap *old_cap = NULL;
1303         int ret;
1304
1305         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1306         spin_lock(&session->s_cap_lock);
1307         p = session->s_caps.next;
1308         while (p != &session->s_caps) {
1309                 cap = list_entry(p, struct ceph_cap, session_caps);
1310                 inode = igrab(&cap->ci->vfs_inode);
1311                 if (!inode) {
1312                         p = p->next;
1313                         continue;
1314                 }
1315                 session->s_cap_iterator = cap;
1316                 spin_unlock(&session->s_cap_lock);
1317
1318                 if (last_inode) {
1319                         /* avoid calling iput_final() while holding
1320                          * s_mutex or in mds dispatch threads */
1321                         ceph_async_iput(last_inode);
1322                         last_inode = NULL;
1323                 }
1324                 if (old_cap) {
1325                         ceph_put_cap(session->s_mdsc, old_cap);
1326                         old_cap = NULL;
1327                 }
1328
1329                 ret = cb(inode, cap, arg);
1330                 last_inode = inode;
1331
1332                 spin_lock(&session->s_cap_lock);
1333                 p = p->next;
1334                 if (!cap->ci) {
1335                         dout("iterate_session_caps  finishing cap %p removal\n",
1336                              cap);
1337                         BUG_ON(cap->session != session);
1338                         cap->session = NULL;
1339                         list_del_init(&cap->session_caps);
1340                         session->s_nr_caps--;
1341                         if (cap->queue_release)
1342                                 __ceph_queue_cap_release(session, cap);
1343                         else
1344                                 old_cap = cap;  /* put_cap it w/o locks held */
1345                 }
1346                 if (ret < 0)
1347                         goto out;
1348         }
1349         ret = 0;
1350 out:
1351         session->s_cap_iterator = NULL;
1352         spin_unlock(&session->s_cap_lock);
1353
1354         ceph_async_iput(last_inode);
1355         if (old_cap)
1356                 ceph_put_cap(session->s_mdsc, old_cap);
1357
1358         return ret;
1359 }
1360
1361 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1362                                   void *arg)
1363 {
1364         struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1365         struct ceph_inode_info *ci = ceph_inode(inode);
1366         LIST_HEAD(to_remove);
1367         bool drop = false;
1368         bool invalidate = false;
1369
1370         dout("removing cap %p, ci is %p, inode is %p\n",
1371              cap, ci, &ci->vfs_inode);
1372         spin_lock(&ci->i_ceph_lock);
1373         if (cap->mds_wanted | cap->issued)
1374                 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1375         __ceph_remove_cap(cap, false);
1376         if (!ci->i_auth_cap) {
1377                 struct ceph_cap_flush *cf;
1378                 struct ceph_mds_client *mdsc = fsc->mdsc;
1379
1380                 if (ci->i_wrbuffer_ref > 0 &&
1381                     READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
1382                         invalidate = true;
1383
1384                 while (!list_empty(&ci->i_cap_flush_list)) {
1385                         cf = list_first_entry(&ci->i_cap_flush_list,
1386                                               struct ceph_cap_flush, i_list);
1387                         list_move(&cf->i_list, &to_remove);
1388                 }
1389
1390                 spin_lock(&mdsc->cap_dirty_lock);
1391
1392                 list_for_each_entry(cf, &to_remove, i_list)
1393                         list_del(&cf->g_list);
1394
1395                 if (!list_empty(&ci->i_dirty_item)) {
1396                         pr_warn_ratelimited(
1397                                 " dropping dirty %s state for %p %lld\n",
1398                                 ceph_cap_string(ci->i_dirty_caps),
1399                                 inode, ceph_ino(inode));
1400                         ci->i_dirty_caps = 0;
1401                         list_del_init(&ci->i_dirty_item);
1402                         drop = true;
1403                 }
1404                 if (!list_empty(&ci->i_flushing_item)) {
1405                         pr_warn_ratelimited(
1406                                 " dropping dirty+flushing %s state for %p %lld\n",
1407                                 ceph_cap_string(ci->i_flushing_caps),
1408                                 inode, ceph_ino(inode));
1409                         ci->i_flushing_caps = 0;
1410                         list_del_init(&ci->i_flushing_item);
1411                         mdsc->num_cap_flushing--;
1412                         drop = true;
1413                 }
1414                 spin_unlock(&mdsc->cap_dirty_lock);
1415
1416                 if (atomic_read(&ci->i_filelock_ref) > 0) {
1417                         /* make further file lock syscall return -EIO */
1418                         ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1419                         pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1420                                             inode, ceph_ino(inode));
1421                 }
1422
1423                 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1424                         list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1425                         ci->i_prealloc_cap_flush = NULL;
1426                 }
1427
1428                if (drop &&
1429                   ci->i_wrbuffer_ref_head == 0 &&
1430                   ci->i_wr_ref == 0 &&
1431                   ci->i_dirty_caps == 0 &&
1432                   ci->i_flushing_caps == 0) {
1433                       ceph_put_snap_context(ci->i_head_snapc);
1434                       ci->i_head_snapc = NULL;
1435                }
1436         }
1437         spin_unlock(&ci->i_ceph_lock);
1438         while (!list_empty(&to_remove)) {
1439                 struct ceph_cap_flush *cf;
1440                 cf = list_first_entry(&to_remove,
1441                                       struct ceph_cap_flush, i_list);
1442                 list_del(&cf->i_list);
1443                 ceph_free_cap_flush(cf);
1444         }
1445
1446         wake_up_all(&ci->i_cap_wq);
1447         if (invalidate)
1448                 ceph_queue_invalidate(inode);
1449         if (drop)
1450                 iput(inode);
1451         return 0;
1452 }
1453
1454 /*
1455  * caller must hold session s_mutex
1456  */
1457 static void remove_session_caps(struct ceph_mds_session *session)
1458 {
1459         struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1460         struct super_block *sb = fsc->sb;
1461         LIST_HEAD(dispose);
1462
1463         dout("remove_session_caps on %p\n", session);
1464         ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1465
1466         wake_up_all(&fsc->mdsc->cap_flushing_wq);
1467
1468         spin_lock(&session->s_cap_lock);
1469         if (session->s_nr_caps > 0) {
1470                 struct inode *inode;
1471                 struct ceph_cap *cap, *prev = NULL;
1472                 struct ceph_vino vino;
1473                 /*
1474                  * iterate_session_caps() skips inodes that are being
1475                  * deleted, we need to wait until deletions are complete.
1476                  * __wait_on_freeing_inode() is designed for the job,
1477                  * but it is not exported, so use lookup inode function
1478                  * to access it.
1479                  */
1480                 while (!list_empty(&session->s_caps)) {
1481                         cap = list_entry(session->s_caps.next,
1482                                          struct ceph_cap, session_caps);
1483                         if (cap == prev)
1484                                 break;
1485                         prev = cap;
1486                         vino = cap->ci->i_vino;
1487                         spin_unlock(&session->s_cap_lock);
1488
1489                         inode = ceph_find_inode(sb, vino);
1490                          /* avoid calling iput_final() while holding s_mutex */
1491                         ceph_async_iput(inode);
1492
1493                         spin_lock(&session->s_cap_lock);
1494                 }
1495         }
1496
1497         // drop cap expires and unlock s_cap_lock
1498         detach_cap_releases(session, &dispose);
1499
1500         BUG_ON(session->s_nr_caps > 0);
1501         BUG_ON(!list_empty(&session->s_cap_flushing));
1502         spin_unlock(&session->s_cap_lock);
1503         dispose_cap_releases(session->s_mdsc, &dispose);
1504 }
1505
1506 enum {
1507         RECONNECT,
1508         RENEWCAPS,
1509         FORCE_RO,
1510 };
1511
1512 /*
1513  * wake up any threads waiting on this session's caps.  if the cap is
1514  * old (didn't get renewed on the client reconnect), remove it now.
1515  *
1516  * caller must hold s_mutex.
1517  */
1518 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1519                               void *arg)
1520 {
1521         struct ceph_inode_info *ci = ceph_inode(inode);
1522         unsigned long ev = (unsigned long)arg;
1523
1524         if (ev == RECONNECT) {
1525                 spin_lock(&ci->i_ceph_lock);
1526                 ci->i_wanted_max_size = 0;
1527                 ci->i_requested_max_size = 0;
1528                 spin_unlock(&ci->i_ceph_lock);
1529         } else if (ev == RENEWCAPS) {
1530                 if (cap->cap_gen < cap->session->s_cap_gen) {
1531                         /* mds did not re-issue stale cap */
1532                         spin_lock(&ci->i_ceph_lock);
1533                         cap->issued = cap->implemented = CEPH_CAP_PIN;
1534                         /* make sure mds knows what we want */
1535                         if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted)
1536                                 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1537                         spin_unlock(&ci->i_ceph_lock);
1538                 }
1539         } else if (ev == FORCE_RO) {
1540         }
1541         wake_up_all(&ci->i_cap_wq);
1542         return 0;
1543 }
1544
1545 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1546 {
1547         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1548         ceph_iterate_session_caps(session, wake_up_session_cb,
1549                                   (void *)(unsigned long)ev);
1550 }
1551
1552 /*
1553  * Send periodic message to MDS renewing all currently held caps.  The
1554  * ack will reset the expiration for all caps from this session.
1555  *
1556  * caller holds s_mutex
1557  */
1558 static int send_renew_caps(struct ceph_mds_client *mdsc,
1559                            struct ceph_mds_session *session)
1560 {
1561         struct ceph_msg *msg;
1562         int state;
1563
1564         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1565             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1566                 pr_info("mds%d caps stale\n", session->s_mds);
1567         session->s_renew_requested = jiffies;
1568
1569         /* do not try to renew caps until a recovering mds has reconnected
1570          * with its clients. */
1571         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1572         if (state < CEPH_MDS_STATE_RECONNECT) {
1573                 dout("send_renew_caps ignoring mds%d (%s)\n",
1574                      session->s_mds, ceph_mds_state_name(state));
1575                 return 0;
1576         }
1577
1578         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1579                 ceph_mds_state_name(state));
1580         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1581                                  ++session->s_renew_seq);
1582         if (!msg)
1583                 return -ENOMEM;
1584         ceph_con_send(&session->s_con, msg);
1585         return 0;
1586 }
1587
1588 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1589                              struct ceph_mds_session *session, u64 seq)
1590 {
1591         struct ceph_msg *msg;
1592
1593         dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1594              session->s_mds, ceph_session_state_name(session->s_state), seq);
1595         msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1596         if (!msg)
1597                 return -ENOMEM;
1598         ceph_con_send(&session->s_con, msg);
1599         return 0;
1600 }
1601
1602
1603 /*
1604  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1605  *
1606  * Called under session->s_mutex
1607  */
1608 static void renewed_caps(struct ceph_mds_client *mdsc,
1609                          struct ceph_mds_session *session, int is_renew)
1610 {
1611         int was_stale;
1612         int wake = 0;
1613
1614         spin_lock(&session->s_cap_lock);
1615         was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1616
1617         session->s_cap_ttl = session->s_renew_requested +
1618                 mdsc->mdsmap->m_session_timeout*HZ;
1619
1620         if (was_stale) {
1621                 if (time_before(jiffies, session->s_cap_ttl)) {
1622                         pr_info("mds%d caps renewed\n", session->s_mds);
1623                         wake = 1;
1624                 } else {
1625                         pr_info("mds%d caps still stale\n", session->s_mds);
1626                 }
1627         }
1628         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1629              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1630              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1631         spin_unlock(&session->s_cap_lock);
1632
1633         if (wake)
1634                 wake_up_session_caps(session, RENEWCAPS);
1635 }
1636
1637 /*
1638  * send a session close request
1639  */
1640 static int request_close_session(struct ceph_mds_client *mdsc,
1641                                  struct ceph_mds_session *session)
1642 {
1643         struct ceph_msg *msg;
1644
1645         dout("request_close_session mds%d state %s seq %lld\n",
1646              session->s_mds, ceph_session_state_name(session->s_state),
1647              session->s_seq);
1648         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1649         if (!msg)
1650                 return -ENOMEM;
1651         ceph_con_send(&session->s_con, msg);
1652         return 1;
1653 }
1654
1655 /*
1656  * Called with s_mutex held.
1657  */
1658 static int __close_session(struct ceph_mds_client *mdsc,
1659                          struct ceph_mds_session *session)
1660 {
1661         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1662                 return 0;
1663         session->s_state = CEPH_MDS_SESSION_CLOSING;
1664         return request_close_session(mdsc, session);
1665 }
1666
1667 static bool drop_negative_children(struct dentry *dentry)
1668 {
1669         struct dentry *child;
1670         bool all_negative = true;
1671
1672         if (!d_is_dir(dentry))
1673                 goto out;
1674
1675         spin_lock(&dentry->d_lock);
1676         list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1677                 if (d_really_is_positive(child)) {
1678                         all_negative = false;
1679                         break;
1680                 }
1681         }
1682         spin_unlock(&dentry->d_lock);
1683
1684         if (all_negative)
1685                 shrink_dcache_parent(dentry);
1686 out:
1687         return all_negative;
1688 }
1689
1690 /*
1691  * Trim old(er) caps.
1692  *
1693  * Because we can't cache an inode without one or more caps, we do
1694  * this indirectly: if a cap is unused, we prune its aliases, at which
1695  * point the inode will hopefully get dropped to.
1696  *
1697  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1698  * memory pressure from the MDS, though, so it needn't be perfect.
1699  */
1700 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1701 {
1702         struct ceph_mds_session *session = arg;
1703         struct ceph_inode_info *ci = ceph_inode(inode);
1704         int used, wanted, oissued, mine;
1705
1706         if (session->s_trim_caps <= 0)
1707                 return -1;
1708
1709         spin_lock(&ci->i_ceph_lock);
1710         mine = cap->issued | cap->implemented;
1711         used = __ceph_caps_used(ci);
1712         wanted = __ceph_caps_file_wanted(ci);
1713         oissued = __ceph_caps_issued_other(ci, cap);
1714
1715         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1716              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1717              ceph_cap_string(used), ceph_cap_string(wanted));
1718         if (cap == ci->i_auth_cap) {
1719                 if (ci->i_dirty_caps || ci->i_flushing_caps ||
1720                     !list_empty(&ci->i_cap_snaps))
1721                         goto out;
1722                 if ((used | wanted) & CEPH_CAP_ANY_WR)
1723                         goto out;
1724                 /* Note: it's possible that i_filelock_ref becomes non-zero
1725                  * after dropping auth caps. It doesn't hurt because reply
1726                  * of lock mds request will re-add auth caps. */
1727                 if (atomic_read(&ci->i_filelock_ref) > 0)
1728                         goto out;
1729         }
1730         /* The inode has cached pages, but it's no longer used.
1731          * we can safely drop it */
1732         if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1733             !(oissued & CEPH_CAP_FILE_CACHE)) {
1734           used = 0;
1735           oissued = 0;
1736         }
1737         if ((used | wanted) & ~oissued & mine)
1738                 goto out;   /* we need these caps */
1739
1740         if (oissued) {
1741                 /* we aren't the only cap.. just remove us */
1742                 __ceph_remove_cap(cap, true);
1743                 session->s_trim_caps--;
1744         } else {
1745                 struct dentry *dentry;
1746                 /* try dropping referring dentries */
1747                 spin_unlock(&ci->i_ceph_lock);
1748                 dentry = d_find_any_alias(inode);
1749                 if (dentry && drop_negative_children(dentry)) {
1750                         int count;
1751                         dput(dentry);
1752                         d_prune_aliases(inode);
1753                         count = atomic_read(&inode->i_count);
1754                         if (count == 1)
1755                                 session->s_trim_caps--;
1756                         dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1757                              inode, cap, count);
1758                 } else {
1759                         dput(dentry);
1760                 }
1761                 return 0;
1762         }
1763
1764 out:
1765         spin_unlock(&ci->i_ceph_lock);
1766         return 0;
1767 }
1768
1769 /*
1770  * Trim session cap count down to some max number.
1771  */
1772 int ceph_trim_caps(struct ceph_mds_client *mdsc,
1773                    struct ceph_mds_session *session,
1774                    int max_caps)
1775 {
1776         int trim_caps = session->s_nr_caps - max_caps;
1777
1778         dout("trim_caps mds%d start: %d / %d, trim %d\n",
1779              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1780         if (trim_caps > 0) {
1781                 session->s_trim_caps = trim_caps;
1782                 ceph_iterate_session_caps(session, trim_caps_cb, session);
1783                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1784                      session->s_mds, session->s_nr_caps, max_caps,
1785                         trim_caps - session->s_trim_caps);
1786                 session->s_trim_caps = 0;
1787         }
1788
1789         ceph_flush_cap_releases(mdsc, session);
1790         return 0;
1791 }
1792
1793 static int check_caps_flush(struct ceph_mds_client *mdsc,
1794                             u64 want_flush_tid)
1795 {
1796         int ret = 1;
1797
1798         spin_lock(&mdsc->cap_dirty_lock);
1799         if (!list_empty(&mdsc->cap_flush_list)) {
1800                 struct ceph_cap_flush *cf =
1801                         list_first_entry(&mdsc->cap_flush_list,
1802                                          struct ceph_cap_flush, g_list);
1803                 if (cf->tid <= want_flush_tid) {
1804                         dout("check_caps_flush still flushing tid "
1805                              "%llu <= %llu\n", cf->tid, want_flush_tid);
1806                         ret = 0;
1807                 }
1808         }
1809         spin_unlock(&mdsc->cap_dirty_lock);
1810         return ret;
1811 }
1812
1813 /*
1814  * flush all dirty inode data to disk.
1815  *
1816  * returns true if we've flushed through want_flush_tid
1817  */
1818 static void wait_caps_flush(struct ceph_mds_client *mdsc,
1819                             u64 want_flush_tid)
1820 {
1821         dout("check_caps_flush want %llu\n", want_flush_tid);
1822
1823         wait_event(mdsc->cap_flushing_wq,
1824                    check_caps_flush(mdsc, want_flush_tid));
1825
1826         dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1827 }
1828
1829 /*
1830  * called under s_mutex
1831  */
1832 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1833                                    struct ceph_mds_session *session)
1834 {
1835         struct ceph_msg *msg = NULL;
1836         struct ceph_mds_cap_release *head;
1837         struct ceph_mds_cap_item *item;
1838         struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
1839         struct ceph_cap *cap;
1840         LIST_HEAD(tmp_list);
1841         int num_cap_releases;
1842         __le32  barrier, *cap_barrier;
1843
1844         down_read(&osdc->lock);
1845         barrier = cpu_to_le32(osdc->epoch_barrier);
1846         up_read(&osdc->lock);
1847
1848         spin_lock(&session->s_cap_lock);
1849 again:
1850         list_splice_init(&session->s_cap_releases, &tmp_list);
1851         num_cap_releases = session->s_num_cap_releases;
1852         session->s_num_cap_releases = 0;
1853         spin_unlock(&session->s_cap_lock);
1854
1855         while (!list_empty(&tmp_list)) {
1856                 if (!msg) {
1857                         msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1858                                         PAGE_SIZE, GFP_NOFS, false);
1859                         if (!msg)
1860                                 goto out_err;
1861                         head = msg->front.iov_base;
1862                         head->num = cpu_to_le32(0);
1863                         msg->front.iov_len = sizeof(*head);
1864
1865                         msg->hdr.version = cpu_to_le16(2);
1866                         msg->hdr.compat_version = cpu_to_le16(1);
1867                 }
1868
1869                 cap = list_first_entry(&tmp_list, struct ceph_cap,
1870                                         session_caps);
1871                 list_del(&cap->session_caps);
1872                 num_cap_releases--;
1873
1874                 head = msg->front.iov_base;
1875                 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
1876                                    &head->num);
1877                 item = msg->front.iov_base + msg->front.iov_len;
1878                 item->ino = cpu_to_le64(cap->cap_ino);
1879                 item->cap_id = cpu_to_le64(cap->cap_id);
1880                 item->migrate_seq = cpu_to_le32(cap->mseq);
1881                 item->seq = cpu_to_le32(cap->issue_seq);
1882                 msg->front.iov_len += sizeof(*item);
1883
1884                 ceph_put_cap(mdsc, cap);
1885
1886                 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1887                         // Append cap_barrier field
1888                         cap_barrier = msg->front.iov_base + msg->front.iov_len;
1889                         *cap_barrier = barrier;
1890                         msg->front.iov_len += sizeof(*cap_barrier);
1891
1892                         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1893                         dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1894                         ceph_con_send(&session->s_con, msg);
1895                         msg = NULL;
1896                 }
1897         }
1898
1899         BUG_ON(num_cap_releases != 0);
1900
1901         spin_lock(&session->s_cap_lock);
1902         if (!list_empty(&session->s_cap_releases))
1903                 goto again;
1904         spin_unlock(&session->s_cap_lock);
1905
1906         if (msg) {
1907                 // Append cap_barrier field
1908                 cap_barrier = msg->front.iov_base + msg->front.iov_len;
1909                 *cap_barrier = barrier;
1910                 msg->front.iov_len += sizeof(*cap_barrier);
1911
1912                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1913                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1914                 ceph_con_send(&session->s_con, msg);
1915         }
1916         return;
1917 out_err:
1918         pr_err("send_cap_releases mds%d, failed to allocate message\n",
1919                 session->s_mds);
1920         spin_lock(&session->s_cap_lock);
1921         list_splice(&tmp_list, &session->s_cap_releases);
1922         session->s_num_cap_releases += num_cap_releases;
1923         spin_unlock(&session->s_cap_lock);
1924 }
1925
1926 static void ceph_cap_release_work(struct work_struct *work)
1927 {
1928         struct ceph_mds_session *session =
1929                 container_of(work, struct ceph_mds_session, s_cap_release_work);
1930
1931         mutex_lock(&session->s_mutex);
1932         if (session->s_state == CEPH_MDS_SESSION_OPEN ||
1933             session->s_state == CEPH_MDS_SESSION_HUNG)
1934                 ceph_send_cap_releases(session->s_mdsc, session);
1935         mutex_unlock(&session->s_mutex);
1936         ceph_put_mds_session(session);
1937 }
1938
1939 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
1940                              struct ceph_mds_session *session)
1941 {
1942         if (mdsc->stopping)
1943                 return;
1944
1945         get_session(session);
1946         if (queue_work(mdsc->fsc->cap_wq,
1947                        &session->s_cap_release_work)) {
1948                 dout("cap release work queued\n");
1949         } else {
1950                 ceph_put_mds_session(session);
1951                 dout("failed to queue cap release work\n");
1952         }
1953 }
1954
1955 /*
1956  * caller holds session->s_cap_lock
1957  */
1958 void __ceph_queue_cap_release(struct ceph_mds_session *session,
1959                               struct ceph_cap *cap)
1960 {
1961         list_add_tail(&cap->session_caps, &session->s_cap_releases);
1962         session->s_num_cap_releases++;
1963
1964         if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
1965                 ceph_flush_cap_releases(session->s_mdsc, session);
1966 }
1967
1968 static void ceph_cap_reclaim_work(struct work_struct *work)
1969 {
1970         struct ceph_mds_client *mdsc =
1971                 container_of(work, struct ceph_mds_client, cap_reclaim_work);
1972         int ret = ceph_trim_dentries(mdsc);
1973         if (ret == -EAGAIN)
1974                 ceph_queue_cap_reclaim_work(mdsc);
1975 }
1976
1977 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
1978 {
1979         if (mdsc->stopping)
1980                 return;
1981
1982         if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
1983                 dout("caps reclaim work queued\n");
1984         } else {
1985                 dout("failed to queue caps release work\n");
1986         }
1987 }
1988
1989 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
1990 {
1991         int val;
1992         if (!nr)
1993                 return;
1994         val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
1995         if (!(val % CEPH_CAPS_PER_RELEASE)) {
1996                 atomic_set(&mdsc->cap_reclaim_pending, 0);
1997                 ceph_queue_cap_reclaim_work(mdsc);
1998         }
1999 }
2000
2001 /*
2002  * requests
2003  */
2004
2005 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2006                                     struct inode *dir)
2007 {
2008         struct ceph_inode_info *ci = ceph_inode(dir);
2009         struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2010         struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2011         size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2012         int order, num_entries;
2013
2014         spin_lock(&ci->i_ceph_lock);
2015         num_entries = ci->i_files + ci->i_subdirs;
2016         spin_unlock(&ci->i_ceph_lock);
2017         num_entries = max(num_entries, 1);
2018         num_entries = min(num_entries, opt->max_readdir);
2019
2020         order = get_order(size * num_entries);
2021         while (order >= 0) {
2022                 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2023                                                              __GFP_NOWARN,
2024                                                              order);
2025                 if (rinfo->dir_entries)
2026                         break;
2027                 order--;
2028         }
2029         if (!rinfo->dir_entries)
2030                 return -ENOMEM;
2031
2032         num_entries = (PAGE_SIZE << order) / size;
2033         num_entries = min(num_entries, opt->max_readdir);
2034
2035         rinfo->dir_buf_size = PAGE_SIZE << order;
2036         req->r_num_caps = num_entries + 1;
2037         req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2038         req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2039         return 0;
2040 }
2041
2042 /*
2043  * Create an mds request.
2044  */
2045 struct ceph_mds_request *
2046 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2047 {
2048         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
2049         struct timespec64 ts;
2050
2051         if (!req)
2052                 return ERR_PTR(-ENOMEM);
2053
2054         mutex_init(&req->r_fill_mutex);
2055         req->r_mdsc = mdsc;
2056         req->r_started = jiffies;
2057         req->r_resend_mds = -1;
2058         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2059         INIT_LIST_HEAD(&req->r_unsafe_target_item);
2060         req->r_fmode = -1;
2061         kref_init(&req->r_kref);
2062         RB_CLEAR_NODE(&req->r_node);
2063         INIT_LIST_HEAD(&req->r_wait);
2064         init_completion(&req->r_completion);
2065         init_completion(&req->r_safe_completion);
2066         INIT_LIST_HEAD(&req->r_unsafe_item);
2067
2068         ktime_get_coarse_real_ts64(&ts);
2069         req->r_stamp = timespec64_trunc(ts, mdsc->fsc->sb->s_time_gran);
2070
2071         req->r_op = op;
2072         req->r_direct_mode = mode;
2073         return req;
2074 }
2075
2076 /*
2077  * return oldest (lowest) request, tid in request tree, 0 if none.
2078  *
2079  * called under mdsc->mutex.
2080  */
2081 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2082 {
2083         if (RB_EMPTY_ROOT(&mdsc->request_tree))
2084                 return NULL;
2085         return rb_entry(rb_first(&mdsc->request_tree),
2086                         struct ceph_mds_request, r_node);
2087 }
2088
2089 static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2090 {
2091         return mdsc->oldest_tid;
2092 }
2093
2094 /*
2095  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
2096  * on build_path_from_dentry in fs/cifs/dir.c.
2097  *
2098  * If @stop_on_nosnap, generate path relative to the first non-snapped
2099  * inode.
2100  *
2101  * Encode hidden .snap dirs as a double /, i.e.
2102  *   foo/.snap/bar -> foo//bar
2103  */
2104 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2105                            int stop_on_nosnap)
2106 {
2107         struct dentry *temp;
2108         char *path;
2109         int pos;
2110         unsigned seq;
2111         u64 base;
2112
2113         if (!dentry)
2114                 return ERR_PTR(-EINVAL);
2115
2116         path = __getname();
2117         if (!path)
2118                 return ERR_PTR(-ENOMEM);
2119 retry:
2120         pos = PATH_MAX - 1;
2121         path[pos] = '\0';
2122
2123         seq = read_seqbegin(&rename_lock);
2124         rcu_read_lock();
2125         temp = dentry;
2126         for (;;) {
2127                 struct inode *inode;
2128
2129                 spin_lock(&temp->d_lock);
2130                 inode = d_inode(temp);
2131                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2132                         dout("build_path path+%d: %p SNAPDIR\n",
2133                              pos, temp);
2134                 } else if (stop_on_nosnap && inode && dentry != temp &&
2135                            ceph_snap(inode) == CEPH_NOSNAP) {
2136                         spin_unlock(&temp->d_lock);
2137                         pos++; /* get rid of any prepended '/' */
2138                         break;
2139                 } else {
2140                         pos -= temp->d_name.len;
2141                         if (pos < 0) {
2142                                 spin_unlock(&temp->d_lock);
2143                                 break;
2144                         }
2145                         memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2146                 }
2147                 spin_unlock(&temp->d_lock);
2148                 temp = READ_ONCE(temp->d_parent);
2149
2150                 /* Are we at the root? */
2151                 if (IS_ROOT(temp))
2152                         break;
2153
2154                 /* Are we out of buffer? */
2155                 if (--pos < 0)
2156                         break;
2157
2158                 path[pos] = '/';
2159         }
2160         base = ceph_ino(d_inode(temp));
2161         rcu_read_unlock();
2162         if (pos < 0 || read_seqretry(&rename_lock, seq)) {
2163                 pr_err("build_path did not end path lookup where "
2164                        "expected, pos is %d\n", pos);
2165                 /* presumably this is only possible if racing with a
2166                    rename of one of the parent directories (we can not
2167                    lock the dentries above us to prevent this, but
2168                    retrying should be harmless) */
2169                 goto retry;
2170         }
2171
2172         *pbase = base;
2173         *plen = PATH_MAX - 1 - pos;
2174         dout("build_path on %p %d built %llx '%.*s'\n",
2175              dentry, d_count(dentry), base, *plen, path + pos);
2176         return path + pos;
2177 }
2178
2179 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2180                              const char **ppath, int *ppathlen, u64 *pino,
2181                              bool *pfreepath, bool parent_locked)
2182 {
2183         char *path;
2184
2185         rcu_read_lock();
2186         if (!dir)
2187                 dir = d_inode_rcu(dentry->d_parent);
2188         if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2189                 *pino = ceph_ino(dir);
2190                 rcu_read_unlock();
2191                 *ppath = dentry->d_name.name;
2192                 *ppathlen = dentry->d_name.len;
2193                 return 0;
2194         }
2195         rcu_read_unlock();
2196         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2197         if (IS_ERR(path))
2198                 return PTR_ERR(path);
2199         *ppath = path;
2200         *pfreepath = true;
2201         return 0;
2202 }
2203
2204 static int build_inode_path(struct inode *inode,
2205                             const char **ppath, int *ppathlen, u64 *pino,
2206                             bool *pfreepath)
2207 {
2208         struct dentry *dentry;
2209         char *path;
2210
2211         if (ceph_snap(inode) == CEPH_NOSNAP) {
2212                 *pino = ceph_ino(inode);
2213                 *ppathlen = 0;
2214                 return 0;
2215         }
2216         dentry = d_find_alias(inode);
2217         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2218         dput(dentry);
2219         if (IS_ERR(path))
2220                 return PTR_ERR(path);
2221         *ppath = path;
2222         *pfreepath = true;
2223         return 0;
2224 }
2225
2226 /*
2227  * request arguments may be specified via an inode *, a dentry *, or
2228  * an explicit ino+path.
2229  */
2230 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2231                                   struct inode *rdiri, const char *rpath,
2232                                   u64 rino, const char **ppath, int *pathlen,
2233                                   u64 *ino, bool *freepath, bool parent_locked)
2234 {
2235         int r = 0;
2236
2237         if (rinode) {
2238                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2239                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2240                      ceph_snap(rinode));
2241         } else if (rdentry) {
2242                 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2243                                         freepath, parent_locked);
2244                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2245                      *ppath);
2246         } else if (rpath || rino) {
2247                 *ino = rino;
2248                 *ppath = rpath;
2249                 *pathlen = rpath ? strlen(rpath) : 0;
2250                 dout(" path %.*s\n", *pathlen, rpath);
2251         }
2252
2253         return r;
2254 }
2255
2256 /*
2257  * called under mdsc->mutex
2258  */
2259 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2260                                                struct ceph_mds_request *req,
2261                                                int mds, bool drop_cap_releases)
2262 {
2263         struct ceph_msg *msg;
2264         struct ceph_mds_request_head *head;
2265         const char *path1 = NULL;
2266         const char *path2 = NULL;
2267         u64 ino1 = 0, ino2 = 0;
2268         int pathlen1 = 0, pathlen2 = 0;
2269         bool freepath1 = false, freepath2 = false;
2270         int len;
2271         u16 releases;
2272         void *p, *end;
2273         int ret;
2274
2275         ret = set_request_path_attr(req->r_inode, req->r_dentry,
2276                               req->r_parent, req->r_path1, req->r_ino1.ino,
2277                               &path1, &pathlen1, &ino1, &freepath1,
2278                               test_bit(CEPH_MDS_R_PARENT_LOCKED,
2279                                         &req->r_req_flags));
2280         if (ret < 0) {
2281                 msg = ERR_PTR(ret);
2282                 goto out;
2283         }
2284
2285         /* If r_old_dentry is set, then assume that its parent is locked */
2286         ret = set_request_path_attr(NULL, req->r_old_dentry,
2287                               req->r_old_dentry_dir,
2288                               req->r_path2, req->r_ino2.ino,
2289                               &path2, &pathlen2, &ino2, &freepath2, true);
2290         if (ret < 0) {
2291                 msg = ERR_PTR(ret);
2292                 goto out_free1;
2293         }
2294
2295         len = sizeof(*head) +
2296                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2297                 sizeof(struct ceph_timespec);
2298
2299         /* calculate (max) length for cap releases */
2300         len += sizeof(struct ceph_mds_request_release) *
2301                 (!!req->r_inode_drop + !!req->r_dentry_drop +
2302                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2303         if (req->r_dentry_drop)
2304                 len += pathlen1;
2305         if (req->r_old_dentry_drop)
2306                 len += pathlen2;
2307
2308         msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2309         if (!msg) {
2310                 msg = ERR_PTR(-ENOMEM);
2311                 goto out_free2;
2312         }
2313
2314         msg->hdr.version = cpu_to_le16(2);
2315         msg->hdr.tid = cpu_to_le64(req->r_tid);
2316
2317         head = msg->front.iov_base;
2318         p = msg->front.iov_base + sizeof(*head);
2319         end = msg->front.iov_base + msg->front.iov_len;
2320
2321         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2322         head->op = cpu_to_le32(req->r_op);
2323         head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2324         head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2325         head->args = req->r_args;
2326
2327         ceph_encode_filepath(&p, end, ino1, path1);
2328         ceph_encode_filepath(&p, end, ino2, path2);
2329
2330         /* make note of release offset, in case we need to replay */
2331         req->r_request_release_offset = p - msg->front.iov_base;
2332
2333         /* cap releases */
2334         releases = 0;
2335         if (req->r_inode_drop)
2336                 releases += ceph_encode_inode_release(&p,
2337                       req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2338                       mds, req->r_inode_drop, req->r_inode_unless, 0);
2339         if (req->r_dentry_drop)
2340                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
2341                                 req->r_parent, mds, req->r_dentry_drop,
2342                                 req->r_dentry_unless);
2343         if (req->r_old_dentry_drop)
2344                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2345                                 req->r_old_dentry_dir, mds,
2346                                 req->r_old_dentry_drop,
2347                                 req->r_old_dentry_unless);
2348         if (req->r_old_inode_drop)
2349                 releases += ceph_encode_inode_release(&p,
2350                       d_inode(req->r_old_dentry),
2351                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2352
2353         if (drop_cap_releases) {
2354                 releases = 0;
2355                 p = msg->front.iov_base + req->r_request_release_offset;
2356         }
2357
2358         head->num_releases = cpu_to_le16(releases);
2359
2360         /* time stamp */
2361         {
2362                 struct ceph_timespec ts;
2363                 ceph_encode_timespec64(&ts, &req->r_stamp);
2364                 ceph_encode_copy(&p, &ts, sizeof(ts));
2365         }
2366
2367         BUG_ON(p > end);
2368         msg->front.iov_len = p - msg->front.iov_base;
2369         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2370
2371         if (req->r_pagelist) {
2372                 struct ceph_pagelist *pagelist = req->r_pagelist;
2373                 ceph_msg_data_add_pagelist(msg, pagelist);
2374                 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2375         } else {
2376                 msg->hdr.data_len = 0;
2377         }
2378
2379         msg->hdr.data_off = cpu_to_le16(0);
2380
2381 out_free2:
2382         if (freepath2)
2383                 ceph_mdsc_free_path((char *)path2, pathlen2);
2384 out_free1:
2385         if (freepath1)
2386                 ceph_mdsc_free_path((char *)path1, pathlen1);
2387 out:
2388         return msg;
2389 }
2390
2391 /*
2392  * called under mdsc->mutex if error, under no mutex if
2393  * success.
2394  */
2395 static void complete_request(struct ceph_mds_client *mdsc,
2396                              struct ceph_mds_request *req)
2397 {
2398         if (req->r_callback)
2399                 req->r_callback(mdsc, req);
2400         complete_all(&req->r_completion);
2401 }
2402
2403 /*
2404  * called under mdsc->mutex
2405  */
2406 static int __prepare_send_request(struct ceph_mds_client *mdsc,
2407                                   struct ceph_mds_request *req,
2408                                   int mds, bool drop_cap_releases)
2409 {
2410         struct ceph_mds_request_head *rhead;
2411         struct ceph_msg *msg;
2412         int flags = 0;
2413
2414         req->r_attempts++;
2415         if (req->r_inode) {
2416                 struct ceph_cap *cap =
2417                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2418
2419                 if (cap)
2420                         req->r_sent_on_mseq = cap->mseq;
2421                 else
2422                         req->r_sent_on_mseq = -1;
2423         }
2424         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2425              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2426
2427         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2428                 void *p;
2429                 /*
2430                  * Replay.  Do not regenerate message (and rebuild
2431                  * paths, etc.); just use the original message.
2432                  * Rebuilding paths will break for renames because
2433                  * d_move mangles the src name.
2434                  */
2435                 msg = req->r_request;
2436                 rhead = msg->front.iov_base;
2437
2438                 flags = le32_to_cpu(rhead->flags);
2439                 flags |= CEPH_MDS_FLAG_REPLAY;
2440                 rhead->flags = cpu_to_le32(flags);
2441
2442                 if (req->r_target_inode)
2443                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2444
2445                 rhead->num_retry = req->r_attempts - 1;
2446
2447                 /* remove cap/dentry releases from message */
2448                 rhead->num_releases = 0;
2449
2450                 /* time stamp */
2451                 p = msg->front.iov_base + req->r_request_release_offset;
2452                 {
2453                         struct ceph_timespec ts;
2454                         ceph_encode_timespec64(&ts, &req->r_stamp);
2455                         ceph_encode_copy(&p, &ts, sizeof(ts));
2456                 }
2457
2458                 msg->front.iov_len = p - msg->front.iov_base;
2459                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2460                 return 0;
2461         }
2462
2463         if (req->r_request) {
2464                 ceph_msg_put(req->r_request);
2465                 req->r_request = NULL;
2466         }
2467         msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2468         if (IS_ERR(msg)) {
2469                 req->r_err = PTR_ERR(msg);
2470                 return PTR_ERR(msg);
2471         }
2472         req->r_request = msg;
2473
2474         rhead = msg->front.iov_base;
2475         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2476         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2477                 flags |= CEPH_MDS_FLAG_REPLAY;
2478         if (req->r_parent)
2479                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2480         rhead->flags = cpu_to_le32(flags);
2481         rhead->num_fwd = req->r_num_fwd;
2482         rhead->num_retry = req->r_attempts - 1;
2483         rhead->ino = 0;
2484
2485         dout(" r_parent = %p\n", req->r_parent);
2486         return 0;
2487 }
2488
2489 /*
2490  * send request, or put it on the appropriate wait list.
2491  */
2492 static void __do_request(struct ceph_mds_client *mdsc,
2493                         struct ceph_mds_request *req)
2494 {
2495         struct ceph_mds_session *session = NULL;
2496         int mds = -1;
2497         int err = 0;
2498
2499         if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2500                 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2501                         __unregister_request(mdsc, req);
2502                 return;
2503         }
2504
2505         if (req->r_timeout &&
2506             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2507                 dout("do_request timed out\n");
2508                 err = -EIO;
2509                 goto finish;
2510         }
2511         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2512                 dout("do_request forced umount\n");
2513                 err = -EIO;
2514                 goto finish;
2515         }
2516         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2517                 if (mdsc->mdsmap_err) {
2518                         err = mdsc->mdsmap_err;
2519                         dout("do_request mdsmap err %d\n", err);
2520                         goto finish;
2521                 }
2522                 if (mdsc->mdsmap->m_epoch == 0) {
2523                         dout("do_request no mdsmap, waiting for map\n");
2524                         list_add(&req->r_wait, &mdsc->waiting_for_map);
2525                         return;
2526                 }
2527                 if (!(mdsc->fsc->mount_options->flags &
2528                       CEPH_MOUNT_OPT_MOUNTWAIT) &&
2529                     !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2530                         err = -ENOENT;
2531                         pr_info("probably no mds server is up\n");
2532                         goto finish;
2533                 }
2534         }
2535
2536         put_request_session(req);
2537
2538         mds = __choose_mds(mdsc, req);
2539         if (mds < 0 ||
2540             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2541                 dout("do_request no mds or not active, waiting for map\n");
2542                 list_add(&req->r_wait, &mdsc->waiting_for_map);
2543                 return;
2544         }
2545
2546         /* get, open session */
2547         session = __ceph_lookup_mds_session(mdsc, mds);
2548         if (!session) {
2549                 session = register_session(mdsc, mds);
2550                 if (IS_ERR(session)) {
2551                         err = PTR_ERR(session);
2552                         goto finish;
2553                 }
2554         }
2555         req->r_session = get_session(session);
2556
2557         dout("do_request mds%d session %p state %s\n", mds, session,
2558              ceph_session_state_name(session->s_state));
2559         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2560             session->s_state != CEPH_MDS_SESSION_HUNG) {
2561                 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2562                         err = -EACCES;
2563                         goto out_session;
2564                 }
2565                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2566                     session->s_state == CEPH_MDS_SESSION_CLOSING)
2567                         __open_session(mdsc, session);
2568                 list_add(&req->r_wait, &session->s_waiting);
2569                 goto out_session;
2570         }
2571
2572         /* send request */
2573         req->r_resend_mds = -1;   /* forget any previous mds hint */
2574
2575         if (req->r_request_started == 0)   /* note request start time */
2576                 req->r_request_started = jiffies;
2577
2578         err = __prepare_send_request(mdsc, req, mds, false);
2579         if (!err) {
2580                 ceph_msg_get(req->r_request);
2581                 ceph_con_send(&session->s_con, req->r_request);
2582         }
2583
2584 out_session:
2585         ceph_put_mds_session(session);
2586 finish:
2587         if (err) {
2588                 dout("__do_request early error %d\n", err);
2589                 req->r_err = err;
2590                 complete_request(mdsc, req);
2591                 __unregister_request(mdsc, req);
2592         }
2593         return;
2594 }
2595
2596 /*
2597  * called under mdsc->mutex
2598  */
2599 static void __wake_requests(struct ceph_mds_client *mdsc,
2600                             struct list_head *head)
2601 {
2602         struct ceph_mds_request *req;
2603         LIST_HEAD(tmp_list);
2604
2605         list_splice_init(head, &tmp_list);
2606
2607         while (!list_empty(&tmp_list)) {
2608                 req = list_entry(tmp_list.next,
2609                                  struct ceph_mds_request, r_wait);
2610                 list_del_init(&req->r_wait);
2611                 dout(" wake request %p tid %llu\n", req, req->r_tid);
2612                 __do_request(mdsc, req);
2613         }
2614 }
2615
2616 /*
2617  * Wake up threads with requests pending for @mds, so that they can
2618  * resubmit their requests to a possibly different mds.
2619  */
2620 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2621 {
2622         struct ceph_mds_request *req;
2623         struct rb_node *p = rb_first(&mdsc->request_tree);
2624
2625         dout("kick_requests mds%d\n", mds);
2626         while (p) {
2627                 req = rb_entry(p, struct ceph_mds_request, r_node);
2628                 p = rb_next(p);
2629                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2630                         continue;
2631                 if (req->r_attempts > 0)
2632                         continue; /* only new requests */
2633                 if (req->r_session &&
2634                     req->r_session->s_mds == mds) {
2635                         dout(" kicking tid %llu\n", req->r_tid);
2636                         list_del_init(&req->r_wait);
2637                         __do_request(mdsc, req);
2638                 }
2639         }
2640 }
2641
2642 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2643                               struct ceph_mds_request *req)
2644 {
2645         int err;
2646
2647         /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2648         if (req->r_inode)
2649                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2650         if (req->r_parent)
2651                 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
2652         if (req->r_old_dentry_dir)
2653                 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2654                                   CEPH_CAP_PIN);
2655
2656         dout("submit_request on %p for inode %p\n", req, dir);
2657         mutex_lock(&mdsc->mutex);
2658         __register_request(mdsc, req, dir);
2659         __do_request(mdsc, req);
2660         err = req->r_err;
2661         mutex_unlock(&mdsc->mutex);
2662         return err;
2663 }
2664
2665 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
2666                                   struct ceph_mds_request *req)
2667 {
2668         int err;
2669
2670         /* wait */
2671         dout("do_request waiting\n");
2672         if (!req->r_timeout && req->r_wait_for_completion) {
2673                 err = req->r_wait_for_completion(mdsc, req);
2674         } else {
2675                 long timeleft = wait_for_completion_killable_timeout(
2676                                         &req->r_completion,
2677                                         ceph_timeout_jiffies(req->r_timeout));
2678                 if (timeleft > 0)
2679                         err = 0;
2680                 else if (!timeleft)
2681                         err = -EIO;  /* timed out */
2682                 else
2683                         err = timeleft;  /* killed */
2684         }
2685         dout("do_request waited, got %d\n", err);
2686         mutex_lock(&mdsc->mutex);
2687
2688         /* only abort if we didn't race with a real reply */
2689         if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2690                 err = le32_to_cpu(req->r_reply_info.head->result);
2691         } else if (err < 0) {
2692                 dout("aborted request %lld with %d\n", req->r_tid, err);
2693
2694                 /*
2695                  * ensure we aren't running concurrently with
2696                  * ceph_fill_trace or ceph_readdir_prepopulate, which
2697                  * rely on locks (dir mutex) held by our caller.
2698                  */
2699                 mutex_lock(&req->r_fill_mutex);
2700                 req->r_err = err;
2701                 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
2702                 mutex_unlock(&req->r_fill_mutex);
2703
2704                 if (req->r_parent &&
2705                     (req->r_op & CEPH_MDS_OP_WRITE))
2706                         ceph_invalidate_dir_request(req);
2707         } else {
2708                 err = req->r_err;
2709         }
2710
2711         mutex_unlock(&mdsc->mutex);
2712         return err;
2713 }
2714
2715 /*
2716  * Synchrously perform an mds request.  Take care of all of the
2717  * session setup, forwarding, retry details.
2718  */
2719 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2720                          struct inode *dir,
2721                          struct ceph_mds_request *req)
2722 {
2723         int err;
2724
2725         dout("do_request on %p\n", req);
2726
2727         /* issue */
2728         err = ceph_mdsc_submit_request(mdsc, dir, req);
2729         if (!err)
2730                 err = ceph_mdsc_wait_request(mdsc, req);
2731         dout("do_request %p done, result %d\n", req, err);
2732         return err;
2733 }
2734
2735 /*
2736  * Invalidate dir's completeness, dentry lease state on an aborted MDS
2737  * namespace request.
2738  */
2739 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2740 {
2741         struct inode *dir = req->r_parent;
2742         struct inode *old_dir = req->r_old_dentry_dir;
2743
2744         dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
2745
2746         ceph_dir_clear_complete(dir);
2747         if (old_dir)
2748                 ceph_dir_clear_complete(old_dir);
2749         if (req->r_dentry)
2750                 ceph_invalidate_dentry_lease(req->r_dentry);
2751         if (req->r_old_dentry)
2752                 ceph_invalidate_dentry_lease(req->r_old_dentry);
2753 }
2754
2755 /*
2756  * Handle mds reply.
2757  *
2758  * We take the session mutex and parse and process the reply immediately.
2759  * This preserves the logical ordering of replies, capabilities, etc., sent
2760  * by the MDS as they are applied to our local cache.
2761  */
2762 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2763 {
2764         struct ceph_mds_client *mdsc = session->s_mdsc;
2765         struct ceph_mds_request *req;
2766         struct ceph_mds_reply_head *head = msg->front.iov_base;
2767         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2768         struct ceph_snap_realm *realm;
2769         u64 tid;
2770         int err, result;
2771         int mds = session->s_mds;
2772
2773         if (msg->front.iov_len < sizeof(*head)) {
2774                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2775                 ceph_msg_dump(msg);
2776                 return;
2777         }
2778
2779         /* get request, session */
2780         tid = le64_to_cpu(msg->hdr.tid);
2781         mutex_lock(&mdsc->mutex);
2782         req = lookup_get_request(mdsc, tid);
2783         if (!req) {
2784                 dout("handle_reply on unknown tid %llu\n", tid);
2785                 mutex_unlock(&mdsc->mutex);
2786                 return;
2787         }
2788         dout("handle_reply %p\n", req);
2789
2790         /* correct session? */
2791         if (req->r_session != session) {
2792                 pr_err("mdsc_handle_reply got %llu on session mds%d"
2793                        " not mds%d\n", tid, session->s_mds,
2794                        req->r_session ? req->r_session->s_mds : -1);
2795                 mutex_unlock(&mdsc->mutex);
2796                 goto out;
2797         }
2798
2799         /* dup? */
2800         if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
2801             (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
2802                 pr_warn("got a dup %s reply on %llu from mds%d\n",
2803                            head->safe ? "safe" : "unsafe", tid, mds);
2804                 mutex_unlock(&mdsc->mutex);
2805                 goto out;
2806         }
2807         if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
2808                 pr_warn("got unsafe after safe on %llu from mds%d\n",
2809                            tid, mds);
2810                 mutex_unlock(&mdsc->mutex);
2811                 goto out;
2812         }
2813
2814         result = le32_to_cpu(head->result);
2815
2816         /*
2817          * Handle an ESTALE
2818          * if we're not talking to the authority, send to them
2819          * if the authority has changed while we weren't looking,
2820          * send to new authority
2821          * Otherwise we just have to return an ESTALE
2822          */
2823         if (result == -ESTALE) {
2824                 dout("got ESTALE on request %llu\n", req->r_tid);
2825                 req->r_resend_mds = -1;
2826                 if (req->r_direct_mode != USE_AUTH_MDS) {
2827                         dout("not using auth, setting for that now\n");
2828                         req->r_direct_mode = USE_AUTH_MDS;
2829                         __do_request(mdsc, req);
2830                         mutex_unlock(&mdsc->mutex);
2831                         goto out;
2832                 } else  {
2833                         int mds = __choose_mds(mdsc, req);
2834                         if (mds >= 0 && mds != req->r_session->s_mds) {
2835                                 dout("but auth changed, so resending\n");
2836                                 __do_request(mdsc, req);
2837                                 mutex_unlock(&mdsc->mutex);
2838                                 goto out;
2839                         }
2840                 }
2841                 dout("have to return ESTALE on request %llu\n", req->r_tid);
2842         }
2843
2844
2845         if (head->safe) {
2846                 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
2847                 __unregister_request(mdsc, req);
2848
2849                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2850                         /*
2851                          * We already handled the unsafe response, now do the
2852                          * cleanup.  No need to examine the response; the MDS
2853                          * doesn't include any result info in the safe
2854                          * response.  And even if it did, there is nothing
2855                          * useful we could do with a revised return value.
2856                          */
2857                         dout("got safe reply %llu, mds%d\n", tid, mds);
2858
2859                         /* last unsafe request during umount? */
2860                         if (mdsc->stopping && !__get_oldest_req(mdsc))
2861                                 complete_all(&mdsc->safe_umount_waiters);
2862                         mutex_unlock(&mdsc->mutex);
2863                         goto out;
2864                 }
2865         } else {
2866                 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
2867                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2868                 if (req->r_unsafe_dir) {
2869                         struct ceph_inode_info *ci =
2870                                         ceph_inode(req->r_unsafe_dir);
2871                         spin_lock(&ci->i_unsafe_lock);
2872                         list_add_tail(&req->r_unsafe_dir_item,
2873                                       &ci->i_unsafe_dirops);
2874                         spin_unlock(&ci->i_unsafe_lock);
2875                 }
2876         }
2877
2878         dout("handle_reply tid %lld result %d\n", tid, result);
2879         rinfo = &req->r_reply_info;
2880         if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
2881                 err = parse_reply_info(msg, rinfo, (u64)-1);
2882         else
2883                 err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2884         mutex_unlock(&mdsc->mutex);
2885
2886         mutex_lock(&session->s_mutex);
2887         if (err < 0) {
2888                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2889                 ceph_msg_dump(msg);
2890                 goto out_err;
2891         }
2892
2893         /* snap trace */
2894         realm = NULL;
2895         if (rinfo->snapblob_len) {
2896                 down_write(&mdsc->snap_rwsem);
2897                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2898                                 rinfo->snapblob + rinfo->snapblob_len,
2899                                 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2900                                 &realm);
2901                 downgrade_write(&mdsc->snap_rwsem);
2902         } else {
2903                 down_read(&mdsc->snap_rwsem);
2904         }
2905
2906         /* insert trace into our cache */
2907         mutex_lock(&req->r_fill_mutex);
2908         current->journal_info = req;
2909         err = ceph_fill_trace(mdsc->fsc->sb, req);
2910         if (err == 0) {
2911                 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2912                                     req->r_op == CEPH_MDS_OP_LSSNAP))
2913                         ceph_readdir_prepopulate(req, req->r_session);
2914         }
2915         current->journal_info = NULL;
2916         mutex_unlock(&req->r_fill_mutex);
2917
2918         up_read(&mdsc->snap_rwsem);
2919         if (realm)
2920                 ceph_put_snap_realm(mdsc, realm);
2921
2922         if (err == 0) {
2923                 if (req->r_target_inode &&
2924                     test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2925                         struct ceph_inode_info *ci =
2926                                 ceph_inode(req->r_target_inode);
2927                         spin_lock(&ci->i_unsafe_lock);
2928                         list_add_tail(&req->r_unsafe_target_item,
2929                                       &ci->i_unsafe_iops);
2930                         spin_unlock(&ci->i_unsafe_lock);
2931                 }
2932
2933                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2934         }
2935 out_err:
2936         mutex_lock(&mdsc->mutex);
2937         if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2938                 if (err) {
2939                         req->r_err = err;
2940                 } else {
2941                         req->r_reply =  ceph_msg_get(msg);
2942                         set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
2943                 }
2944         } else {
2945                 dout("reply arrived after request %lld was aborted\n", tid);
2946         }
2947         mutex_unlock(&mdsc->mutex);
2948
2949         mutex_unlock(&session->s_mutex);
2950
2951         /* kick calling process */
2952         complete_request(mdsc, req);
2953 out:
2954         ceph_mdsc_put_request(req);
2955         return;
2956 }
2957
2958
2959
2960 /*
2961  * handle mds notification that our request has been forwarded.
2962  */
2963 static void handle_forward(struct ceph_mds_client *mdsc,
2964                            struct ceph_mds_session *session,
2965                            struct ceph_msg *msg)
2966 {
2967         struct ceph_mds_request *req;
2968         u64 tid = le64_to_cpu(msg->hdr.tid);
2969         u32 next_mds;
2970         u32 fwd_seq;
2971         int err = -EINVAL;
2972         void *p = msg->front.iov_base;
2973         void *end = p + msg->front.iov_len;
2974
2975         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2976         next_mds = ceph_decode_32(&p);
2977         fwd_seq = ceph_decode_32(&p);
2978
2979         mutex_lock(&mdsc->mutex);
2980         req = lookup_get_request(mdsc, tid);
2981         if (!req) {
2982                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2983                 goto out;  /* dup reply? */
2984         }
2985
2986         if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2987                 dout("forward tid %llu aborted, unregistering\n", tid);
2988                 __unregister_request(mdsc, req);
2989         } else if (fwd_seq <= req->r_num_fwd) {
2990                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2991                      tid, next_mds, req->r_num_fwd, fwd_seq);
2992         } else {
2993                 /* resend. forward race not possible; mds would drop */
2994                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2995                 BUG_ON(req->r_err);
2996                 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
2997                 req->r_attempts = 0;
2998                 req->r_num_fwd = fwd_seq;
2999                 req->r_resend_mds = next_mds;
3000                 put_request_session(req);
3001                 __do_request(mdsc, req);
3002         }
3003         ceph_mdsc_put_request(req);
3004 out:
3005         mutex_unlock(&mdsc->mutex);
3006         return;
3007
3008 bad:
3009         pr_err("mdsc_handle_forward decode error err=%d\n", err);
3010 }
3011
3012 static int __decode_and_drop_session_metadata(void **p, void *end)
3013 {
3014         /* map<string,string> */
3015         u32 n;
3016         ceph_decode_32_safe(p, end, n, bad);
3017         while (n-- > 0) {
3018                 u32 len;
3019                 ceph_decode_32_safe(p, end, len, bad);
3020                 ceph_decode_need(p, end, len, bad);
3021                 *p += len;
3022                 ceph_decode_32_safe(p, end, len, bad);
3023                 ceph_decode_need(p, end, len, bad);
3024                 *p += len;
3025         }
3026         return 0;
3027 bad:
3028         return -1;
3029 }
3030
3031 /*
3032  * handle a mds session control message
3033  */
3034 static void handle_session(struct ceph_mds_session *session,
3035                            struct ceph_msg *msg)
3036 {
3037         struct ceph_mds_client *mdsc = session->s_mdsc;
3038         int mds = session->s_mds;
3039         int msg_version = le16_to_cpu(msg->hdr.version);
3040         void *p = msg->front.iov_base;
3041         void *end = p + msg->front.iov_len;
3042         struct ceph_mds_session_head *h;
3043         u32 op;
3044         u64 seq;
3045         unsigned long features = 0;
3046         int wake = 0;
3047
3048         /* decode */
3049         ceph_decode_need(&p, end, sizeof(*h), bad);
3050         h = p;
3051         p += sizeof(*h);
3052
3053         op = le32_to_cpu(h->op);
3054         seq = le64_to_cpu(h->seq);
3055
3056         if (msg_version >= 3) {
3057                 u32 len;
3058                 /* version >= 2, metadata */
3059                 if (__decode_and_drop_session_metadata(&p, end) < 0)
3060                         goto bad;
3061                 /* version >= 3, feature bits */
3062                 ceph_decode_32_safe(&p, end, len, bad);
3063                 ceph_decode_need(&p, end, len, bad);
3064                 memcpy(&features, p, min_t(size_t, len, sizeof(features)));
3065                 p += len;
3066         }
3067
3068         mutex_lock(&mdsc->mutex);
3069         if (op == CEPH_SESSION_CLOSE) {
3070                 get_session(session);
3071                 __unregister_session(mdsc, session);
3072         }
3073         /* FIXME: this ttl calculation is generous */
3074         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3075         mutex_unlock(&mdsc->mutex);
3076
3077         mutex_lock(&session->s_mutex);
3078
3079         dout("handle_session mds%d %s %p state %s seq %llu\n",
3080              mds, ceph_session_op_name(op), session,
3081              ceph_session_state_name(session->s_state), seq);
3082
3083         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3084                 session->s_state = CEPH_MDS_SESSION_OPEN;
3085                 pr_info("mds%d came back\n", session->s_mds);
3086         }
3087
3088         switch (op) {
3089         case CEPH_SESSION_OPEN:
3090                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3091                         pr_info("mds%d reconnect success\n", session->s_mds);
3092                 session->s_state = CEPH_MDS_SESSION_OPEN;
3093                 session->s_features = features;
3094                 renewed_caps(mdsc, session, 0);
3095                 wake = 1;
3096                 if (mdsc->stopping)
3097                         __close_session(mdsc, session);
3098                 break;
3099
3100         case CEPH_SESSION_RENEWCAPS:
3101                 if (session->s_renew_seq == seq)
3102                         renewed_caps(mdsc, session, 1);
3103                 break;
3104
3105         case CEPH_SESSION_CLOSE:
3106                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3107                         pr_info("mds%d reconnect denied\n", session->s_mds);
3108                 cleanup_session_requests(mdsc, session);
3109                 remove_session_caps(session);
3110                 wake = 2; /* for good measure */
3111                 wake_up_all(&mdsc->session_close_wq);
3112                 break;
3113
3114         case CEPH_SESSION_STALE:
3115                 pr_info("mds%d caps went stale, renewing\n",
3116                         session->s_mds);
3117                 spin_lock(&session->s_gen_ttl_lock);
3118                 session->s_cap_gen++;
3119                 session->s_cap_ttl = jiffies - 1;
3120                 spin_unlock(&session->s_gen_ttl_lock);
3121                 send_renew_caps(mdsc, session);
3122                 break;
3123
3124         case CEPH_SESSION_RECALL_STATE:
3125                 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3126                 break;
3127
3128         case CEPH_SESSION_FLUSHMSG:
3129                 send_flushmsg_ack(mdsc, session, seq);
3130                 break;
3131
3132         case CEPH_SESSION_FORCE_RO:
3133                 dout("force_session_readonly %p\n", session);
3134                 spin_lock(&session->s_cap_lock);
3135                 session->s_readonly = true;
3136                 spin_unlock(&session->s_cap_lock);
3137                 wake_up_session_caps(session, FORCE_RO);
3138                 break;
3139
3140         case CEPH_SESSION_REJECT:
3141                 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3142                 pr_info("mds%d rejected session\n", session->s_mds);
3143                 session->s_state = CEPH_MDS_SESSION_REJECTED;
3144                 cleanup_session_requests(mdsc, session);
3145                 remove_session_caps(session);
3146                 wake = 2; /* for good measure */
3147                 break;
3148
3149         default:
3150                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3151                 WARN_ON(1);
3152         }
3153
3154         mutex_unlock(&session->s_mutex);
3155         if (wake) {
3156                 mutex_lock(&mdsc->mutex);
3157                 __wake_requests(mdsc, &session->s_waiting);
3158                 if (wake == 2)
3159                         kick_requests(mdsc, mds);
3160                 mutex_unlock(&mdsc->mutex);
3161         }
3162         if (op == CEPH_SESSION_CLOSE)
3163                 ceph_put_mds_session(session);
3164         return;
3165
3166 bad:
3167         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3168                (int)msg->front.iov_len);
3169         ceph_msg_dump(msg);
3170         return;
3171 }
3172
3173
3174 /*
3175  * called under session->mutex.
3176  */
3177 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3178                                    struct ceph_mds_session *session)
3179 {
3180         struct ceph_mds_request *req, *nreq;
3181         struct rb_node *p;
3182         int err;
3183
3184         dout("replay_unsafe_requests mds%d\n", session->s_mds);
3185
3186         mutex_lock(&mdsc->mutex);
3187         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
3188                 err = __prepare_send_request(mdsc, req, session->s_mds, true);
3189                 if (!err) {
3190                         ceph_msg_get(req->r_request);
3191                         ceph_con_send(&session->s_con, req->r_request);
3192                 }
3193         }
3194
3195         /*
3196          * also re-send old requests when MDS enters reconnect stage. So that MDS
3197          * can process completed request in clientreplay stage.
3198          */
3199         p = rb_first(&mdsc->request_tree);
3200         while (p) {
3201                 req = rb_entry(p, struct ceph_mds_request, r_node);
3202                 p = rb_next(p);
3203                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3204                         continue;
3205                 if (req->r_attempts == 0)
3206                         continue; /* only old requests */
3207                 if (req->r_session &&
3208                     req->r_session->s_mds == session->s_mds) {
3209                         err = __prepare_send_request(mdsc, req,
3210                                                      session->s_mds, true);
3211                         if (!err) {
3212                                 ceph_msg_get(req->r_request);
3213                                 ceph_con_send(&session->s_con, req->r_request);
3214                         }
3215                 }
3216         }
3217         mutex_unlock(&mdsc->mutex);
3218 }
3219
3220 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3221 {
3222         struct ceph_msg *reply;
3223         struct ceph_pagelist *_pagelist;
3224         struct page *page;
3225         __le32 *addr;
3226         int err = -ENOMEM;
3227
3228         if (!recon_state->allow_multi)
3229                 return -ENOSPC;
3230
3231         /* can't handle message that contains both caps and realm */
3232         BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3233
3234         /* pre-allocate new pagelist */
3235         _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3236         if (!_pagelist)
3237                 return -ENOMEM;
3238
3239         reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3240         if (!reply)
3241                 goto fail_msg;
3242
3243         /* placeholder for nr_caps */
3244         err = ceph_pagelist_encode_32(_pagelist, 0);
3245         if (err < 0)
3246                 goto fail;
3247
3248         if (recon_state->nr_caps) {
3249                 /* currently encoding caps */
3250                 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3251                 if (err)
3252                         goto fail;
3253         } else {
3254                 /* placeholder for nr_realms (currently encoding relams) */
3255                 err = ceph_pagelist_encode_32(_pagelist, 0);
3256                 if (err < 0)
3257                         goto fail;
3258         }
3259
3260         err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3261         if (err)
3262                 goto fail;
3263
3264         page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3265         addr = kmap_atomic(page);
3266         if (recon_state->nr_caps) {
3267                 /* currently encoding caps */
3268                 *addr = cpu_to_le32(recon_state->nr_caps);
3269         } else {
3270                 /* currently encoding relams */
3271                 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3272         }
3273         kunmap_atomic(addr);
3274
3275         reply->hdr.version = cpu_to_le16(5);
3276         reply->hdr.compat_version = cpu_to_le16(4);
3277
3278         reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3279         ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3280
3281         ceph_con_send(&recon_state->session->s_con, reply);
3282         ceph_pagelist_release(recon_state->pagelist);
3283
3284         recon_state->pagelist = _pagelist;
3285         recon_state->nr_caps = 0;
3286         recon_state->nr_realms = 0;
3287         recon_state->msg_version = 5;
3288         return 0;
3289 fail:
3290         ceph_msg_put(reply);
3291 fail_msg:
3292         ceph_pagelist_release(_pagelist);
3293         return err;
3294 }
3295
3296 /*
3297  * Encode information about a cap for a reconnect with the MDS.
3298  */
3299 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
3300                           void *arg)
3301 {
3302         union {
3303                 struct ceph_mds_cap_reconnect v2;
3304                 struct ceph_mds_cap_reconnect_v1 v1;
3305         } rec;
3306         struct ceph_inode_info *ci = cap->ci;
3307         struct ceph_reconnect_state *recon_state = arg;
3308         struct ceph_pagelist *pagelist = recon_state->pagelist;
3309         int err;
3310         u64 snap_follows;
3311
3312         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3313              inode, ceph_vinop(inode), cap, cap->cap_id,
3314              ceph_cap_string(cap->issued));
3315
3316         spin_lock(&ci->i_ceph_lock);
3317         cap->seq = 0;        /* reset cap seq */
3318         cap->issue_seq = 0;  /* and issue_seq */
3319         cap->mseq = 0;       /* and migrate_seq */
3320         cap->cap_gen = cap->session->s_cap_gen;
3321
3322         if (recon_state->msg_version >= 2) {
3323                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3324                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3325                 rec.v2.issued = cpu_to_le32(cap->issued);
3326                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3327                 rec.v2.pathbase = 0;
3328                 rec.v2.flock_len = (__force __le32)
3329                         ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3330         } else {
3331                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3332                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3333                 rec.v1.issued = cpu_to_le32(cap->issued);
3334                 rec.v1.size = cpu_to_le64(inode->i_size);
3335                 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3336                 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3337                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3338                 rec.v1.pathbase = 0;
3339         }
3340
3341         if (list_empty(&ci->i_cap_snaps)) {
3342                 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3343         } else {
3344                 struct ceph_cap_snap *capsnap =
3345                         list_first_entry(&ci->i_cap_snaps,
3346                                          struct ceph_cap_snap, ci_item);
3347                 snap_follows = capsnap->follows;
3348         }
3349         spin_unlock(&ci->i_ceph_lock);
3350
3351         if (recon_state->msg_version >= 2) {
3352                 int num_fcntl_locks, num_flock_locks;
3353                 struct ceph_filelock *flocks = NULL;
3354                 size_t struct_len, total_len = sizeof(u64);
3355                 u8 struct_v = 0;
3356
3357 encode_again:
3358                 if (rec.v2.flock_len) {
3359                         ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3360                 } else {
3361                         num_fcntl_locks = 0;
3362                         num_flock_locks = 0;
3363                 }
3364                 if (num_fcntl_locks + num_flock_locks > 0) {
3365                         flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3366                                                sizeof(struct ceph_filelock),
3367                                                GFP_NOFS);
3368                         if (!flocks) {
3369                                 err = -ENOMEM;
3370                                 goto out_err;
3371                         }
3372                         err = ceph_encode_locks_to_buffer(inode, flocks,
3373                                                           num_fcntl_locks,
3374                                                           num_flock_locks);
3375                         if (err) {
3376                                 kfree(flocks);
3377                                 flocks = NULL;
3378                                 if (err == -ENOSPC)
3379                                         goto encode_again;
3380                                 goto out_err;
3381                         }
3382                 } else {
3383                         kfree(flocks);
3384                         flocks = NULL;
3385                 }
3386
3387                 if (recon_state->msg_version >= 3) {
3388                         /* version, compat_version and struct_len */
3389                         total_len += 2 * sizeof(u8) + sizeof(u32);
3390                         struct_v = 2;
3391                 }
3392                 /*
3393                  * number of encoded locks is stable, so copy to pagelist
3394                  */
3395                 struct_len = 2 * sizeof(u32) +
3396                             (num_fcntl_locks + num_flock_locks) *
3397                             sizeof(struct ceph_filelock);
3398                 rec.v2.flock_len = cpu_to_le32(struct_len);
3399
3400                 struct_len += sizeof(u32) + sizeof(rec.v2);
3401
3402                 if (struct_v >= 2)
3403                         struct_len += sizeof(u64); /* snap_follows */
3404
3405                 total_len += struct_len;
3406
3407                 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3408                         err = send_reconnect_partial(recon_state);
3409                         if (err)
3410                                 goto out_freeflocks;
3411                         pagelist = recon_state->pagelist;
3412                 }
3413
3414                 err = ceph_pagelist_reserve(pagelist, total_len);
3415                 if (err)
3416                         goto out_freeflocks;
3417
3418                 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3419                 if (recon_state->msg_version >= 3) {
3420                         ceph_pagelist_encode_8(pagelist, struct_v);
3421                         ceph_pagelist_encode_8(pagelist, 1);
3422                         ceph_pagelist_encode_32(pagelist, struct_len);
3423                 }
3424                 ceph_pagelist_encode_string(pagelist, NULL, 0);
3425                 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3426                 ceph_locks_to_pagelist(flocks, pagelist,
3427                                        num_fcntl_locks, num_flock_locks);
3428                 if (struct_v >= 2)
3429                         ceph_pagelist_encode_64(pagelist, snap_follows);
3430 out_freeflocks:
3431                 kfree(flocks);
3432         } else {
3433                 u64 pathbase = 0;
3434                 int pathlen = 0;
3435                 char *path = NULL;
3436                 struct dentry *dentry;
3437
3438                 dentry = d_find_alias(inode);
3439                 if (dentry) {
3440                         path = ceph_mdsc_build_path(dentry,
3441                                                 &pathlen, &pathbase, 0);
3442                         dput(dentry);
3443                         if (IS_ERR(path)) {
3444                                 err = PTR_ERR(path);
3445                                 goto out_err;
3446                         }
3447                         rec.v1.pathbase = cpu_to_le64(pathbase);
3448                 }
3449
3450                 err = ceph_pagelist_reserve(pagelist,
3451                                             sizeof(u64) + sizeof(u32) +
3452                                             pathlen + sizeof(rec.v1));
3453                 if (err) {
3454                         goto out_freepath;
3455                 }
3456
3457                 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3458                 ceph_pagelist_encode_string(pagelist, path, pathlen);
3459                 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3460 out_freepath:
3461                 ceph_mdsc_free_path(path, pathlen);
3462         }
3463
3464 out_err:
3465         if (err >= 0)
3466                 recon_state->nr_caps++;
3467         return err;
3468 }
3469
3470 static int encode_snap_realms(struct ceph_mds_client *mdsc,
3471                               struct ceph_reconnect_state *recon_state)
3472 {
3473         struct rb_node *p;
3474         struct ceph_pagelist *pagelist = recon_state->pagelist;
3475         int err = 0;
3476
3477         if (recon_state->msg_version >= 4) {
3478                 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3479                 if (err < 0)
3480                         goto fail;
3481         }
3482
3483         /*
3484          * snaprealms.  we provide mds with the ino, seq (version), and
3485          * parent for all of our realms.  If the mds has any newer info,
3486          * it will tell us.
3487          */
3488         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3489                 struct ceph_snap_realm *realm =
3490                        rb_entry(p, struct ceph_snap_realm, node);
3491                 struct ceph_mds_snaprealm_reconnect sr_rec;
3492
3493                 if (recon_state->msg_version >= 4) {
3494                         size_t need = sizeof(u8) * 2 + sizeof(u32) +
3495                                       sizeof(sr_rec);
3496
3497                         if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3498                                 err = send_reconnect_partial(recon_state);
3499                                 if (err)
3500                                         goto fail;
3501                                 pagelist = recon_state->pagelist;
3502                         }
3503
3504                         err = ceph_pagelist_reserve(pagelist, need);
3505                         if (err)
3506                                 goto fail;
3507
3508                         ceph_pagelist_encode_8(pagelist, 1);
3509                         ceph_pagelist_encode_8(pagelist, 1);
3510                         ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3511                 }
3512
3513                 dout(" adding snap realm %llx seq %lld parent %llx\n",
3514                      realm->ino, realm->seq, realm->parent_ino);
3515                 sr_rec.ino = cpu_to_le64(realm->ino);
3516                 sr_rec.seq = cpu_to_le64(realm->seq);
3517                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
3518
3519                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3520                 if (err)
3521                         goto fail;
3522
3523                 recon_state->nr_realms++;
3524         }
3525 fail:
3526         return err;
3527 }
3528
3529
3530 /*
3531  * If an MDS fails and recovers, clients need to reconnect in order to
3532  * reestablish shared state.  This includes all caps issued through
3533  * this session _and_ the snap_realm hierarchy.  Because it's not
3534  * clear which snap realms the mds cares about, we send everything we
3535  * know about.. that ensures we'll then get any new info the
3536  * recovering MDS might have.
3537  *
3538  * This is a relatively heavyweight operation, but it's rare.
3539  *
3540  * called with mdsc->mutex held.
3541  */
3542 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3543                                struct ceph_mds_session *session)
3544 {
3545         struct ceph_msg *reply;
3546         int mds = session->s_mds;
3547         int err = -ENOMEM;
3548         struct ceph_reconnect_state recon_state = {
3549                 .session = session,
3550         };
3551         LIST_HEAD(dispose);
3552
3553         pr_info("mds%d reconnect start\n", mds);
3554
3555         recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3556         if (!recon_state.pagelist)
3557                 goto fail_nopagelist;
3558
3559         reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3560         if (!reply)
3561                 goto fail_nomsg;
3562
3563         mutex_lock(&session->s_mutex);
3564         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3565         session->s_seq = 0;
3566
3567         dout("session %p state %s\n", session,
3568              ceph_session_state_name(session->s_state));
3569
3570         spin_lock(&session->s_gen_ttl_lock);
3571         session->s_cap_gen++;
3572         spin_unlock(&session->s_gen_ttl_lock);
3573
3574         spin_lock(&session->s_cap_lock);
3575         /* don't know if session is readonly */
3576         session->s_readonly = 0;
3577         /*
3578          * notify __ceph_remove_cap() that we are composing cap reconnect.
3579          * If a cap get released before being added to the cap reconnect,
3580          * __ceph_remove_cap() should skip queuing cap release.
3581          */
3582         session->s_cap_reconnect = 1;
3583         /* drop old cap expires; we're about to reestablish that state */
3584         detach_cap_releases(session, &dispose);
3585         spin_unlock(&session->s_cap_lock);
3586         dispose_cap_releases(mdsc, &dispose);
3587
3588         /* trim unused caps to reduce MDS's cache rejoin time */
3589         if (mdsc->fsc->sb->s_root)
3590                 shrink_dcache_parent(mdsc->fsc->sb->s_root);
3591
3592         ceph_con_close(&session->s_con);
3593         ceph_con_open(&session->s_con,
3594                       CEPH_ENTITY_TYPE_MDS, mds,
3595                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
3596
3597         /* replay unsafe requests */
3598         replay_unsafe_requests(mdsc, session);
3599
3600         ceph_early_kick_flushing_caps(mdsc, session);
3601
3602         down_read(&mdsc->snap_rwsem);
3603
3604         /* placeholder for nr_caps */
3605         err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
3606         if (err)
3607                 goto fail;
3608
3609         if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
3610                 recon_state.msg_version = 3;
3611                 recon_state.allow_multi = true;
3612         } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
3613                 recon_state.msg_version = 3;
3614         } else {
3615                 recon_state.msg_version = 2;
3616         }
3617         /* trsaverse this session's caps */
3618         err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state);
3619
3620         spin_lock(&session->s_cap_lock);
3621         session->s_cap_reconnect = 0;
3622         spin_unlock(&session->s_cap_lock);
3623
3624         if (err < 0)
3625                 goto fail;
3626
3627         /* check if all realms can be encoded into current message */
3628         if (mdsc->num_snap_realms) {
3629                 size_t total_len =
3630                         recon_state.pagelist->length +
3631                         mdsc->num_snap_realms *
3632                         sizeof(struct ceph_mds_snaprealm_reconnect);
3633                 if (recon_state.msg_version >= 4) {
3634                         /* number of realms */
3635                         total_len += sizeof(u32);
3636                         /* version, compat_version and struct_len */
3637                         total_len += mdsc->num_snap_realms *
3638                                      (2 * sizeof(u8) + sizeof(u32));
3639                 }
3640                 if (total_len > RECONNECT_MAX_SIZE) {
3641                         if (!recon_state.allow_multi) {
3642                                 err = -ENOSPC;
3643                                 goto fail;
3644                         }
3645                         if (recon_state.nr_caps) {
3646                                 err = send_reconnect_partial(&recon_state);
3647                                 if (err)
3648                                         goto fail;
3649                         }
3650                         recon_state.msg_version = 5;
3651                 }
3652         }
3653
3654         err = encode_snap_realms(mdsc, &recon_state);
3655         if (err < 0)
3656                 goto fail;
3657
3658         if (recon_state.msg_version >= 5) {
3659                 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
3660                 if (err < 0)
3661                         goto fail;
3662         }
3663
3664         if (recon_state.nr_caps || recon_state.nr_realms) {
3665                 struct page *page =
3666                         list_first_entry(&recon_state.pagelist->head,
3667                                         struct page, lru);
3668                 __le32 *addr = kmap_atomic(page);
3669                 if (recon_state.nr_caps) {
3670                         WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
3671                         *addr = cpu_to_le32(recon_state.nr_caps);
3672                 } else if (recon_state.msg_version >= 4) {
3673                         *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
3674                 }
3675                 kunmap_atomic(addr);
3676         }
3677
3678         reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3679         if (recon_state.msg_version >= 4)
3680                 reply->hdr.compat_version = cpu_to_le16(4);
3681
3682         reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
3683         ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
3684
3685         ceph_con_send(&session->s_con, reply);
3686
3687         mutex_unlock(&session->s_mutex);
3688
3689         mutex_lock(&mdsc->mutex);
3690         __wake_requests(mdsc, &session->s_waiting);
3691         mutex_unlock(&mdsc->mutex);
3692
3693         up_read(&mdsc->snap_rwsem);
3694         ceph_pagelist_release(recon_state.pagelist);
3695         return;
3696
3697 fail:
3698         ceph_msg_put(reply);
3699         up_read(&mdsc->snap_rwsem);
3700         mutex_unlock(&session->s_mutex);
3701 fail_nomsg:
3702         ceph_pagelist_release(recon_state.pagelist);
3703 fail_nopagelist:
3704         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3705         return;
3706 }
3707
3708
3709 /*
3710  * compare old and new mdsmaps, kicking requests
3711  * and closing out old connections as necessary
3712  *
3713  * called under mdsc->mutex.
3714  */
3715 static void check_new_map(struct ceph_mds_client *mdsc,
3716                           struct ceph_mdsmap *newmap,
3717                           struct ceph_mdsmap *oldmap)
3718 {
3719         int i;
3720         int oldstate, newstate;
3721         struct ceph_mds_session *s;
3722
3723         dout("check_new_map new %u old %u\n",
3724              newmap->m_epoch, oldmap->m_epoch);
3725
3726         for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
3727                 if (!mdsc->sessions[i])
3728                         continue;
3729                 s = mdsc->sessions[i];
3730                 oldstate = ceph_mdsmap_get_state(oldmap, i);
3731                 newstate = ceph_mdsmap_get_state(newmap, i);
3732
3733                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3734                      i, ceph_mds_state_name(oldstate),
3735                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3736                      ceph_mds_state_name(newstate),
3737                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3738                      ceph_session_state_name(s->s_state));
3739
3740                 if (i >= newmap->m_num_mds ||
3741                     memcmp(ceph_mdsmap_get_addr(oldmap, i),
3742                            ceph_mdsmap_get_addr(newmap, i),
3743                            sizeof(struct ceph_entity_addr))) {
3744                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3745                                 /* the session never opened, just close it
3746                                  * out now */
3747                                 get_session(s);
3748                                 __unregister_session(mdsc, s);
3749                                 __wake_requests(mdsc, &s->s_waiting);
3750                                 ceph_put_mds_session(s);
3751                         } else if (i >= newmap->m_num_mds) {
3752                                 /* force close session for stopped mds */
3753                                 get_session(s);
3754                                 __unregister_session(mdsc, s);
3755                                 __wake_requests(mdsc, &s->s_waiting);
3756                                 kick_requests(mdsc, i);
3757                                 mutex_unlock(&mdsc->mutex);
3758
3759                                 mutex_lock(&s->s_mutex);
3760                                 cleanup_session_requests(mdsc, s);
3761                                 remove_session_caps(s);
3762                                 mutex_unlock(&s->s_mutex);
3763
3764                                 ceph_put_mds_session(s);
3765
3766                                 mutex_lock(&mdsc->mutex);
3767                         } else {
3768                                 /* just close it */
3769                                 mutex_unlock(&mdsc->mutex);
3770                                 mutex_lock(&s->s_mutex);
3771                                 mutex_lock(&mdsc->mutex);
3772                                 ceph_con_close(&s->s_con);
3773                                 mutex_unlock(&s->s_mutex);
3774                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
3775                         }
3776                 } else if (oldstate == newstate) {
3777                         continue;  /* nothing new with this mds */
3778                 }
3779
3780                 /*
3781                  * send reconnect?
3782                  */
3783                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3784                     newstate >= CEPH_MDS_STATE_RECONNECT) {
3785                         mutex_unlock(&mdsc->mutex);
3786                         send_mds_reconnect(mdsc, s);
3787                         mutex_lock(&mdsc->mutex);
3788                 }
3789
3790                 /*
3791                  * kick request on any mds that has gone active.
3792                  */
3793                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3794                     newstate >= CEPH_MDS_STATE_ACTIVE) {
3795                         if (oldstate != CEPH_MDS_STATE_CREATING &&
3796                             oldstate != CEPH_MDS_STATE_STARTING)
3797                                 pr_info("mds%d recovery completed\n", s->s_mds);
3798                         kick_requests(mdsc, i);
3799                         ceph_kick_flushing_caps(mdsc, s);
3800                         wake_up_session_caps(s, RECONNECT);
3801                 }
3802         }
3803
3804         for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
3805                 s = mdsc->sessions[i];
3806                 if (!s)
3807                         continue;
3808                 if (!ceph_mdsmap_is_laggy(newmap, i))
3809                         continue;
3810                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3811                     s->s_state == CEPH_MDS_SESSION_HUNG ||
3812                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
3813                         dout(" connecting to export targets of laggy mds%d\n",
3814                              i);
3815                         __open_export_target_sessions(mdsc, s);
3816                 }
3817         }
3818 }
3819
3820
3821
3822 /*
3823  * leases
3824  */
3825
3826 /*
3827  * caller must hold session s_mutex, dentry->d_lock
3828  */
3829 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3830 {
3831         struct ceph_dentry_info *di = ceph_dentry(dentry);
3832
3833         ceph_put_mds_session(di->lease_session);
3834         di->lease_session = NULL;
3835 }
3836
3837 static void handle_lease(struct ceph_mds_client *mdsc,
3838                          struct ceph_mds_session *session,
3839                          struct ceph_msg *msg)
3840 {
3841         struct super_block *sb = mdsc->fsc->sb;
3842         struct inode *inode;
3843         struct dentry *parent, *dentry;
3844         struct ceph_dentry_info *di;
3845         int mds = session->s_mds;
3846         struct ceph_mds_lease *h = msg->front.iov_base;
3847         u32 seq;
3848         struct ceph_vino vino;
3849         struct qstr dname;
3850         int release = 0;
3851
3852         dout("handle_lease from mds%d\n", mds);
3853
3854         /* decode */
3855         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3856                 goto bad;
3857         vino.ino = le64_to_cpu(h->ino);
3858         vino.snap = CEPH_NOSNAP;
3859         seq = le32_to_cpu(h->seq);
3860         dname.len = get_unaligned_le32(h + 1);
3861         if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
3862                 goto bad;
3863         dname.name = (void *)(h + 1) + sizeof(u32);
3864
3865         /* lookup inode */
3866         inode = ceph_find_inode(sb, vino);
3867         dout("handle_lease %s, ino %llx %p %.*s\n",
3868              ceph_lease_op_name(h->action), vino.ino, inode,
3869              dname.len, dname.name);
3870
3871         mutex_lock(&session->s_mutex);
3872         session->s_seq++;
3873
3874         if (!inode) {
3875                 dout("handle_lease no inode %llx\n", vino.ino);
3876                 goto release;
3877         }
3878
3879         /* dentry */
3880         parent = d_find_alias(inode);
3881         if (!parent) {
3882                 dout("no parent dentry on inode %p\n", inode);
3883                 WARN_ON(1);
3884                 goto release;  /* hrm... */
3885         }
3886         dname.hash = full_name_hash(parent, dname.name, dname.len);
3887         dentry = d_lookup(parent, &dname);
3888         dput(parent);
3889         if (!dentry)
3890                 goto release;
3891
3892         spin_lock(&dentry->d_lock);
3893         di = ceph_dentry(dentry);
3894         switch (h->action) {
3895         case CEPH_MDS_LEASE_REVOKE:
3896                 if (di->lease_session == session) {
3897                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3898                                 h->seq = cpu_to_le32(di->lease_seq);
3899                         __ceph_mdsc_drop_dentry_lease(dentry);
3900                 }
3901                 release = 1;
3902                 break;
3903
3904         case CEPH_MDS_LEASE_RENEW:
3905                 if (di->lease_session == session &&
3906                     di->lease_gen == session->s_cap_gen &&
3907                     di->lease_renew_from &&
3908                     di->lease_renew_after == 0) {
3909                         unsigned long duration =
3910                                 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3911
3912                         di->lease_seq = seq;
3913                         di->time = di->lease_renew_from + duration;
3914                         di->lease_renew_after = di->lease_renew_from +
3915                                 (duration >> 1);
3916                         di->lease_renew_from = 0;
3917                 }
3918                 break;
3919         }
3920         spin_unlock(&dentry->d_lock);
3921         dput(dentry);
3922
3923         if (!release)
3924                 goto out;
3925
3926 release:
3927         /* let's just reuse the same message */
3928         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3929         ceph_msg_get(msg);
3930         ceph_con_send(&session->s_con, msg);
3931
3932 out:
3933         mutex_unlock(&session->s_mutex);
3934         /* avoid calling iput_final() in mds dispatch threads */
3935         ceph_async_iput(inode);
3936         return;
3937
3938 bad:
3939         pr_err("corrupt lease message\n");
3940         ceph_msg_dump(msg);
3941 }
3942
3943 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3944                               struct dentry *dentry, char action,
3945                               u32 seq)
3946 {
3947         struct ceph_msg *msg;
3948         struct ceph_mds_lease *lease;
3949         struct inode *dir;
3950         int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
3951
3952         dout("lease_send_msg identry %p %s to mds%d\n",
3953              dentry, ceph_lease_op_name(action), session->s_mds);
3954
3955         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3956         if (!msg)
3957                 return;
3958         lease = msg->front.iov_base;
3959         lease->action = action;
3960         lease->seq = cpu_to_le32(seq);
3961
3962         spin_lock(&dentry->d_lock);
3963         dir = d_inode(dentry->d_parent);
3964         lease->ino = cpu_to_le64(ceph_ino(dir));
3965         lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
3966
3967         put_unaligned_le32(dentry->d_name.len, lease + 1);
3968         memcpy((void *)(lease + 1) + 4,
3969                dentry->d_name.name, dentry->d_name.len);
3970         spin_unlock(&dentry->d_lock);
3971         /*
3972          * if this is a preemptive lease RELEASE, no need to
3973          * flush request stream, since the actual request will
3974          * soon follow.
3975          */
3976         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3977
3978         ceph_con_send(&session->s_con, msg);
3979 }
3980
3981 /*
3982  * lock unlock sessions, to wait ongoing session activities
3983  */
3984 static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
3985 {
3986         int i;
3987
3988         mutex_lock(&mdsc->mutex);
3989         for (i = 0; i < mdsc->max_sessions; i++) {
3990                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3991                 if (!s)
3992                         continue;
3993                 mutex_unlock(&mdsc->mutex);
3994                 mutex_lock(&s->s_mutex);
3995                 mutex_unlock(&s->s_mutex);
3996                 ceph_put_mds_session(s);
3997                 mutex_lock(&mdsc->mutex);
3998         }
3999         mutex_unlock(&mdsc->mutex);
4000 }
4001
4002
4003
4004 /*
4005  * delayed work -- periodically trim expired leases, renew caps with mds
4006  */
4007 static void schedule_delayed(struct ceph_mds_client *mdsc)
4008 {
4009         int delay = 5;
4010         unsigned hz = round_jiffies_relative(HZ * delay);
4011         schedule_delayed_work(&mdsc->delayed_work, hz);
4012 }
4013
4014 static void delayed_work(struct work_struct *work)
4015 {
4016         int i;
4017         struct ceph_mds_client *mdsc =
4018                 container_of(work, struct ceph_mds_client, delayed_work.work);
4019         int renew_interval;
4020         int renew_caps;
4021
4022         dout("mdsc delayed_work\n");
4023
4024         mutex_lock(&mdsc->mutex);
4025         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4026         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4027                                    mdsc->last_renew_caps);
4028         if (renew_caps)
4029                 mdsc->last_renew_caps = jiffies;
4030
4031         for (i = 0; i < mdsc->max_sessions; i++) {
4032                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4033                 if (!s)
4034                         continue;
4035                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4036                         dout("resending session close request for mds%d\n",
4037                              s->s_mds);
4038                         request_close_session(mdsc, s);
4039                         ceph_put_mds_session(s);
4040                         continue;
4041                 }
4042                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4043                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
4044                                 s->s_state = CEPH_MDS_SESSION_HUNG;
4045                                 pr_info("mds%d hung\n", s->s_mds);
4046                         }
4047                 }
4048                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
4049                         /* this mds is failed or recovering, just wait */
4050                         ceph_put_mds_session(s);
4051                         continue;
4052                 }
4053                 mutex_unlock(&mdsc->mutex);
4054
4055                 mutex_lock(&s->s_mutex);
4056                 if (renew_caps)
4057                         send_renew_caps(mdsc, s);
4058                 else
4059                         ceph_con_keepalive(&s->s_con);
4060                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4061                     s->s_state == CEPH_MDS_SESSION_HUNG)
4062                         ceph_send_cap_releases(mdsc, s);
4063                 mutex_unlock(&s->s_mutex);
4064                 ceph_put_mds_session(s);
4065
4066                 mutex_lock(&mdsc->mutex);
4067         }
4068         mutex_unlock(&mdsc->mutex);
4069
4070         ceph_check_delayed_caps(mdsc);
4071
4072         ceph_queue_cap_reclaim_work(mdsc);
4073
4074         ceph_trim_snapid_map(mdsc);
4075
4076         schedule_delayed(mdsc);
4077 }
4078
4079 int ceph_mdsc_init(struct ceph_fs_client *fsc)
4080
4081 {
4082         struct ceph_mds_client *mdsc;
4083
4084         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4085         if (!mdsc)
4086                 return -ENOMEM;
4087         mdsc->fsc = fsc;
4088         mutex_init(&mdsc->mutex);
4089         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4090         if (!mdsc->mdsmap) {
4091                 kfree(mdsc);
4092                 return -ENOMEM;
4093         }
4094
4095         fsc->mdsc = mdsc;
4096         init_completion(&mdsc->safe_umount_waiters);
4097         init_waitqueue_head(&mdsc->session_close_wq);
4098         INIT_LIST_HEAD(&mdsc->waiting_for_map);
4099         mdsc->sessions = NULL;
4100         atomic_set(&mdsc->num_sessions, 0);
4101         mdsc->max_sessions = 0;
4102         mdsc->stopping = 0;
4103         atomic64_set(&mdsc->quotarealms_count, 0);
4104         mdsc->quotarealms_inodes = RB_ROOT;
4105         mutex_init(&mdsc->quotarealms_inodes_mutex);
4106         mdsc->last_snap_seq = 0;
4107         init_rwsem(&mdsc->snap_rwsem);
4108         mdsc->snap_realms = RB_ROOT;
4109         INIT_LIST_HEAD(&mdsc->snap_empty);
4110         mdsc->num_snap_realms = 0;
4111         spin_lock_init(&mdsc->snap_empty_lock);
4112         mdsc->last_tid = 0;
4113         mdsc->oldest_tid = 0;
4114         mdsc->request_tree = RB_ROOT;
4115         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4116         mdsc->last_renew_caps = jiffies;
4117         INIT_LIST_HEAD(&mdsc->cap_delay_list);
4118         spin_lock_init(&mdsc->cap_delay_lock);
4119         INIT_LIST_HEAD(&mdsc->snap_flush_list);
4120         spin_lock_init(&mdsc->snap_flush_lock);
4121         mdsc->last_cap_flush_tid = 1;
4122         INIT_LIST_HEAD(&mdsc->cap_flush_list);
4123         INIT_LIST_HEAD(&mdsc->cap_dirty);
4124         INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4125         mdsc->num_cap_flushing = 0;
4126         spin_lock_init(&mdsc->cap_dirty_lock);
4127         init_waitqueue_head(&mdsc->cap_flushing_wq);
4128         INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4129         atomic_set(&mdsc->cap_reclaim_pending, 0);
4130
4131         spin_lock_init(&mdsc->dentry_list_lock);
4132         INIT_LIST_HEAD(&mdsc->dentry_leases);
4133         INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4134
4135         ceph_caps_init(mdsc);
4136         ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4137
4138         spin_lock_init(&mdsc->snapid_map_lock);
4139         mdsc->snapid_map_tree = RB_ROOT;
4140         INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4141
4142         init_rwsem(&mdsc->pool_perm_rwsem);
4143         mdsc->pool_perm_tree = RB_ROOT;
4144
4145         strscpy(mdsc->nodename, utsname()->nodename,
4146                 sizeof(mdsc->nodename));
4147         return 0;
4148 }
4149
4150 /*
4151  * Wait for safe replies on open mds requests.  If we time out, drop
4152  * all requests from the tree to avoid dangling dentry refs.
4153  */
4154 static void wait_requests(struct ceph_mds_client *mdsc)
4155 {
4156         struct ceph_options *opts = mdsc->fsc->client->options;
4157         struct ceph_mds_request *req;
4158
4159         mutex_lock(&mdsc->mutex);
4160         if (__get_oldest_req(mdsc)) {
4161                 mutex_unlock(&mdsc->mutex);
4162
4163                 dout("wait_requests waiting for requests\n");
4164                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4165                                     ceph_timeout_jiffies(opts->mount_timeout));
4166
4167                 /* tear down remaining requests */
4168                 mutex_lock(&mdsc->mutex);
4169                 while ((req = __get_oldest_req(mdsc))) {
4170                         dout("wait_requests timed out on tid %llu\n",
4171                              req->r_tid);
4172                         __unregister_request(mdsc, req);
4173                 }
4174         }
4175         mutex_unlock(&mdsc->mutex);
4176         dout("wait_requests done\n");
4177 }
4178
4179 /*
4180  * called before mount is ro, and before dentries are torn down.
4181  * (hmm, does this still race with new lookups?)
4182  */
4183 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4184 {
4185         dout("pre_umount\n");
4186         mdsc->stopping = 1;
4187
4188         lock_unlock_sessions(mdsc);
4189         ceph_flush_dirty_caps(mdsc);
4190         wait_requests(mdsc);
4191
4192         /*
4193          * wait for reply handlers to drop their request refs and
4194          * their inode/dcache refs
4195          */
4196         ceph_msgr_flush();
4197
4198         ceph_cleanup_quotarealms_inodes(mdsc);
4199 }
4200
4201 /*
4202  * wait for all write mds requests to flush.
4203  */
4204 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4205 {
4206         struct ceph_mds_request *req = NULL, *nextreq;
4207         struct rb_node *n;
4208
4209         mutex_lock(&mdsc->mutex);
4210         dout("wait_unsafe_requests want %lld\n", want_tid);
4211 restart:
4212         req = __get_oldest_req(mdsc);
4213         while (req && req->r_tid <= want_tid) {
4214                 /* find next request */
4215                 n = rb_next(&req->r_node);
4216                 if (n)
4217                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4218                 else
4219                         nextreq = NULL;
4220                 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4221                     (req->r_op & CEPH_MDS_OP_WRITE)) {
4222                         /* write op */
4223                         ceph_mdsc_get_request(req);
4224                         if (nextreq)
4225                                 ceph_mdsc_get_request(nextreq);
4226                         mutex_unlock(&mdsc->mutex);
4227                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
4228                              req->r_tid, want_tid);
4229                         wait_for_completion(&req->r_safe_completion);
4230                         mutex_lock(&mdsc->mutex);
4231                         ceph_mdsc_put_request(req);
4232                         if (!nextreq)
4233                                 break;  /* next dne before, so we're done! */
4234                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
4235                                 /* next request was removed from tree */
4236                                 ceph_mdsc_put_request(nextreq);
4237                                 goto restart;
4238                         }
4239                         ceph_mdsc_put_request(nextreq);  /* won't go away */
4240                 }
4241                 req = nextreq;
4242         }
4243         mutex_unlock(&mdsc->mutex);
4244         dout("wait_unsafe_requests done\n");
4245 }
4246
4247 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4248 {
4249         u64 want_tid, want_flush;
4250
4251         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4252                 return;
4253
4254         dout("sync\n");
4255         mutex_lock(&mdsc->mutex);
4256         want_tid = mdsc->last_tid;
4257         mutex_unlock(&mdsc->mutex);
4258
4259         ceph_flush_dirty_caps(mdsc);
4260         spin_lock(&mdsc->cap_dirty_lock);
4261         want_flush = mdsc->last_cap_flush_tid;
4262         if (!list_empty(&mdsc->cap_flush_list)) {
4263                 struct ceph_cap_flush *cf =
4264                         list_last_entry(&mdsc->cap_flush_list,
4265                                         struct ceph_cap_flush, g_list);
4266                 cf->wake = true;
4267         }
4268         spin_unlock(&mdsc->cap_dirty_lock);
4269
4270         dout("sync want tid %lld flush_seq %lld\n",
4271              want_tid, want_flush);
4272
4273         wait_unsafe_requests(mdsc, want_tid);
4274         wait_caps_flush(mdsc, want_flush);
4275 }
4276
4277 /*
4278  * true if all sessions are closed, or we force unmount
4279  */
4280 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4281 {
4282         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4283                 return true;
4284         return atomic_read(&mdsc->num_sessions) <= skipped;
4285 }
4286
4287 /*
4288  * called after sb is ro.
4289  */
4290 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4291 {
4292         struct ceph_options *opts = mdsc->fsc->client->options;
4293         struct ceph_mds_session *session;
4294         int i;
4295         int skipped = 0;
4296
4297         dout("close_sessions\n");
4298
4299         /* close sessions */
4300         mutex_lock(&mdsc->mutex);
4301         for (i = 0; i < mdsc->max_sessions; i++) {
4302                 session = __ceph_lookup_mds_session(mdsc, i);
4303                 if (!session)
4304                         continue;
4305                 mutex_unlock(&mdsc->mutex);
4306                 mutex_lock(&session->s_mutex);
4307                 if (__close_session(mdsc, session) <= 0)
4308                         skipped++;
4309                 mutex_unlock(&session->s_mutex);
4310                 ceph_put_mds_session(session);
4311                 mutex_lock(&mdsc->mutex);
4312         }
4313         mutex_unlock(&mdsc->mutex);
4314
4315         dout("waiting for sessions to close\n");
4316         wait_event_timeout(mdsc->session_close_wq,
4317                            done_closing_sessions(mdsc, skipped),
4318                            ceph_timeout_jiffies(opts->mount_timeout));
4319
4320         /* tear down remaining sessions */
4321         mutex_lock(&mdsc->mutex);
4322         for (i = 0; i < mdsc->max_sessions; i++) {
4323                 if (mdsc->sessions[i]) {
4324                         session = get_session(mdsc->sessions[i]);
4325                         __unregister_session(mdsc, session);
4326                         mutex_unlock(&mdsc->mutex);
4327                         mutex_lock(&session->s_mutex);
4328                         remove_session_caps(session);
4329                         mutex_unlock(&session->s_mutex);
4330                         ceph_put_mds_session(session);
4331                         mutex_lock(&mdsc->mutex);
4332                 }
4333         }
4334         WARN_ON(!list_empty(&mdsc->cap_delay_list));
4335         mutex_unlock(&mdsc->mutex);
4336
4337         ceph_cleanup_snapid_map(mdsc);
4338         ceph_cleanup_empty_realms(mdsc);
4339
4340         cancel_work_sync(&mdsc->cap_reclaim_work);
4341         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4342
4343         dout("stopped\n");
4344 }
4345
4346 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4347 {
4348         struct ceph_mds_session *session;
4349         int mds;
4350
4351         dout("force umount\n");
4352
4353         mutex_lock(&mdsc->mutex);
4354         for (mds = 0; mds < mdsc->max_sessions; mds++) {
4355                 session = __ceph_lookup_mds_session(mdsc, mds);
4356                 if (!session)
4357                         continue;
4358                 mutex_unlock(&mdsc->mutex);
4359                 mutex_lock(&session->s_mutex);
4360                 __close_session(mdsc, session);
4361                 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4362                         cleanup_session_requests(mdsc, session);
4363                         remove_session_caps(session);
4364                 }
4365                 mutex_unlock(&session->s_mutex);
4366                 ceph_put_mds_session(session);
4367                 mutex_lock(&mdsc->mutex);
4368                 kick_requests(mdsc, mds);
4369         }
4370         __wake_requests(mdsc, &mdsc->waiting_for_map);
4371         mutex_unlock(&mdsc->mutex);
4372 }
4373
4374 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4375 {
4376         dout("stop\n");
4377         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4378         if (mdsc->mdsmap)
4379                 ceph_mdsmap_destroy(mdsc->mdsmap);
4380         kfree(mdsc->sessions);
4381         ceph_caps_finalize(mdsc);
4382         ceph_pool_perm_destroy(mdsc);
4383 }
4384
4385 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4386 {
4387         struct ceph_mds_client *mdsc = fsc->mdsc;
4388         dout("mdsc_destroy %p\n", mdsc);
4389
4390         if (!mdsc)
4391                 return;
4392
4393         /* flush out any connection work with references to us */
4394         ceph_msgr_flush();
4395
4396         ceph_mdsc_stop(mdsc);
4397
4398         fsc->mdsc = NULL;
4399         kfree(mdsc);
4400         dout("mdsc_destroy %p done\n", mdsc);
4401 }
4402
4403 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4404 {
4405         struct ceph_fs_client *fsc = mdsc->fsc;
4406         const char *mds_namespace = fsc->mount_options->mds_namespace;
4407         void *p = msg->front.iov_base;
4408         void *end = p + msg->front.iov_len;
4409         u32 epoch;
4410         u32 map_len;
4411         u32 num_fs;
4412         u32 mount_fscid = (u32)-1;
4413         u8 struct_v, struct_cv;
4414         int err = -EINVAL;
4415
4416         ceph_decode_need(&p, end, sizeof(u32), bad);
4417         epoch = ceph_decode_32(&p);
4418
4419         dout("handle_fsmap epoch %u\n", epoch);
4420
4421         ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4422         struct_v = ceph_decode_8(&p);
4423         struct_cv = ceph_decode_8(&p);
4424         map_len = ceph_decode_32(&p);
4425
4426         ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
4427         p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
4428
4429         num_fs = ceph_decode_32(&p);
4430         while (num_fs-- > 0) {
4431                 void *info_p, *info_end;
4432                 u32 info_len;
4433                 u8 info_v, info_cv;
4434                 u32 fscid, namelen;
4435
4436                 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4437                 info_v = ceph_decode_8(&p);
4438                 info_cv = ceph_decode_8(&p);
4439                 info_len = ceph_decode_32(&p);
4440                 ceph_decode_need(&p, end, info_len, bad);
4441                 info_p = p;
4442                 info_end = p + info_len;
4443                 p = info_end;
4444
4445                 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4446                 fscid = ceph_decode_32(&info_p);
4447                 namelen = ceph_decode_32(&info_p);
4448                 ceph_decode_need(&info_p, info_end, namelen, bad);
4449
4450                 if (mds_namespace &&
4451                     strlen(mds_namespace) == namelen &&
4452                     !strncmp(mds_namespace, (char *)info_p, namelen)) {
4453                         mount_fscid = fscid;
4454                         break;
4455                 }
4456         }
4457
4458         ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4459         if (mount_fscid != (u32)-1) {
4460                 fsc->client->monc.fs_cluster_id = mount_fscid;
4461                 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4462                                    0, true);
4463                 ceph_monc_renew_subs(&fsc->client->monc);
4464         } else {
4465                 err = -ENOENT;
4466                 goto err_out;
4467         }
4468         return;
4469
4470 bad:
4471         pr_err("error decoding fsmap\n");
4472 err_out:
4473         mutex_lock(&mdsc->mutex);
4474         mdsc->mdsmap_err = err;
4475         __wake_requests(mdsc, &mdsc->waiting_for_map);
4476         mutex_unlock(&mdsc->mutex);
4477 }
4478
4479 /*
4480  * handle mds map update.
4481  */
4482 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4483 {
4484         u32 epoch;
4485         u32 maplen;
4486         void *p = msg->front.iov_base;
4487         void *end = p + msg->front.iov_len;
4488         struct ceph_mdsmap *newmap, *oldmap;
4489         struct ceph_fsid fsid;
4490         int err = -EINVAL;
4491
4492         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
4493         ceph_decode_copy(&p, &fsid, sizeof(fsid));
4494         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
4495                 return;
4496         epoch = ceph_decode_32(&p);
4497         maplen = ceph_decode_32(&p);
4498         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
4499
4500         /* do we need it? */
4501         mutex_lock(&mdsc->mutex);
4502         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
4503                 dout("handle_map epoch %u <= our %u\n",
4504                      epoch, mdsc->mdsmap->m_epoch);
4505                 mutex_unlock(&mdsc->mutex);
4506                 return;
4507         }
4508
4509         newmap = ceph_mdsmap_decode(&p, end);
4510         if (IS_ERR(newmap)) {
4511                 err = PTR_ERR(newmap);
4512                 goto bad_unlock;
4513         }
4514
4515         /* swap into place */
4516         if (mdsc->mdsmap) {
4517                 oldmap = mdsc->mdsmap;
4518                 mdsc->mdsmap = newmap;
4519                 check_new_map(mdsc, newmap, oldmap);
4520                 ceph_mdsmap_destroy(oldmap);
4521         } else {
4522                 mdsc->mdsmap = newmap;  /* first mds map */
4523         }
4524         mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
4525                                         MAX_LFS_FILESIZE);
4526
4527         __wake_requests(mdsc, &mdsc->waiting_for_map);
4528         ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
4529                           mdsc->mdsmap->m_epoch);
4530
4531         mutex_unlock(&mdsc->mutex);
4532         schedule_delayed(mdsc);
4533         return;
4534
4535 bad_unlock:
4536         mutex_unlock(&mdsc->mutex);
4537 bad:
4538         pr_err("error decoding mdsmap %d\n", err);
4539         return;
4540 }
4541
4542 static struct ceph_connection *con_get(struct ceph_connection *con)
4543 {
4544         struct ceph_mds_session *s = con->private;
4545
4546         if (get_session(s)) {
4547                 dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
4548                 return con;
4549         }
4550         dout("mdsc con_get %p FAIL\n", s);
4551         return NULL;
4552 }
4553
4554 static void con_put(struct ceph_connection *con)
4555 {
4556         struct ceph_mds_session *s = con->private;
4557
4558         dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
4559         ceph_put_mds_session(s);
4560 }
4561
4562 /*
4563  * if the client is unresponsive for long enough, the mds will kill
4564  * the session entirely.
4565  */
4566 static void peer_reset(struct ceph_connection *con)
4567 {
4568         struct ceph_mds_session *s = con->private;
4569         struct ceph_mds_client *mdsc = s->s_mdsc;
4570
4571         pr_warn("mds%d closed our session\n", s->s_mds);
4572         send_mds_reconnect(mdsc, s);
4573 }
4574
4575 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
4576 {
4577         struct ceph_mds_session *s = con->private;
4578         struct ceph_mds_client *mdsc = s->s_mdsc;
4579         int type = le16_to_cpu(msg->hdr.type);
4580
4581         mutex_lock(&mdsc->mutex);
4582         if (__verify_registered_session(mdsc, s) < 0) {
4583                 mutex_unlock(&mdsc->mutex);
4584                 goto out;
4585         }
4586         mutex_unlock(&mdsc->mutex);
4587
4588         switch (type) {
4589         case CEPH_MSG_MDS_MAP:
4590                 ceph_mdsc_handle_mdsmap(mdsc, msg);
4591                 break;
4592         case CEPH_MSG_FS_MAP_USER:
4593                 ceph_mdsc_handle_fsmap(mdsc, msg);
4594                 break;
4595         case CEPH_MSG_CLIENT_SESSION:
4596                 handle_session(s, msg);
4597                 break;
4598         case CEPH_MSG_CLIENT_REPLY:
4599                 handle_reply(s, msg);
4600                 break;
4601         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
4602                 handle_forward(mdsc, s, msg);
4603                 break;
4604         case CEPH_MSG_CLIENT_CAPS:
4605                 ceph_handle_caps(s, msg);
4606                 break;
4607         case CEPH_MSG_CLIENT_SNAP:
4608                 ceph_handle_snap(mdsc, s, msg);
4609                 break;
4610         case CEPH_MSG_CLIENT_LEASE:
4611                 handle_lease(mdsc, s, msg);
4612                 break;
4613         case CEPH_MSG_CLIENT_QUOTA:
4614                 ceph_handle_quota(mdsc, s, msg);
4615                 break;
4616
4617         default:
4618                 pr_err("received unknown message type %d %s\n", type,
4619                        ceph_msg_type_name(type));
4620         }
4621 out:
4622         ceph_msg_put(msg);
4623 }
4624
4625 /*
4626  * authentication
4627  */
4628
4629 /*
4630  * Note: returned pointer is the address of a structure that's
4631  * managed separately.  Caller must *not* attempt to free it.
4632  */
4633 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
4634                                         int *proto, int force_new)
4635 {
4636         struct ceph_mds_session *s = con->private;
4637         struct ceph_mds_client *mdsc = s->s_mdsc;
4638         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4639         struct ceph_auth_handshake *auth = &s->s_auth;
4640
4641         if (force_new && auth->authorizer) {
4642                 ceph_auth_destroy_authorizer(auth->authorizer);
4643                 auth->authorizer = NULL;
4644         }
4645         if (!auth->authorizer) {
4646                 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4647                                                       auth);
4648                 if (ret)
4649                         return ERR_PTR(ret);
4650         } else {
4651                 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4652                                                       auth);
4653                 if (ret)
4654                         return ERR_PTR(ret);
4655         }
4656         *proto = ac->protocol;
4657
4658         return auth;
4659 }
4660
4661 static int add_authorizer_challenge(struct ceph_connection *con,
4662                                     void *challenge_buf, int challenge_buf_len)
4663 {
4664         struct ceph_mds_session *s = con->private;
4665         struct ceph_mds_client *mdsc = s->s_mdsc;
4666         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4667
4668         return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
4669                                             challenge_buf, challenge_buf_len);
4670 }
4671
4672 static int verify_authorizer_reply(struct ceph_connection *con)
4673 {
4674         struct ceph_mds_session *s = con->private;
4675         struct ceph_mds_client *mdsc = s->s_mdsc;
4676         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4677
4678         return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
4679 }
4680
4681 static int invalidate_authorizer(struct ceph_connection *con)
4682 {
4683         struct ceph_mds_session *s = con->private;
4684         struct ceph_mds_client *mdsc = s->s_mdsc;
4685         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4686
4687         ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
4688
4689         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
4690 }
4691
4692 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
4693                                 struct ceph_msg_header *hdr, int *skip)
4694 {
4695         struct ceph_msg *msg;
4696         int type = (int) le16_to_cpu(hdr->type);
4697         int front_len = (int) le32_to_cpu(hdr->front_len);
4698
4699         if (con->in_msg)
4700                 return con->in_msg;
4701
4702         *skip = 0;
4703         msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
4704         if (!msg) {
4705                 pr_err("unable to allocate msg type %d len %d\n",
4706                        type, front_len);
4707                 return NULL;
4708         }
4709
4710         return msg;
4711 }
4712
4713 static int mds_sign_message(struct ceph_msg *msg)
4714 {
4715        struct ceph_mds_session *s = msg->con->private;
4716        struct ceph_auth_handshake *auth = &s->s_auth;
4717
4718        return ceph_auth_sign_message(auth, msg);
4719 }
4720
4721 static int mds_check_message_signature(struct ceph_msg *msg)
4722 {
4723        struct ceph_mds_session *s = msg->con->private;
4724        struct ceph_auth_handshake *auth = &s->s_auth;
4725
4726        return ceph_auth_check_message_signature(auth, msg);
4727 }
4728
4729 static const struct ceph_connection_operations mds_con_ops = {
4730         .get = con_get,
4731         .put = con_put,
4732         .dispatch = dispatch,
4733         .get_authorizer = get_authorizer,
4734         .add_authorizer_challenge = add_authorizer_challenge,
4735         .verify_authorizer_reply = verify_authorizer_reply,
4736         .invalidate_authorizer = invalidate_authorizer,
4737         .peer_reset = peer_reset,
4738         .alloc_msg = mds_alloc_msg,
4739         .sign_message = mds_sign_message,
4740         .check_message_signature = mds_check_message_signature,
4741 };
4742
4743 /* eof */