04a881343e43eb8c25d18353781671b52339369e
[linux-2.6-microblaze.git] / fs / ceph / mds_client.c
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/fs.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
7 #include <linux/gfp.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12 #include <linux/bits.h>
13 #include <linux/ktime.h>
14 #include <linux/bitmap.h>
15
16 #include "super.h"
17 #include "mds_client.h"
18 #include "crypto.h"
19
20 #include <linux/ceph/ceph_features.h>
21 #include <linux/ceph/messenger.h>
22 #include <linux/ceph/decode.h>
23 #include <linux/ceph/pagelist.h>
24 #include <linux/ceph/auth.h>
25 #include <linux/ceph/debugfs.h>
26
27 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
28
29 /*
30  * A cluster of MDS (metadata server) daemons is responsible for
31  * managing the file system namespace (the directory hierarchy and
32  * inodes) and for coordinating shared access to storage.  Metadata is
33  * partitioning hierarchically across a number of servers, and that
34  * partition varies over time as the cluster adjusts the distribution
35  * in order to balance load.
36  *
37  * The MDS client is primarily responsible to managing synchronous
38  * metadata requests for operations like open, unlink, and so forth.
39  * If there is a MDS failure, we find out about it when we (possibly
40  * request and) receive a new MDS map, and can resubmit affected
41  * requests.
42  *
43  * For the most part, though, we take advantage of a lossless
44  * communications channel to the MDS, and do not need to worry about
45  * timing out or resubmitting requests.
46  *
47  * We maintain a stateful "session" with each MDS we interact with.
48  * Within each session, we sent periodic heartbeat messages to ensure
49  * any capabilities or leases we have been issues remain valid.  If
50  * the session times out and goes stale, our leases and capabilities
51  * are no longer valid.
52  */
53
54 struct ceph_reconnect_state {
55         struct ceph_mds_session *session;
56         int nr_caps, nr_realms;
57         struct ceph_pagelist *pagelist;
58         unsigned msg_version;
59         bool allow_multi;
60 };
61
62 static void __wake_requests(struct ceph_mds_client *mdsc,
63                             struct list_head *head);
64 static void ceph_cap_release_work(struct work_struct *work);
65 static void ceph_cap_reclaim_work(struct work_struct *work);
66
67 static const struct ceph_connection_operations mds_con_ops;
68
69
70 /*
71  * mds reply parsing
72  */
73
74 static int parse_reply_info_quota(void **p, void *end,
75                                   struct ceph_mds_reply_info_in *info)
76 {
77         u8 struct_v, struct_compat;
78         u32 struct_len;
79
80         ceph_decode_8_safe(p, end, struct_v, bad);
81         ceph_decode_8_safe(p, end, struct_compat, bad);
82         /* struct_v is expected to be >= 1. we only
83          * understand encoding with struct_compat == 1. */
84         if (!struct_v || struct_compat != 1)
85                 goto bad;
86         ceph_decode_32_safe(p, end, struct_len, bad);
87         ceph_decode_need(p, end, struct_len, bad);
88         end = *p + struct_len;
89         ceph_decode_64_safe(p, end, info->max_bytes, bad);
90         ceph_decode_64_safe(p, end, info->max_files, bad);
91         *p = end;
92         return 0;
93 bad:
94         return -EIO;
95 }
96
97 /*
98  * parse individual inode info
99  */
100 static int parse_reply_info_in(void **p, void *end,
101                                struct ceph_mds_reply_info_in *info,
102                                u64 features)
103 {
104         int err = 0;
105         u8 struct_v = 0;
106
107         if (features == (u64)-1) {
108                 u32 struct_len;
109                 u8 struct_compat;
110                 ceph_decode_8_safe(p, end, struct_v, bad);
111                 ceph_decode_8_safe(p, end, struct_compat, bad);
112                 /* struct_v is expected to be >= 1. we only understand
113                  * encoding with struct_compat == 1. */
114                 if (!struct_v || struct_compat != 1)
115                         goto bad;
116                 ceph_decode_32_safe(p, end, struct_len, bad);
117                 ceph_decode_need(p, end, struct_len, bad);
118                 end = *p + struct_len;
119         }
120
121         ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
122         info->in = *p;
123         *p += sizeof(struct ceph_mds_reply_inode) +
124                 sizeof(*info->in->fragtree.splits) *
125                 le32_to_cpu(info->in->fragtree.nsplits);
126
127         ceph_decode_32_safe(p, end, info->symlink_len, bad);
128         ceph_decode_need(p, end, info->symlink_len, bad);
129         info->symlink = *p;
130         *p += info->symlink_len;
131
132         ceph_decode_copy_safe(p, end, &info->dir_layout,
133                               sizeof(info->dir_layout), bad);
134         ceph_decode_32_safe(p, end, info->xattr_len, bad);
135         ceph_decode_need(p, end, info->xattr_len, bad);
136         info->xattr_data = *p;
137         *p += info->xattr_len;
138
139         if (features == (u64)-1) {
140                 /* inline data */
141                 ceph_decode_64_safe(p, end, info->inline_version, bad);
142                 ceph_decode_32_safe(p, end, info->inline_len, bad);
143                 ceph_decode_need(p, end, info->inline_len, bad);
144                 info->inline_data = *p;
145                 *p += info->inline_len;
146                 /* quota */
147                 err = parse_reply_info_quota(p, end, info);
148                 if (err < 0)
149                         goto out_bad;
150                 /* pool namespace */
151                 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
152                 if (info->pool_ns_len > 0) {
153                         ceph_decode_need(p, end, info->pool_ns_len, bad);
154                         info->pool_ns_data = *p;
155                         *p += info->pool_ns_len;
156                 }
157
158                 /* btime */
159                 ceph_decode_need(p, end, sizeof(info->btime), bad);
160                 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
161
162                 /* change attribute */
163                 ceph_decode_64_safe(p, end, info->change_attr, bad);
164
165                 /* dir pin */
166                 if (struct_v >= 2) {
167                         ceph_decode_32_safe(p, end, info->dir_pin, bad);
168                 } else {
169                         info->dir_pin = -ENODATA;
170                 }
171
172                 /* snapshot birth time, remains zero for v<=2 */
173                 if (struct_v >= 3) {
174                         ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
175                         ceph_decode_copy(p, &info->snap_btime,
176                                          sizeof(info->snap_btime));
177                 } else {
178                         memset(&info->snap_btime, 0, sizeof(info->snap_btime));
179                 }
180
181                 /* snapshot count, remains zero for v<=3 */
182                 if (struct_v >= 4) {
183                         ceph_decode_64_safe(p, end, info->rsnaps, bad);
184                 } else {
185                         info->rsnaps = 0;
186                 }
187
188                 if (struct_v >= 5) {
189                         u32 alen;
190
191                         ceph_decode_32_safe(p, end, alen, bad);
192
193                         while (alen--) {
194                                 u32 len;
195
196                                 /* key */
197                                 ceph_decode_32_safe(p, end, len, bad);
198                                 ceph_decode_skip_n(p, end, len, bad);
199                                 /* value */
200                                 ceph_decode_32_safe(p, end, len, bad);
201                                 ceph_decode_skip_n(p, end, len, bad);
202                         }
203                 }
204
205                 /* fscrypt flag -- ignore */
206                 if (struct_v >= 6)
207                         ceph_decode_skip_8(p, end, bad);
208
209                 info->fscrypt_auth = NULL;
210                 info->fscrypt_auth_len = 0;
211                 info->fscrypt_file = NULL;
212                 info->fscrypt_file_len = 0;
213                 if (struct_v >= 7) {
214                         ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad);
215                         if (info->fscrypt_auth_len) {
216                                 info->fscrypt_auth = kmalloc(info->fscrypt_auth_len,
217                                                              GFP_KERNEL);
218                                 if (!info->fscrypt_auth)
219                                         return -ENOMEM;
220                                 ceph_decode_copy_safe(p, end, info->fscrypt_auth,
221                                                       info->fscrypt_auth_len, bad);
222                         }
223                         ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad);
224                         if (info->fscrypt_file_len) {
225                                 info->fscrypt_file = kmalloc(info->fscrypt_file_len,
226                                                              GFP_KERNEL);
227                                 if (!info->fscrypt_file)
228                                         return -ENOMEM;
229                                 ceph_decode_copy_safe(p, end, info->fscrypt_file,
230                                                       info->fscrypt_file_len, bad);
231                         }
232                 }
233                 *p = end;
234         } else {
235                 /* legacy (unversioned) struct */
236                 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
237                         ceph_decode_64_safe(p, end, info->inline_version, bad);
238                         ceph_decode_32_safe(p, end, info->inline_len, bad);
239                         ceph_decode_need(p, end, info->inline_len, bad);
240                         info->inline_data = *p;
241                         *p += info->inline_len;
242                 } else
243                         info->inline_version = CEPH_INLINE_NONE;
244
245                 if (features & CEPH_FEATURE_MDS_QUOTA) {
246                         err = parse_reply_info_quota(p, end, info);
247                         if (err < 0)
248                                 goto out_bad;
249                 } else {
250                         info->max_bytes = 0;
251                         info->max_files = 0;
252                 }
253
254                 info->pool_ns_len = 0;
255                 info->pool_ns_data = NULL;
256                 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
257                         ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
258                         if (info->pool_ns_len > 0) {
259                                 ceph_decode_need(p, end, info->pool_ns_len, bad);
260                                 info->pool_ns_data = *p;
261                                 *p += info->pool_ns_len;
262                         }
263                 }
264
265                 if (features & CEPH_FEATURE_FS_BTIME) {
266                         ceph_decode_need(p, end, sizeof(info->btime), bad);
267                         ceph_decode_copy(p, &info->btime, sizeof(info->btime));
268                         ceph_decode_64_safe(p, end, info->change_attr, bad);
269                 }
270
271                 info->dir_pin = -ENODATA;
272                 /* info->snap_btime and info->rsnaps remain zero */
273         }
274         return 0;
275 bad:
276         err = -EIO;
277 out_bad:
278         return err;
279 }
280
281 static int parse_reply_info_dir(void **p, void *end,
282                                 struct ceph_mds_reply_dirfrag **dirfrag,
283                                 u64 features)
284 {
285         if (features == (u64)-1) {
286                 u8 struct_v, struct_compat;
287                 u32 struct_len;
288                 ceph_decode_8_safe(p, end, struct_v, bad);
289                 ceph_decode_8_safe(p, end, struct_compat, bad);
290                 /* struct_v is expected to be >= 1. we only understand
291                  * encoding whose struct_compat == 1. */
292                 if (!struct_v || struct_compat != 1)
293                         goto bad;
294                 ceph_decode_32_safe(p, end, struct_len, bad);
295                 ceph_decode_need(p, end, struct_len, bad);
296                 end = *p + struct_len;
297         }
298
299         ceph_decode_need(p, end, sizeof(**dirfrag), bad);
300         *dirfrag = *p;
301         *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
302         if (unlikely(*p > end))
303                 goto bad;
304         if (features == (u64)-1)
305                 *p = end;
306         return 0;
307 bad:
308         return -EIO;
309 }
310
311 static int parse_reply_info_lease(void **p, void *end,
312                                   struct ceph_mds_reply_lease **lease,
313                                   u64 features, u32 *altname_len, u8 **altname)
314 {
315         u8 struct_v;
316         u32 struct_len;
317         void *lend;
318
319         if (features == (u64)-1) {
320                 u8 struct_compat;
321
322                 ceph_decode_8_safe(p, end, struct_v, bad);
323                 ceph_decode_8_safe(p, end, struct_compat, bad);
324
325                 /* struct_v is expected to be >= 1. we only understand
326                  * encoding whose struct_compat == 1. */
327                 if (!struct_v || struct_compat != 1)
328                         goto bad;
329
330                 ceph_decode_32_safe(p, end, struct_len, bad);
331         } else {
332                 struct_len = sizeof(**lease);
333                 *altname_len = 0;
334                 *altname = NULL;
335         }
336
337         lend = *p + struct_len;
338         ceph_decode_need(p, end, struct_len, bad);
339         *lease = *p;
340         *p += sizeof(**lease);
341
342         if (features == (u64)-1) {
343                 if (struct_v >= 2) {
344                         ceph_decode_32_safe(p, end, *altname_len, bad);
345                         ceph_decode_need(p, end, *altname_len, bad);
346                         *altname = *p;
347                         *p += *altname_len;
348                 } else {
349                         *altname = NULL;
350                         *altname_len = 0;
351                 }
352         }
353         *p = lend;
354         return 0;
355 bad:
356         return -EIO;
357 }
358
359 /*
360  * parse a normal reply, which may contain a (dir+)dentry and/or a
361  * target inode.
362  */
363 static int parse_reply_info_trace(void **p, void *end,
364                                   struct ceph_mds_reply_info_parsed *info,
365                                   u64 features)
366 {
367         int err;
368
369         if (info->head->is_dentry) {
370                 err = parse_reply_info_in(p, end, &info->diri, features);
371                 if (err < 0)
372                         goto out_bad;
373
374                 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
375                 if (err < 0)
376                         goto out_bad;
377
378                 ceph_decode_32_safe(p, end, info->dname_len, bad);
379                 ceph_decode_need(p, end, info->dname_len, bad);
380                 info->dname = *p;
381                 *p += info->dname_len;
382
383                 err = parse_reply_info_lease(p, end, &info->dlease, features,
384                                              &info->altname_len, &info->altname);
385                 if (err < 0)
386                         goto out_bad;
387         }
388
389         if (info->head->is_target) {
390                 err = parse_reply_info_in(p, end, &info->targeti, features);
391                 if (err < 0)
392                         goto out_bad;
393         }
394
395         if (unlikely(*p != end))
396                 goto bad;
397         return 0;
398
399 bad:
400         err = -EIO;
401 out_bad:
402         pr_err("problem parsing mds trace %d\n", err);
403         return err;
404 }
405
406 /*
407  * parse readdir results
408  */
409 static int parse_reply_info_readdir(void **p, void *end,
410                                     struct ceph_mds_request *req,
411                                     u64 features)
412 {
413         struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
414         u32 num, i = 0;
415         int err;
416
417         err = parse_reply_info_dir(p, end, &info->dir_dir, features);
418         if (err < 0)
419                 goto out_bad;
420
421         ceph_decode_need(p, end, sizeof(num) + 2, bad);
422         num = ceph_decode_32(p);
423         {
424                 u16 flags = ceph_decode_16(p);
425                 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
426                 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
427                 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
428                 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
429         }
430         if (num == 0)
431                 goto done;
432
433         BUG_ON(!info->dir_entries);
434         if ((unsigned long)(info->dir_entries + num) >
435             (unsigned long)info->dir_entries + info->dir_buf_size) {
436                 pr_err("dir contents are larger than expected\n");
437                 WARN_ON(1);
438                 goto bad;
439         }
440
441         info->dir_nr = num;
442         while (num) {
443                 struct inode *inode = d_inode(req->r_dentry);
444                 struct ceph_inode_info *ci = ceph_inode(inode);
445                 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
446                 struct fscrypt_str tname = FSTR_INIT(NULL, 0);
447                 struct fscrypt_str oname = FSTR_INIT(NULL, 0);
448                 struct ceph_fname fname;
449                 u32 altname_len, _name_len;
450                 u8 *altname, *_name;
451
452                 /* dentry */
453                 ceph_decode_32_safe(p, end, _name_len, bad);
454                 ceph_decode_need(p, end, _name_len, bad);
455                 _name = *p;
456                 *p += _name_len;
457                 dout("parsed dir dname '%.*s'\n", _name_len, _name);
458
459                 if (info->hash_order)
460                         rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
461                                                       _name, _name_len);
462
463                 /* dentry lease */
464                 err = parse_reply_info_lease(p, end, &rde->lease, features,
465                                              &altname_len, &altname);
466                 if (err)
467                         goto out_bad;
468
469                 /*
470                  * Try to dencrypt the dentry names and update them
471                  * in the ceph_mds_reply_dir_entry struct.
472                  */
473                 fname.dir = inode;
474                 fname.name = _name;
475                 fname.name_len = _name_len;
476                 fname.ctext = altname;
477                 fname.ctext_len = altname_len;
478                 /*
479                  * The _name_len maybe larger than altname_len, such as
480                  * when the human readable name length is in range of
481                  * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE),
482                  * then the copy in ceph_fname_to_usr will corrupt the
483                  * data if there has no encryption key.
484                  *
485                  * Just set the no_copy flag and then if there has no
486                  * encryption key the oname.name will be assigned to
487                  * _name always.
488                  */
489                 fname.no_copy = true;
490                 if (altname_len == 0) {
491                         /*
492                          * Set tname to _name, and this will be used
493                          * to do the base64_decode in-place. It's
494                          * safe because the decoded string should
495                          * always be shorter, which is 3/4 of origin
496                          * string.
497                          */
498                         tname.name = _name;
499
500                         /*
501                          * Set oname to _name too, and this will be
502                          * used to do the dencryption in-place.
503                          */
504                         oname.name = _name;
505                         oname.len = _name_len;
506                 } else {
507                         /*
508                          * This will do the decryption only in-place
509                          * from altname cryptext directly.
510                          */
511                         oname.name = altname;
512                         oname.len = altname_len;
513                 }
514                 rde->is_nokey = false;
515                 err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey);
516                 if (err) {
517                         pr_err("%s unable to decode %.*s, got %d\n", __func__,
518                                _name_len, _name, err);
519                         goto out_bad;
520                 }
521                 rde->name = oname.name;
522                 rde->name_len = oname.len;
523
524                 /* inode */
525                 err = parse_reply_info_in(p, end, &rde->inode, features);
526                 if (err < 0)
527                         goto out_bad;
528                 /* ceph_readdir_prepopulate() will update it */
529                 rde->offset = 0;
530                 i++;
531                 num--;
532         }
533
534 done:
535         /* Skip over any unrecognized fields */
536         *p = end;
537         return 0;
538
539 bad:
540         err = -EIO;
541 out_bad:
542         pr_err("problem parsing dir contents %d\n", err);
543         return err;
544 }
545
546 /*
547  * parse fcntl F_GETLK results
548  */
549 static int parse_reply_info_filelock(void **p, void *end,
550                                      struct ceph_mds_reply_info_parsed *info,
551                                      u64 features)
552 {
553         if (*p + sizeof(*info->filelock_reply) > end)
554                 goto bad;
555
556         info->filelock_reply = *p;
557
558         /* Skip over any unrecognized fields */
559         *p = end;
560         return 0;
561 bad:
562         return -EIO;
563 }
564
565
566 #if BITS_PER_LONG == 64
567
568 #define DELEGATED_INO_AVAILABLE         xa_mk_value(1)
569
570 static int ceph_parse_deleg_inos(void **p, void *end,
571                                  struct ceph_mds_session *s)
572 {
573         u32 sets;
574
575         ceph_decode_32_safe(p, end, sets, bad);
576         dout("got %u sets of delegated inodes\n", sets);
577         while (sets--) {
578                 u64 start, len;
579
580                 ceph_decode_64_safe(p, end, start, bad);
581                 ceph_decode_64_safe(p, end, len, bad);
582
583                 /* Don't accept a delegation of system inodes */
584                 if (start < CEPH_INO_SYSTEM_BASE) {
585                         pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
586                                         start, len);
587                         continue;
588                 }
589                 while (len--) {
590                         int err = xa_insert(&s->s_delegated_inos, start++,
591                                             DELEGATED_INO_AVAILABLE,
592                                             GFP_KERNEL);
593                         if (!err) {
594                                 dout("added delegated inode 0x%llx\n",
595                                      start - 1);
596                         } else if (err == -EBUSY) {
597                                 pr_warn("MDS delegated inode 0x%llx more than once.\n",
598                                         start - 1);
599                         } else {
600                                 return err;
601                         }
602                 }
603         }
604         return 0;
605 bad:
606         return -EIO;
607 }
608
609 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
610 {
611         unsigned long ino;
612         void *val;
613
614         xa_for_each(&s->s_delegated_inos, ino, val) {
615                 val = xa_erase(&s->s_delegated_inos, ino);
616                 if (val == DELEGATED_INO_AVAILABLE)
617                         return ino;
618         }
619         return 0;
620 }
621
622 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
623 {
624         return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
625                          GFP_KERNEL);
626 }
627 #else /* BITS_PER_LONG == 64 */
628 /*
629  * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
630  * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
631  * and bottom words?
632  */
633 static int ceph_parse_deleg_inos(void **p, void *end,
634                                  struct ceph_mds_session *s)
635 {
636         u32 sets;
637
638         ceph_decode_32_safe(p, end, sets, bad);
639         if (sets)
640                 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
641         return 0;
642 bad:
643         return -EIO;
644 }
645
646 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
647 {
648         return 0;
649 }
650
651 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
652 {
653         return 0;
654 }
655 #endif /* BITS_PER_LONG == 64 */
656
657 /*
658  * parse create results
659  */
660 static int parse_reply_info_create(void **p, void *end,
661                                   struct ceph_mds_reply_info_parsed *info,
662                                   u64 features, struct ceph_mds_session *s)
663 {
664         int ret;
665
666         if (features == (u64)-1 ||
667             (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
668                 if (*p == end) {
669                         /* Malformed reply? */
670                         info->has_create_ino = false;
671                 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
672                         info->has_create_ino = true;
673                         /* struct_v, struct_compat, and len */
674                         ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
675                         ceph_decode_64_safe(p, end, info->ino, bad);
676                         ret = ceph_parse_deleg_inos(p, end, s);
677                         if (ret)
678                                 return ret;
679                 } else {
680                         /* legacy */
681                         ceph_decode_64_safe(p, end, info->ino, bad);
682                         info->has_create_ino = true;
683                 }
684         } else {
685                 if (*p != end)
686                         goto bad;
687         }
688
689         /* Skip over any unrecognized fields */
690         *p = end;
691         return 0;
692 bad:
693         return -EIO;
694 }
695
696 static int parse_reply_info_getvxattr(void **p, void *end,
697                                       struct ceph_mds_reply_info_parsed *info,
698                                       u64 features)
699 {
700         u32 value_len;
701
702         ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
703         ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
704         ceph_decode_skip_32(p, end, bad); /* skip payload length */
705
706         ceph_decode_32_safe(p, end, value_len, bad);
707
708         if (value_len == end - *p) {
709           info->xattr_info.xattr_value = *p;
710           info->xattr_info.xattr_value_len = value_len;
711           *p = end;
712           return value_len;
713         }
714 bad:
715         return -EIO;
716 }
717
718 /*
719  * parse extra results
720  */
721 static int parse_reply_info_extra(void **p, void *end,
722                                   struct ceph_mds_request *req,
723                                   u64 features, struct ceph_mds_session *s)
724 {
725         struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
726         u32 op = le32_to_cpu(info->head->op);
727
728         if (op == CEPH_MDS_OP_GETFILELOCK)
729                 return parse_reply_info_filelock(p, end, info, features);
730         else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
731                 return parse_reply_info_readdir(p, end, req, features);
732         else if (op == CEPH_MDS_OP_CREATE)
733                 return parse_reply_info_create(p, end, info, features, s);
734         else if (op == CEPH_MDS_OP_GETVXATTR)
735                 return parse_reply_info_getvxattr(p, end, info, features);
736         else
737                 return -EIO;
738 }
739
740 /*
741  * parse entire mds reply
742  */
743 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
744                             struct ceph_mds_request *req, u64 features)
745 {
746         struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
747         void *p, *end;
748         u32 len;
749         int err;
750
751         info->head = msg->front.iov_base;
752         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
753         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
754
755         /* trace */
756         ceph_decode_32_safe(&p, end, len, bad);
757         if (len > 0) {
758                 ceph_decode_need(&p, end, len, bad);
759                 err = parse_reply_info_trace(&p, p+len, info, features);
760                 if (err < 0)
761                         goto out_bad;
762         }
763
764         /* extra */
765         ceph_decode_32_safe(&p, end, len, bad);
766         if (len > 0) {
767                 ceph_decode_need(&p, end, len, bad);
768                 err = parse_reply_info_extra(&p, p+len, req, features, s);
769                 if (err < 0)
770                         goto out_bad;
771         }
772
773         /* snap blob */
774         ceph_decode_32_safe(&p, end, len, bad);
775         info->snapblob_len = len;
776         info->snapblob = p;
777         p += len;
778
779         if (p != end)
780                 goto bad;
781         return 0;
782
783 bad:
784         err = -EIO;
785 out_bad:
786         pr_err("mds parse_reply err %d\n", err);
787         ceph_msg_dump(msg);
788         return err;
789 }
790
791 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
792 {
793         int i;
794
795         kfree(info->diri.fscrypt_auth);
796         kfree(info->diri.fscrypt_file);
797         kfree(info->targeti.fscrypt_auth);
798         kfree(info->targeti.fscrypt_file);
799         if (!info->dir_entries)
800                 return;
801
802         for (i = 0; i < info->dir_nr; i++) {
803                 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
804
805                 kfree(rde->inode.fscrypt_auth);
806                 kfree(rde->inode.fscrypt_file);
807         }
808         free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
809 }
810
811 /*
812  * In async unlink case the kclient won't wait for the first reply
813  * from MDS and just drop all the links and unhash the dentry and then
814  * succeeds immediately.
815  *
816  * For any new create/link/rename,etc requests followed by using the
817  * same file names we must wait for the first reply of the inflight
818  * unlink request, or the MDS possibly will fail these following
819  * requests with -EEXIST if the inflight async unlink request was
820  * delayed for some reasons.
821  *
822  * And the worst case is that for the none async openc request it will
823  * successfully open the file if the CDentry hasn't been unlinked yet,
824  * but later the previous delayed async unlink request will remove the
825  * CDenty. That means the just created file is possiblly deleted later
826  * by accident.
827  *
828  * We need to wait for the inflight async unlink requests to finish
829  * when creating new files/directories by using the same file names.
830  */
831 int ceph_wait_on_conflict_unlink(struct dentry *dentry)
832 {
833         struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
834         struct dentry *pdentry = dentry->d_parent;
835         struct dentry *udentry, *found = NULL;
836         struct ceph_dentry_info *di;
837         struct qstr dname;
838         u32 hash = dentry->d_name.hash;
839         int err;
840
841         dname.name = dentry->d_name.name;
842         dname.len = dentry->d_name.len;
843
844         rcu_read_lock();
845         hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
846                                    hnode, hash) {
847                 udentry = di->dentry;
848
849                 spin_lock(&udentry->d_lock);
850                 if (udentry->d_name.hash != hash)
851                         goto next;
852                 if (unlikely(udentry->d_parent != pdentry))
853                         goto next;
854                 if (!hash_hashed(&di->hnode))
855                         goto next;
856
857                 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
858                         pr_warn("%s dentry %p:%pd async unlink bit is not set\n",
859                                 __func__, dentry, dentry);
860
861                 if (!d_same_name(udentry, pdentry, &dname))
862                         goto next;
863
864                 spin_unlock(&udentry->d_lock);
865                 found = dget(udentry);
866                 break;
867 next:
868                 spin_unlock(&udentry->d_lock);
869         }
870         rcu_read_unlock();
871
872         if (likely(!found))
873                 return 0;
874
875         dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__,
876              dentry, dentry, found, found);
877
878         err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
879                           TASK_KILLABLE);
880         dput(found);
881         return err;
882 }
883
884
885 /*
886  * sessions
887  */
888 const char *ceph_session_state_name(int s)
889 {
890         switch (s) {
891         case CEPH_MDS_SESSION_NEW: return "new";
892         case CEPH_MDS_SESSION_OPENING: return "opening";
893         case CEPH_MDS_SESSION_OPEN: return "open";
894         case CEPH_MDS_SESSION_HUNG: return "hung";
895         case CEPH_MDS_SESSION_CLOSING: return "closing";
896         case CEPH_MDS_SESSION_CLOSED: return "closed";
897         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
898         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
899         case CEPH_MDS_SESSION_REJECTED: return "rejected";
900         default: return "???";
901         }
902 }
903
904 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
905 {
906         if (refcount_inc_not_zero(&s->s_ref))
907                 return s;
908         return NULL;
909 }
910
911 void ceph_put_mds_session(struct ceph_mds_session *s)
912 {
913         if (IS_ERR_OR_NULL(s))
914                 return;
915
916         if (refcount_dec_and_test(&s->s_ref)) {
917                 if (s->s_auth.authorizer)
918                         ceph_auth_destroy_authorizer(s->s_auth.authorizer);
919                 WARN_ON(mutex_is_locked(&s->s_mutex));
920                 xa_destroy(&s->s_delegated_inos);
921                 kfree(s);
922         }
923 }
924
925 /*
926  * called under mdsc->mutex
927  */
928 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
929                                                    int mds)
930 {
931         if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
932                 return NULL;
933         return ceph_get_mds_session(mdsc->sessions[mds]);
934 }
935
936 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
937 {
938         if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
939                 return false;
940         else
941                 return true;
942 }
943
944 static int __verify_registered_session(struct ceph_mds_client *mdsc,
945                                        struct ceph_mds_session *s)
946 {
947         if (s->s_mds >= mdsc->max_sessions ||
948             mdsc->sessions[s->s_mds] != s)
949                 return -ENOENT;
950         return 0;
951 }
952
953 /*
954  * create+register a new session for given mds.
955  * called under mdsc->mutex.
956  */
957 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
958                                                  int mds)
959 {
960         struct ceph_mds_session *s;
961
962         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
963                 return ERR_PTR(-EIO);
964
965         if (mds >= mdsc->mdsmap->possible_max_rank)
966                 return ERR_PTR(-EINVAL);
967
968         s = kzalloc(sizeof(*s), GFP_NOFS);
969         if (!s)
970                 return ERR_PTR(-ENOMEM);
971
972         if (mds >= mdsc->max_sessions) {
973                 int newmax = 1 << get_count_order(mds + 1);
974                 struct ceph_mds_session **sa;
975
976                 dout("%s: realloc to %d\n", __func__, newmax);
977                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
978                 if (!sa)
979                         goto fail_realloc;
980                 if (mdsc->sessions) {
981                         memcpy(sa, mdsc->sessions,
982                                mdsc->max_sessions * sizeof(void *));
983                         kfree(mdsc->sessions);
984                 }
985                 mdsc->sessions = sa;
986                 mdsc->max_sessions = newmax;
987         }
988
989         dout("%s: mds%d\n", __func__, mds);
990         s->s_mdsc = mdsc;
991         s->s_mds = mds;
992         s->s_state = CEPH_MDS_SESSION_NEW;
993         mutex_init(&s->s_mutex);
994
995         ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
996
997         atomic_set(&s->s_cap_gen, 1);
998         s->s_cap_ttl = jiffies - 1;
999
1000         spin_lock_init(&s->s_cap_lock);
1001         INIT_LIST_HEAD(&s->s_caps);
1002         refcount_set(&s->s_ref, 1);
1003         INIT_LIST_HEAD(&s->s_waiting);
1004         INIT_LIST_HEAD(&s->s_unsafe);
1005         xa_init(&s->s_delegated_inos);
1006         INIT_LIST_HEAD(&s->s_cap_releases);
1007         INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
1008
1009         INIT_LIST_HEAD(&s->s_cap_dirty);
1010         INIT_LIST_HEAD(&s->s_cap_flushing);
1011
1012         mdsc->sessions[mds] = s;
1013         atomic_inc(&mdsc->num_sessions);
1014         refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
1015
1016         ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
1017                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
1018
1019         return s;
1020
1021 fail_realloc:
1022         kfree(s);
1023         return ERR_PTR(-ENOMEM);
1024 }
1025
1026 /*
1027  * called under mdsc->mutex
1028  */
1029 static void __unregister_session(struct ceph_mds_client *mdsc,
1030                                struct ceph_mds_session *s)
1031 {
1032         dout("__unregister_session mds%d %p\n", s->s_mds, s);
1033         BUG_ON(mdsc->sessions[s->s_mds] != s);
1034         mdsc->sessions[s->s_mds] = NULL;
1035         ceph_con_close(&s->s_con);
1036         ceph_put_mds_session(s);
1037         atomic_dec(&mdsc->num_sessions);
1038 }
1039
1040 /*
1041  * drop session refs in request.
1042  *
1043  * should be last request ref, or hold mdsc->mutex
1044  */
1045 static void put_request_session(struct ceph_mds_request *req)
1046 {
1047         if (req->r_session) {
1048                 ceph_put_mds_session(req->r_session);
1049                 req->r_session = NULL;
1050         }
1051 }
1052
1053 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
1054                                 void (*cb)(struct ceph_mds_session *),
1055                                 bool check_state)
1056 {
1057         int mds;
1058
1059         mutex_lock(&mdsc->mutex);
1060         for (mds = 0; mds < mdsc->max_sessions; ++mds) {
1061                 struct ceph_mds_session *s;
1062
1063                 s = __ceph_lookup_mds_session(mdsc, mds);
1064                 if (!s)
1065                         continue;
1066
1067                 if (check_state && !check_session_state(s)) {
1068                         ceph_put_mds_session(s);
1069                         continue;
1070                 }
1071
1072                 mutex_unlock(&mdsc->mutex);
1073                 cb(s);
1074                 ceph_put_mds_session(s);
1075                 mutex_lock(&mdsc->mutex);
1076         }
1077         mutex_unlock(&mdsc->mutex);
1078 }
1079
1080 void ceph_mdsc_release_request(struct kref *kref)
1081 {
1082         struct ceph_mds_request *req = container_of(kref,
1083                                                     struct ceph_mds_request,
1084                                                     r_kref);
1085         ceph_mdsc_release_dir_caps_no_check(req);
1086         destroy_reply_info(&req->r_reply_info);
1087         if (req->r_request)
1088                 ceph_msg_put(req->r_request);
1089         if (req->r_reply)
1090                 ceph_msg_put(req->r_reply);
1091         if (req->r_inode) {
1092                 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1093                 iput(req->r_inode);
1094         }
1095         if (req->r_parent) {
1096                 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
1097                 iput(req->r_parent);
1098         }
1099         iput(req->r_target_inode);
1100         iput(req->r_new_inode);
1101         if (req->r_dentry)
1102                 dput(req->r_dentry);
1103         if (req->r_old_dentry)
1104                 dput(req->r_old_dentry);
1105         if (req->r_old_dentry_dir) {
1106                 /*
1107                  * track (and drop pins for) r_old_dentry_dir
1108                  * separately, since r_old_dentry's d_parent may have
1109                  * changed between the dir mutex being dropped and
1110                  * this request being freed.
1111                  */
1112                 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
1113                                   CEPH_CAP_PIN);
1114                 iput(req->r_old_dentry_dir);
1115         }
1116         kfree(req->r_path1);
1117         kfree(req->r_path2);
1118         put_cred(req->r_cred);
1119         if (req->r_pagelist)
1120                 ceph_pagelist_release(req->r_pagelist);
1121         kfree(req->r_fscrypt_auth);
1122         kfree(req->r_altname);
1123         put_request_session(req);
1124         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
1125         WARN_ON_ONCE(!list_empty(&req->r_wait));
1126         kmem_cache_free(ceph_mds_request_cachep, req);
1127 }
1128
1129 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
1130
1131 /*
1132  * lookup session, bump ref if found.
1133  *
1134  * called under mdsc->mutex.
1135  */
1136 static struct ceph_mds_request *
1137 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
1138 {
1139         struct ceph_mds_request *req;
1140
1141         req = lookup_request(&mdsc->request_tree, tid);
1142         if (req)
1143                 ceph_mdsc_get_request(req);
1144
1145         return req;
1146 }
1147
1148 /*
1149  * Register an in-flight request, and assign a tid.  Link to directory
1150  * are modifying (if any).
1151  *
1152  * Called under mdsc->mutex.
1153  */
1154 static void __register_request(struct ceph_mds_client *mdsc,
1155                                struct ceph_mds_request *req,
1156                                struct inode *dir)
1157 {
1158         int ret = 0;
1159
1160         req->r_tid = ++mdsc->last_tid;
1161         if (req->r_num_caps) {
1162                 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
1163                                         req->r_num_caps);
1164                 if (ret < 0) {
1165                         pr_err("__register_request %p "
1166                                "failed to reserve caps: %d\n", req, ret);
1167                         /* set req->r_err to fail early from __do_request */
1168                         req->r_err = ret;
1169                         return;
1170                 }
1171         }
1172         dout("__register_request %p tid %lld\n", req, req->r_tid);
1173         ceph_mdsc_get_request(req);
1174         insert_request(&mdsc->request_tree, req);
1175
1176         req->r_cred = get_current_cred();
1177
1178         if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
1179                 mdsc->oldest_tid = req->r_tid;
1180
1181         if (dir) {
1182                 struct ceph_inode_info *ci = ceph_inode(dir);
1183
1184                 ihold(dir);
1185                 req->r_unsafe_dir = dir;
1186                 spin_lock(&ci->i_unsafe_lock);
1187                 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
1188                 spin_unlock(&ci->i_unsafe_lock);
1189         }
1190 }
1191
1192 static void __unregister_request(struct ceph_mds_client *mdsc,
1193                                  struct ceph_mds_request *req)
1194 {
1195         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
1196
1197         /* Never leave an unregistered request on an unsafe list! */
1198         list_del_init(&req->r_unsafe_item);
1199
1200         if (req->r_tid == mdsc->oldest_tid) {
1201                 struct rb_node *p = rb_next(&req->r_node);
1202                 mdsc->oldest_tid = 0;
1203                 while (p) {
1204                         struct ceph_mds_request *next_req =
1205                                 rb_entry(p, struct ceph_mds_request, r_node);
1206                         if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
1207                                 mdsc->oldest_tid = next_req->r_tid;
1208                                 break;
1209                         }
1210                         p = rb_next(p);
1211                 }
1212         }
1213
1214         erase_request(&mdsc->request_tree, req);
1215
1216         if (req->r_unsafe_dir) {
1217                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
1218                 spin_lock(&ci->i_unsafe_lock);
1219                 list_del_init(&req->r_unsafe_dir_item);
1220                 spin_unlock(&ci->i_unsafe_lock);
1221         }
1222         if (req->r_target_inode &&
1223             test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
1224                 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
1225                 spin_lock(&ci->i_unsafe_lock);
1226                 list_del_init(&req->r_unsafe_target_item);
1227                 spin_unlock(&ci->i_unsafe_lock);
1228         }
1229
1230         if (req->r_unsafe_dir) {
1231                 iput(req->r_unsafe_dir);
1232                 req->r_unsafe_dir = NULL;
1233         }
1234
1235         complete_all(&req->r_safe_completion);
1236
1237         ceph_mdsc_put_request(req);
1238 }
1239
1240 /*
1241  * Walk back up the dentry tree until we hit a dentry representing a
1242  * non-snapshot inode. We do this using the rcu_read_lock (which must be held
1243  * when calling this) to ensure that the objects won't disappear while we're
1244  * working with them. Once we hit a candidate dentry, we attempt to take a
1245  * reference to it, and return that as the result.
1246  */
1247 static struct inode *get_nonsnap_parent(struct dentry *dentry)
1248 {
1249         struct inode *inode = NULL;
1250
1251         while (dentry && !IS_ROOT(dentry)) {
1252                 inode = d_inode_rcu(dentry);
1253                 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1254                         break;
1255                 dentry = dentry->d_parent;
1256         }
1257         if (inode)
1258                 inode = igrab(inode);
1259         return inode;
1260 }
1261
1262 /*
1263  * Choose mds to send request to next.  If there is a hint set in the
1264  * request (e.g., due to a prior forward hint from the mds), use that.
1265  * Otherwise, consult frag tree and/or caps to identify the
1266  * appropriate mds.  If all else fails, choose randomly.
1267  *
1268  * Called under mdsc->mutex.
1269  */
1270 static int __choose_mds(struct ceph_mds_client *mdsc,
1271                         struct ceph_mds_request *req,
1272                         bool *random)
1273 {
1274         struct inode *inode;
1275         struct ceph_inode_info *ci;
1276         struct ceph_cap *cap;
1277         int mode = req->r_direct_mode;
1278         int mds = -1;
1279         u32 hash = req->r_direct_hash;
1280         bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1281
1282         if (random)
1283                 *random = false;
1284
1285         /*
1286          * is there a specific mds we should try?  ignore hint if we have
1287          * no session and the mds is not up (active or recovering).
1288          */
1289         if (req->r_resend_mds >= 0 &&
1290             (__have_session(mdsc, req->r_resend_mds) ||
1291              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1292                 dout("%s using resend_mds mds%d\n", __func__,
1293                      req->r_resend_mds);
1294                 return req->r_resend_mds;
1295         }
1296
1297         if (mode == USE_RANDOM_MDS)
1298                 goto random;
1299
1300         inode = NULL;
1301         if (req->r_inode) {
1302                 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1303                         inode = req->r_inode;
1304                         ihold(inode);
1305                 } else {
1306                         /* req->r_dentry is non-null for LSSNAP request */
1307                         rcu_read_lock();
1308                         inode = get_nonsnap_parent(req->r_dentry);
1309                         rcu_read_unlock();
1310                         dout("%s using snapdir's parent %p\n", __func__, inode);
1311                 }
1312         } else if (req->r_dentry) {
1313                 /* ignore race with rename; old or new d_parent is okay */
1314                 struct dentry *parent;
1315                 struct inode *dir;
1316
1317                 rcu_read_lock();
1318                 parent = READ_ONCE(req->r_dentry->d_parent);
1319                 dir = req->r_parent ? : d_inode_rcu(parent);
1320
1321                 if (!dir || dir->i_sb != mdsc->fsc->sb) {
1322                         /*  not this fs or parent went negative */
1323                         inode = d_inode(req->r_dentry);
1324                         if (inode)
1325                                 ihold(inode);
1326                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
1327                         /* direct snapped/virtual snapdir requests
1328                          * based on parent dir inode */
1329                         inode = get_nonsnap_parent(parent);
1330                         dout("%s using nonsnap parent %p\n", __func__, inode);
1331                 } else {
1332                         /* dentry target */
1333                         inode = d_inode(req->r_dentry);
1334                         if (!inode || mode == USE_AUTH_MDS) {
1335                                 /* dir + name */
1336                                 inode = igrab(dir);
1337                                 hash = ceph_dentry_hash(dir, req->r_dentry);
1338                                 is_hash = true;
1339                         } else {
1340                                 ihold(inode);
1341                         }
1342                 }
1343                 rcu_read_unlock();
1344         }
1345
1346         dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1347              hash, mode);
1348         if (!inode)
1349                 goto random;
1350         ci = ceph_inode(inode);
1351
1352         if (is_hash && S_ISDIR(inode->i_mode)) {
1353                 struct ceph_inode_frag frag;
1354                 int found;
1355
1356                 ceph_choose_frag(ci, hash, &frag, &found);
1357                 if (found) {
1358                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
1359                                 u8 r;
1360
1361                                 /* choose a random replica */
1362                                 get_random_bytes(&r, 1);
1363                                 r %= frag.ndist;
1364                                 mds = frag.dist[r];
1365                                 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1366                                      __func__, inode, ceph_vinop(inode),
1367                                      frag.frag, mds, (int)r, frag.ndist);
1368                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1369                                     CEPH_MDS_STATE_ACTIVE &&
1370                                     !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1371                                         goto out;
1372                         }
1373
1374                         /* since this file/dir wasn't known to be
1375                          * replicated, then we want to look for the
1376                          * authoritative mds. */
1377                         if (frag.mds >= 0) {
1378                                 /* choose auth mds */
1379                                 mds = frag.mds;
1380                                 dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1381                                      __func__, inode, ceph_vinop(inode),
1382                                      frag.frag, mds);
1383                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1384                                     CEPH_MDS_STATE_ACTIVE) {
1385                                         if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1386                                                                   mds))
1387                                                 goto out;
1388                                 }
1389                         }
1390                         mode = USE_AUTH_MDS;
1391                 }
1392         }
1393
1394         spin_lock(&ci->i_ceph_lock);
1395         cap = NULL;
1396         if (mode == USE_AUTH_MDS)
1397                 cap = ci->i_auth_cap;
1398         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1399                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1400         if (!cap) {
1401                 spin_unlock(&ci->i_ceph_lock);
1402                 iput(inode);
1403                 goto random;
1404         }
1405         mds = cap->session->s_mds;
1406         dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1407              inode, ceph_vinop(inode), mds,
1408              cap == ci->i_auth_cap ? "auth " : "", cap);
1409         spin_unlock(&ci->i_ceph_lock);
1410 out:
1411         iput(inode);
1412         return mds;
1413
1414 random:
1415         if (random)
1416                 *random = true;
1417
1418         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1419         dout("%s chose random mds%d\n", __func__, mds);
1420         return mds;
1421 }
1422
1423
1424 /*
1425  * session messages
1426  */
1427 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1428 {
1429         struct ceph_msg *msg;
1430         struct ceph_mds_session_head *h;
1431
1432         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1433                            false);
1434         if (!msg) {
1435                 pr_err("ENOMEM creating session %s msg\n",
1436                        ceph_session_op_name(op));
1437                 return NULL;
1438         }
1439         h = msg->front.iov_base;
1440         h->op = cpu_to_le32(op);
1441         h->seq = cpu_to_le64(seq);
1442
1443         return msg;
1444 }
1445
1446 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1447 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1448 static int encode_supported_features(void **p, void *end)
1449 {
1450         static const size_t count = ARRAY_SIZE(feature_bits);
1451
1452         if (count > 0) {
1453                 size_t i;
1454                 size_t size = FEATURE_BYTES(count);
1455                 unsigned long bit;
1456
1457                 if (WARN_ON_ONCE(*p + 4 + size > end))
1458                         return -ERANGE;
1459
1460                 ceph_encode_32(p, size);
1461                 memset(*p, 0, size);
1462                 for (i = 0; i < count; i++) {
1463                         bit = feature_bits[i];
1464                         ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1465                 }
1466                 *p += size;
1467         } else {
1468                 if (WARN_ON_ONCE(*p + 4 > end))
1469                         return -ERANGE;
1470
1471                 ceph_encode_32(p, 0);
1472         }
1473
1474         return 0;
1475 }
1476
1477 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1478 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1479 static int encode_metric_spec(void **p, void *end)
1480 {
1481         static const size_t count = ARRAY_SIZE(metric_bits);
1482
1483         /* header */
1484         if (WARN_ON_ONCE(*p + 2 > end))
1485                 return -ERANGE;
1486
1487         ceph_encode_8(p, 1); /* version */
1488         ceph_encode_8(p, 1); /* compat */
1489
1490         if (count > 0) {
1491                 size_t i;
1492                 size_t size = METRIC_BYTES(count);
1493
1494                 if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1495                         return -ERANGE;
1496
1497                 /* metric spec info length */
1498                 ceph_encode_32(p, 4 + size);
1499
1500                 /* metric spec */
1501                 ceph_encode_32(p, size);
1502                 memset(*p, 0, size);
1503                 for (i = 0; i < count; i++)
1504                         ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1505                 *p += size;
1506         } else {
1507                 if (WARN_ON_ONCE(*p + 4 + 4 > end))
1508                         return -ERANGE;
1509
1510                 /* metric spec info length */
1511                 ceph_encode_32(p, 4);
1512                 /* metric spec */
1513                 ceph_encode_32(p, 0);
1514         }
1515
1516         return 0;
1517 }
1518
1519 /*
1520  * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1521  * to include additional client metadata fields.
1522  */
1523 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1524 {
1525         struct ceph_msg *msg;
1526         struct ceph_mds_session_head *h;
1527         int i;
1528         int extra_bytes = 0;
1529         int metadata_key_count = 0;
1530         struct ceph_options *opt = mdsc->fsc->client->options;
1531         struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1532         size_t size, count;
1533         void *p, *end;
1534         int ret;
1535
1536         const char* metadata[][2] = {
1537                 {"hostname", mdsc->nodename},
1538                 {"kernel_version", init_utsname()->release},
1539                 {"entity_id", opt->name ? : ""},
1540                 {"root", fsopt->server_path ? : "/"},
1541                 {NULL, NULL}
1542         };
1543
1544         /* Calculate serialized length of metadata */
1545         extra_bytes = 4;  /* map length */
1546         for (i = 0; metadata[i][0]; ++i) {
1547                 extra_bytes += 8 + strlen(metadata[i][0]) +
1548                         strlen(metadata[i][1]);
1549                 metadata_key_count++;
1550         }
1551
1552         /* supported feature */
1553         size = 0;
1554         count = ARRAY_SIZE(feature_bits);
1555         if (count > 0)
1556                 size = FEATURE_BYTES(count);
1557         extra_bytes += 4 + size;
1558
1559         /* metric spec */
1560         size = 0;
1561         count = ARRAY_SIZE(metric_bits);
1562         if (count > 0)
1563                 size = METRIC_BYTES(count);
1564         extra_bytes += 2 + 4 + 4 + size;
1565
1566         /* Allocate the message */
1567         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1568                            GFP_NOFS, false);
1569         if (!msg) {
1570                 pr_err("ENOMEM creating session open msg\n");
1571                 return ERR_PTR(-ENOMEM);
1572         }
1573         p = msg->front.iov_base;
1574         end = p + msg->front.iov_len;
1575
1576         h = p;
1577         h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1578         h->seq = cpu_to_le64(seq);
1579
1580         /*
1581          * Serialize client metadata into waiting buffer space, using
1582          * the format that userspace expects for map<string, string>
1583          *
1584          * ClientSession messages with metadata are v4
1585          */
1586         msg->hdr.version = cpu_to_le16(4);
1587         msg->hdr.compat_version = cpu_to_le16(1);
1588
1589         /* The write pointer, following the session_head structure */
1590         p += sizeof(*h);
1591
1592         /* Number of entries in the map */
1593         ceph_encode_32(&p, metadata_key_count);
1594
1595         /* Two length-prefixed strings for each entry in the map */
1596         for (i = 0; metadata[i][0]; ++i) {
1597                 size_t const key_len = strlen(metadata[i][0]);
1598                 size_t const val_len = strlen(metadata[i][1]);
1599
1600                 ceph_encode_32(&p, key_len);
1601                 memcpy(p, metadata[i][0], key_len);
1602                 p += key_len;
1603                 ceph_encode_32(&p, val_len);
1604                 memcpy(p, metadata[i][1], val_len);
1605                 p += val_len;
1606         }
1607
1608         ret = encode_supported_features(&p, end);
1609         if (ret) {
1610                 pr_err("encode_supported_features failed!\n");
1611                 ceph_msg_put(msg);
1612                 return ERR_PTR(ret);
1613         }
1614
1615         ret = encode_metric_spec(&p, end);
1616         if (ret) {
1617                 pr_err("encode_metric_spec failed!\n");
1618                 ceph_msg_put(msg);
1619                 return ERR_PTR(ret);
1620         }
1621
1622         msg->front.iov_len = p - msg->front.iov_base;
1623         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1624
1625         return msg;
1626 }
1627
1628 /*
1629  * send session open request.
1630  *
1631  * called under mdsc->mutex
1632  */
1633 static int __open_session(struct ceph_mds_client *mdsc,
1634                           struct ceph_mds_session *session)
1635 {
1636         struct ceph_msg *msg;
1637         int mstate;
1638         int mds = session->s_mds;
1639
1640         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
1641                 return -EIO;
1642
1643         /* wait for mds to go active? */
1644         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1645         dout("open_session to mds%d (%s)\n", mds,
1646              ceph_mds_state_name(mstate));
1647         session->s_state = CEPH_MDS_SESSION_OPENING;
1648         session->s_renew_requested = jiffies;
1649
1650         /* send connect message */
1651         msg = create_session_open_msg(mdsc, session->s_seq);
1652         if (IS_ERR(msg))
1653                 return PTR_ERR(msg);
1654         ceph_con_send(&session->s_con, msg);
1655         return 0;
1656 }
1657
1658 /*
1659  * open sessions for any export targets for the given mds
1660  *
1661  * called under mdsc->mutex
1662  */
1663 static struct ceph_mds_session *
1664 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1665 {
1666         struct ceph_mds_session *session;
1667         int ret;
1668
1669         session = __ceph_lookup_mds_session(mdsc, target);
1670         if (!session) {
1671                 session = register_session(mdsc, target);
1672                 if (IS_ERR(session))
1673                         return session;
1674         }
1675         if (session->s_state == CEPH_MDS_SESSION_NEW ||
1676             session->s_state == CEPH_MDS_SESSION_CLOSING) {
1677                 ret = __open_session(mdsc, session);
1678                 if (ret)
1679                         return ERR_PTR(ret);
1680         }
1681
1682         return session;
1683 }
1684
1685 struct ceph_mds_session *
1686 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1687 {
1688         struct ceph_mds_session *session;
1689
1690         dout("open_export_target_session to mds%d\n", target);
1691
1692         mutex_lock(&mdsc->mutex);
1693         session = __open_export_target_session(mdsc, target);
1694         mutex_unlock(&mdsc->mutex);
1695
1696         return session;
1697 }
1698
1699 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1700                                           struct ceph_mds_session *session)
1701 {
1702         struct ceph_mds_info *mi;
1703         struct ceph_mds_session *ts;
1704         int i, mds = session->s_mds;
1705
1706         if (mds >= mdsc->mdsmap->possible_max_rank)
1707                 return;
1708
1709         mi = &mdsc->mdsmap->m_info[mds];
1710         dout("open_export_target_sessions for mds%d (%d targets)\n",
1711              session->s_mds, mi->num_export_targets);
1712
1713         for (i = 0; i < mi->num_export_targets; i++) {
1714                 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1715                 ceph_put_mds_session(ts);
1716         }
1717 }
1718
1719 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1720                                            struct ceph_mds_session *session)
1721 {
1722         mutex_lock(&mdsc->mutex);
1723         __open_export_target_sessions(mdsc, session);
1724         mutex_unlock(&mdsc->mutex);
1725 }
1726
1727 /*
1728  * session caps
1729  */
1730
1731 static void detach_cap_releases(struct ceph_mds_session *session,
1732                                 struct list_head *target)
1733 {
1734         lockdep_assert_held(&session->s_cap_lock);
1735
1736         list_splice_init(&session->s_cap_releases, target);
1737         session->s_num_cap_releases = 0;
1738         dout("dispose_cap_releases mds%d\n", session->s_mds);
1739 }
1740
1741 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1742                                  struct list_head *dispose)
1743 {
1744         while (!list_empty(dispose)) {
1745                 struct ceph_cap *cap;
1746                 /* zero out the in-progress message */
1747                 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1748                 list_del(&cap->session_caps);
1749                 ceph_put_cap(mdsc, cap);
1750         }
1751 }
1752
1753 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1754                                      struct ceph_mds_session *session)
1755 {
1756         struct ceph_mds_request *req;
1757         struct rb_node *p;
1758
1759         dout("cleanup_session_requests mds%d\n", session->s_mds);
1760         mutex_lock(&mdsc->mutex);
1761         while (!list_empty(&session->s_unsafe)) {
1762                 req = list_first_entry(&session->s_unsafe,
1763                                        struct ceph_mds_request, r_unsafe_item);
1764                 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1765                                     req->r_tid);
1766                 if (req->r_target_inode)
1767                         mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1768                 if (req->r_unsafe_dir)
1769                         mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1770                 __unregister_request(mdsc, req);
1771         }
1772         /* zero r_attempts, so kick_requests() will re-send requests */
1773         p = rb_first(&mdsc->request_tree);
1774         while (p) {
1775                 req = rb_entry(p, struct ceph_mds_request, r_node);
1776                 p = rb_next(p);
1777                 if (req->r_session &&
1778                     req->r_session->s_mds == session->s_mds)
1779                         req->r_attempts = 0;
1780         }
1781         mutex_unlock(&mdsc->mutex);
1782 }
1783
1784 /*
1785  * Helper to safely iterate over all caps associated with a session, with
1786  * special care taken to handle a racing __ceph_remove_cap().
1787  *
1788  * Caller must hold session s_mutex.
1789  */
1790 int ceph_iterate_session_caps(struct ceph_mds_session *session,
1791                               int (*cb)(struct inode *, int mds, void *),
1792                               void *arg)
1793 {
1794         struct list_head *p;
1795         struct ceph_cap *cap;
1796         struct inode *inode, *last_inode = NULL;
1797         struct ceph_cap *old_cap = NULL;
1798         int ret;
1799
1800         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1801         spin_lock(&session->s_cap_lock);
1802         p = session->s_caps.next;
1803         while (p != &session->s_caps) {
1804                 int mds;
1805
1806                 cap = list_entry(p, struct ceph_cap, session_caps);
1807                 inode = igrab(&cap->ci->netfs.inode);
1808                 if (!inode) {
1809                         p = p->next;
1810                         continue;
1811                 }
1812                 session->s_cap_iterator = cap;
1813                 mds = cap->mds;
1814                 spin_unlock(&session->s_cap_lock);
1815
1816                 if (last_inode) {
1817                         iput(last_inode);
1818                         last_inode = NULL;
1819                 }
1820                 if (old_cap) {
1821                         ceph_put_cap(session->s_mdsc, old_cap);
1822                         old_cap = NULL;
1823                 }
1824
1825                 ret = cb(inode, mds, arg);
1826                 last_inode = inode;
1827
1828                 spin_lock(&session->s_cap_lock);
1829                 p = p->next;
1830                 if (!cap->ci) {
1831                         dout("iterate_session_caps  finishing cap %p removal\n",
1832                              cap);
1833                         BUG_ON(cap->session != session);
1834                         cap->session = NULL;
1835                         list_del_init(&cap->session_caps);
1836                         session->s_nr_caps--;
1837                         atomic64_dec(&session->s_mdsc->metric.total_caps);
1838                         if (cap->queue_release)
1839                                 __ceph_queue_cap_release(session, cap);
1840                         else
1841                                 old_cap = cap;  /* put_cap it w/o locks held */
1842                 }
1843                 if (ret < 0)
1844                         goto out;
1845         }
1846         ret = 0;
1847 out:
1848         session->s_cap_iterator = NULL;
1849         spin_unlock(&session->s_cap_lock);
1850
1851         iput(last_inode);
1852         if (old_cap)
1853                 ceph_put_cap(session->s_mdsc, old_cap);
1854
1855         return ret;
1856 }
1857
1858 static int remove_session_caps_cb(struct inode *inode, int mds, void *arg)
1859 {
1860         struct ceph_inode_info *ci = ceph_inode(inode);
1861         bool invalidate = false;
1862         struct ceph_cap *cap;
1863         int iputs = 0;
1864
1865         spin_lock(&ci->i_ceph_lock);
1866         cap = __get_cap_for_mds(ci, mds);
1867         if (cap) {
1868                 dout(" removing cap %p, ci is %p, inode is %p\n",
1869                      cap, ci, &ci->netfs.inode);
1870
1871                 iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
1872         }
1873         spin_unlock(&ci->i_ceph_lock);
1874
1875         if (cap)
1876                 wake_up_all(&ci->i_cap_wq);
1877         if (invalidate)
1878                 ceph_queue_invalidate(inode);
1879         while (iputs--)
1880                 iput(inode);
1881         return 0;
1882 }
1883
1884 /*
1885  * caller must hold session s_mutex
1886  */
1887 static void remove_session_caps(struct ceph_mds_session *session)
1888 {
1889         struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1890         struct super_block *sb = fsc->sb;
1891         LIST_HEAD(dispose);
1892
1893         dout("remove_session_caps on %p\n", session);
1894         ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1895
1896         wake_up_all(&fsc->mdsc->cap_flushing_wq);
1897
1898         spin_lock(&session->s_cap_lock);
1899         if (session->s_nr_caps > 0) {
1900                 struct inode *inode;
1901                 struct ceph_cap *cap, *prev = NULL;
1902                 struct ceph_vino vino;
1903                 /*
1904                  * iterate_session_caps() skips inodes that are being
1905                  * deleted, we need to wait until deletions are complete.
1906                  * __wait_on_freeing_inode() is designed for the job,
1907                  * but it is not exported, so use lookup inode function
1908                  * to access it.
1909                  */
1910                 while (!list_empty(&session->s_caps)) {
1911                         cap = list_entry(session->s_caps.next,
1912                                          struct ceph_cap, session_caps);
1913                         if (cap == prev)
1914                                 break;
1915                         prev = cap;
1916                         vino = cap->ci->i_vino;
1917                         spin_unlock(&session->s_cap_lock);
1918
1919                         inode = ceph_find_inode(sb, vino);
1920                         iput(inode);
1921
1922                         spin_lock(&session->s_cap_lock);
1923                 }
1924         }
1925
1926         // drop cap expires and unlock s_cap_lock
1927         detach_cap_releases(session, &dispose);
1928
1929         BUG_ON(session->s_nr_caps > 0);
1930         BUG_ON(!list_empty(&session->s_cap_flushing));
1931         spin_unlock(&session->s_cap_lock);
1932         dispose_cap_releases(session->s_mdsc, &dispose);
1933 }
1934
1935 enum {
1936         RECONNECT,
1937         RENEWCAPS,
1938         FORCE_RO,
1939 };
1940
1941 /*
1942  * wake up any threads waiting on this session's caps.  if the cap is
1943  * old (didn't get renewed on the client reconnect), remove it now.
1944  *
1945  * caller must hold s_mutex.
1946  */
1947 static int wake_up_session_cb(struct inode *inode, int mds, void *arg)
1948 {
1949         struct ceph_inode_info *ci = ceph_inode(inode);
1950         unsigned long ev = (unsigned long)arg;
1951
1952         if (ev == RECONNECT) {
1953                 spin_lock(&ci->i_ceph_lock);
1954                 ci->i_wanted_max_size = 0;
1955                 ci->i_requested_max_size = 0;
1956                 spin_unlock(&ci->i_ceph_lock);
1957         } else if (ev == RENEWCAPS) {
1958                 struct ceph_cap *cap;
1959
1960                 spin_lock(&ci->i_ceph_lock);
1961                 cap = __get_cap_for_mds(ci, mds);
1962                 /* mds did not re-issue stale cap */
1963                 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen))
1964                         cap->issued = cap->implemented = CEPH_CAP_PIN;
1965                 spin_unlock(&ci->i_ceph_lock);
1966         } else if (ev == FORCE_RO) {
1967         }
1968         wake_up_all(&ci->i_cap_wq);
1969         return 0;
1970 }
1971
1972 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1973 {
1974         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1975         ceph_iterate_session_caps(session, wake_up_session_cb,
1976                                   (void *)(unsigned long)ev);
1977 }
1978
1979 /*
1980  * Send periodic message to MDS renewing all currently held caps.  The
1981  * ack will reset the expiration for all caps from this session.
1982  *
1983  * caller holds s_mutex
1984  */
1985 static int send_renew_caps(struct ceph_mds_client *mdsc,
1986                            struct ceph_mds_session *session)
1987 {
1988         struct ceph_msg *msg;
1989         int state;
1990
1991         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1992             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1993                 pr_info("mds%d caps stale\n", session->s_mds);
1994         session->s_renew_requested = jiffies;
1995
1996         /* do not try to renew caps until a recovering mds has reconnected
1997          * with its clients. */
1998         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1999         if (state < CEPH_MDS_STATE_RECONNECT) {
2000                 dout("send_renew_caps ignoring mds%d (%s)\n",
2001                      session->s_mds, ceph_mds_state_name(state));
2002                 return 0;
2003         }
2004
2005         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
2006                 ceph_mds_state_name(state));
2007         msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
2008                                       ++session->s_renew_seq);
2009         if (!msg)
2010                 return -ENOMEM;
2011         ceph_con_send(&session->s_con, msg);
2012         return 0;
2013 }
2014
2015 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
2016                              struct ceph_mds_session *session, u64 seq)
2017 {
2018         struct ceph_msg *msg;
2019
2020         dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
2021              session->s_mds, ceph_session_state_name(session->s_state), seq);
2022         msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
2023         if (!msg)
2024                 return -ENOMEM;
2025         ceph_con_send(&session->s_con, msg);
2026         return 0;
2027 }
2028
2029
2030 /*
2031  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
2032  *
2033  * Called under session->s_mutex
2034  */
2035 static void renewed_caps(struct ceph_mds_client *mdsc,
2036                          struct ceph_mds_session *session, int is_renew)
2037 {
2038         int was_stale;
2039         int wake = 0;
2040
2041         spin_lock(&session->s_cap_lock);
2042         was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
2043
2044         session->s_cap_ttl = session->s_renew_requested +
2045                 mdsc->mdsmap->m_session_timeout*HZ;
2046
2047         if (was_stale) {
2048                 if (time_before(jiffies, session->s_cap_ttl)) {
2049                         pr_info("mds%d caps renewed\n", session->s_mds);
2050                         wake = 1;
2051                 } else {
2052                         pr_info("mds%d caps still stale\n", session->s_mds);
2053                 }
2054         }
2055         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
2056              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
2057              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
2058         spin_unlock(&session->s_cap_lock);
2059
2060         if (wake)
2061                 wake_up_session_caps(session, RENEWCAPS);
2062 }
2063
2064 /*
2065  * send a session close request
2066  */
2067 static int request_close_session(struct ceph_mds_session *session)
2068 {
2069         struct ceph_msg *msg;
2070
2071         dout("request_close_session mds%d state %s seq %lld\n",
2072              session->s_mds, ceph_session_state_name(session->s_state),
2073              session->s_seq);
2074         msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
2075                                       session->s_seq);
2076         if (!msg)
2077                 return -ENOMEM;
2078         ceph_con_send(&session->s_con, msg);
2079         return 1;
2080 }
2081
2082 /*
2083  * Called with s_mutex held.
2084  */
2085 static int __close_session(struct ceph_mds_client *mdsc,
2086                          struct ceph_mds_session *session)
2087 {
2088         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
2089                 return 0;
2090         session->s_state = CEPH_MDS_SESSION_CLOSING;
2091         return request_close_session(session);
2092 }
2093
2094 static bool drop_negative_children(struct dentry *dentry)
2095 {
2096         struct dentry *child;
2097         bool all_negative = true;
2098
2099         if (!d_is_dir(dentry))
2100                 goto out;
2101
2102         spin_lock(&dentry->d_lock);
2103         list_for_each_entry(child, &dentry->d_subdirs, d_child) {
2104                 if (d_really_is_positive(child)) {
2105                         all_negative = false;
2106                         break;
2107                 }
2108         }
2109         spin_unlock(&dentry->d_lock);
2110
2111         if (all_negative)
2112                 shrink_dcache_parent(dentry);
2113 out:
2114         return all_negative;
2115 }
2116
2117 /*
2118  * Trim old(er) caps.
2119  *
2120  * Because we can't cache an inode without one or more caps, we do
2121  * this indirectly: if a cap is unused, we prune its aliases, at which
2122  * point the inode will hopefully get dropped to.
2123  *
2124  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
2125  * memory pressure from the MDS, though, so it needn't be perfect.
2126  */
2127 static int trim_caps_cb(struct inode *inode, int mds, void *arg)
2128 {
2129         int *remaining = arg;
2130         struct ceph_inode_info *ci = ceph_inode(inode);
2131         int used, wanted, oissued, mine;
2132         struct ceph_cap *cap;
2133
2134         if (*remaining <= 0)
2135                 return -1;
2136
2137         spin_lock(&ci->i_ceph_lock);
2138         cap = __get_cap_for_mds(ci, mds);
2139         if (!cap) {
2140                 spin_unlock(&ci->i_ceph_lock);
2141                 return 0;
2142         }
2143         mine = cap->issued | cap->implemented;
2144         used = __ceph_caps_used(ci);
2145         wanted = __ceph_caps_file_wanted(ci);
2146         oissued = __ceph_caps_issued_other(ci, cap);
2147
2148         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
2149              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
2150              ceph_cap_string(used), ceph_cap_string(wanted));
2151         if (cap == ci->i_auth_cap) {
2152                 if (ci->i_dirty_caps || ci->i_flushing_caps ||
2153                     !list_empty(&ci->i_cap_snaps))
2154                         goto out;
2155                 if ((used | wanted) & CEPH_CAP_ANY_WR)
2156                         goto out;
2157                 /* Note: it's possible that i_filelock_ref becomes non-zero
2158                  * after dropping auth caps. It doesn't hurt because reply
2159                  * of lock mds request will re-add auth caps. */
2160                 if (atomic_read(&ci->i_filelock_ref) > 0)
2161                         goto out;
2162         }
2163         /* The inode has cached pages, but it's no longer used.
2164          * we can safely drop it */
2165         if (S_ISREG(inode->i_mode) &&
2166             wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2167             !(oissued & CEPH_CAP_FILE_CACHE)) {
2168           used = 0;
2169           oissued = 0;
2170         }
2171         if ((used | wanted) & ~oissued & mine)
2172                 goto out;   /* we need these caps */
2173
2174         if (oissued) {
2175                 /* we aren't the only cap.. just remove us */
2176                 ceph_remove_cap(cap, true);
2177                 (*remaining)--;
2178         } else {
2179                 struct dentry *dentry;
2180                 /* try dropping referring dentries */
2181                 spin_unlock(&ci->i_ceph_lock);
2182                 dentry = d_find_any_alias(inode);
2183                 if (dentry && drop_negative_children(dentry)) {
2184                         int count;
2185                         dput(dentry);
2186                         d_prune_aliases(inode);
2187                         count = atomic_read(&inode->i_count);
2188                         if (count == 1)
2189                                 (*remaining)--;
2190                         dout("trim_caps_cb %p cap %p pruned, count now %d\n",
2191                              inode, cap, count);
2192                 } else {
2193                         dput(dentry);
2194                 }
2195                 return 0;
2196         }
2197
2198 out:
2199         spin_unlock(&ci->i_ceph_lock);
2200         return 0;
2201 }
2202
2203 /*
2204  * Trim session cap count down to some max number.
2205  */
2206 int ceph_trim_caps(struct ceph_mds_client *mdsc,
2207                    struct ceph_mds_session *session,
2208                    int max_caps)
2209 {
2210         int trim_caps = session->s_nr_caps - max_caps;
2211
2212         dout("trim_caps mds%d start: %d / %d, trim %d\n",
2213              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2214         if (trim_caps > 0) {
2215                 int remaining = trim_caps;
2216
2217                 ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2218                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2219                      session->s_mds, session->s_nr_caps, max_caps,
2220                         trim_caps - remaining);
2221         }
2222
2223         ceph_flush_cap_releases(mdsc, session);
2224         return 0;
2225 }
2226
2227 static int check_caps_flush(struct ceph_mds_client *mdsc,
2228                             u64 want_flush_tid)
2229 {
2230         int ret = 1;
2231
2232         spin_lock(&mdsc->cap_dirty_lock);
2233         if (!list_empty(&mdsc->cap_flush_list)) {
2234                 struct ceph_cap_flush *cf =
2235                         list_first_entry(&mdsc->cap_flush_list,
2236                                          struct ceph_cap_flush, g_list);
2237                 if (cf->tid <= want_flush_tid) {
2238                         dout("check_caps_flush still flushing tid "
2239                              "%llu <= %llu\n", cf->tid, want_flush_tid);
2240                         ret = 0;
2241                 }
2242         }
2243         spin_unlock(&mdsc->cap_dirty_lock);
2244         return ret;
2245 }
2246
2247 /*
2248  * flush all dirty inode data to disk.
2249  *
2250  * returns true if we've flushed through want_flush_tid
2251  */
2252 static void wait_caps_flush(struct ceph_mds_client *mdsc,
2253                             u64 want_flush_tid)
2254 {
2255         dout("check_caps_flush want %llu\n", want_flush_tid);
2256
2257         wait_event(mdsc->cap_flushing_wq,
2258                    check_caps_flush(mdsc, want_flush_tid));
2259
2260         dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2261 }
2262
2263 /*
2264  * called under s_mutex
2265  */
2266 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2267                                    struct ceph_mds_session *session)
2268 {
2269         struct ceph_msg *msg = NULL;
2270         struct ceph_mds_cap_release *head;
2271         struct ceph_mds_cap_item *item;
2272         struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2273         struct ceph_cap *cap;
2274         LIST_HEAD(tmp_list);
2275         int num_cap_releases;
2276         __le32  barrier, *cap_barrier;
2277
2278         down_read(&osdc->lock);
2279         barrier = cpu_to_le32(osdc->epoch_barrier);
2280         up_read(&osdc->lock);
2281
2282         spin_lock(&session->s_cap_lock);
2283 again:
2284         list_splice_init(&session->s_cap_releases, &tmp_list);
2285         num_cap_releases = session->s_num_cap_releases;
2286         session->s_num_cap_releases = 0;
2287         spin_unlock(&session->s_cap_lock);
2288
2289         while (!list_empty(&tmp_list)) {
2290                 if (!msg) {
2291                         msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2292                                         PAGE_SIZE, GFP_NOFS, false);
2293                         if (!msg)
2294                                 goto out_err;
2295                         head = msg->front.iov_base;
2296                         head->num = cpu_to_le32(0);
2297                         msg->front.iov_len = sizeof(*head);
2298
2299                         msg->hdr.version = cpu_to_le16(2);
2300                         msg->hdr.compat_version = cpu_to_le16(1);
2301                 }
2302
2303                 cap = list_first_entry(&tmp_list, struct ceph_cap,
2304                                         session_caps);
2305                 list_del(&cap->session_caps);
2306                 num_cap_releases--;
2307
2308                 head = msg->front.iov_base;
2309                 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2310                                    &head->num);
2311                 item = msg->front.iov_base + msg->front.iov_len;
2312                 item->ino = cpu_to_le64(cap->cap_ino);
2313                 item->cap_id = cpu_to_le64(cap->cap_id);
2314                 item->migrate_seq = cpu_to_le32(cap->mseq);
2315                 item->seq = cpu_to_le32(cap->issue_seq);
2316                 msg->front.iov_len += sizeof(*item);
2317
2318                 ceph_put_cap(mdsc, cap);
2319
2320                 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2321                         // Append cap_barrier field
2322                         cap_barrier = msg->front.iov_base + msg->front.iov_len;
2323                         *cap_barrier = barrier;
2324                         msg->front.iov_len += sizeof(*cap_barrier);
2325
2326                         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2327                         dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2328                         ceph_con_send(&session->s_con, msg);
2329                         msg = NULL;
2330                 }
2331         }
2332
2333         BUG_ON(num_cap_releases != 0);
2334
2335         spin_lock(&session->s_cap_lock);
2336         if (!list_empty(&session->s_cap_releases))
2337                 goto again;
2338         spin_unlock(&session->s_cap_lock);
2339
2340         if (msg) {
2341                 // Append cap_barrier field
2342                 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2343                 *cap_barrier = barrier;
2344                 msg->front.iov_len += sizeof(*cap_barrier);
2345
2346                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2347                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2348                 ceph_con_send(&session->s_con, msg);
2349         }
2350         return;
2351 out_err:
2352         pr_err("send_cap_releases mds%d, failed to allocate message\n",
2353                 session->s_mds);
2354         spin_lock(&session->s_cap_lock);
2355         list_splice(&tmp_list, &session->s_cap_releases);
2356         session->s_num_cap_releases += num_cap_releases;
2357         spin_unlock(&session->s_cap_lock);
2358 }
2359
2360 static void ceph_cap_release_work(struct work_struct *work)
2361 {
2362         struct ceph_mds_session *session =
2363                 container_of(work, struct ceph_mds_session, s_cap_release_work);
2364
2365         mutex_lock(&session->s_mutex);
2366         if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2367             session->s_state == CEPH_MDS_SESSION_HUNG)
2368                 ceph_send_cap_releases(session->s_mdsc, session);
2369         mutex_unlock(&session->s_mutex);
2370         ceph_put_mds_session(session);
2371 }
2372
2373 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2374                              struct ceph_mds_session *session)
2375 {
2376         if (mdsc->stopping)
2377                 return;
2378
2379         ceph_get_mds_session(session);
2380         if (queue_work(mdsc->fsc->cap_wq,
2381                        &session->s_cap_release_work)) {
2382                 dout("cap release work queued\n");
2383         } else {
2384                 ceph_put_mds_session(session);
2385                 dout("failed to queue cap release work\n");
2386         }
2387 }
2388
2389 /*
2390  * caller holds session->s_cap_lock
2391  */
2392 void __ceph_queue_cap_release(struct ceph_mds_session *session,
2393                               struct ceph_cap *cap)
2394 {
2395         list_add_tail(&cap->session_caps, &session->s_cap_releases);
2396         session->s_num_cap_releases++;
2397
2398         if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2399                 ceph_flush_cap_releases(session->s_mdsc, session);
2400 }
2401
2402 static void ceph_cap_reclaim_work(struct work_struct *work)
2403 {
2404         struct ceph_mds_client *mdsc =
2405                 container_of(work, struct ceph_mds_client, cap_reclaim_work);
2406         int ret = ceph_trim_dentries(mdsc);
2407         if (ret == -EAGAIN)
2408                 ceph_queue_cap_reclaim_work(mdsc);
2409 }
2410
2411 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2412 {
2413         if (mdsc->stopping)
2414                 return;
2415
2416         if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2417                 dout("caps reclaim work queued\n");
2418         } else {
2419                 dout("failed to queue caps release work\n");
2420         }
2421 }
2422
2423 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2424 {
2425         int val;
2426         if (!nr)
2427                 return;
2428         val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2429         if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2430                 atomic_set(&mdsc->cap_reclaim_pending, 0);
2431                 ceph_queue_cap_reclaim_work(mdsc);
2432         }
2433 }
2434
2435 /*
2436  * requests
2437  */
2438
2439 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2440                                     struct inode *dir)
2441 {
2442         struct ceph_inode_info *ci = ceph_inode(dir);
2443         struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2444         struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2445         size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2446         unsigned int num_entries;
2447         int order;
2448
2449         spin_lock(&ci->i_ceph_lock);
2450         num_entries = ci->i_files + ci->i_subdirs;
2451         spin_unlock(&ci->i_ceph_lock);
2452         num_entries = max(num_entries, 1U);
2453         num_entries = min(num_entries, opt->max_readdir);
2454
2455         order = get_order(size * num_entries);
2456         while (order >= 0) {
2457                 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2458                                                              __GFP_NOWARN |
2459                                                              __GFP_ZERO,
2460                                                              order);
2461                 if (rinfo->dir_entries)
2462                         break;
2463                 order--;
2464         }
2465         if (!rinfo->dir_entries)
2466                 return -ENOMEM;
2467
2468         num_entries = (PAGE_SIZE << order) / size;
2469         num_entries = min(num_entries, opt->max_readdir);
2470
2471         rinfo->dir_buf_size = PAGE_SIZE << order;
2472         req->r_num_caps = num_entries + 1;
2473         req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2474         req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2475         return 0;
2476 }
2477
2478 /*
2479  * Create an mds request.
2480  */
2481 struct ceph_mds_request *
2482 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2483 {
2484         struct ceph_mds_request *req;
2485
2486         req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2487         if (!req)
2488                 return ERR_PTR(-ENOMEM);
2489
2490         mutex_init(&req->r_fill_mutex);
2491         req->r_mdsc = mdsc;
2492         req->r_started = jiffies;
2493         req->r_start_latency = ktime_get();
2494         req->r_resend_mds = -1;
2495         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2496         INIT_LIST_HEAD(&req->r_unsafe_target_item);
2497         req->r_fmode = -1;
2498         req->r_feature_needed = -1;
2499         kref_init(&req->r_kref);
2500         RB_CLEAR_NODE(&req->r_node);
2501         INIT_LIST_HEAD(&req->r_wait);
2502         init_completion(&req->r_completion);
2503         init_completion(&req->r_safe_completion);
2504         INIT_LIST_HEAD(&req->r_unsafe_item);
2505
2506         ktime_get_coarse_real_ts64(&req->r_stamp);
2507
2508         req->r_op = op;
2509         req->r_direct_mode = mode;
2510         return req;
2511 }
2512
2513 /*
2514  * return oldest (lowest) request, tid in request tree, 0 if none.
2515  *
2516  * called under mdsc->mutex.
2517  */
2518 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2519 {
2520         if (RB_EMPTY_ROOT(&mdsc->request_tree))
2521                 return NULL;
2522         return rb_entry(rb_first(&mdsc->request_tree),
2523                         struct ceph_mds_request, r_node);
2524 }
2525
2526 static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2527 {
2528         return mdsc->oldest_tid;
2529 }
2530
2531 #if IS_ENABLED(CONFIG_FS_ENCRYPTION)
2532 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2533 {
2534         struct inode *dir = req->r_parent;
2535         struct dentry *dentry = req->r_dentry;
2536         u8 *cryptbuf = NULL;
2537         u32 len = 0;
2538         int ret = 0;
2539
2540         /* only encode if we have parent and dentry */
2541         if (!dir || !dentry)
2542                 goto success;
2543
2544         /* No-op unless this is encrypted */
2545         if (!IS_ENCRYPTED(dir))
2546                 goto success;
2547
2548         ret = ceph_fscrypt_prepare_readdir(dir);
2549         if (ret < 0)
2550                 return ERR_PTR(ret);
2551
2552         /* No key? Just ignore it. */
2553         if (!fscrypt_has_encryption_key(dir))
2554                 goto success;
2555
2556         if (!fscrypt_fname_encrypted_size(dir, dentry->d_name.len, NAME_MAX,
2557                                           &len)) {
2558                 WARN_ON_ONCE(1);
2559                 return ERR_PTR(-ENAMETOOLONG);
2560         }
2561
2562         /* No need to append altname if name is short enough */
2563         if (len <= CEPH_NOHASH_NAME_MAX) {
2564                 len = 0;
2565                 goto success;
2566         }
2567
2568         cryptbuf = kmalloc(len, GFP_KERNEL);
2569         if (!cryptbuf)
2570                 return ERR_PTR(-ENOMEM);
2571
2572         ret = fscrypt_fname_encrypt(dir, &dentry->d_name, cryptbuf, len);
2573         if (ret) {
2574                 kfree(cryptbuf);
2575                 return ERR_PTR(ret);
2576         }
2577 success:
2578         *plen = len;
2579         return cryptbuf;
2580 }
2581 #else
2582 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2583 {
2584         *plen = 0;
2585         return NULL;
2586 }
2587 #endif
2588
2589 /**
2590  * ceph_mdsc_build_path - build a path string to a given dentry
2591  * @dentry: dentry to which path should be built
2592  * @plen: returned length of string
2593  * @pbase: returned base inode number
2594  * @for_wire: is this path going to be sent to the MDS?
2595  *
2596  * Build a string that represents the path to the dentry. This is mostly called
2597  * for two different purposes:
2598  *
2599  * 1) we need to build a path string to send to the MDS (for_wire == true)
2600  * 2) we need a path string for local presentation (e.g. debugfs)
2601  *    (for_wire == false)
2602  *
2603  * The path is built in reverse, starting with the dentry. Walk back up toward
2604  * the root, building the path until the first non-snapped inode is reached
2605  * (for_wire) or the root inode is reached (!for_wire).
2606  *
2607  * Encode hidden .snap dirs as a double /, i.e.
2608  *   foo/.snap/bar -> foo//bar
2609  */
2610 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2611                            int for_wire)
2612 {
2613         struct dentry *cur;
2614         struct inode *inode;
2615         char *path;
2616         int pos;
2617         unsigned seq;
2618         u64 base;
2619
2620         if (!dentry)
2621                 return ERR_PTR(-EINVAL);
2622
2623         path = __getname();
2624         if (!path)
2625                 return ERR_PTR(-ENOMEM);
2626 retry:
2627         pos = PATH_MAX - 1;
2628         path[pos] = '\0';
2629
2630         seq = read_seqbegin(&rename_lock);
2631         cur = dget(dentry);
2632         for (;;) {
2633                 struct dentry *parent;
2634
2635                 spin_lock(&cur->d_lock);
2636                 inode = d_inode(cur);
2637                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2638                         dout("build_path path+%d: %p SNAPDIR\n",
2639                              pos, cur);
2640                         spin_unlock(&cur->d_lock);
2641                         parent = dget_parent(cur);
2642                 } else if (for_wire && inode && dentry != cur &&
2643                            ceph_snap(inode) == CEPH_NOSNAP) {
2644                         spin_unlock(&cur->d_lock);
2645                         pos++; /* get rid of any prepended '/' */
2646                         break;
2647                 } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) {
2648                         pos -= cur->d_name.len;
2649                         if (pos < 0) {
2650                                 spin_unlock(&cur->d_lock);
2651                                 break;
2652                         }
2653                         memcpy(path + pos, cur->d_name.name, cur->d_name.len);
2654                         spin_unlock(&cur->d_lock);
2655                         parent = dget_parent(cur);
2656                 } else {
2657                         int len, ret;
2658                         char buf[NAME_MAX];
2659
2660                         /*
2661                          * Proactively copy name into buf, in case we need to
2662                          * present it as-is.
2663                          */
2664                         memcpy(buf, cur->d_name.name, cur->d_name.len);
2665                         len = cur->d_name.len;
2666                         spin_unlock(&cur->d_lock);
2667                         parent = dget_parent(cur);
2668
2669                         ret = ceph_fscrypt_prepare_readdir(d_inode(parent));
2670                         if (ret < 0) {
2671                                 dput(parent);
2672                                 dput(cur);
2673                                 return ERR_PTR(ret);
2674                         }
2675
2676                         if (fscrypt_has_encryption_key(d_inode(parent))) {
2677                                 len = ceph_encode_encrypted_fname(d_inode(parent),
2678                                                                   cur, buf);
2679                                 if (len < 0) {
2680                                         dput(parent);
2681                                         dput(cur);
2682                                         return ERR_PTR(len);
2683                                 }
2684                         }
2685                         pos -= len;
2686                         if (pos < 0) {
2687                                 dput(parent);
2688                                 break;
2689                         }
2690                         memcpy(path + pos, buf, len);
2691                 }
2692                 dput(cur);
2693                 cur = parent;
2694
2695                 /* Are we at the root? */
2696                 if (IS_ROOT(cur))
2697                         break;
2698
2699                 /* Are we out of buffer? */
2700                 if (--pos < 0)
2701                         break;
2702
2703                 path[pos] = '/';
2704         }
2705         inode = d_inode(cur);
2706         base = inode ? ceph_ino(inode) : 0;
2707         dput(cur);
2708
2709         if (read_seqretry(&rename_lock, seq))
2710                 goto retry;
2711
2712         if (pos < 0) {
2713                 /*
2714                  * A rename didn't occur, but somehow we didn't end up where
2715                  * we thought we would. Throw a warning and try again.
2716                  */
2717                 pr_warn("build_path did not end path lookup where expected (pos = %d)\n",
2718                         pos);
2719                 goto retry;
2720         }
2721
2722         *pbase = base;
2723         *plen = PATH_MAX - 1 - pos;
2724         dout("build_path on %p %d built %llx '%.*s'\n",
2725              dentry, d_count(dentry), base, *plen, path + pos);
2726         return path + pos;
2727 }
2728
2729 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2730                              const char **ppath, int *ppathlen, u64 *pino,
2731                              bool *pfreepath, bool parent_locked)
2732 {
2733         char *path;
2734
2735         rcu_read_lock();
2736         if (!dir)
2737                 dir = d_inode_rcu(dentry->d_parent);
2738         if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP &&
2739             !IS_ENCRYPTED(dir)) {
2740                 *pino = ceph_ino(dir);
2741                 rcu_read_unlock();
2742                 *ppath = dentry->d_name.name;
2743                 *ppathlen = dentry->d_name.len;
2744                 return 0;
2745         }
2746         rcu_read_unlock();
2747         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2748         if (IS_ERR(path))
2749                 return PTR_ERR(path);
2750         *ppath = path;
2751         *pfreepath = true;
2752         return 0;
2753 }
2754
2755 static int build_inode_path(struct inode *inode,
2756                             const char **ppath, int *ppathlen, u64 *pino,
2757                             bool *pfreepath)
2758 {
2759         struct dentry *dentry;
2760         char *path;
2761
2762         if (ceph_snap(inode) == CEPH_NOSNAP) {
2763                 *pino = ceph_ino(inode);
2764                 *ppathlen = 0;
2765                 return 0;
2766         }
2767         dentry = d_find_alias(inode);
2768         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2769         dput(dentry);
2770         if (IS_ERR(path))
2771                 return PTR_ERR(path);
2772         *ppath = path;
2773         *pfreepath = true;
2774         return 0;
2775 }
2776
2777 /*
2778  * request arguments may be specified via an inode *, a dentry *, or
2779  * an explicit ino+path.
2780  */
2781 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2782                                   struct inode *rdiri, const char *rpath,
2783                                   u64 rino, const char **ppath, int *pathlen,
2784                                   u64 *ino, bool *freepath, bool parent_locked)
2785 {
2786         int r = 0;
2787
2788         if (rinode) {
2789                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2790                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2791                      ceph_snap(rinode));
2792         } else if (rdentry) {
2793                 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2794                                         freepath, parent_locked);
2795                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2796                      *ppath);
2797         } else if (rpath || rino) {
2798                 *ino = rino;
2799                 *ppath = rpath;
2800                 *pathlen = rpath ? strlen(rpath) : 0;
2801                 dout(" path %.*s\n", *pathlen, rpath);
2802         }
2803
2804         return r;
2805 }
2806
2807 static void encode_mclientrequest_tail(void **p,
2808                                        const struct ceph_mds_request *req)
2809 {
2810         struct ceph_timespec ts;
2811         int i;
2812
2813         ceph_encode_timespec64(&ts, &req->r_stamp);
2814         ceph_encode_copy(p, &ts, sizeof(ts));
2815
2816         /* v4: gid_list */
2817         ceph_encode_32(p, req->r_cred->group_info->ngroups);
2818         for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2819                 ceph_encode_64(p, from_kgid(&init_user_ns,
2820                                             req->r_cred->group_info->gid[i]));
2821
2822         /* v5: altname */
2823         ceph_encode_32(p, req->r_altname_len);
2824         ceph_encode_copy(p, req->r_altname, req->r_altname_len);
2825
2826         /* v6: fscrypt_auth and fscrypt_file */
2827         if (req->r_fscrypt_auth) {
2828                 u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth);
2829
2830                 ceph_encode_32(p, authlen);
2831                 ceph_encode_copy(p, req->r_fscrypt_auth, authlen);
2832         } else {
2833                 ceph_encode_32(p, 0);
2834         }
2835         if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) {
2836                 ceph_encode_32(p, sizeof(__le64));
2837                 ceph_encode_64(p, req->r_fscrypt_file);
2838         } else {
2839                 ceph_encode_32(p, 0);
2840         }
2841 }
2842
2843 /*
2844  * called under mdsc->mutex
2845  */
2846 static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2847                                                struct ceph_mds_request *req,
2848                                                bool drop_cap_releases)
2849 {
2850         int mds = session->s_mds;
2851         struct ceph_mds_client *mdsc = session->s_mdsc;
2852         struct ceph_msg *msg;
2853         struct ceph_mds_request_head_old *head;
2854         const char *path1 = NULL;
2855         const char *path2 = NULL;
2856         u64 ino1 = 0, ino2 = 0;
2857         int pathlen1 = 0, pathlen2 = 0;
2858         bool freepath1 = false, freepath2 = false;
2859         struct dentry *old_dentry = NULL;
2860         int len;
2861         u16 releases;
2862         void *p, *end;
2863         int ret;
2864         bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
2865
2866         ret = set_request_path_attr(req->r_inode, req->r_dentry,
2867                               req->r_parent, req->r_path1, req->r_ino1.ino,
2868                               &path1, &pathlen1, &ino1, &freepath1,
2869                               test_bit(CEPH_MDS_R_PARENT_LOCKED,
2870                                         &req->r_req_flags));
2871         if (ret < 0) {
2872                 msg = ERR_PTR(ret);
2873                 goto out;
2874         }
2875
2876         /* If r_old_dentry is set, then assume that its parent is locked */
2877         if (req->r_old_dentry &&
2878             !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED))
2879                 old_dentry = req->r_old_dentry;
2880         ret = set_request_path_attr(NULL, old_dentry,
2881                               req->r_old_dentry_dir,
2882                               req->r_path2, req->r_ino2.ino,
2883                               &path2, &pathlen2, &ino2, &freepath2, true);
2884         if (ret < 0) {
2885                 msg = ERR_PTR(ret);
2886                 goto out_free1;
2887         }
2888
2889         req->r_altname = get_fscrypt_altname(req, &req->r_altname_len);
2890         if (IS_ERR(req->r_altname)) {
2891                 msg = ERR_CAST(req->r_altname);
2892                 req->r_altname = NULL;
2893                 goto out_free2;
2894         }
2895
2896         len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head);
2897
2898         /* filepaths */
2899         len += 2 * (1 + sizeof(u32) + sizeof(u64));
2900         len += pathlen1 + pathlen2;
2901
2902         /* cap releases */
2903         len += sizeof(struct ceph_mds_request_release) *
2904                 (!!req->r_inode_drop + !!req->r_dentry_drop +
2905                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2906
2907         if (req->r_dentry_drop)
2908                 len += pathlen1;
2909         if (req->r_old_dentry_drop)
2910                 len += pathlen2;
2911
2912         /* MClientRequest tail */
2913
2914         /* req->r_stamp */
2915         len += sizeof(struct ceph_timespec);
2916
2917         /* gid list */
2918         len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
2919
2920         /* alternate name */
2921         len += sizeof(u32) + req->r_altname_len;
2922
2923         /* fscrypt_auth */
2924         len += sizeof(u32); // fscrypt_auth
2925         if (req->r_fscrypt_auth)
2926                 len += ceph_fscrypt_auth_len(req->r_fscrypt_auth);
2927
2928         /* fscrypt_file */
2929         len += sizeof(u32);
2930         if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags))
2931                 len += sizeof(__le64);
2932
2933         msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2934         if (!msg) {
2935                 msg = ERR_PTR(-ENOMEM);
2936                 goto out_free2;
2937         }
2938
2939         msg->hdr.tid = cpu_to_le64(req->r_tid);
2940
2941         /*
2942          * The old ceph_mds_request_head didn't contain a version field, and
2943          * one was added when we moved the message version from 3->4.
2944          */
2945         if (legacy) {
2946                 msg->hdr.version = cpu_to_le16(3);
2947                 head = msg->front.iov_base;
2948                 p = msg->front.iov_base + sizeof(*head);
2949         } else {
2950                 struct ceph_mds_request_head *new_head = msg->front.iov_base;
2951
2952                 msg->hdr.version = cpu_to_le16(6);
2953                 new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
2954                 head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2955                 p = msg->front.iov_base + sizeof(*new_head);
2956         }
2957
2958         end = msg->front.iov_base + msg->front.iov_len;
2959
2960         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2961         head->op = cpu_to_le32(req->r_op);
2962         head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
2963                                                  req->r_cred->fsuid));
2964         head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
2965                                                  req->r_cred->fsgid));
2966         head->ino = cpu_to_le64(req->r_deleg_ino);
2967         head->args = req->r_args;
2968
2969         ceph_encode_filepath(&p, end, ino1, path1);
2970         ceph_encode_filepath(&p, end, ino2, path2);
2971
2972         /* make note of release offset, in case we need to replay */
2973         req->r_request_release_offset = p - msg->front.iov_base;
2974
2975         /* cap releases */
2976         releases = 0;
2977         if (req->r_inode_drop)
2978                 releases += ceph_encode_inode_release(&p,
2979                       req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2980                       mds, req->r_inode_drop, req->r_inode_unless,
2981                       req->r_op == CEPH_MDS_OP_READDIR);
2982         if (req->r_dentry_drop) {
2983                 ret = ceph_encode_dentry_release(&p, req->r_dentry,
2984                                 req->r_parent, mds, req->r_dentry_drop,
2985                                 req->r_dentry_unless);
2986                 if (ret < 0)
2987                         goto out_err;
2988                 releases += ret;
2989         }
2990         if (req->r_old_dentry_drop) {
2991                 ret = ceph_encode_dentry_release(&p, req->r_old_dentry,
2992                                 req->r_old_dentry_dir, mds,
2993                                 req->r_old_dentry_drop,
2994                                 req->r_old_dentry_unless);
2995                 if (ret < 0)
2996                         goto out_err;
2997                 releases += ret;
2998         }
2999         if (req->r_old_inode_drop)
3000                 releases += ceph_encode_inode_release(&p,
3001                       d_inode(req->r_old_dentry),
3002                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
3003
3004         if (drop_cap_releases) {
3005                 releases = 0;
3006                 p = msg->front.iov_base + req->r_request_release_offset;
3007         }
3008
3009         head->num_releases = cpu_to_le16(releases);
3010
3011         encode_mclientrequest_tail(&p, req);
3012
3013         if (WARN_ON_ONCE(p > end)) {
3014                 ceph_msg_put(msg);
3015                 msg = ERR_PTR(-ERANGE);
3016                 goto out_free2;
3017         }
3018
3019         msg->front.iov_len = p - msg->front.iov_base;
3020         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3021
3022         if (req->r_pagelist) {
3023                 struct ceph_pagelist *pagelist = req->r_pagelist;
3024                 ceph_msg_data_add_pagelist(msg, pagelist);
3025                 msg->hdr.data_len = cpu_to_le32(pagelist->length);
3026         } else {
3027                 msg->hdr.data_len = 0;
3028         }
3029
3030         msg->hdr.data_off = cpu_to_le16(0);
3031
3032 out_free2:
3033         if (freepath2)
3034                 ceph_mdsc_free_path((char *)path2, pathlen2);
3035 out_free1:
3036         if (freepath1)
3037                 ceph_mdsc_free_path((char *)path1, pathlen1);
3038 out:
3039         return msg;
3040 out_err:
3041         ceph_msg_put(msg);
3042         msg = ERR_PTR(ret);
3043         goto out_free2;
3044 }
3045
3046 /*
3047  * called under mdsc->mutex if error, under no mutex if
3048  * success.
3049  */
3050 static void complete_request(struct ceph_mds_client *mdsc,
3051                              struct ceph_mds_request *req)
3052 {
3053         req->r_end_latency = ktime_get();
3054
3055         if (req->r_callback)
3056                 req->r_callback(mdsc, req);
3057         complete_all(&req->r_completion);
3058 }
3059
3060 static struct ceph_mds_request_head_old *
3061 find_old_request_head(void *p, u64 features)
3062 {
3063         bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
3064         struct ceph_mds_request_head *new_head;
3065
3066         if (legacy)
3067                 return (struct ceph_mds_request_head_old *)p;
3068         new_head = (struct ceph_mds_request_head *)p;
3069         return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
3070 }
3071
3072 /*
3073  * called under mdsc->mutex
3074  */
3075 static int __prepare_send_request(struct ceph_mds_session *session,
3076                                   struct ceph_mds_request *req,
3077                                   bool drop_cap_releases)
3078 {
3079         int mds = session->s_mds;
3080         struct ceph_mds_client *mdsc = session->s_mdsc;
3081         struct ceph_mds_request_head_old *rhead;
3082         struct ceph_msg *msg;
3083         int flags = 0, max_retry;
3084
3085         /*
3086          * The type of 'r_attempts' in kernel 'ceph_mds_request'
3087          * is 'int', while in 'ceph_mds_request_head' the type of
3088          * 'num_retry' is '__u8'. So in case the request retries
3089          *  exceeding 256 times, the MDS will receive a incorrect
3090          *  retry seq.
3091          *
3092          * In this case it's ususally a bug in MDS and continue
3093          * retrying the request makes no sense.
3094          *
3095          * In future this could be fixed in ceph code, so avoid
3096          * using the hardcode here.
3097          */
3098         max_retry = sizeof_field(struct ceph_mds_request_head, num_retry);
3099         max_retry = 1 << (max_retry * BITS_PER_BYTE);
3100         if (req->r_attempts >= max_retry) {
3101                 pr_warn_ratelimited("%s request tid %llu seq overflow\n",
3102                                     __func__, req->r_tid);
3103                 return -EMULTIHOP;
3104         }
3105
3106         req->r_attempts++;
3107         if (req->r_inode) {
3108                 struct ceph_cap *cap =
3109                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
3110
3111                 if (cap)
3112                         req->r_sent_on_mseq = cap->mseq;
3113                 else
3114                         req->r_sent_on_mseq = -1;
3115         }
3116         dout("%s %p tid %lld %s (attempt %d)\n", __func__, req,
3117              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
3118
3119         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3120                 void *p;
3121
3122                 /*
3123                  * Replay.  Do not regenerate message (and rebuild
3124                  * paths, etc.); just use the original message.
3125                  * Rebuilding paths will break for renames because
3126                  * d_move mangles the src name.
3127                  */
3128                 msg = req->r_request;
3129                 rhead = find_old_request_head(msg->front.iov_base,
3130                                               session->s_con.peer_features);
3131
3132                 flags = le32_to_cpu(rhead->flags);
3133                 flags |= CEPH_MDS_FLAG_REPLAY;
3134                 rhead->flags = cpu_to_le32(flags);
3135
3136                 if (req->r_target_inode)
3137                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
3138
3139                 rhead->num_retry = req->r_attempts - 1;
3140
3141                 /* remove cap/dentry releases from message */
3142                 rhead->num_releases = 0;
3143
3144                 p = msg->front.iov_base + req->r_request_release_offset;
3145                 encode_mclientrequest_tail(&p, req);
3146
3147                 msg->front.iov_len = p - msg->front.iov_base;
3148                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3149                 return 0;
3150         }
3151
3152         if (req->r_request) {
3153                 ceph_msg_put(req->r_request);
3154                 req->r_request = NULL;
3155         }
3156         msg = create_request_message(session, req, drop_cap_releases);
3157         if (IS_ERR(msg)) {
3158                 req->r_err = PTR_ERR(msg);
3159                 return PTR_ERR(msg);
3160         }
3161         req->r_request = msg;
3162
3163         rhead = find_old_request_head(msg->front.iov_base,
3164                                       session->s_con.peer_features);
3165         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
3166         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3167                 flags |= CEPH_MDS_FLAG_REPLAY;
3168         if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
3169                 flags |= CEPH_MDS_FLAG_ASYNC;
3170         if (req->r_parent)
3171                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
3172         rhead->flags = cpu_to_le32(flags);
3173         rhead->num_fwd = req->r_num_fwd;
3174         rhead->num_retry = req->r_attempts - 1;
3175
3176         dout(" r_parent = %p\n", req->r_parent);
3177         return 0;
3178 }
3179
3180 /*
3181  * called under mdsc->mutex
3182  */
3183 static int __send_request(struct ceph_mds_session *session,
3184                           struct ceph_mds_request *req,
3185                           bool drop_cap_releases)
3186 {
3187         int err;
3188
3189         err = __prepare_send_request(session, req, drop_cap_releases);
3190         if (!err) {
3191                 ceph_msg_get(req->r_request);
3192                 ceph_con_send(&session->s_con, req->r_request);
3193         }
3194
3195         return err;
3196 }
3197
3198 /*
3199  * send request, or put it on the appropriate wait list.
3200  */
3201 static void __do_request(struct ceph_mds_client *mdsc,
3202                         struct ceph_mds_request *req)
3203 {
3204         struct ceph_mds_session *session = NULL;
3205         int mds = -1;
3206         int err = 0;
3207         bool random;
3208
3209         if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3210                 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
3211                         __unregister_request(mdsc, req);
3212                 return;
3213         }
3214
3215         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) {
3216                 dout("do_request metadata corrupted\n");
3217                 err = -EIO;
3218                 goto finish;
3219         }
3220         if (req->r_timeout &&
3221             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
3222                 dout("do_request timed out\n");
3223                 err = -ETIMEDOUT;
3224                 goto finish;
3225         }
3226         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
3227                 dout("do_request forced umount\n");
3228                 err = -EIO;
3229                 goto finish;
3230         }
3231         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
3232                 if (mdsc->mdsmap_err) {
3233                         err = mdsc->mdsmap_err;
3234                         dout("do_request mdsmap err %d\n", err);
3235                         goto finish;
3236                 }
3237                 if (mdsc->mdsmap->m_epoch == 0) {
3238                         dout("do_request no mdsmap, waiting for map\n");
3239                         list_add(&req->r_wait, &mdsc->waiting_for_map);
3240                         return;
3241                 }
3242                 if (!(mdsc->fsc->mount_options->flags &
3243                       CEPH_MOUNT_OPT_MOUNTWAIT) &&
3244                     !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
3245                         err = -EHOSTUNREACH;
3246                         goto finish;
3247                 }
3248         }
3249
3250         put_request_session(req);
3251
3252         mds = __choose_mds(mdsc, req, &random);
3253         if (mds < 0 ||
3254             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
3255                 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3256                         err = -EJUKEBOX;
3257                         goto finish;
3258                 }
3259                 dout("do_request no mds or not active, waiting for map\n");
3260                 list_add(&req->r_wait, &mdsc->waiting_for_map);
3261                 return;
3262         }
3263
3264         /* get, open session */
3265         session = __ceph_lookup_mds_session(mdsc, mds);
3266         if (!session) {
3267                 session = register_session(mdsc, mds);
3268                 if (IS_ERR(session)) {
3269                         err = PTR_ERR(session);
3270                         goto finish;
3271                 }
3272         }
3273         req->r_session = ceph_get_mds_session(session);
3274
3275         dout("do_request mds%d session %p state %s\n", mds, session,
3276              ceph_session_state_name(session->s_state));
3277
3278         /*
3279          * The old ceph will crash the MDSs when see unknown OPs
3280          */
3281         if (req->r_feature_needed > 0 &&
3282             !test_bit(req->r_feature_needed, &session->s_features)) {
3283                 err = -EOPNOTSUPP;
3284                 goto out_session;
3285         }
3286
3287         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
3288             session->s_state != CEPH_MDS_SESSION_HUNG) {
3289                 /*
3290                  * We cannot queue async requests since the caps and delegated
3291                  * inodes are bound to the session. Just return -EJUKEBOX and
3292                  * let the caller retry a sync request in that case.
3293                  */
3294                 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3295                         err = -EJUKEBOX;
3296                         goto out_session;
3297                 }
3298
3299                 /*
3300                  * If the session has been REJECTED, then return a hard error,
3301                  * unless it's a CLEANRECOVER mount, in which case we'll queue
3302                  * it to the mdsc queue.
3303                  */
3304                 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
3305                         if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
3306                                 list_add(&req->r_wait, &mdsc->waiting_for_map);
3307                         else
3308                                 err = -EACCES;
3309                         goto out_session;
3310                 }
3311
3312                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
3313                     session->s_state == CEPH_MDS_SESSION_CLOSING) {
3314                         err = __open_session(mdsc, session);
3315                         if (err)
3316                                 goto out_session;
3317                         /* retry the same mds later */
3318                         if (random)
3319                                 req->r_resend_mds = mds;
3320                 }
3321                 list_add(&req->r_wait, &session->s_waiting);
3322                 goto out_session;
3323         }
3324
3325         /* send request */
3326         req->r_resend_mds = -1;   /* forget any previous mds hint */
3327
3328         if (req->r_request_started == 0)   /* note request start time */
3329                 req->r_request_started = jiffies;
3330
3331         /*
3332          * For async create we will choose the auth MDS of frag in parent
3333          * directory to send the request and ususally this works fine, but
3334          * if the migrated the dirtory to another MDS before it could handle
3335          * it the request will be forwarded.
3336          *
3337          * And then the auth cap will be changed.
3338          */
3339         if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) {
3340                 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry);
3341                 struct ceph_inode_info *ci;
3342                 struct ceph_cap *cap;
3343
3344                 /*
3345                  * The request maybe handled very fast and the new inode
3346                  * hasn't been linked to the dentry yet. We need to wait
3347                  * for the ceph_finish_async_create(), which shouldn't be
3348                  * stuck too long or fail in thoery, to finish when forwarding
3349                  * the request.
3350                  */
3351                 if (!d_inode(req->r_dentry)) {
3352                         err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT,
3353                                           TASK_KILLABLE);
3354                         if (err) {
3355                                 mutex_lock(&req->r_fill_mutex);
3356                                 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3357                                 mutex_unlock(&req->r_fill_mutex);
3358                                 goto out_session;
3359                         }
3360                 }
3361
3362                 ci = ceph_inode(d_inode(req->r_dentry));
3363
3364                 spin_lock(&ci->i_ceph_lock);
3365                 cap = ci->i_auth_cap;
3366                 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) {
3367                         dout("do_request session changed for auth cap %d -> %d\n",
3368                              cap->session->s_mds, session->s_mds);
3369
3370                         /* Remove the auth cap from old session */
3371                         spin_lock(&cap->session->s_cap_lock);
3372                         cap->session->s_nr_caps--;
3373                         list_del_init(&cap->session_caps);
3374                         spin_unlock(&cap->session->s_cap_lock);
3375
3376                         /* Add the auth cap to the new session */
3377                         cap->mds = mds;
3378                         cap->session = session;
3379                         spin_lock(&session->s_cap_lock);
3380                         session->s_nr_caps++;
3381                         list_add_tail(&cap->session_caps, &session->s_caps);
3382                         spin_unlock(&session->s_cap_lock);
3383
3384                         change_auth_cap_ses(ci, session);
3385                 }
3386                 spin_unlock(&ci->i_ceph_lock);
3387         }
3388
3389         err = __send_request(session, req, false);
3390
3391 out_session:
3392         ceph_put_mds_session(session);
3393 finish:
3394         if (err) {
3395                 dout("__do_request early error %d\n", err);
3396                 req->r_err = err;
3397                 complete_request(mdsc, req);
3398                 __unregister_request(mdsc, req);
3399         }
3400         return;
3401 }
3402
3403 /*
3404  * called under mdsc->mutex
3405  */
3406 static void __wake_requests(struct ceph_mds_client *mdsc,
3407                             struct list_head *head)
3408 {
3409         struct ceph_mds_request *req;
3410         LIST_HEAD(tmp_list);
3411
3412         list_splice_init(head, &tmp_list);
3413
3414         while (!list_empty(&tmp_list)) {
3415                 req = list_entry(tmp_list.next,
3416                                  struct ceph_mds_request, r_wait);
3417                 list_del_init(&req->r_wait);
3418                 dout(" wake request %p tid %llu\n", req, req->r_tid);
3419                 __do_request(mdsc, req);
3420         }
3421 }
3422
3423 /*
3424  * Wake up threads with requests pending for @mds, so that they can
3425  * resubmit their requests to a possibly different mds.
3426  */
3427 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
3428 {
3429         struct ceph_mds_request *req;
3430         struct rb_node *p = rb_first(&mdsc->request_tree);
3431
3432         dout("kick_requests mds%d\n", mds);
3433         while (p) {
3434                 req = rb_entry(p, struct ceph_mds_request, r_node);
3435                 p = rb_next(p);
3436                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3437                         continue;
3438                 if (req->r_attempts > 0)
3439                         continue; /* only new requests */
3440                 if (req->r_session &&
3441                     req->r_session->s_mds == mds) {
3442                         dout(" kicking tid %llu\n", req->r_tid);
3443                         list_del_init(&req->r_wait);
3444                         __do_request(mdsc, req);
3445                 }
3446         }
3447 }
3448
3449 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
3450                               struct ceph_mds_request *req)
3451 {
3452         int err = 0;
3453
3454         /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
3455         if (req->r_inode)
3456                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
3457         if (req->r_parent) {
3458                 struct ceph_inode_info *ci = ceph_inode(req->r_parent);
3459                 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
3460                             CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
3461                 spin_lock(&ci->i_ceph_lock);
3462                 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
3463                 __ceph_touch_fmode(ci, mdsc, fmode);
3464                 spin_unlock(&ci->i_ceph_lock);
3465         }
3466         if (req->r_old_dentry_dir)
3467                 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
3468                                   CEPH_CAP_PIN);
3469
3470         if (req->r_inode) {
3471                 err = ceph_wait_on_async_create(req->r_inode);
3472                 if (err) {
3473                         dout("%s: wait for async create returned: %d\n",
3474                              __func__, err);
3475                         return err;
3476                 }
3477         }
3478
3479         if (!err && req->r_old_inode) {
3480                 err = ceph_wait_on_async_create(req->r_old_inode);
3481                 if (err) {
3482                         dout("%s: wait for async create returned: %d\n",
3483                              __func__, err);
3484                         return err;
3485                 }
3486         }
3487
3488         dout("submit_request on %p for inode %p\n", req, dir);
3489         mutex_lock(&mdsc->mutex);
3490         __register_request(mdsc, req, dir);
3491         __do_request(mdsc, req);
3492         err = req->r_err;
3493         mutex_unlock(&mdsc->mutex);
3494         return err;
3495 }
3496
3497 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3498                            struct ceph_mds_request *req,
3499                            ceph_mds_request_wait_callback_t wait_func)
3500 {
3501         int err;
3502
3503         /* wait */
3504         dout("do_request waiting\n");
3505         if (wait_func) {
3506                 err = wait_func(mdsc, req);
3507         } else {
3508                 long timeleft = wait_for_completion_killable_timeout(
3509                                         &req->r_completion,
3510                                         ceph_timeout_jiffies(req->r_timeout));
3511                 if (timeleft > 0)
3512                         err = 0;
3513                 else if (!timeleft)
3514                         err = -ETIMEDOUT;  /* timed out */
3515                 else
3516                         err = timeleft;  /* killed */
3517         }
3518         dout("do_request waited, got %d\n", err);
3519         mutex_lock(&mdsc->mutex);
3520
3521         /* only abort if we didn't race with a real reply */
3522         if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3523                 err = le32_to_cpu(req->r_reply_info.head->result);
3524         } else if (err < 0) {
3525                 dout("aborted request %lld with %d\n", req->r_tid, err);
3526
3527                 /*
3528                  * ensure we aren't running concurrently with
3529                  * ceph_fill_trace or ceph_readdir_prepopulate, which
3530                  * rely on locks (dir mutex) held by our caller.
3531                  */
3532                 mutex_lock(&req->r_fill_mutex);
3533                 req->r_err = err;
3534                 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3535                 mutex_unlock(&req->r_fill_mutex);
3536
3537                 if (req->r_parent &&
3538                     (req->r_op & CEPH_MDS_OP_WRITE))
3539                         ceph_invalidate_dir_request(req);
3540         } else {
3541                 err = req->r_err;
3542         }
3543
3544         mutex_unlock(&mdsc->mutex);
3545         return err;
3546 }
3547
3548 /*
3549  * Synchrously perform an mds request.  Take care of all of the
3550  * session setup, forwarding, retry details.
3551  */
3552 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3553                          struct inode *dir,
3554                          struct ceph_mds_request *req)
3555 {
3556         int err;
3557
3558         dout("do_request on %p\n", req);
3559
3560         /* issue */
3561         err = ceph_mdsc_submit_request(mdsc, dir, req);
3562         if (!err)
3563                 err = ceph_mdsc_wait_request(mdsc, req, NULL);
3564         dout("do_request %p done, result %d\n", req, err);
3565         return err;
3566 }
3567
3568 /*
3569  * Invalidate dir's completeness, dentry lease state on an aborted MDS
3570  * namespace request.
3571  */
3572 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3573 {
3574         struct inode *dir = req->r_parent;
3575         struct inode *old_dir = req->r_old_dentry_dir;
3576
3577         dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3578
3579         ceph_dir_clear_complete(dir);
3580         if (old_dir)
3581                 ceph_dir_clear_complete(old_dir);
3582         if (req->r_dentry)
3583                 ceph_invalidate_dentry_lease(req->r_dentry);
3584         if (req->r_old_dentry)
3585                 ceph_invalidate_dentry_lease(req->r_old_dentry);
3586 }
3587
3588 /*
3589  * Handle mds reply.
3590  *
3591  * We take the session mutex and parse and process the reply immediately.
3592  * This preserves the logical ordering of replies, capabilities, etc., sent
3593  * by the MDS as they are applied to our local cache.
3594  */
3595 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3596 {
3597         struct ceph_mds_client *mdsc = session->s_mdsc;
3598         struct ceph_mds_request *req;
3599         struct ceph_mds_reply_head *head = msg->front.iov_base;
3600         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
3601         struct ceph_snap_realm *realm;
3602         u64 tid;
3603         int err, result;
3604         int mds = session->s_mds;
3605         bool close_sessions = false;
3606
3607         if (msg->front.iov_len < sizeof(*head)) {
3608                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3609                 ceph_msg_dump(msg);
3610                 return;
3611         }
3612
3613         /* get request, session */
3614         tid = le64_to_cpu(msg->hdr.tid);
3615         mutex_lock(&mdsc->mutex);
3616         req = lookup_get_request(mdsc, tid);
3617         if (!req) {
3618                 dout("handle_reply on unknown tid %llu\n", tid);
3619                 mutex_unlock(&mdsc->mutex);
3620                 return;
3621         }
3622         dout("handle_reply %p\n", req);
3623
3624         /* correct session? */
3625         if (req->r_session != session) {
3626                 pr_err("mdsc_handle_reply got %llu on session mds%d"
3627                        " not mds%d\n", tid, session->s_mds,
3628                        req->r_session ? req->r_session->s_mds : -1);
3629                 mutex_unlock(&mdsc->mutex);
3630                 goto out;
3631         }
3632
3633         /* dup? */
3634         if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3635             (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3636                 pr_warn("got a dup %s reply on %llu from mds%d\n",
3637                            head->safe ? "safe" : "unsafe", tid, mds);
3638                 mutex_unlock(&mdsc->mutex);
3639                 goto out;
3640         }
3641         if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3642                 pr_warn("got unsafe after safe on %llu from mds%d\n",
3643                            tid, mds);
3644                 mutex_unlock(&mdsc->mutex);
3645                 goto out;
3646         }
3647
3648         result = le32_to_cpu(head->result);
3649
3650         if (head->safe) {
3651                 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3652                 __unregister_request(mdsc, req);
3653
3654                 /* last request during umount? */
3655                 if (mdsc->stopping && !__get_oldest_req(mdsc))
3656                         complete_all(&mdsc->safe_umount_waiters);
3657
3658                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3659                         /*
3660                          * We already handled the unsafe response, now do the
3661                          * cleanup.  No need to examine the response; the MDS
3662                          * doesn't include any result info in the safe
3663                          * response.  And even if it did, there is nothing
3664                          * useful we could do with a revised return value.
3665                          */
3666                         dout("got safe reply %llu, mds%d\n", tid, mds);
3667
3668                         mutex_unlock(&mdsc->mutex);
3669                         goto out;
3670                 }
3671         } else {
3672                 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3673                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3674         }
3675
3676         dout("handle_reply tid %lld result %d\n", tid, result);
3677         if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3678                 err = parse_reply_info(session, msg, req, (u64)-1);
3679         else
3680                 err = parse_reply_info(session, msg, req,
3681                                        session->s_con.peer_features);
3682         mutex_unlock(&mdsc->mutex);
3683
3684         /* Must find target inode outside of mutexes to avoid deadlocks */
3685         rinfo = &req->r_reply_info;
3686         if ((err >= 0) && rinfo->head->is_target) {
3687                 struct inode *in = xchg(&req->r_new_inode, NULL);
3688                 struct ceph_vino tvino = {
3689                         .ino  = le64_to_cpu(rinfo->targeti.in->ino),
3690                         .snap = le64_to_cpu(rinfo->targeti.in->snapid)
3691                 };
3692
3693                 /*
3694                  * If we ended up opening an existing inode, discard
3695                  * r_new_inode
3696                  */
3697                 if (req->r_op == CEPH_MDS_OP_CREATE &&
3698                     !req->r_reply_info.has_create_ino) {
3699                         /* This should never happen on an async create */
3700                         WARN_ON_ONCE(req->r_deleg_ino);
3701                         iput(in);
3702                         in = NULL;
3703                 }
3704
3705                 in = ceph_get_inode(mdsc->fsc->sb, tvino, in);
3706                 if (IS_ERR(in)) {
3707                         err = PTR_ERR(in);
3708                         mutex_lock(&session->s_mutex);
3709                         goto out_err;
3710                 }
3711                 req->r_target_inode = in;
3712         }
3713
3714         mutex_lock(&session->s_mutex);
3715         if (err < 0) {
3716                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3717                 ceph_msg_dump(msg);
3718                 goto out_err;
3719         }
3720
3721         /* snap trace */
3722         realm = NULL;
3723         if (rinfo->snapblob_len) {
3724                 down_write(&mdsc->snap_rwsem);
3725                 err = ceph_update_snap_trace(mdsc, rinfo->snapblob,
3726                                 rinfo->snapblob + rinfo->snapblob_len,
3727                                 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3728                                 &realm);
3729                 if (err) {
3730                         up_write(&mdsc->snap_rwsem);
3731                         close_sessions = true;
3732                         if (err == -EIO)
3733                                 ceph_msg_dump(msg);
3734                         goto out_err;
3735                 }
3736                 downgrade_write(&mdsc->snap_rwsem);
3737         } else {
3738                 down_read(&mdsc->snap_rwsem);
3739         }
3740
3741         /* insert trace into our cache */
3742         mutex_lock(&req->r_fill_mutex);
3743         current->journal_info = req;
3744         err = ceph_fill_trace(mdsc->fsc->sb, req);
3745         if (err == 0) {
3746                 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3747                                     req->r_op == CEPH_MDS_OP_LSSNAP))
3748                         err = ceph_readdir_prepopulate(req, req->r_session);
3749         }
3750         current->journal_info = NULL;
3751         mutex_unlock(&req->r_fill_mutex);
3752
3753         up_read(&mdsc->snap_rwsem);
3754         if (realm)
3755                 ceph_put_snap_realm(mdsc, realm);
3756
3757         if (err == 0) {
3758                 if (req->r_target_inode &&
3759                     test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3760                         struct ceph_inode_info *ci =
3761                                 ceph_inode(req->r_target_inode);
3762                         spin_lock(&ci->i_unsafe_lock);
3763                         list_add_tail(&req->r_unsafe_target_item,
3764                                       &ci->i_unsafe_iops);
3765                         spin_unlock(&ci->i_unsafe_lock);
3766                 }
3767
3768                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3769         }
3770 out_err:
3771         mutex_lock(&mdsc->mutex);
3772         if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3773                 if (err) {
3774                         req->r_err = err;
3775                 } else {
3776                         req->r_reply =  ceph_msg_get(msg);
3777                         set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3778                 }
3779         } else {
3780                 dout("reply arrived after request %lld was aborted\n", tid);
3781         }
3782         mutex_unlock(&mdsc->mutex);
3783
3784         mutex_unlock(&session->s_mutex);
3785
3786         /* kick calling process */
3787         complete_request(mdsc, req);
3788
3789         ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
3790                                      req->r_end_latency, err);
3791 out:
3792         ceph_mdsc_put_request(req);
3793
3794         /* Defer closing the sessions after s_mutex lock being released */
3795         if (close_sessions)
3796                 ceph_mdsc_close_sessions(mdsc);
3797         return;
3798 }
3799
3800
3801
3802 /*
3803  * handle mds notification that our request has been forwarded.
3804  */
3805 static void handle_forward(struct ceph_mds_client *mdsc,
3806                            struct ceph_mds_session *session,
3807                            struct ceph_msg *msg)
3808 {
3809         struct ceph_mds_request *req;
3810         u64 tid = le64_to_cpu(msg->hdr.tid);
3811         u32 next_mds;
3812         u32 fwd_seq;
3813         int err = -EINVAL;
3814         void *p = msg->front.iov_base;
3815         void *end = p + msg->front.iov_len;
3816         bool aborted = false;
3817
3818         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3819         next_mds = ceph_decode_32(&p);
3820         fwd_seq = ceph_decode_32(&p);
3821
3822         mutex_lock(&mdsc->mutex);
3823         req = lookup_get_request(mdsc, tid);
3824         if (!req) {
3825                 mutex_unlock(&mdsc->mutex);
3826                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3827                 return;  /* dup reply? */
3828         }
3829
3830         if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3831                 dout("forward tid %llu aborted, unregistering\n", tid);
3832                 __unregister_request(mdsc, req);
3833         } else if (fwd_seq <= req->r_num_fwd) {
3834                 /*
3835                  * The type of 'num_fwd' in ceph 'MClientRequestForward'
3836                  * is 'int32_t', while in 'ceph_mds_request_head' the
3837                  * type is '__u8'. So in case the request bounces between
3838                  * MDSes exceeding 256 times, the client will get stuck.
3839                  *
3840                  * In this case it's ususally a bug in MDS and continue
3841                  * bouncing the request makes no sense.
3842                  *
3843                  * In future this could be fixed in ceph code, so avoid
3844                  * using the hardcode here.
3845                  */
3846                 int max = sizeof_field(struct ceph_mds_request_head, num_fwd);
3847                 max = 1 << (max * BITS_PER_BYTE);
3848                 if (req->r_num_fwd >= max) {
3849                         mutex_lock(&req->r_fill_mutex);
3850                         req->r_err = -EMULTIHOP;
3851                         set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3852                         mutex_unlock(&req->r_fill_mutex);
3853                         aborted = true;
3854                         pr_warn_ratelimited("forward tid %llu seq overflow\n",
3855                                             tid);
3856                 } else {
3857                         dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3858                              tid, next_mds, req->r_num_fwd, fwd_seq);
3859                 }
3860         } else {
3861                 /* resend. forward race not possible; mds would drop */
3862                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3863                 BUG_ON(req->r_err);
3864                 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3865                 req->r_attempts = 0;
3866                 req->r_num_fwd = fwd_seq;
3867                 req->r_resend_mds = next_mds;
3868                 put_request_session(req);
3869                 __do_request(mdsc, req);
3870         }
3871         mutex_unlock(&mdsc->mutex);
3872
3873         /* kick calling process */
3874         if (aborted)
3875                 complete_request(mdsc, req);
3876         ceph_mdsc_put_request(req);
3877         return;
3878
3879 bad:
3880         pr_err("mdsc_handle_forward decode error err=%d\n", err);
3881         ceph_msg_dump(msg);
3882 }
3883
3884 static int __decode_session_metadata(void **p, void *end,
3885                                      bool *blocklisted)
3886 {
3887         /* map<string,string> */
3888         u32 n;
3889         bool err_str;
3890         ceph_decode_32_safe(p, end, n, bad);
3891         while (n-- > 0) {
3892                 u32 len;
3893                 ceph_decode_32_safe(p, end, len, bad);
3894                 ceph_decode_need(p, end, len, bad);
3895                 err_str = !strncmp(*p, "error_string", len);
3896                 *p += len;
3897                 ceph_decode_32_safe(p, end, len, bad);
3898                 ceph_decode_need(p, end, len, bad);
3899                 /*
3900                  * Match "blocklisted (blacklisted)" from newer MDSes,
3901                  * or "blacklisted" from older MDSes.
3902                  */
3903                 if (err_str && strnstr(*p, "blacklisted", len))
3904                         *blocklisted = true;
3905                 *p += len;
3906         }
3907         return 0;
3908 bad:
3909         return -1;
3910 }
3911
3912 /*
3913  * handle a mds session control message
3914  */
3915 static void handle_session(struct ceph_mds_session *session,
3916                            struct ceph_msg *msg)
3917 {
3918         struct ceph_mds_client *mdsc = session->s_mdsc;
3919         int mds = session->s_mds;
3920         int msg_version = le16_to_cpu(msg->hdr.version);
3921         void *p = msg->front.iov_base;
3922         void *end = p + msg->front.iov_len;
3923         struct ceph_mds_session_head *h;
3924         u32 op;
3925         u64 seq, features = 0;
3926         int wake = 0;
3927         bool blocklisted = false;
3928
3929         /* decode */
3930         ceph_decode_need(&p, end, sizeof(*h), bad);
3931         h = p;
3932         p += sizeof(*h);
3933
3934         op = le32_to_cpu(h->op);
3935         seq = le64_to_cpu(h->seq);
3936
3937         if (msg_version >= 3) {
3938                 u32 len;
3939                 /* version >= 2 and < 5, decode metadata, skip otherwise
3940                  * as it's handled via flags.
3941                  */
3942                 if (msg_version >= 5)
3943                         ceph_decode_skip_map(&p, end, string, string, bad);
3944                 else if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3945                         goto bad;
3946
3947                 /* version >= 3, feature bits */
3948                 ceph_decode_32_safe(&p, end, len, bad);
3949                 if (len) {
3950                         ceph_decode_64_safe(&p, end, features, bad);
3951                         p += len - sizeof(features);
3952                 }
3953         }
3954
3955         if (msg_version >= 5) {
3956                 u32 flags, len;
3957
3958                 /* version >= 4 */
3959                 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */
3960                 ceph_decode_32_safe(&p, end, len, bad); /* len */
3961                 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */
3962
3963                 /* version >= 5, flags   */
3964                 ceph_decode_32_safe(&p, end, flags, bad);
3965                 if (flags & CEPH_SESSION_BLOCKLISTED) {
3966                         pr_warn("mds%d session blocklisted\n", session->s_mds);
3967                         blocklisted = true;
3968                 }
3969         }
3970
3971         mutex_lock(&mdsc->mutex);
3972         if (op == CEPH_SESSION_CLOSE) {
3973                 ceph_get_mds_session(session);
3974                 __unregister_session(mdsc, session);
3975         }
3976         /* FIXME: this ttl calculation is generous */
3977         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3978         mutex_unlock(&mdsc->mutex);
3979
3980         mutex_lock(&session->s_mutex);
3981
3982         dout("handle_session mds%d %s %p state %s seq %llu\n",
3983              mds, ceph_session_op_name(op), session,
3984              ceph_session_state_name(session->s_state), seq);
3985
3986         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3987                 session->s_state = CEPH_MDS_SESSION_OPEN;
3988                 pr_info("mds%d came back\n", session->s_mds);
3989         }
3990
3991         switch (op) {
3992         case CEPH_SESSION_OPEN:
3993                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3994                         pr_info("mds%d reconnect success\n", session->s_mds);
3995
3996                 if (session->s_state == CEPH_MDS_SESSION_OPEN) {
3997                         pr_notice("mds%d is already opened\n", session->s_mds);
3998                 } else {
3999                         session->s_state = CEPH_MDS_SESSION_OPEN;
4000                         session->s_features = features;
4001                         renewed_caps(mdsc, session, 0);
4002                         if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
4003                                      &session->s_features))
4004                                 metric_schedule_delayed(&mdsc->metric);
4005                 }
4006
4007                 /*
4008                  * The connection maybe broken and the session in client
4009                  * side has been reinitialized, need to update the seq
4010                  * anyway.
4011                  */
4012                 if (!session->s_seq && seq)
4013                         session->s_seq = seq;
4014
4015                 wake = 1;
4016                 if (mdsc->stopping)
4017                         __close_session(mdsc, session);
4018                 break;
4019
4020         case CEPH_SESSION_RENEWCAPS:
4021                 if (session->s_renew_seq == seq)
4022                         renewed_caps(mdsc, session, 1);
4023                 break;
4024
4025         case CEPH_SESSION_CLOSE:
4026                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4027                         pr_info("mds%d reconnect denied\n", session->s_mds);
4028                 session->s_state = CEPH_MDS_SESSION_CLOSED;
4029                 cleanup_session_requests(mdsc, session);
4030                 remove_session_caps(session);
4031                 wake = 2; /* for good measure */
4032                 wake_up_all(&mdsc->session_close_wq);
4033                 break;
4034
4035         case CEPH_SESSION_STALE:
4036                 pr_info("mds%d caps went stale, renewing\n",
4037                         session->s_mds);
4038                 atomic_inc(&session->s_cap_gen);
4039                 session->s_cap_ttl = jiffies - 1;
4040                 send_renew_caps(mdsc, session);
4041                 break;
4042
4043         case CEPH_SESSION_RECALL_STATE:
4044                 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
4045                 break;
4046
4047         case CEPH_SESSION_FLUSHMSG:
4048                 /* flush cap releases */
4049                 spin_lock(&session->s_cap_lock);
4050                 if (session->s_num_cap_releases)
4051                         ceph_flush_cap_releases(mdsc, session);
4052                 spin_unlock(&session->s_cap_lock);
4053
4054                 send_flushmsg_ack(mdsc, session, seq);
4055                 break;
4056
4057         case CEPH_SESSION_FORCE_RO:
4058                 dout("force_session_readonly %p\n", session);
4059                 spin_lock(&session->s_cap_lock);
4060                 session->s_readonly = true;
4061                 spin_unlock(&session->s_cap_lock);
4062                 wake_up_session_caps(session, FORCE_RO);
4063                 break;
4064
4065         case CEPH_SESSION_REJECT:
4066                 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
4067                 pr_info("mds%d rejected session\n", session->s_mds);
4068                 session->s_state = CEPH_MDS_SESSION_REJECTED;
4069                 cleanup_session_requests(mdsc, session);
4070                 remove_session_caps(session);
4071                 if (blocklisted)
4072                         mdsc->fsc->blocklisted = true;
4073                 wake = 2; /* for good measure */
4074                 break;
4075
4076         default:
4077                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
4078                 WARN_ON(1);
4079         }
4080
4081         mutex_unlock(&session->s_mutex);
4082         if (wake) {
4083                 mutex_lock(&mdsc->mutex);
4084                 __wake_requests(mdsc, &session->s_waiting);
4085                 if (wake == 2)
4086                         kick_requests(mdsc, mds);
4087                 mutex_unlock(&mdsc->mutex);
4088         }
4089         if (op == CEPH_SESSION_CLOSE)
4090                 ceph_put_mds_session(session);
4091         return;
4092
4093 bad:
4094         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
4095                (int)msg->front.iov_len);
4096         ceph_msg_dump(msg);
4097         return;
4098 }
4099
4100 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
4101 {
4102         int dcaps;
4103
4104         dcaps = xchg(&req->r_dir_caps, 0);
4105         if (dcaps) {
4106                 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4107                 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
4108         }
4109 }
4110
4111 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
4112 {
4113         int dcaps;
4114
4115         dcaps = xchg(&req->r_dir_caps, 0);
4116         if (dcaps) {
4117                 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4118                 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
4119                                                 dcaps);
4120         }
4121 }
4122
4123 /*
4124  * called under session->mutex.
4125  */
4126 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
4127                                    struct ceph_mds_session *session)
4128 {
4129         struct ceph_mds_request *req, *nreq;
4130         struct rb_node *p;
4131
4132         dout("replay_unsafe_requests mds%d\n", session->s_mds);
4133
4134         mutex_lock(&mdsc->mutex);
4135         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
4136                 __send_request(session, req, true);
4137
4138         /*
4139          * also re-send old requests when MDS enters reconnect stage. So that MDS
4140          * can process completed request in clientreplay stage.
4141          */
4142         p = rb_first(&mdsc->request_tree);
4143         while (p) {
4144                 req = rb_entry(p, struct ceph_mds_request, r_node);
4145                 p = rb_next(p);
4146                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
4147                         continue;
4148                 if (req->r_attempts == 0)
4149                         continue; /* only old requests */
4150                 if (!req->r_session)
4151                         continue;
4152                 if (req->r_session->s_mds != session->s_mds)
4153                         continue;
4154
4155                 ceph_mdsc_release_dir_caps_no_check(req);
4156
4157                 __send_request(session, req, true);
4158         }
4159         mutex_unlock(&mdsc->mutex);
4160 }
4161
4162 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
4163 {
4164         struct ceph_msg *reply;
4165         struct ceph_pagelist *_pagelist;
4166         struct page *page;
4167         __le32 *addr;
4168         int err = -ENOMEM;
4169
4170         if (!recon_state->allow_multi)
4171                 return -ENOSPC;
4172
4173         /* can't handle message that contains both caps and realm */
4174         BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
4175
4176         /* pre-allocate new pagelist */
4177         _pagelist = ceph_pagelist_alloc(GFP_NOFS);
4178         if (!_pagelist)
4179                 return -ENOMEM;
4180
4181         reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4182         if (!reply)
4183                 goto fail_msg;
4184
4185         /* placeholder for nr_caps */
4186         err = ceph_pagelist_encode_32(_pagelist, 0);
4187         if (err < 0)
4188                 goto fail;
4189
4190         if (recon_state->nr_caps) {
4191                 /* currently encoding caps */
4192                 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
4193                 if (err)
4194                         goto fail;
4195         } else {
4196                 /* placeholder for nr_realms (currently encoding relams) */
4197                 err = ceph_pagelist_encode_32(_pagelist, 0);
4198                 if (err < 0)
4199                         goto fail;
4200         }
4201
4202         err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
4203         if (err)
4204                 goto fail;
4205
4206         page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
4207         addr = kmap_atomic(page);
4208         if (recon_state->nr_caps) {
4209                 /* currently encoding caps */
4210                 *addr = cpu_to_le32(recon_state->nr_caps);
4211         } else {
4212                 /* currently encoding relams */
4213                 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
4214         }
4215         kunmap_atomic(addr);
4216
4217         reply->hdr.version = cpu_to_le16(5);
4218         reply->hdr.compat_version = cpu_to_le16(4);
4219
4220         reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
4221         ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
4222
4223         ceph_con_send(&recon_state->session->s_con, reply);
4224         ceph_pagelist_release(recon_state->pagelist);
4225
4226         recon_state->pagelist = _pagelist;
4227         recon_state->nr_caps = 0;
4228         recon_state->nr_realms = 0;
4229         recon_state->msg_version = 5;
4230         return 0;
4231 fail:
4232         ceph_msg_put(reply);
4233 fail_msg:
4234         ceph_pagelist_release(_pagelist);
4235         return err;
4236 }
4237
4238 static struct dentry* d_find_primary(struct inode *inode)
4239 {
4240         struct dentry *alias, *dn = NULL;
4241
4242         if (hlist_empty(&inode->i_dentry))
4243                 return NULL;
4244
4245         spin_lock(&inode->i_lock);
4246         if (hlist_empty(&inode->i_dentry))
4247                 goto out_unlock;
4248
4249         if (S_ISDIR(inode->i_mode)) {
4250                 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
4251                 if (!IS_ROOT(alias))
4252                         dn = dget(alias);
4253                 goto out_unlock;
4254         }
4255
4256         hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
4257                 spin_lock(&alias->d_lock);
4258                 if (!d_unhashed(alias) &&
4259                     (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
4260                         dn = dget_dlock(alias);
4261                 }
4262                 spin_unlock(&alias->d_lock);
4263                 if (dn)
4264                         break;
4265         }
4266 out_unlock:
4267         spin_unlock(&inode->i_lock);
4268         return dn;
4269 }
4270
4271 /*
4272  * Encode information about a cap for a reconnect with the MDS.
4273  */
4274 static int reconnect_caps_cb(struct inode *inode, int mds, void *arg)
4275 {
4276         union {
4277                 struct ceph_mds_cap_reconnect v2;
4278                 struct ceph_mds_cap_reconnect_v1 v1;
4279         } rec;
4280         struct ceph_inode_info *ci = ceph_inode(inode);
4281         struct ceph_reconnect_state *recon_state = arg;
4282         struct ceph_pagelist *pagelist = recon_state->pagelist;
4283         struct dentry *dentry;
4284         struct ceph_cap *cap;
4285         char *path;
4286         int pathlen = 0, err;
4287         u64 pathbase;
4288         u64 snap_follows;
4289
4290         dentry = d_find_primary(inode);
4291         if (dentry) {
4292                 /* set pathbase to parent dir when msg_version >= 2 */
4293                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
4294                                             recon_state->msg_version >= 2);
4295                 dput(dentry);
4296                 if (IS_ERR(path)) {
4297                         err = PTR_ERR(path);
4298                         goto out_err;
4299                 }
4300         } else {
4301                 path = NULL;
4302                 pathbase = 0;
4303         }
4304
4305         spin_lock(&ci->i_ceph_lock);
4306         cap = __get_cap_for_mds(ci, mds);
4307         if (!cap) {
4308                 spin_unlock(&ci->i_ceph_lock);
4309                 err = 0;
4310                 goto out_err;
4311         }
4312         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
4313              inode, ceph_vinop(inode), cap, cap->cap_id,
4314              ceph_cap_string(cap->issued));
4315
4316         cap->seq = 0;        /* reset cap seq */
4317         cap->issue_seq = 0;  /* and issue_seq */
4318         cap->mseq = 0;       /* and migrate_seq */
4319         cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
4320
4321         /* These are lost when the session goes away */
4322         if (S_ISDIR(inode->i_mode)) {
4323                 if (cap->issued & CEPH_CAP_DIR_CREATE) {
4324                         ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
4325                         memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
4326                 }
4327                 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
4328         }
4329
4330         if (recon_state->msg_version >= 2) {
4331                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
4332                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4333                 rec.v2.issued = cpu_to_le32(cap->issued);
4334                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4335                 rec.v2.pathbase = cpu_to_le64(pathbase);
4336                 rec.v2.flock_len = (__force __le32)
4337                         ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
4338         } else {
4339                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
4340                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4341                 rec.v1.issued = cpu_to_le32(cap->issued);
4342                 rec.v1.size = cpu_to_le64(i_size_read(inode));
4343                 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
4344                 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
4345                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4346                 rec.v1.pathbase = cpu_to_le64(pathbase);
4347         }
4348
4349         if (list_empty(&ci->i_cap_snaps)) {
4350                 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
4351         } else {
4352                 struct ceph_cap_snap *capsnap =
4353                         list_first_entry(&ci->i_cap_snaps,
4354                                          struct ceph_cap_snap, ci_item);
4355                 snap_follows = capsnap->follows;
4356         }
4357         spin_unlock(&ci->i_ceph_lock);
4358
4359         if (recon_state->msg_version >= 2) {
4360                 int num_fcntl_locks, num_flock_locks;
4361                 struct ceph_filelock *flocks = NULL;
4362                 size_t struct_len, total_len = sizeof(u64);
4363                 u8 struct_v = 0;
4364
4365 encode_again:
4366                 if (rec.v2.flock_len) {
4367                         ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
4368                 } else {
4369                         num_fcntl_locks = 0;
4370                         num_flock_locks = 0;
4371                 }
4372                 if (num_fcntl_locks + num_flock_locks > 0) {
4373                         flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
4374                                                sizeof(struct ceph_filelock),
4375                                                GFP_NOFS);
4376                         if (!flocks) {
4377                                 err = -ENOMEM;
4378                                 goto out_err;
4379                         }
4380                         err = ceph_encode_locks_to_buffer(inode, flocks,
4381                                                           num_fcntl_locks,
4382                                                           num_flock_locks);
4383                         if (err) {
4384                                 kfree(flocks);
4385                                 flocks = NULL;
4386                                 if (err == -ENOSPC)
4387                                         goto encode_again;
4388                                 goto out_err;
4389                         }
4390                 } else {
4391                         kfree(flocks);
4392                         flocks = NULL;
4393                 }
4394
4395                 if (recon_state->msg_version >= 3) {
4396                         /* version, compat_version and struct_len */
4397                         total_len += 2 * sizeof(u8) + sizeof(u32);
4398                         struct_v = 2;
4399                 }
4400                 /*
4401                  * number of encoded locks is stable, so copy to pagelist
4402                  */
4403                 struct_len = 2 * sizeof(u32) +
4404                             (num_fcntl_locks + num_flock_locks) *
4405                             sizeof(struct ceph_filelock);
4406                 rec.v2.flock_len = cpu_to_le32(struct_len);
4407
4408                 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
4409
4410                 if (struct_v >= 2)
4411                         struct_len += sizeof(u64); /* snap_follows */
4412
4413                 total_len += struct_len;
4414
4415                 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
4416                         err = send_reconnect_partial(recon_state);
4417                         if (err)
4418                                 goto out_freeflocks;
4419                         pagelist = recon_state->pagelist;
4420                 }
4421
4422                 err = ceph_pagelist_reserve(pagelist, total_len);
4423                 if (err)
4424                         goto out_freeflocks;
4425
4426                 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4427                 if (recon_state->msg_version >= 3) {
4428                         ceph_pagelist_encode_8(pagelist, struct_v);
4429                         ceph_pagelist_encode_8(pagelist, 1);
4430                         ceph_pagelist_encode_32(pagelist, struct_len);
4431                 }
4432                 ceph_pagelist_encode_string(pagelist, path, pathlen);
4433                 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
4434                 ceph_locks_to_pagelist(flocks, pagelist,
4435                                        num_fcntl_locks, num_flock_locks);
4436                 if (struct_v >= 2)
4437                         ceph_pagelist_encode_64(pagelist, snap_follows);
4438 out_freeflocks:
4439                 kfree(flocks);
4440         } else {
4441                 err = ceph_pagelist_reserve(pagelist,
4442                                             sizeof(u64) + sizeof(u32) +
4443                                             pathlen + sizeof(rec.v1));
4444                 if (err)
4445                         goto out_err;
4446
4447                 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4448                 ceph_pagelist_encode_string(pagelist, path, pathlen);
4449                 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
4450         }
4451
4452 out_err:
4453         ceph_mdsc_free_path(path, pathlen);
4454         if (!err)
4455                 recon_state->nr_caps++;
4456         return err;
4457 }
4458
4459 static int encode_snap_realms(struct ceph_mds_client *mdsc,
4460                               struct ceph_reconnect_state *recon_state)
4461 {
4462         struct rb_node *p;
4463         struct ceph_pagelist *pagelist = recon_state->pagelist;
4464         int err = 0;
4465
4466         if (recon_state->msg_version >= 4) {
4467                 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
4468                 if (err < 0)
4469                         goto fail;
4470         }
4471
4472         /*
4473          * snaprealms.  we provide mds with the ino, seq (version), and
4474          * parent for all of our realms.  If the mds has any newer info,
4475          * it will tell us.
4476          */
4477         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
4478                 struct ceph_snap_realm *realm =
4479                        rb_entry(p, struct ceph_snap_realm, node);
4480                 struct ceph_mds_snaprealm_reconnect sr_rec;
4481
4482                 if (recon_state->msg_version >= 4) {
4483                         size_t need = sizeof(u8) * 2 + sizeof(u32) +
4484                                       sizeof(sr_rec);
4485
4486                         if (pagelist->length + need > RECONNECT_MAX_SIZE) {
4487                                 err = send_reconnect_partial(recon_state);
4488                                 if (err)
4489                                         goto fail;
4490                                 pagelist = recon_state->pagelist;
4491                         }
4492
4493                         err = ceph_pagelist_reserve(pagelist, need);
4494                         if (err)
4495                                 goto fail;
4496
4497                         ceph_pagelist_encode_8(pagelist, 1);
4498                         ceph_pagelist_encode_8(pagelist, 1);
4499                         ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
4500                 }
4501
4502                 dout(" adding snap realm %llx seq %lld parent %llx\n",
4503                      realm->ino, realm->seq, realm->parent_ino);
4504                 sr_rec.ino = cpu_to_le64(realm->ino);
4505                 sr_rec.seq = cpu_to_le64(realm->seq);
4506                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
4507
4508                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
4509                 if (err)
4510                         goto fail;
4511
4512                 recon_state->nr_realms++;
4513         }
4514 fail:
4515         return err;
4516 }
4517
4518
4519 /*
4520  * If an MDS fails and recovers, clients need to reconnect in order to
4521  * reestablish shared state.  This includes all caps issued through
4522  * this session _and_ the snap_realm hierarchy.  Because it's not
4523  * clear which snap realms the mds cares about, we send everything we
4524  * know about.. that ensures we'll then get any new info the
4525  * recovering MDS might have.
4526  *
4527  * This is a relatively heavyweight operation, but it's rare.
4528  */
4529 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
4530                                struct ceph_mds_session *session)
4531 {
4532         struct ceph_msg *reply;
4533         int mds = session->s_mds;
4534         int err = -ENOMEM;
4535         struct ceph_reconnect_state recon_state = {
4536                 .session = session,
4537         };
4538         LIST_HEAD(dispose);
4539
4540         pr_info("mds%d reconnect start\n", mds);
4541
4542         recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
4543         if (!recon_state.pagelist)
4544                 goto fail_nopagelist;
4545
4546         reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4547         if (!reply)
4548                 goto fail_nomsg;
4549
4550         xa_destroy(&session->s_delegated_inos);
4551
4552         mutex_lock(&session->s_mutex);
4553         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4554         session->s_seq = 0;
4555
4556         dout("session %p state %s\n", session,
4557              ceph_session_state_name(session->s_state));
4558
4559         atomic_inc(&session->s_cap_gen);
4560
4561         spin_lock(&session->s_cap_lock);
4562         /* don't know if session is readonly */
4563         session->s_readonly = 0;
4564         /*
4565          * notify __ceph_remove_cap() that we are composing cap reconnect.
4566          * If a cap get released before being added to the cap reconnect,
4567          * __ceph_remove_cap() should skip queuing cap release.
4568          */
4569         session->s_cap_reconnect = 1;
4570         /* drop old cap expires; we're about to reestablish that state */
4571         detach_cap_releases(session, &dispose);
4572         spin_unlock(&session->s_cap_lock);
4573         dispose_cap_releases(mdsc, &dispose);
4574
4575         /* trim unused caps to reduce MDS's cache rejoin time */
4576         if (mdsc->fsc->sb->s_root)
4577                 shrink_dcache_parent(mdsc->fsc->sb->s_root);
4578
4579         ceph_con_close(&session->s_con);
4580         ceph_con_open(&session->s_con,
4581                       CEPH_ENTITY_TYPE_MDS, mds,
4582                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4583
4584         /* replay unsafe requests */
4585         replay_unsafe_requests(mdsc, session);
4586
4587         ceph_early_kick_flushing_caps(mdsc, session);
4588
4589         down_read(&mdsc->snap_rwsem);
4590
4591         /* placeholder for nr_caps */
4592         err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4593         if (err)
4594                 goto fail;
4595
4596         if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4597                 recon_state.msg_version = 3;
4598                 recon_state.allow_multi = true;
4599         } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4600                 recon_state.msg_version = 3;
4601         } else {
4602                 recon_state.msg_version = 2;
4603         }
4604         /* trsaverse this session's caps */
4605         err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4606
4607         spin_lock(&session->s_cap_lock);
4608         session->s_cap_reconnect = 0;
4609         spin_unlock(&session->s_cap_lock);
4610
4611         if (err < 0)
4612                 goto fail;
4613
4614         /* check if all realms can be encoded into current message */
4615         if (mdsc->num_snap_realms) {
4616                 size_t total_len =
4617                         recon_state.pagelist->length +
4618                         mdsc->num_snap_realms *
4619                         sizeof(struct ceph_mds_snaprealm_reconnect);
4620                 if (recon_state.msg_version >= 4) {
4621                         /* number of realms */
4622                         total_len += sizeof(u32);
4623                         /* version, compat_version and struct_len */
4624                         total_len += mdsc->num_snap_realms *
4625                                      (2 * sizeof(u8) + sizeof(u32));
4626                 }
4627                 if (total_len > RECONNECT_MAX_SIZE) {
4628                         if (!recon_state.allow_multi) {
4629                                 err = -ENOSPC;
4630                                 goto fail;
4631                         }
4632                         if (recon_state.nr_caps) {
4633                                 err = send_reconnect_partial(&recon_state);
4634                                 if (err)
4635                                         goto fail;
4636                         }
4637                         recon_state.msg_version = 5;
4638                 }
4639         }
4640
4641         err = encode_snap_realms(mdsc, &recon_state);
4642         if (err < 0)
4643                 goto fail;
4644
4645         if (recon_state.msg_version >= 5) {
4646                 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4647                 if (err < 0)
4648                         goto fail;
4649         }
4650
4651         if (recon_state.nr_caps || recon_state.nr_realms) {
4652                 struct page *page =
4653                         list_first_entry(&recon_state.pagelist->head,
4654                                         struct page, lru);
4655                 __le32 *addr = kmap_atomic(page);
4656                 if (recon_state.nr_caps) {
4657                         WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4658                         *addr = cpu_to_le32(recon_state.nr_caps);
4659                 } else if (recon_state.msg_version >= 4) {
4660                         *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4661                 }
4662                 kunmap_atomic(addr);
4663         }
4664
4665         reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4666         if (recon_state.msg_version >= 4)
4667                 reply->hdr.compat_version = cpu_to_le16(4);
4668
4669         reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4670         ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4671
4672         ceph_con_send(&session->s_con, reply);
4673
4674         mutex_unlock(&session->s_mutex);
4675
4676         mutex_lock(&mdsc->mutex);
4677         __wake_requests(mdsc, &session->s_waiting);
4678         mutex_unlock(&mdsc->mutex);
4679
4680         up_read(&mdsc->snap_rwsem);
4681         ceph_pagelist_release(recon_state.pagelist);
4682         return;
4683
4684 fail:
4685         ceph_msg_put(reply);
4686         up_read(&mdsc->snap_rwsem);
4687         mutex_unlock(&session->s_mutex);
4688 fail_nomsg:
4689         ceph_pagelist_release(recon_state.pagelist);
4690 fail_nopagelist:
4691         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4692         return;
4693 }
4694
4695
4696 /*
4697  * compare old and new mdsmaps, kicking requests
4698  * and closing out old connections as necessary
4699  *
4700  * called under mdsc->mutex.
4701  */
4702 static void check_new_map(struct ceph_mds_client *mdsc,
4703                           struct ceph_mdsmap *newmap,
4704                           struct ceph_mdsmap *oldmap)
4705 {
4706         int i, j, err;
4707         int oldstate, newstate;
4708         struct ceph_mds_session *s;
4709         unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
4710
4711         dout("check_new_map new %u old %u\n",
4712              newmap->m_epoch, oldmap->m_epoch);
4713
4714         if (newmap->m_info) {
4715                 for (i = 0; i < newmap->possible_max_rank; i++) {
4716                         for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
4717                                 set_bit(newmap->m_info[i].export_targets[j], targets);
4718                 }
4719         }
4720
4721         for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4722                 if (!mdsc->sessions[i])
4723                         continue;
4724                 s = mdsc->sessions[i];
4725                 oldstate = ceph_mdsmap_get_state(oldmap, i);
4726                 newstate = ceph_mdsmap_get_state(newmap, i);
4727
4728                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4729                      i, ceph_mds_state_name(oldstate),
4730                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4731                      ceph_mds_state_name(newstate),
4732                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4733                      ceph_session_state_name(s->s_state));
4734
4735                 if (i >= newmap->possible_max_rank) {
4736                         /* force close session for stopped mds */
4737                         ceph_get_mds_session(s);
4738                         __unregister_session(mdsc, s);
4739                         __wake_requests(mdsc, &s->s_waiting);
4740                         mutex_unlock(&mdsc->mutex);
4741
4742                         mutex_lock(&s->s_mutex);
4743                         cleanup_session_requests(mdsc, s);
4744                         remove_session_caps(s);
4745                         mutex_unlock(&s->s_mutex);
4746
4747                         ceph_put_mds_session(s);
4748
4749                         mutex_lock(&mdsc->mutex);
4750                         kick_requests(mdsc, i);
4751                         continue;
4752                 }
4753
4754                 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4755                            ceph_mdsmap_get_addr(newmap, i),
4756                            sizeof(struct ceph_entity_addr))) {
4757                         /* just close it */
4758                         mutex_unlock(&mdsc->mutex);
4759                         mutex_lock(&s->s_mutex);
4760                         mutex_lock(&mdsc->mutex);
4761                         ceph_con_close(&s->s_con);
4762                         mutex_unlock(&s->s_mutex);
4763                         s->s_state = CEPH_MDS_SESSION_RESTARTING;
4764                 } else if (oldstate == newstate) {
4765                         continue;  /* nothing new with this mds */
4766                 }
4767
4768                 /*
4769                  * send reconnect?
4770                  */
4771                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4772                     newstate >= CEPH_MDS_STATE_RECONNECT) {
4773                         mutex_unlock(&mdsc->mutex);
4774                         clear_bit(i, targets);
4775                         send_mds_reconnect(mdsc, s);
4776                         mutex_lock(&mdsc->mutex);
4777                 }
4778
4779                 /*
4780                  * kick request on any mds that has gone active.
4781                  */
4782                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4783                     newstate >= CEPH_MDS_STATE_ACTIVE) {
4784                         if (oldstate != CEPH_MDS_STATE_CREATING &&
4785                             oldstate != CEPH_MDS_STATE_STARTING)
4786                                 pr_info("mds%d recovery completed\n", s->s_mds);
4787                         kick_requests(mdsc, i);
4788                         mutex_unlock(&mdsc->mutex);
4789                         mutex_lock(&s->s_mutex);
4790                         mutex_lock(&mdsc->mutex);
4791                         ceph_kick_flushing_caps(mdsc, s);
4792                         mutex_unlock(&s->s_mutex);
4793                         wake_up_session_caps(s, RECONNECT);
4794                 }
4795         }
4796
4797         /*
4798          * Only open and reconnect sessions that don't exist yet.
4799          */
4800         for (i = 0; i < newmap->possible_max_rank; i++) {
4801                 /*
4802                  * In case the import MDS is crashed just after
4803                  * the EImportStart journal is flushed, so when
4804                  * a standby MDS takes over it and is replaying
4805                  * the EImportStart journal the new MDS daemon
4806                  * will wait the client to reconnect it, but the
4807                  * client may never register/open the session yet.
4808                  *
4809                  * Will try to reconnect that MDS daemon if the
4810                  * rank number is in the export targets array and
4811                  * is the up:reconnect state.
4812                  */
4813                 newstate = ceph_mdsmap_get_state(newmap, i);
4814                 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
4815                         continue;
4816
4817                 /*
4818                  * The session maybe registered and opened by some
4819                  * requests which were choosing random MDSes during
4820                  * the mdsc->mutex's unlock/lock gap below in rare
4821                  * case. But the related MDS daemon will just queue
4822                  * that requests and be still waiting for the client's
4823                  * reconnection request in up:reconnect state.
4824                  */
4825                 s = __ceph_lookup_mds_session(mdsc, i);
4826                 if (likely(!s)) {
4827                         s = __open_export_target_session(mdsc, i);
4828                         if (IS_ERR(s)) {
4829                                 err = PTR_ERR(s);
4830                                 pr_err("failed to open export target session, err %d\n",
4831                                        err);
4832                                 continue;
4833                         }
4834                 }
4835                 dout("send reconnect to export target mds.%d\n", i);
4836                 mutex_unlock(&mdsc->mutex);
4837                 send_mds_reconnect(mdsc, s);
4838                 ceph_put_mds_session(s);
4839                 mutex_lock(&mdsc->mutex);
4840         }
4841
4842         for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4843                 s = mdsc->sessions[i];
4844                 if (!s)
4845                         continue;
4846                 if (!ceph_mdsmap_is_laggy(newmap, i))
4847                         continue;
4848                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4849                     s->s_state == CEPH_MDS_SESSION_HUNG ||
4850                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
4851                         dout(" connecting to export targets of laggy mds%d\n",
4852                              i);
4853                         __open_export_target_sessions(mdsc, s);
4854                 }
4855         }
4856 }
4857
4858
4859
4860 /*
4861  * leases
4862  */
4863
4864 /*
4865  * caller must hold session s_mutex, dentry->d_lock
4866  */
4867 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4868 {
4869         struct ceph_dentry_info *di = ceph_dentry(dentry);
4870
4871         ceph_put_mds_session(di->lease_session);
4872         di->lease_session = NULL;
4873 }
4874
4875 static void handle_lease(struct ceph_mds_client *mdsc,
4876                          struct ceph_mds_session *session,
4877                          struct ceph_msg *msg)
4878 {
4879         struct super_block *sb = mdsc->fsc->sb;
4880         struct inode *inode;
4881         struct dentry *parent, *dentry;
4882         struct ceph_dentry_info *di;
4883         int mds = session->s_mds;
4884         struct ceph_mds_lease *h = msg->front.iov_base;
4885         u32 seq;
4886         struct ceph_vino vino;
4887         struct qstr dname;
4888         int release = 0;
4889
4890         dout("handle_lease from mds%d\n", mds);
4891
4892         if (!ceph_inc_mds_stopping_blocker(mdsc, session))
4893                 return;
4894
4895         /* decode */
4896         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4897                 goto bad;
4898         vino.ino = le64_to_cpu(h->ino);
4899         vino.snap = CEPH_NOSNAP;
4900         seq = le32_to_cpu(h->seq);
4901         dname.len = get_unaligned_le32(h + 1);
4902         if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4903                 goto bad;
4904         dname.name = (void *)(h + 1) + sizeof(u32);
4905
4906         /* lookup inode */
4907         inode = ceph_find_inode(sb, vino);
4908         dout("handle_lease %s, ino %llx %p %.*s\n",
4909              ceph_lease_op_name(h->action), vino.ino, inode,
4910              dname.len, dname.name);
4911
4912         mutex_lock(&session->s_mutex);
4913         if (!inode) {
4914                 dout("handle_lease no inode %llx\n", vino.ino);
4915                 goto release;
4916         }
4917
4918         /* dentry */
4919         parent = d_find_alias(inode);
4920         if (!parent) {
4921                 dout("no parent dentry on inode %p\n", inode);
4922                 WARN_ON(1);
4923                 goto release;  /* hrm... */
4924         }
4925         dname.hash = full_name_hash(parent, dname.name, dname.len);
4926         dentry = d_lookup(parent, &dname);
4927         dput(parent);
4928         if (!dentry)
4929                 goto release;
4930
4931         spin_lock(&dentry->d_lock);
4932         di = ceph_dentry(dentry);
4933         switch (h->action) {
4934         case CEPH_MDS_LEASE_REVOKE:
4935                 if (di->lease_session == session) {
4936                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4937                                 h->seq = cpu_to_le32(di->lease_seq);
4938                         __ceph_mdsc_drop_dentry_lease(dentry);
4939                 }
4940                 release = 1;
4941                 break;
4942
4943         case CEPH_MDS_LEASE_RENEW:
4944                 if (di->lease_session == session &&
4945                     di->lease_gen == atomic_read(&session->s_cap_gen) &&
4946                     di->lease_renew_from &&
4947                     di->lease_renew_after == 0) {
4948                         unsigned long duration =
4949                                 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4950
4951                         di->lease_seq = seq;
4952                         di->time = di->lease_renew_from + duration;
4953                         di->lease_renew_after = di->lease_renew_from +
4954                                 (duration >> 1);
4955                         di->lease_renew_from = 0;
4956                 }
4957                 break;
4958         }
4959         spin_unlock(&dentry->d_lock);
4960         dput(dentry);
4961
4962         if (!release)
4963                 goto out;
4964
4965 release:
4966         /* let's just reuse the same message */
4967         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4968         ceph_msg_get(msg);
4969         ceph_con_send(&session->s_con, msg);
4970
4971 out:
4972         mutex_unlock(&session->s_mutex);
4973         iput(inode);
4974
4975         ceph_dec_mds_stopping_blocker(mdsc);
4976         return;
4977
4978 bad:
4979         ceph_dec_mds_stopping_blocker(mdsc);
4980
4981         pr_err("corrupt lease message\n");
4982         ceph_msg_dump(msg);
4983 }
4984
4985 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4986                               struct dentry *dentry, char action,
4987                               u32 seq)
4988 {
4989         struct ceph_msg *msg;
4990         struct ceph_mds_lease *lease;
4991         struct inode *dir;
4992         int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4993
4994         dout("lease_send_msg identry %p %s to mds%d\n",
4995              dentry, ceph_lease_op_name(action), session->s_mds);
4996
4997         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4998         if (!msg)
4999                 return;
5000         lease = msg->front.iov_base;
5001         lease->action = action;
5002         lease->seq = cpu_to_le32(seq);
5003
5004         spin_lock(&dentry->d_lock);
5005         dir = d_inode(dentry->d_parent);
5006         lease->ino = cpu_to_le64(ceph_ino(dir));
5007         lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
5008
5009         put_unaligned_le32(dentry->d_name.len, lease + 1);
5010         memcpy((void *)(lease + 1) + 4,
5011                dentry->d_name.name, dentry->d_name.len);
5012         spin_unlock(&dentry->d_lock);
5013
5014         ceph_con_send(&session->s_con, msg);
5015 }
5016
5017 /*
5018  * lock unlock the session, to wait ongoing session activities
5019  */
5020 static void lock_unlock_session(struct ceph_mds_session *s)
5021 {
5022         mutex_lock(&s->s_mutex);
5023         mutex_unlock(&s->s_mutex);
5024 }
5025
5026 static void maybe_recover_session(struct ceph_mds_client *mdsc)
5027 {
5028         struct ceph_fs_client *fsc = mdsc->fsc;
5029
5030         if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
5031                 return;
5032
5033         if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
5034                 return;
5035
5036         if (!READ_ONCE(fsc->blocklisted))
5037                 return;
5038
5039         pr_info("auto reconnect after blocklisted\n");
5040         ceph_force_reconnect(fsc->sb);
5041 }
5042
5043 bool check_session_state(struct ceph_mds_session *s)
5044 {
5045         switch (s->s_state) {
5046         case CEPH_MDS_SESSION_OPEN:
5047                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
5048                         s->s_state = CEPH_MDS_SESSION_HUNG;
5049                         pr_info("mds%d hung\n", s->s_mds);
5050                 }
5051                 break;
5052         case CEPH_MDS_SESSION_CLOSING:
5053         case CEPH_MDS_SESSION_NEW:
5054         case CEPH_MDS_SESSION_RESTARTING:
5055         case CEPH_MDS_SESSION_CLOSED:
5056         case CEPH_MDS_SESSION_REJECTED:
5057                 return false;
5058         }
5059
5060         return true;
5061 }
5062
5063 /*
5064  * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
5065  * then we need to retransmit that request.
5066  */
5067 void inc_session_sequence(struct ceph_mds_session *s)
5068 {
5069         lockdep_assert_held(&s->s_mutex);
5070
5071         s->s_seq++;
5072
5073         if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
5074                 int ret;
5075
5076                 dout("resending session close request for mds%d\n", s->s_mds);
5077                 ret = request_close_session(s);
5078                 if (ret < 0)
5079                         pr_err("unable to close session to mds%d: %d\n",
5080                                s->s_mds, ret);
5081         }
5082 }
5083
5084 /*
5085  * delayed work -- periodically trim expired leases, renew caps with mds.  If
5086  * the @delay parameter is set to 0 or if it's more than 5 secs, the default
5087  * workqueue delay value of 5 secs will be used.
5088  */
5089 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
5090 {
5091         unsigned long max_delay = HZ * 5;
5092
5093         /* 5 secs default delay */
5094         if (!delay || (delay > max_delay))
5095                 delay = max_delay;
5096         schedule_delayed_work(&mdsc->delayed_work,
5097                               round_jiffies_relative(delay));
5098 }
5099
5100 static void delayed_work(struct work_struct *work)
5101 {
5102         struct ceph_mds_client *mdsc =
5103                 container_of(work, struct ceph_mds_client, delayed_work.work);
5104         unsigned long delay;
5105         int renew_interval;
5106         int renew_caps;
5107         int i;
5108
5109         dout("mdsc delayed_work\n");
5110
5111         if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
5112                 return;
5113
5114         mutex_lock(&mdsc->mutex);
5115         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
5116         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
5117                                    mdsc->last_renew_caps);
5118         if (renew_caps)
5119                 mdsc->last_renew_caps = jiffies;
5120
5121         for (i = 0; i < mdsc->max_sessions; i++) {
5122                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
5123                 if (!s)
5124                         continue;
5125
5126                 if (!check_session_state(s)) {
5127                         ceph_put_mds_session(s);
5128                         continue;
5129                 }
5130                 mutex_unlock(&mdsc->mutex);
5131
5132                 mutex_lock(&s->s_mutex);
5133                 if (renew_caps)
5134                         send_renew_caps(mdsc, s);
5135                 else
5136                         ceph_con_keepalive(&s->s_con);
5137                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5138                     s->s_state == CEPH_MDS_SESSION_HUNG)
5139                         ceph_send_cap_releases(mdsc, s);
5140                 mutex_unlock(&s->s_mutex);
5141                 ceph_put_mds_session(s);
5142
5143                 mutex_lock(&mdsc->mutex);
5144         }
5145         mutex_unlock(&mdsc->mutex);
5146
5147         delay = ceph_check_delayed_caps(mdsc);
5148
5149         ceph_queue_cap_reclaim_work(mdsc);
5150
5151         ceph_trim_snapid_map(mdsc);
5152
5153         maybe_recover_session(mdsc);
5154
5155         schedule_delayed(mdsc, delay);
5156 }
5157
5158 int ceph_mdsc_init(struct ceph_fs_client *fsc)
5159
5160 {
5161         struct ceph_mds_client *mdsc;
5162         int err;
5163
5164         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
5165         if (!mdsc)
5166                 return -ENOMEM;
5167         mdsc->fsc = fsc;
5168         mutex_init(&mdsc->mutex);
5169         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
5170         if (!mdsc->mdsmap) {
5171                 err = -ENOMEM;
5172                 goto err_mdsc;
5173         }
5174
5175         init_completion(&mdsc->safe_umount_waiters);
5176         spin_lock_init(&mdsc->stopping_lock);
5177         atomic_set(&mdsc->stopping_blockers, 0);
5178         init_completion(&mdsc->stopping_waiter);
5179         init_waitqueue_head(&mdsc->session_close_wq);
5180         INIT_LIST_HEAD(&mdsc->waiting_for_map);
5181         mdsc->quotarealms_inodes = RB_ROOT;
5182         mutex_init(&mdsc->quotarealms_inodes_mutex);
5183         init_rwsem(&mdsc->snap_rwsem);
5184         mdsc->snap_realms = RB_ROOT;
5185         INIT_LIST_HEAD(&mdsc->snap_empty);
5186         spin_lock_init(&mdsc->snap_empty_lock);
5187         mdsc->request_tree = RB_ROOT;
5188         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
5189         mdsc->last_renew_caps = jiffies;
5190         INIT_LIST_HEAD(&mdsc->cap_delay_list);
5191         INIT_LIST_HEAD(&mdsc->cap_wait_list);
5192         spin_lock_init(&mdsc->cap_delay_lock);
5193         INIT_LIST_HEAD(&mdsc->snap_flush_list);
5194         spin_lock_init(&mdsc->snap_flush_lock);
5195         mdsc->last_cap_flush_tid = 1;
5196         INIT_LIST_HEAD(&mdsc->cap_flush_list);
5197         INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
5198         spin_lock_init(&mdsc->cap_dirty_lock);
5199         init_waitqueue_head(&mdsc->cap_flushing_wq);
5200         INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
5201         err = ceph_metric_init(&mdsc->metric);
5202         if (err)
5203                 goto err_mdsmap;
5204
5205         spin_lock_init(&mdsc->dentry_list_lock);
5206         INIT_LIST_HEAD(&mdsc->dentry_leases);
5207         INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
5208
5209         ceph_caps_init(mdsc);
5210         ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
5211
5212         spin_lock_init(&mdsc->snapid_map_lock);
5213         mdsc->snapid_map_tree = RB_ROOT;
5214         INIT_LIST_HEAD(&mdsc->snapid_map_lru);
5215
5216         init_rwsem(&mdsc->pool_perm_rwsem);
5217         mdsc->pool_perm_tree = RB_ROOT;
5218
5219         strscpy(mdsc->nodename, utsname()->nodename,
5220                 sizeof(mdsc->nodename));
5221
5222         fsc->mdsc = mdsc;
5223         return 0;
5224
5225 err_mdsmap:
5226         kfree(mdsc->mdsmap);
5227 err_mdsc:
5228         kfree(mdsc);
5229         return err;
5230 }
5231
5232 /*
5233  * Wait for safe replies on open mds requests.  If we time out, drop
5234  * all requests from the tree to avoid dangling dentry refs.
5235  */
5236 static void wait_requests(struct ceph_mds_client *mdsc)
5237 {
5238         struct ceph_options *opts = mdsc->fsc->client->options;
5239         struct ceph_mds_request *req;
5240
5241         mutex_lock(&mdsc->mutex);
5242         if (__get_oldest_req(mdsc)) {
5243                 mutex_unlock(&mdsc->mutex);
5244
5245                 dout("wait_requests waiting for requests\n");
5246                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
5247                                     ceph_timeout_jiffies(opts->mount_timeout));
5248
5249                 /* tear down remaining requests */
5250                 mutex_lock(&mdsc->mutex);
5251                 while ((req = __get_oldest_req(mdsc))) {
5252                         dout("wait_requests timed out on tid %llu\n",
5253                              req->r_tid);
5254                         list_del_init(&req->r_wait);
5255                         __unregister_request(mdsc, req);
5256                 }
5257         }
5258         mutex_unlock(&mdsc->mutex);
5259         dout("wait_requests done\n");
5260 }
5261
5262 void send_flush_mdlog(struct ceph_mds_session *s)
5263 {
5264         struct ceph_msg *msg;
5265
5266         /*
5267          * Pre-luminous MDS crashes when it sees an unknown session request
5268          */
5269         if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
5270                 return;
5271
5272         mutex_lock(&s->s_mutex);
5273         dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
5274              ceph_session_state_name(s->s_state), s->s_seq);
5275         msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
5276                                       s->s_seq);
5277         if (!msg) {
5278                 pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
5279                        s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5280         } else {
5281                 ceph_con_send(&s->s_con, msg);
5282         }
5283         mutex_unlock(&s->s_mutex);
5284 }
5285
5286 /*
5287  * called before mount is ro, and before dentries are torn down.
5288  * (hmm, does this still race with new lookups?)
5289  */
5290 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
5291 {
5292         dout("pre_umount\n");
5293         mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
5294
5295         ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
5296         ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
5297         ceph_flush_dirty_caps(mdsc);
5298         wait_requests(mdsc);
5299
5300         /*
5301          * wait for reply handlers to drop their request refs and
5302          * their inode/dcache refs
5303          */
5304         ceph_msgr_flush();
5305
5306         ceph_cleanup_quotarealms_inodes(mdsc);
5307 }
5308
5309 /*
5310  * flush the mdlog and wait for all write mds requests to flush.
5311  */
5312 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc,
5313                                                  u64 want_tid)
5314 {
5315         struct ceph_mds_request *req = NULL, *nextreq;
5316         struct ceph_mds_session *last_session = NULL;
5317         struct rb_node *n;
5318
5319         mutex_lock(&mdsc->mutex);
5320         dout("%s want %lld\n", __func__, want_tid);
5321 restart:
5322         req = __get_oldest_req(mdsc);
5323         while (req && req->r_tid <= want_tid) {
5324                 /* find next request */
5325                 n = rb_next(&req->r_node);
5326                 if (n)
5327                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
5328                 else
5329                         nextreq = NULL;
5330                 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
5331                     (req->r_op & CEPH_MDS_OP_WRITE)) {
5332                         struct ceph_mds_session *s = req->r_session;
5333
5334                         if (!s) {
5335                                 req = nextreq;
5336                                 continue;
5337                         }
5338
5339                         /* write op */
5340                         ceph_mdsc_get_request(req);
5341                         if (nextreq)
5342                                 ceph_mdsc_get_request(nextreq);
5343                         s = ceph_get_mds_session(s);
5344                         mutex_unlock(&mdsc->mutex);
5345
5346                         /* send flush mdlog request to MDS */
5347                         if (last_session != s) {
5348                                 send_flush_mdlog(s);
5349                                 ceph_put_mds_session(last_session);
5350                                 last_session = s;
5351                         } else {
5352                                 ceph_put_mds_session(s);
5353                         }
5354                         dout("%s wait on %llu (want %llu)\n", __func__,
5355                              req->r_tid, want_tid);
5356                         wait_for_completion(&req->r_safe_completion);
5357
5358                         mutex_lock(&mdsc->mutex);
5359                         ceph_mdsc_put_request(req);
5360                         if (!nextreq)
5361                                 break;  /* next dne before, so we're done! */
5362                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
5363                                 /* next request was removed from tree */
5364                                 ceph_mdsc_put_request(nextreq);
5365                                 goto restart;
5366                         }
5367                         ceph_mdsc_put_request(nextreq);  /* won't go away */
5368                 }
5369                 req = nextreq;
5370         }
5371         mutex_unlock(&mdsc->mutex);
5372         ceph_put_mds_session(last_session);
5373         dout("%s done\n", __func__);
5374 }
5375
5376 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
5377 {
5378         u64 want_tid, want_flush;
5379
5380         if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
5381                 return;
5382
5383         dout("sync\n");
5384         mutex_lock(&mdsc->mutex);
5385         want_tid = mdsc->last_tid;
5386         mutex_unlock(&mdsc->mutex);
5387
5388         ceph_flush_dirty_caps(mdsc);
5389         spin_lock(&mdsc->cap_dirty_lock);
5390         want_flush = mdsc->last_cap_flush_tid;
5391         if (!list_empty(&mdsc->cap_flush_list)) {
5392                 struct ceph_cap_flush *cf =
5393                         list_last_entry(&mdsc->cap_flush_list,
5394                                         struct ceph_cap_flush, g_list);
5395                 cf->wake = true;
5396         }
5397         spin_unlock(&mdsc->cap_dirty_lock);
5398
5399         dout("sync want tid %lld flush_seq %lld\n",
5400              want_tid, want_flush);
5401
5402         flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid);
5403         wait_caps_flush(mdsc, want_flush);
5404 }
5405
5406 /*
5407  * true if all sessions are closed, or we force unmount
5408  */
5409 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
5410 {
5411         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
5412                 return true;
5413         return atomic_read(&mdsc->num_sessions) <= skipped;
5414 }
5415
5416 /*
5417  * called after sb is ro or when metadata corrupted.
5418  */
5419 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
5420 {
5421         struct ceph_options *opts = mdsc->fsc->client->options;
5422         struct ceph_mds_session *session;
5423         int i;
5424         int skipped = 0;
5425
5426         dout("close_sessions\n");
5427
5428         /* close sessions */
5429         mutex_lock(&mdsc->mutex);
5430         for (i = 0; i < mdsc->max_sessions; i++) {
5431                 session = __ceph_lookup_mds_session(mdsc, i);
5432                 if (!session)
5433                         continue;
5434                 mutex_unlock(&mdsc->mutex);
5435                 mutex_lock(&session->s_mutex);
5436                 if (__close_session(mdsc, session) <= 0)
5437                         skipped++;
5438                 mutex_unlock(&session->s_mutex);
5439                 ceph_put_mds_session(session);
5440                 mutex_lock(&mdsc->mutex);
5441         }
5442         mutex_unlock(&mdsc->mutex);
5443
5444         dout("waiting for sessions to close\n");
5445         wait_event_timeout(mdsc->session_close_wq,
5446                            done_closing_sessions(mdsc, skipped),
5447                            ceph_timeout_jiffies(opts->mount_timeout));
5448
5449         /* tear down remaining sessions */
5450         mutex_lock(&mdsc->mutex);
5451         for (i = 0; i < mdsc->max_sessions; i++) {
5452                 if (mdsc->sessions[i]) {
5453                         session = ceph_get_mds_session(mdsc->sessions[i]);
5454                         __unregister_session(mdsc, session);
5455                         mutex_unlock(&mdsc->mutex);
5456                         mutex_lock(&session->s_mutex);
5457                         remove_session_caps(session);
5458                         mutex_unlock(&session->s_mutex);
5459                         ceph_put_mds_session(session);
5460                         mutex_lock(&mdsc->mutex);
5461                 }
5462         }
5463         WARN_ON(!list_empty(&mdsc->cap_delay_list));
5464         mutex_unlock(&mdsc->mutex);
5465
5466         ceph_cleanup_snapid_map(mdsc);
5467         ceph_cleanup_global_and_empty_realms(mdsc);
5468
5469         cancel_work_sync(&mdsc->cap_reclaim_work);
5470         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
5471
5472         dout("stopped\n");
5473 }
5474
5475 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
5476 {
5477         struct ceph_mds_session *session;
5478         int mds;
5479
5480         dout("force umount\n");
5481
5482         mutex_lock(&mdsc->mutex);
5483         for (mds = 0; mds < mdsc->max_sessions; mds++) {
5484                 session = __ceph_lookup_mds_session(mdsc, mds);
5485                 if (!session)
5486                         continue;
5487
5488                 if (session->s_state == CEPH_MDS_SESSION_REJECTED)
5489                         __unregister_session(mdsc, session);
5490                 __wake_requests(mdsc, &session->s_waiting);
5491                 mutex_unlock(&mdsc->mutex);
5492
5493                 mutex_lock(&session->s_mutex);
5494                 __close_session(mdsc, session);
5495                 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
5496                         cleanup_session_requests(mdsc, session);
5497                         remove_session_caps(session);
5498                 }
5499                 mutex_unlock(&session->s_mutex);
5500                 ceph_put_mds_session(session);
5501
5502                 mutex_lock(&mdsc->mutex);
5503                 kick_requests(mdsc, mds);
5504         }
5505         __wake_requests(mdsc, &mdsc->waiting_for_map);
5506         mutex_unlock(&mdsc->mutex);
5507 }
5508
5509 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
5510 {
5511         dout("stop\n");
5512         /*
5513          * Make sure the delayed work stopped before releasing
5514          * the resources.
5515          *
5516          * Because the cancel_delayed_work_sync() will only
5517          * guarantee that the work finishes executing. But the
5518          * delayed work will re-arm itself again after that.
5519          */
5520         flush_delayed_work(&mdsc->delayed_work);
5521
5522         if (mdsc->mdsmap)
5523                 ceph_mdsmap_destroy(mdsc->mdsmap);
5524         kfree(mdsc->sessions);
5525         ceph_caps_finalize(mdsc);
5526         ceph_pool_perm_destroy(mdsc);
5527 }
5528
5529 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
5530 {
5531         struct ceph_mds_client *mdsc = fsc->mdsc;
5532         dout("mdsc_destroy %p\n", mdsc);
5533
5534         if (!mdsc)
5535                 return;
5536
5537         /* flush out any connection work with references to us */
5538         ceph_msgr_flush();
5539
5540         ceph_mdsc_stop(mdsc);
5541
5542         ceph_metric_destroy(&mdsc->metric);
5543
5544         fsc->mdsc = NULL;
5545         kfree(mdsc);
5546         dout("mdsc_destroy %p done\n", mdsc);
5547 }
5548
5549 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5550 {
5551         struct ceph_fs_client *fsc = mdsc->fsc;
5552         const char *mds_namespace = fsc->mount_options->mds_namespace;
5553         void *p = msg->front.iov_base;
5554         void *end = p + msg->front.iov_len;
5555         u32 epoch;
5556         u32 num_fs;
5557         u32 mount_fscid = (u32)-1;
5558         int err = -EINVAL;
5559
5560         ceph_decode_need(&p, end, sizeof(u32), bad);
5561         epoch = ceph_decode_32(&p);
5562
5563         dout("handle_fsmap epoch %u\n", epoch);
5564
5565         /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
5566         ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
5567
5568         ceph_decode_32_safe(&p, end, num_fs, bad);
5569         while (num_fs-- > 0) {
5570                 void *info_p, *info_end;
5571                 u32 info_len;
5572                 u32 fscid, namelen;
5573
5574                 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
5575                 p += 2;         // info_v, info_cv
5576                 info_len = ceph_decode_32(&p);
5577                 ceph_decode_need(&p, end, info_len, bad);
5578                 info_p = p;
5579                 info_end = p + info_len;
5580                 p = info_end;
5581
5582                 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
5583                 fscid = ceph_decode_32(&info_p);
5584                 namelen = ceph_decode_32(&info_p);
5585                 ceph_decode_need(&info_p, info_end, namelen, bad);
5586
5587                 if (mds_namespace &&
5588                     strlen(mds_namespace) == namelen &&
5589                     !strncmp(mds_namespace, (char *)info_p, namelen)) {
5590                         mount_fscid = fscid;
5591                         break;
5592                 }
5593         }
5594
5595         ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
5596         if (mount_fscid != (u32)-1) {
5597                 fsc->client->monc.fs_cluster_id = mount_fscid;
5598                 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
5599                                    0, true);
5600                 ceph_monc_renew_subs(&fsc->client->monc);
5601         } else {
5602                 err = -ENOENT;
5603                 goto err_out;
5604         }
5605         return;
5606
5607 bad:
5608         pr_err("error decoding fsmap %d. Shutting down mount.\n", err);
5609         ceph_umount_begin(mdsc->fsc->sb);
5610         ceph_msg_dump(msg);
5611 err_out:
5612         mutex_lock(&mdsc->mutex);
5613         mdsc->mdsmap_err = err;
5614         __wake_requests(mdsc, &mdsc->waiting_for_map);
5615         mutex_unlock(&mdsc->mutex);
5616 }
5617
5618 /*
5619  * handle mds map update.
5620  */
5621 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5622 {
5623         u32 epoch;
5624         u32 maplen;
5625         void *p = msg->front.iov_base;
5626         void *end = p + msg->front.iov_len;
5627         struct ceph_mdsmap *newmap, *oldmap;
5628         struct ceph_fsid fsid;
5629         int err = -EINVAL;
5630
5631         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5632         ceph_decode_copy(&p, &fsid, sizeof(fsid));
5633         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
5634                 return;
5635         epoch = ceph_decode_32(&p);
5636         maplen = ceph_decode_32(&p);
5637         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5638
5639         /* do we need it? */
5640         mutex_lock(&mdsc->mutex);
5641         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5642                 dout("handle_map epoch %u <= our %u\n",
5643                      epoch, mdsc->mdsmap->m_epoch);
5644                 mutex_unlock(&mdsc->mutex);
5645                 return;
5646         }
5647
5648         newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client));
5649         if (IS_ERR(newmap)) {
5650                 err = PTR_ERR(newmap);
5651                 goto bad_unlock;
5652         }
5653
5654         /* swap into place */
5655         if (mdsc->mdsmap) {
5656                 oldmap = mdsc->mdsmap;
5657                 mdsc->mdsmap = newmap;
5658                 check_new_map(mdsc, newmap, oldmap);
5659                 ceph_mdsmap_destroy(oldmap);
5660         } else {
5661                 mdsc->mdsmap = newmap;  /* first mds map */
5662         }
5663         mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5664                                         MAX_LFS_FILESIZE);
5665
5666         __wake_requests(mdsc, &mdsc->waiting_for_map);
5667         ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5668                           mdsc->mdsmap->m_epoch);
5669
5670         mutex_unlock(&mdsc->mutex);
5671         schedule_delayed(mdsc, 0);
5672         return;
5673
5674 bad_unlock:
5675         mutex_unlock(&mdsc->mutex);
5676 bad:
5677         pr_err("error decoding mdsmap %d. Shutting down mount.\n", err);
5678         ceph_umount_begin(mdsc->fsc->sb);
5679         ceph_msg_dump(msg);
5680         return;
5681 }
5682
5683 static struct ceph_connection *mds_get_con(struct ceph_connection *con)
5684 {
5685         struct ceph_mds_session *s = con->private;
5686
5687         if (ceph_get_mds_session(s))
5688                 return con;
5689         return NULL;
5690 }
5691
5692 static void mds_put_con(struct ceph_connection *con)
5693 {
5694         struct ceph_mds_session *s = con->private;
5695
5696         ceph_put_mds_session(s);
5697 }
5698
5699 /*
5700  * if the client is unresponsive for long enough, the mds will kill
5701  * the session entirely.
5702  */
5703 static void mds_peer_reset(struct ceph_connection *con)
5704 {
5705         struct ceph_mds_session *s = con->private;
5706         struct ceph_mds_client *mdsc = s->s_mdsc;
5707
5708         pr_warn("mds%d closed our session\n", s->s_mds);
5709         if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO)
5710                 send_mds_reconnect(mdsc, s);
5711 }
5712
5713 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5714 {
5715         struct ceph_mds_session *s = con->private;
5716         struct ceph_mds_client *mdsc = s->s_mdsc;
5717         int type = le16_to_cpu(msg->hdr.type);
5718
5719         mutex_lock(&mdsc->mutex);
5720         if (__verify_registered_session(mdsc, s) < 0) {
5721                 mutex_unlock(&mdsc->mutex);
5722                 goto out;
5723         }
5724         mutex_unlock(&mdsc->mutex);
5725
5726         switch (type) {
5727         case CEPH_MSG_MDS_MAP:
5728                 ceph_mdsc_handle_mdsmap(mdsc, msg);
5729                 break;
5730         case CEPH_MSG_FS_MAP_USER:
5731                 ceph_mdsc_handle_fsmap(mdsc, msg);
5732                 break;
5733         case CEPH_MSG_CLIENT_SESSION:
5734                 handle_session(s, msg);
5735                 break;
5736         case CEPH_MSG_CLIENT_REPLY:
5737                 handle_reply(s, msg);
5738                 break;
5739         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5740                 handle_forward(mdsc, s, msg);
5741                 break;
5742         case CEPH_MSG_CLIENT_CAPS:
5743                 ceph_handle_caps(s, msg);
5744                 break;
5745         case CEPH_MSG_CLIENT_SNAP:
5746                 ceph_handle_snap(mdsc, s, msg);
5747                 break;
5748         case CEPH_MSG_CLIENT_LEASE:
5749                 handle_lease(mdsc, s, msg);
5750                 break;
5751         case CEPH_MSG_CLIENT_QUOTA:
5752                 ceph_handle_quota(mdsc, s, msg);
5753                 break;
5754
5755         default:
5756                 pr_err("received unknown message type %d %s\n", type,
5757                        ceph_msg_type_name(type));
5758         }
5759 out:
5760         ceph_msg_put(msg);
5761 }
5762
5763 /*
5764  * authentication
5765  */
5766
5767 /*
5768  * Note: returned pointer is the address of a structure that's
5769  * managed separately.  Caller must *not* attempt to free it.
5770  */
5771 static struct ceph_auth_handshake *
5772 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5773 {
5774         struct ceph_mds_session *s = con->private;
5775         struct ceph_mds_client *mdsc = s->s_mdsc;
5776         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5777         struct ceph_auth_handshake *auth = &s->s_auth;
5778         int ret;
5779
5780         ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5781                                          force_new, proto, NULL, NULL);
5782         if (ret)
5783                 return ERR_PTR(ret);
5784
5785         return auth;
5786 }
5787
5788 static int mds_add_authorizer_challenge(struct ceph_connection *con,
5789                                     void *challenge_buf, int challenge_buf_len)
5790 {
5791         struct ceph_mds_session *s = con->private;
5792         struct ceph_mds_client *mdsc = s->s_mdsc;
5793         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5794
5795         return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5796                                             challenge_buf, challenge_buf_len);
5797 }
5798
5799 static int mds_verify_authorizer_reply(struct ceph_connection *con)
5800 {
5801         struct ceph_mds_session *s = con->private;
5802         struct ceph_mds_client *mdsc = s->s_mdsc;
5803         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5804         struct ceph_auth_handshake *auth = &s->s_auth;
5805
5806         return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5807                 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5808                 NULL, NULL, NULL, NULL);
5809 }
5810
5811 static int mds_invalidate_authorizer(struct ceph_connection *con)
5812 {
5813         struct ceph_mds_session *s = con->private;
5814         struct ceph_mds_client *mdsc = s->s_mdsc;
5815         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5816
5817         ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5818
5819         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5820 }
5821
5822 static int mds_get_auth_request(struct ceph_connection *con,
5823                                 void *buf, int *buf_len,
5824                                 void **authorizer, int *authorizer_len)
5825 {
5826         struct ceph_mds_session *s = con->private;
5827         struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5828         struct ceph_auth_handshake *auth = &s->s_auth;
5829         int ret;
5830
5831         ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5832                                        buf, buf_len);
5833         if (ret)
5834                 return ret;
5835
5836         *authorizer = auth->authorizer_buf;
5837         *authorizer_len = auth->authorizer_buf_len;
5838         return 0;
5839 }
5840
5841 static int mds_handle_auth_reply_more(struct ceph_connection *con,
5842                                       void *reply, int reply_len,
5843                                       void *buf, int *buf_len,
5844                                       void **authorizer, int *authorizer_len)
5845 {
5846         struct ceph_mds_session *s = con->private;
5847         struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5848         struct ceph_auth_handshake *auth = &s->s_auth;
5849         int ret;
5850
5851         ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5852                                               buf, buf_len);
5853         if (ret)
5854                 return ret;
5855
5856         *authorizer = auth->authorizer_buf;
5857         *authorizer_len = auth->authorizer_buf_len;
5858         return 0;
5859 }
5860
5861 static int mds_handle_auth_done(struct ceph_connection *con,
5862                                 u64 global_id, void *reply, int reply_len,
5863                                 u8 *session_key, int *session_key_len,
5864                                 u8 *con_secret, int *con_secret_len)
5865 {
5866         struct ceph_mds_session *s = con->private;
5867         struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5868         struct ceph_auth_handshake *auth = &s->s_auth;
5869
5870         return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5871                                                session_key, session_key_len,
5872                                                con_secret, con_secret_len);
5873 }
5874
5875 static int mds_handle_auth_bad_method(struct ceph_connection *con,
5876                                       int used_proto, int result,
5877                                       const int *allowed_protos, int proto_cnt,
5878                                       const int *allowed_modes, int mode_cnt)
5879 {
5880         struct ceph_mds_session *s = con->private;
5881         struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
5882         int ret;
5883
5884         if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
5885                                             used_proto, result,
5886                                             allowed_protos, proto_cnt,
5887                                             allowed_modes, mode_cnt)) {
5888                 ret = ceph_monc_validate_auth(monc);
5889                 if (ret)
5890                         return ret;
5891         }
5892
5893         return -EACCES;
5894 }
5895
5896 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5897                                 struct ceph_msg_header *hdr, int *skip)
5898 {
5899         struct ceph_msg *msg;
5900         int type = (int) le16_to_cpu(hdr->type);
5901         int front_len = (int) le32_to_cpu(hdr->front_len);
5902
5903         if (con->in_msg)
5904                 return con->in_msg;
5905
5906         *skip = 0;
5907         msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5908         if (!msg) {
5909                 pr_err("unable to allocate msg type %d len %d\n",
5910                        type, front_len);
5911                 return NULL;
5912         }
5913
5914         return msg;
5915 }
5916
5917 static int mds_sign_message(struct ceph_msg *msg)
5918 {
5919        struct ceph_mds_session *s = msg->con->private;
5920        struct ceph_auth_handshake *auth = &s->s_auth;
5921
5922        return ceph_auth_sign_message(auth, msg);
5923 }
5924
5925 static int mds_check_message_signature(struct ceph_msg *msg)
5926 {
5927        struct ceph_mds_session *s = msg->con->private;
5928        struct ceph_auth_handshake *auth = &s->s_auth;
5929
5930        return ceph_auth_check_message_signature(auth, msg);
5931 }
5932
5933 static const struct ceph_connection_operations mds_con_ops = {
5934         .get = mds_get_con,
5935         .put = mds_put_con,
5936         .alloc_msg = mds_alloc_msg,
5937         .dispatch = mds_dispatch,
5938         .peer_reset = mds_peer_reset,
5939         .get_authorizer = mds_get_authorizer,
5940         .add_authorizer_challenge = mds_add_authorizer_challenge,
5941         .verify_authorizer_reply = mds_verify_authorizer_reply,
5942         .invalidate_authorizer = mds_invalidate_authorizer,
5943         .sign_message = mds_sign_message,
5944         .check_message_signature = mds_check_message_signature,
5945         .get_auth_request = mds_get_auth_request,
5946         .handle_auth_reply_more = mds_handle_auth_reply_more,
5947         .handle_auth_done = mds_handle_auth_done,
5948         .handle_auth_bad_method = mds_handle_auth_bad_method,
5949 };
5950
5951 /* eof */