e87544e424888b2409d63b3e1b45d4357dab2b86
[linux-2.6-microblaze.git] / fs / ceph / mds_client.c
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/fs.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
7 #include <linux/gfp.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12
13 #include "super.h"
14 #include "mds_client.h"
15
16 #include <linux/ceph/ceph_features.h>
17 #include <linux/ceph/messenger.h>
18 #include <linux/ceph/decode.h>
19 #include <linux/ceph/pagelist.h>
20 #include <linux/ceph/auth.h>
21 #include <linux/ceph/debugfs.h>
22
23 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
24
25 /*
26  * A cluster of MDS (metadata server) daemons is responsible for
27  * managing the file system namespace (the directory hierarchy and
28  * inodes) and for coordinating shared access to storage.  Metadata is
29  * partitioning hierarchically across a number of servers, and that
30  * partition varies over time as the cluster adjusts the distribution
31  * in order to balance load.
32  *
33  * The MDS client is primarily responsible to managing synchronous
34  * metadata requests for operations like open, unlink, and so forth.
35  * If there is a MDS failure, we find out about it when we (possibly
36  * request and) receive a new MDS map, and can resubmit affected
37  * requests.
38  *
39  * For the most part, though, we take advantage of a lossless
40  * communications channel to the MDS, and do not need to worry about
41  * timing out or resubmitting requests.
42  *
43  * We maintain a stateful "session" with each MDS we interact with.
44  * Within each session, we sent periodic heartbeat messages to ensure
45  * any capabilities or leases we have been issues remain valid.  If
46  * the session times out and goes stale, our leases and capabilities
47  * are no longer valid.
48  */
49
50 struct ceph_reconnect_state {
51         struct ceph_mds_session *session;
52         int nr_caps, nr_realms;
53         struct ceph_pagelist *pagelist;
54         unsigned msg_version;
55         bool allow_multi;
56 };
57
58 static void __wake_requests(struct ceph_mds_client *mdsc,
59                             struct list_head *head);
60
61 static const struct ceph_connection_operations mds_con_ops;
62
63
64 /*
65  * mds reply parsing
66  */
67
68 static int parse_reply_info_quota(void **p, void *end,
69                                   struct ceph_mds_reply_info_in *info)
70 {
71         u8 struct_v, struct_compat;
72         u32 struct_len;
73
74         ceph_decode_8_safe(p, end, struct_v, bad);
75         ceph_decode_8_safe(p, end, struct_compat, bad);
76         /* struct_v is expected to be >= 1. we only
77          * understand encoding with struct_compat == 1. */
78         if (!struct_v || struct_compat != 1)
79                 goto bad;
80         ceph_decode_32_safe(p, end, struct_len, bad);
81         ceph_decode_need(p, end, struct_len, bad);
82         end = *p + struct_len;
83         ceph_decode_64_safe(p, end, info->max_bytes, bad);
84         ceph_decode_64_safe(p, end, info->max_files, bad);
85         *p = end;
86         return 0;
87 bad:
88         return -EIO;
89 }
90
91 /*
92  * parse individual inode info
93  */
94 static int parse_reply_info_in(void **p, void *end,
95                                struct ceph_mds_reply_info_in *info,
96                                u64 features)
97 {
98         int err = 0;
99         u8 struct_v = 0;
100
101         if (features == (u64)-1) {
102                 u32 struct_len;
103                 u8 struct_compat;
104                 ceph_decode_8_safe(p, end, struct_v, bad);
105                 ceph_decode_8_safe(p, end, struct_compat, bad);
106                 /* struct_v is expected to be >= 1. we only understand
107                  * encoding with struct_compat == 1. */
108                 if (!struct_v || struct_compat != 1)
109                         goto bad;
110                 ceph_decode_32_safe(p, end, struct_len, bad);
111                 ceph_decode_need(p, end, struct_len, bad);
112                 end = *p + struct_len;
113         }
114
115         ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
116         info->in = *p;
117         *p += sizeof(struct ceph_mds_reply_inode) +
118                 sizeof(*info->in->fragtree.splits) *
119                 le32_to_cpu(info->in->fragtree.nsplits);
120
121         ceph_decode_32_safe(p, end, info->symlink_len, bad);
122         ceph_decode_need(p, end, info->symlink_len, bad);
123         info->symlink = *p;
124         *p += info->symlink_len;
125
126         ceph_decode_copy_safe(p, end, &info->dir_layout,
127                               sizeof(info->dir_layout), bad);
128         ceph_decode_32_safe(p, end, info->xattr_len, bad);
129         ceph_decode_need(p, end, info->xattr_len, bad);
130         info->xattr_data = *p;
131         *p += info->xattr_len;
132
133         if (features == (u64)-1) {
134                 /* inline data */
135                 ceph_decode_64_safe(p, end, info->inline_version, bad);
136                 ceph_decode_32_safe(p, end, info->inline_len, bad);
137                 ceph_decode_need(p, end, info->inline_len, bad);
138                 info->inline_data = *p;
139                 *p += info->inline_len;
140                 /* quota */
141                 err = parse_reply_info_quota(p, end, info);
142                 if (err < 0)
143                         goto out_bad;
144                 /* pool namespace */
145                 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
146                 if (info->pool_ns_len > 0) {
147                         ceph_decode_need(p, end, info->pool_ns_len, bad);
148                         info->pool_ns_data = *p;
149                         *p += info->pool_ns_len;
150                 }
151                 /* btime, change_attr */
152                 {
153                         struct ceph_timespec btime;
154                         u64 change_attr;
155                         ceph_decode_need(p, end, sizeof(btime), bad);
156                         ceph_decode_copy(p, &btime, sizeof(btime));
157                         ceph_decode_64_safe(p, end, change_attr, bad);
158                 }
159
160                 *p = end;
161         } else {
162                 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
163                         ceph_decode_64_safe(p, end, info->inline_version, bad);
164                         ceph_decode_32_safe(p, end, info->inline_len, bad);
165                         ceph_decode_need(p, end, info->inline_len, bad);
166                         info->inline_data = *p;
167                         *p += info->inline_len;
168                 } else
169                         info->inline_version = CEPH_INLINE_NONE;
170
171                 if (features & CEPH_FEATURE_MDS_QUOTA) {
172                         err = parse_reply_info_quota(p, end, info);
173                         if (err < 0)
174                                 goto out_bad;
175                 } else {
176                         info->max_bytes = 0;
177                         info->max_files = 0;
178                 }
179
180                 info->pool_ns_len = 0;
181                 info->pool_ns_data = NULL;
182                 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
183                         ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
184                         if (info->pool_ns_len > 0) {
185                                 ceph_decode_need(p, end, info->pool_ns_len, bad);
186                                 info->pool_ns_data = *p;
187                                 *p += info->pool_ns_len;
188                         }
189                 }
190         }
191         return 0;
192 bad:
193         err = -EIO;
194 out_bad:
195         return err;
196 }
197
198 static int parse_reply_info_dir(void **p, void *end,
199                                 struct ceph_mds_reply_dirfrag **dirfrag,
200                                 u64 features)
201 {
202         if (features == (u64)-1) {
203                 u8 struct_v, struct_compat;
204                 u32 struct_len;
205                 ceph_decode_8_safe(p, end, struct_v, bad);
206                 ceph_decode_8_safe(p, end, struct_compat, bad);
207                 /* struct_v is expected to be >= 1. we only understand
208                  * encoding whose struct_compat == 1. */
209                 if (!struct_v || struct_compat != 1)
210                         goto bad;
211                 ceph_decode_32_safe(p, end, struct_len, bad);
212                 ceph_decode_need(p, end, struct_len, bad);
213                 end = *p + struct_len;
214         }
215
216         ceph_decode_need(p, end, sizeof(**dirfrag), bad);
217         *dirfrag = *p;
218         *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
219         if (unlikely(*p > end))
220                 goto bad;
221         if (features == (u64)-1)
222                 *p = end;
223         return 0;
224 bad:
225         return -EIO;
226 }
227
228 static int parse_reply_info_lease(void **p, void *end,
229                                   struct ceph_mds_reply_lease **lease,
230                                   u64 features)
231 {
232         if (features == (u64)-1) {
233                 u8 struct_v, struct_compat;
234                 u32 struct_len;
235                 ceph_decode_8_safe(p, end, struct_v, bad);
236                 ceph_decode_8_safe(p, end, struct_compat, bad);
237                 /* struct_v is expected to be >= 1. we only understand
238                  * encoding whose struct_compat == 1. */
239                 if (!struct_v || struct_compat != 1)
240                         goto bad;
241                 ceph_decode_32_safe(p, end, struct_len, bad);
242                 ceph_decode_need(p, end, struct_len, bad);
243                 end = *p + struct_len;
244         }
245
246         ceph_decode_need(p, end, sizeof(**lease), bad);
247         *lease = *p;
248         *p += sizeof(**lease);
249         if (features == (u64)-1)
250                 *p = end;
251         return 0;
252 bad:
253         return -EIO;
254 }
255
256 /*
257  * parse a normal reply, which may contain a (dir+)dentry and/or a
258  * target inode.
259  */
260 static int parse_reply_info_trace(void **p, void *end,
261                                   struct ceph_mds_reply_info_parsed *info,
262                                   u64 features)
263 {
264         int err;
265
266         if (info->head->is_dentry) {
267                 err = parse_reply_info_in(p, end, &info->diri, features);
268                 if (err < 0)
269                         goto out_bad;
270
271                 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
272                 if (err < 0)
273                         goto out_bad;
274
275                 ceph_decode_32_safe(p, end, info->dname_len, bad);
276                 ceph_decode_need(p, end, info->dname_len, bad);
277                 info->dname = *p;
278                 *p += info->dname_len;
279
280                 err = parse_reply_info_lease(p, end, &info->dlease, features);
281                 if (err < 0)
282                         goto out_bad;
283         }
284
285         if (info->head->is_target) {
286                 err = parse_reply_info_in(p, end, &info->targeti, features);
287                 if (err < 0)
288                         goto out_bad;
289         }
290
291         if (unlikely(*p != end))
292                 goto bad;
293         return 0;
294
295 bad:
296         err = -EIO;
297 out_bad:
298         pr_err("problem parsing mds trace %d\n", err);
299         return err;
300 }
301
302 /*
303  * parse readdir results
304  */
305 static int parse_reply_info_readdir(void **p, void *end,
306                                 struct ceph_mds_reply_info_parsed *info,
307                                 u64 features)
308 {
309         u32 num, i = 0;
310         int err;
311
312         err = parse_reply_info_dir(p, end, &info->dir_dir, features);
313         if (err < 0)
314                 goto out_bad;
315
316         ceph_decode_need(p, end, sizeof(num) + 2, bad);
317         num = ceph_decode_32(p);
318         {
319                 u16 flags = ceph_decode_16(p);
320                 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
321                 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
322                 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
323                 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
324         }
325         if (num == 0)
326                 goto done;
327
328         BUG_ON(!info->dir_entries);
329         if ((unsigned long)(info->dir_entries + num) >
330             (unsigned long)info->dir_entries + info->dir_buf_size) {
331                 pr_err("dir contents are larger than expected\n");
332                 WARN_ON(1);
333                 goto bad;
334         }
335
336         info->dir_nr = num;
337         while (num) {
338                 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
339                 /* dentry */
340                 ceph_decode_32_safe(p, end, rde->name_len, bad);
341                 ceph_decode_need(p, end, rde->name_len, bad);
342                 rde->name = *p;
343                 *p += rde->name_len;
344                 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
345
346                 /* dentry lease */
347                 err = parse_reply_info_lease(p, end, &rde->lease, features);
348                 if (err)
349                         goto out_bad;
350                 /* inode */
351                 err = parse_reply_info_in(p, end, &rde->inode, features);
352                 if (err < 0)
353                         goto out_bad;
354                 /* ceph_readdir_prepopulate() will update it */
355                 rde->offset = 0;
356                 i++;
357                 num--;
358         }
359
360 done:
361         if (*p != end)
362                 goto bad;
363         return 0;
364
365 bad:
366         err = -EIO;
367 out_bad:
368         pr_err("problem parsing dir contents %d\n", err);
369         return err;
370 }
371
372 /*
373  * parse fcntl F_GETLK results
374  */
375 static int parse_reply_info_filelock(void **p, void *end,
376                                      struct ceph_mds_reply_info_parsed *info,
377                                      u64 features)
378 {
379         if (*p + sizeof(*info->filelock_reply) > end)
380                 goto bad;
381
382         info->filelock_reply = *p;
383         *p += sizeof(*info->filelock_reply);
384
385         if (unlikely(*p != end))
386                 goto bad;
387         return 0;
388
389 bad:
390         return -EIO;
391 }
392
393 /*
394  * parse create results
395  */
396 static int parse_reply_info_create(void **p, void *end,
397                                   struct ceph_mds_reply_info_parsed *info,
398                                   u64 features)
399 {
400         if (features == (u64)-1 ||
401             (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
402                 if (*p == end) {
403                         info->has_create_ino = false;
404                 } else {
405                         info->has_create_ino = true;
406                         info->ino = ceph_decode_64(p);
407                 }
408         }
409
410         if (unlikely(*p != end))
411                 goto bad;
412         return 0;
413
414 bad:
415         return -EIO;
416 }
417
418 /*
419  * parse extra results
420  */
421 static int parse_reply_info_extra(void **p, void *end,
422                                   struct ceph_mds_reply_info_parsed *info,
423                                   u64 features)
424 {
425         u32 op = le32_to_cpu(info->head->op);
426
427         if (op == CEPH_MDS_OP_GETFILELOCK)
428                 return parse_reply_info_filelock(p, end, info, features);
429         else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
430                 return parse_reply_info_readdir(p, end, info, features);
431         else if (op == CEPH_MDS_OP_CREATE)
432                 return parse_reply_info_create(p, end, info, features);
433         else
434                 return -EIO;
435 }
436
437 /*
438  * parse entire mds reply
439  */
440 static int parse_reply_info(struct ceph_msg *msg,
441                             struct ceph_mds_reply_info_parsed *info,
442                             u64 features)
443 {
444         void *p, *end;
445         u32 len;
446         int err;
447
448         info->head = msg->front.iov_base;
449         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
450         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
451
452         /* trace */
453         ceph_decode_32_safe(&p, end, len, bad);
454         if (len > 0) {
455                 ceph_decode_need(&p, end, len, bad);
456                 err = parse_reply_info_trace(&p, p+len, info, features);
457                 if (err < 0)
458                         goto out_bad;
459         }
460
461         /* extra */
462         ceph_decode_32_safe(&p, end, len, bad);
463         if (len > 0) {
464                 ceph_decode_need(&p, end, len, bad);
465                 err = parse_reply_info_extra(&p, p+len, info, features);
466                 if (err < 0)
467                         goto out_bad;
468         }
469
470         /* snap blob */
471         ceph_decode_32_safe(&p, end, len, bad);
472         info->snapblob_len = len;
473         info->snapblob = p;
474         p += len;
475
476         if (p != end)
477                 goto bad;
478         return 0;
479
480 bad:
481         err = -EIO;
482 out_bad:
483         pr_err("mds parse_reply err %d\n", err);
484         return err;
485 }
486
487 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
488 {
489         if (!info->dir_entries)
490                 return;
491         free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
492 }
493
494
495 /*
496  * sessions
497  */
498 const char *ceph_session_state_name(int s)
499 {
500         switch (s) {
501         case CEPH_MDS_SESSION_NEW: return "new";
502         case CEPH_MDS_SESSION_OPENING: return "opening";
503         case CEPH_MDS_SESSION_OPEN: return "open";
504         case CEPH_MDS_SESSION_HUNG: return "hung";
505         case CEPH_MDS_SESSION_CLOSING: return "closing";
506         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
507         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
508         case CEPH_MDS_SESSION_REJECTED: return "rejected";
509         default: return "???";
510         }
511 }
512
513 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
514 {
515         if (refcount_inc_not_zero(&s->s_ref)) {
516                 dout("mdsc get_session %p %d -> %d\n", s,
517                      refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
518                 return s;
519         } else {
520                 dout("mdsc get_session %p 0 -- FAIL\n", s);
521                 return NULL;
522         }
523 }
524
525 void ceph_put_mds_session(struct ceph_mds_session *s)
526 {
527         dout("mdsc put_session %p %d -> %d\n", s,
528              refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
529         if (refcount_dec_and_test(&s->s_ref)) {
530                 if (s->s_auth.authorizer)
531                         ceph_auth_destroy_authorizer(s->s_auth.authorizer);
532                 kfree(s);
533         }
534 }
535
536 /*
537  * called under mdsc->mutex
538  */
539 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
540                                                    int mds)
541 {
542         struct ceph_mds_session *session;
543
544         if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
545                 return NULL;
546         session = mdsc->sessions[mds];
547         dout("lookup_mds_session %p %d\n", session,
548              refcount_read(&session->s_ref));
549         get_session(session);
550         return session;
551 }
552
553 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
554 {
555         if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
556                 return false;
557         else
558                 return true;
559 }
560
561 static int __verify_registered_session(struct ceph_mds_client *mdsc,
562                                        struct ceph_mds_session *s)
563 {
564         if (s->s_mds >= mdsc->max_sessions ||
565             mdsc->sessions[s->s_mds] != s)
566                 return -ENOENT;
567         return 0;
568 }
569
570 /*
571  * create+register a new session for given mds.
572  * called under mdsc->mutex.
573  */
574 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
575                                                  int mds)
576 {
577         struct ceph_mds_session *s;
578
579         if (mds >= mdsc->mdsmap->m_num_mds)
580                 return ERR_PTR(-EINVAL);
581
582         s = kzalloc(sizeof(*s), GFP_NOFS);
583         if (!s)
584                 return ERR_PTR(-ENOMEM);
585
586         if (mds >= mdsc->max_sessions) {
587                 int newmax = 1 << get_count_order(mds + 1);
588                 struct ceph_mds_session **sa;
589
590                 dout("%s: realloc to %d\n", __func__, newmax);
591                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
592                 if (!sa)
593                         goto fail_realloc;
594                 if (mdsc->sessions) {
595                         memcpy(sa, mdsc->sessions,
596                                mdsc->max_sessions * sizeof(void *));
597                         kfree(mdsc->sessions);
598                 }
599                 mdsc->sessions = sa;
600                 mdsc->max_sessions = newmax;
601         }
602
603         dout("%s: mds%d\n", __func__, mds);
604         s->s_mdsc = mdsc;
605         s->s_mds = mds;
606         s->s_state = CEPH_MDS_SESSION_NEW;
607         s->s_ttl = 0;
608         s->s_seq = 0;
609         mutex_init(&s->s_mutex);
610
611         ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
612
613         spin_lock_init(&s->s_gen_ttl_lock);
614         s->s_cap_gen = 0;
615         s->s_cap_ttl = jiffies - 1;
616
617         spin_lock_init(&s->s_cap_lock);
618         s->s_renew_requested = 0;
619         s->s_renew_seq = 0;
620         INIT_LIST_HEAD(&s->s_caps);
621         s->s_nr_caps = 0;
622         s->s_trim_caps = 0;
623         refcount_set(&s->s_ref, 1);
624         INIT_LIST_HEAD(&s->s_waiting);
625         INIT_LIST_HEAD(&s->s_unsafe);
626         s->s_num_cap_releases = 0;
627         s->s_cap_reconnect = 0;
628         s->s_cap_iterator = NULL;
629         INIT_LIST_HEAD(&s->s_cap_releases);
630         INIT_LIST_HEAD(&s->s_cap_flushing);
631
632         mdsc->sessions[mds] = s;
633         atomic_inc(&mdsc->num_sessions);
634         refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
635
636         ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
637                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
638
639         return s;
640
641 fail_realloc:
642         kfree(s);
643         return ERR_PTR(-ENOMEM);
644 }
645
646 /*
647  * called under mdsc->mutex
648  */
649 static void __unregister_session(struct ceph_mds_client *mdsc,
650                                struct ceph_mds_session *s)
651 {
652         dout("__unregister_session mds%d %p\n", s->s_mds, s);
653         BUG_ON(mdsc->sessions[s->s_mds] != s);
654         mdsc->sessions[s->s_mds] = NULL;
655         ceph_con_close(&s->s_con);
656         ceph_put_mds_session(s);
657         atomic_dec(&mdsc->num_sessions);
658 }
659
660 /*
661  * drop session refs in request.
662  *
663  * should be last request ref, or hold mdsc->mutex
664  */
665 static void put_request_session(struct ceph_mds_request *req)
666 {
667         if (req->r_session) {
668                 ceph_put_mds_session(req->r_session);
669                 req->r_session = NULL;
670         }
671 }
672
673 void ceph_mdsc_release_request(struct kref *kref)
674 {
675         struct ceph_mds_request *req = container_of(kref,
676                                                     struct ceph_mds_request,
677                                                     r_kref);
678         destroy_reply_info(&req->r_reply_info);
679         if (req->r_request)
680                 ceph_msg_put(req->r_request);
681         if (req->r_reply)
682                 ceph_msg_put(req->r_reply);
683         if (req->r_inode) {
684                 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
685                 iput(req->r_inode);
686         }
687         if (req->r_parent)
688                 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
689         iput(req->r_target_inode);
690         if (req->r_dentry)
691                 dput(req->r_dentry);
692         if (req->r_old_dentry)
693                 dput(req->r_old_dentry);
694         if (req->r_old_dentry_dir) {
695                 /*
696                  * track (and drop pins for) r_old_dentry_dir
697                  * separately, since r_old_dentry's d_parent may have
698                  * changed between the dir mutex being dropped and
699                  * this request being freed.
700                  */
701                 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
702                                   CEPH_CAP_PIN);
703                 iput(req->r_old_dentry_dir);
704         }
705         kfree(req->r_path1);
706         kfree(req->r_path2);
707         if (req->r_pagelist)
708                 ceph_pagelist_release(req->r_pagelist);
709         put_request_session(req);
710         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
711         kfree(req);
712 }
713
714 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
715
716 /*
717  * lookup session, bump ref if found.
718  *
719  * called under mdsc->mutex.
720  */
721 static struct ceph_mds_request *
722 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
723 {
724         struct ceph_mds_request *req;
725
726         req = lookup_request(&mdsc->request_tree, tid);
727         if (req)
728                 ceph_mdsc_get_request(req);
729
730         return req;
731 }
732
733 /*
734  * Register an in-flight request, and assign a tid.  Link to directory
735  * are modifying (if any).
736  *
737  * Called under mdsc->mutex.
738  */
739 static void __register_request(struct ceph_mds_client *mdsc,
740                                struct ceph_mds_request *req,
741                                struct inode *dir)
742 {
743         int ret = 0;
744
745         req->r_tid = ++mdsc->last_tid;
746         if (req->r_num_caps) {
747                 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
748                                         req->r_num_caps);
749                 if (ret < 0) {
750                         pr_err("__register_request %p "
751                                "failed to reserve caps: %d\n", req, ret);
752                         /* set req->r_err to fail early from __do_request */
753                         req->r_err = ret;
754                         return;
755                 }
756         }
757         dout("__register_request %p tid %lld\n", req, req->r_tid);
758         ceph_mdsc_get_request(req);
759         insert_request(&mdsc->request_tree, req);
760
761         req->r_uid = current_fsuid();
762         req->r_gid = current_fsgid();
763
764         if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
765                 mdsc->oldest_tid = req->r_tid;
766
767         if (dir) {
768                 ihold(dir);
769                 req->r_unsafe_dir = dir;
770         }
771 }
772
773 static void __unregister_request(struct ceph_mds_client *mdsc,
774                                  struct ceph_mds_request *req)
775 {
776         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
777
778         /* Never leave an unregistered request on an unsafe list! */
779         list_del_init(&req->r_unsafe_item);
780
781         if (req->r_tid == mdsc->oldest_tid) {
782                 struct rb_node *p = rb_next(&req->r_node);
783                 mdsc->oldest_tid = 0;
784                 while (p) {
785                         struct ceph_mds_request *next_req =
786                                 rb_entry(p, struct ceph_mds_request, r_node);
787                         if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
788                                 mdsc->oldest_tid = next_req->r_tid;
789                                 break;
790                         }
791                         p = rb_next(p);
792                 }
793         }
794
795         erase_request(&mdsc->request_tree, req);
796
797         if (req->r_unsafe_dir  &&
798             test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
799                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
800                 spin_lock(&ci->i_unsafe_lock);
801                 list_del_init(&req->r_unsafe_dir_item);
802                 spin_unlock(&ci->i_unsafe_lock);
803         }
804         if (req->r_target_inode &&
805             test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
806                 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
807                 spin_lock(&ci->i_unsafe_lock);
808                 list_del_init(&req->r_unsafe_target_item);
809                 spin_unlock(&ci->i_unsafe_lock);
810         }
811
812         if (req->r_unsafe_dir) {
813                 iput(req->r_unsafe_dir);
814                 req->r_unsafe_dir = NULL;
815         }
816
817         complete_all(&req->r_safe_completion);
818
819         ceph_mdsc_put_request(req);
820 }
821
822 /*
823  * Walk back up the dentry tree until we hit a dentry representing a
824  * non-snapshot inode. We do this using the rcu_read_lock (which must be held
825  * when calling this) to ensure that the objects won't disappear while we're
826  * working with them. Once we hit a candidate dentry, we attempt to take a
827  * reference to it, and return that as the result.
828  */
829 static struct inode *get_nonsnap_parent(struct dentry *dentry)
830 {
831         struct inode *inode = NULL;
832
833         while (dentry && !IS_ROOT(dentry)) {
834                 inode = d_inode_rcu(dentry);
835                 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
836                         break;
837                 dentry = dentry->d_parent;
838         }
839         if (inode)
840                 inode = igrab(inode);
841         return inode;
842 }
843
844 /*
845  * Choose mds to send request to next.  If there is a hint set in the
846  * request (e.g., due to a prior forward hint from the mds), use that.
847  * Otherwise, consult frag tree and/or caps to identify the
848  * appropriate mds.  If all else fails, choose randomly.
849  *
850  * Called under mdsc->mutex.
851  */
852 static int __choose_mds(struct ceph_mds_client *mdsc,
853                         struct ceph_mds_request *req)
854 {
855         struct inode *inode;
856         struct ceph_inode_info *ci;
857         struct ceph_cap *cap;
858         int mode = req->r_direct_mode;
859         int mds = -1;
860         u32 hash = req->r_direct_hash;
861         bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
862
863         /*
864          * is there a specific mds we should try?  ignore hint if we have
865          * no session and the mds is not up (active or recovering).
866          */
867         if (req->r_resend_mds >= 0 &&
868             (__have_session(mdsc, req->r_resend_mds) ||
869              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
870                 dout("choose_mds using resend_mds mds%d\n",
871                      req->r_resend_mds);
872                 return req->r_resend_mds;
873         }
874
875         if (mode == USE_RANDOM_MDS)
876                 goto random;
877
878         inode = NULL;
879         if (req->r_inode) {
880                 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
881                         inode = req->r_inode;
882                         ihold(inode);
883                 } else {
884                         /* req->r_dentry is non-null for LSSNAP request */
885                         rcu_read_lock();
886                         inode = get_nonsnap_parent(req->r_dentry);
887                         rcu_read_unlock();
888                         dout("__choose_mds using snapdir's parent %p\n", inode);
889                 }
890         } else if (req->r_dentry) {
891                 /* ignore race with rename; old or new d_parent is okay */
892                 struct dentry *parent;
893                 struct inode *dir;
894
895                 rcu_read_lock();
896                 parent = req->r_dentry->d_parent;
897                 dir = req->r_parent ? : d_inode_rcu(parent);
898
899                 if (!dir || dir->i_sb != mdsc->fsc->sb) {
900                         /*  not this fs or parent went negative */
901                         inode = d_inode(req->r_dentry);
902                         if (inode)
903                                 ihold(inode);
904                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
905                         /* direct snapped/virtual snapdir requests
906                          * based on parent dir inode */
907                         inode = get_nonsnap_parent(parent);
908                         dout("__choose_mds using nonsnap parent %p\n", inode);
909                 } else {
910                         /* dentry target */
911                         inode = d_inode(req->r_dentry);
912                         if (!inode || mode == USE_AUTH_MDS) {
913                                 /* dir + name */
914                                 inode = igrab(dir);
915                                 hash = ceph_dentry_hash(dir, req->r_dentry);
916                                 is_hash = true;
917                         } else {
918                                 ihold(inode);
919                         }
920                 }
921                 rcu_read_unlock();
922         }
923
924         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
925              (int)hash, mode);
926         if (!inode)
927                 goto random;
928         ci = ceph_inode(inode);
929
930         if (is_hash && S_ISDIR(inode->i_mode)) {
931                 struct ceph_inode_frag frag;
932                 int found;
933
934                 ceph_choose_frag(ci, hash, &frag, &found);
935                 if (found) {
936                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
937                                 u8 r;
938
939                                 /* choose a random replica */
940                                 get_random_bytes(&r, 1);
941                                 r %= frag.ndist;
942                                 mds = frag.dist[r];
943                                 dout("choose_mds %p %llx.%llx "
944                                      "frag %u mds%d (%d/%d)\n",
945                                      inode, ceph_vinop(inode),
946                                      frag.frag, mds,
947                                      (int)r, frag.ndist);
948                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
949                                     CEPH_MDS_STATE_ACTIVE)
950                                         goto out;
951                         }
952
953                         /* since this file/dir wasn't known to be
954                          * replicated, then we want to look for the
955                          * authoritative mds. */
956                         mode = USE_AUTH_MDS;
957                         if (frag.mds >= 0) {
958                                 /* choose auth mds */
959                                 mds = frag.mds;
960                                 dout("choose_mds %p %llx.%llx "
961                                      "frag %u mds%d (auth)\n",
962                                      inode, ceph_vinop(inode), frag.frag, mds);
963                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
964                                     CEPH_MDS_STATE_ACTIVE)
965                                         goto out;
966                         }
967                 }
968         }
969
970         spin_lock(&ci->i_ceph_lock);
971         cap = NULL;
972         if (mode == USE_AUTH_MDS)
973                 cap = ci->i_auth_cap;
974         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
975                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
976         if (!cap) {
977                 spin_unlock(&ci->i_ceph_lock);
978                 iput(inode);
979                 goto random;
980         }
981         mds = cap->session->s_mds;
982         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
983              inode, ceph_vinop(inode), mds,
984              cap == ci->i_auth_cap ? "auth " : "", cap);
985         spin_unlock(&ci->i_ceph_lock);
986 out:
987         iput(inode);
988         return mds;
989
990 random:
991         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
992         dout("choose_mds chose random mds%d\n", mds);
993         return mds;
994 }
995
996
997 /*
998  * session messages
999  */
1000 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
1001 {
1002         struct ceph_msg *msg;
1003         struct ceph_mds_session_head *h;
1004
1005         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1006                            false);
1007         if (!msg) {
1008                 pr_err("create_session_msg ENOMEM creating msg\n");
1009                 return NULL;
1010         }
1011         h = msg->front.iov_base;
1012         h->op = cpu_to_le32(op);
1013         h->seq = cpu_to_le64(seq);
1014
1015         return msg;
1016 }
1017
1018 static void encode_supported_features(void **p, void *end)
1019 {
1020         static const unsigned char bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1021         static const size_t count = ARRAY_SIZE(bits);
1022
1023         if (count > 0) {
1024                 size_t i;
1025                 size_t size = ((size_t)bits[count - 1] + 64) / 64 * 8;
1026
1027                 BUG_ON(*p + 4 + size > end);
1028                 ceph_encode_32(p, size);
1029                 memset(*p, 0, size);
1030                 for (i = 0; i < count; i++)
1031                         ((unsigned char*)(*p))[i / 8] |= 1 << (bits[i] % 8);
1032                 *p += size;
1033         } else {
1034                 BUG_ON(*p + 4 > end);
1035                 ceph_encode_32(p, 0);
1036         }
1037 }
1038
1039 /*
1040  * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1041  * to include additional client metadata fields.
1042  */
1043 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1044 {
1045         struct ceph_msg *msg;
1046         struct ceph_mds_session_head *h;
1047         int i = -1;
1048         int extra_bytes = 0;
1049         int metadata_key_count = 0;
1050         struct ceph_options *opt = mdsc->fsc->client->options;
1051         struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1052         void *p, *end;
1053
1054         const char* metadata[][2] = {
1055                 {"hostname", mdsc->nodename},
1056                 {"kernel_version", init_utsname()->release},
1057                 {"entity_id", opt->name ? : ""},
1058                 {"root", fsopt->server_path ? : "/"},
1059                 {NULL, NULL}
1060         };
1061
1062         /* Calculate serialized length of metadata */
1063         extra_bytes = 4;  /* map length */
1064         for (i = 0; metadata[i][0]; ++i) {
1065                 extra_bytes += 8 + strlen(metadata[i][0]) +
1066                         strlen(metadata[i][1]);
1067                 metadata_key_count++;
1068         }
1069         /* supported feature */
1070         extra_bytes += 4 + 8;
1071
1072         /* Allocate the message */
1073         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1074                            GFP_NOFS, false);
1075         if (!msg) {
1076                 pr_err("create_session_msg ENOMEM creating msg\n");
1077                 return NULL;
1078         }
1079         p = msg->front.iov_base;
1080         end = p + msg->front.iov_len;
1081
1082         h = p;
1083         h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1084         h->seq = cpu_to_le64(seq);
1085
1086         /*
1087          * Serialize client metadata into waiting buffer space, using
1088          * the format that userspace expects for map<string, string>
1089          *
1090          * ClientSession messages with metadata are v2
1091          */
1092         msg->hdr.version = cpu_to_le16(3);
1093         msg->hdr.compat_version = cpu_to_le16(1);
1094
1095         /* The write pointer, following the session_head structure */
1096         p += sizeof(*h);
1097
1098         /* Number of entries in the map */
1099         ceph_encode_32(&p, metadata_key_count);
1100
1101         /* Two length-prefixed strings for each entry in the map */
1102         for (i = 0; metadata[i][0]; ++i) {
1103                 size_t const key_len = strlen(metadata[i][0]);
1104                 size_t const val_len = strlen(metadata[i][1]);
1105
1106                 ceph_encode_32(&p, key_len);
1107                 memcpy(p, metadata[i][0], key_len);
1108                 p += key_len;
1109                 ceph_encode_32(&p, val_len);
1110                 memcpy(p, metadata[i][1], val_len);
1111                 p += val_len;
1112         }
1113
1114         encode_supported_features(&p, end);
1115         msg->front.iov_len = p - msg->front.iov_base;
1116         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1117
1118         return msg;
1119 }
1120
1121 /*
1122  * send session open request.
1123  *
1124  * called under mdsc->mutex
1125  */
1126 static int __open_session(struct ceph_mds_client *mdsc,
1127                           struct ceph_mds_session *session)
1128 {
1129         struct ceph_msg *msg;
1130         int mstate;
1131         int mds = session->s_mds;
1132
1133         /* wait for mds to go active? */
1134         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1135         dout("open_session to mds%d (%s)\n", mds,
1136              ceph_mds_state_name(mstate));
1137         session->s_state = CEPH_MDS_SESSION_OPENING;
1138         session->s_renew_requested = jiffies;
1139
1140         /* send connect message */
1141         msg = create_session_open_msg(mdsc, session->s_seq);
1142         if (!msg)
1143                 return -ENOMEM;
1144         ceph_con_send(&session->s_con, msg);
1145         return 0;
1146 }
1147
1148 /*
1149  * open sessions for any export targets for the given mds
1150  *
1151  * called under mdsc->mutex
1152  */
1153 static struct ceph_mds_session *
1154 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1155 {
1156         struct ceph_mds_session *session;
1157
1158         session = __ceph_lookup_mds_session(mdsc, target);
1159         if (!session) {
1160                 session = register_session(mdsc, target);
1161                 if (IS_ERR(session))
1162                         return session;
1163         }
1164         if (session->s_state == CEPH_MDS_SESSION_NEW ||
1165             session->s_state == CEPH_MDS_SESSION_CLOSING)
1166                 __open_session(mdsc, session);
1167
1168         return session;
1169 }
1170
1171 struct ceph_mds_session *
1172 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1173 {
1174         struct ceph_mds_session *session;
1175
1176         dout("open_export_target_session to mds%d\n", target);
1177
1178         mutex_lock(&mdsc->mutex);
1179         session = __open_export_target_session(mdsc, target);
1180         mutex_unlock(&mdsc->mutex);
1181
1182         return session;
1183 }
1184
1185 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1186                                           struct ceph_mds_session *session)
1187 {
1188         struct ceph_mds_info *mi;
1189         struct ceph_mds_session *ts;
1190         int i, mds = session->s_mds;
1191
1192         if (mds >= mdsc->mdsmap->m_num_mds)
1193                 return;
1194
1195         mi = &mdsc->mdsmap->m_info[mds];
1196         dout("open_export_target_sessions for mds%d (%d targets)\n",
1197              session->s_mds, mi->num_export_targets);
1198
1199         for (i = 0; i < mi->num_export_targets; i++) {
1200                 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1201                 if (!IS_ERR(ts))
1202                         ceph_put_mds_session(ts);
1203         }
1204 }
1205
1206 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1207                                            struct ceph_mds_session *session)
1208 {
1209         mutex_lock(&mdsc->mutex);
1210         __open_export_target_sessions(mdsc, session);
1211         mutex_unlock(&mdsc->mutex);
1212 }
1213
1214 /*
1215  * session caps
1216  */
1217
1218 static void detach_cap_releases(struct ceph_mds_session *session,
1219                                 struct list_head *target)
1220 {
1221         lockdep_assert_held(&session->s_cap_lock);
1222
1223         list_splice_init(&session->s_cap_releases, target);
1224         session->s_num_cap_releases = 0;
1225         dout("dispose_cap_releases mds%d\n", session->s_mds);
1226 }
1227
1228 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1229                                  struct list_head *dispose)
1230 {
1231         while (!list_empty(dispose)) {
1232                 struct ceph_cap *cap;
1233                 /* zero out the in-progress message */
1234                 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1235                 list_del(&cap->session_caps);
1236                 ceph_put_cap(mdsc, cap);
1237         }
1238 }
1239
1240 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1241                                      struct ceph_mds_session *session)
1242 {
1243         struct ceph_mds_request *req;
1244         struct rb_node *p;
1245
1246         dout("cleanup_session_requests mds%d\n", session->s_mds);
1247         mutex_lock(&mdsc->mutex);
1248         while (!list_empty(&session->s_unsafe)) {
1249                 req = list_first_entry(&session->s_unsafe,
1250                                        struct ceph_mds_request, r_unsafe_item);
1251                 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1252                                     req->r_tid);
1253                 __unregister_request(mdsc, req);
1254         }
1255         /* zero r_attempts, so kick_requests() will re-send requests */
1256         p = rb_first(&mdsc->request_tree);
1257         while (p) {
1258                 req = rb_entry(p, struct ceph_mds_request, r_node);
1259                 p = rb_next(p);
1260                 if (req->r_session &&
1261                     req->r_session->s_mds == session->s_mds)
1262                         req->r_attempts = 0;
1263         }
1264         mutex_unlock(&mdsc->mutex);
1265 }
1266
1267 /*
1268  * Helper to safely iterate over all caps associated with a session, with
1269  * special care taken to handle a racing __ceph_remove_cap().
1270  *
1271  * Caller must hold session s_mutex.
1272  */
1273 static int iterate_session_caps(struct ceph_mds_session *session,
1274                                  int (*cb)(struct inode *, struct ceph_cap *,
1275                                             void *), void *arg)
1276 {
1277         struct list_head *p;
1278         struct ceph_cap *cap;
1279         struct inode *inode, *last_inode = NULL;
1280         struct ceph_cap *old_cap = NULL;
1281         int ret;
1282
1283         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1284         spin_lock(&session->s_cap_lock);
1285         p = session->s_caps.next;
1286         while (p != &session->s_caps) {
1287                 cap = list_entry(p, struct ceph_cap, session_caps);
1288                 inode = igrab(&cap->ci->vfs_inode);
1289                 if (!inode) {
1290                         p = p->next;
1291                         continue;
1292                 }
1293                 session->s_cap_iterator = cap;
1294                 spin_unlock(&session->s_cap_lock);
1295
1296                 if (last_inode) {
1297                         iput(last_inode);
1298                         last_inode = NULL;
1299                 }
1300                 if (old_cap) {
1301                         ceph_put_cap(session->s_mdsc, old_cap);
1302                         old_cap = NULL;
1303                 }
1304
1305                 ret = cb(inode, cap, arg);
1306                 last_inode = inode;
1307
1308                 spin_lock(&session->s_cap_lock);
1309                 p = p->next;
1310                 if (!cap->ci) {
1311                         dout("iterate_session_caps  finishing cap %p removal\n",
1312                              cap);
1313                         BUG_ON(cap->session != session);
1314                         cap->session = NULL;
1315                         list_del_init(&cap->session_caps);
1316                         session->s_nr_caps--;
1317                         if (cap->queue_release) {
1318                                 list_add_tail(&cap->session_caps,
1319                                               &session->s_cap_releases);
1320                                 session->s_num_cap_releases++;
1321                         } else {
1322                                 old_cap = cap;  /* put_cap it w/o locks held */
1323                         }
1324                 }
1325                 if (ret < 0)
1326                         goto out;
1327         }
1328         ret = 0;
1329 out:
1330         session->s_cap_iterator = NULL;
1331         spin_unlock(&session->s_cap_lock);
1332
1333         iput(last_inode);
1334         if (old_cap)
1335                 ceph_put_cap(session->s_mdsc, old_cap);
1336
1337         return ret;
1338 }
1339
1340 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1341                                   void *arg)
1342 {
1343         struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1344         struct ceph_inode_info *ci = ceph_inode(inode);
1345         LIST_HEAD(to_remove);
1346         bool drop = false;
1347         bool invalidate = false;
1348
1349         dout("removing cap %p, ci is %p, inode is %p\n",
1350              cap, ci, &ci->vfs_inode);
1351         spin_lock(&ci->i_ceph_lock);
1352         if (cap->mds_wanted | cap->issued)
1353                 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1354         __ceph_remove_cap(cap, false);
1355         if (!ci->i_auth_cap) {
1356                 struct ceph_cap_flush *cf;
1357                 struct ceph_mds_client *mdsc = fsc->mdsc;
1358
1359                 if (ci->i_wrbuffer_ref > 0 &&
1360                     READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
1361                         invalidate = true;
1362
1363                 while (!list_empty(&ci->i_cap_flush_list)) {
1364                         cf = list_first_entry(&ci->i_cap_flush_list,
1365                                               struct ceph_cap_flush, i_list);
1366                         list_move(&cf->i_list, &to_remove);
1367                 }
1368
1369                 spin_lock(&mdsc->cap_dirty_lock);
1370
1371                 list_for_each_entry(cf, &to_remove, i_list)
1372                         list_del(&cf->g_list);
1373
1374                 if (!list_empty(&ci->i_dirty_item)) {
1375                         pr_warn_ratelimited(
1376                                 " dropping dirty %s state for %p %lld\n",
1377                                 ceph_cap_string(ci->i_dirty_caps),
1378                                 inode, ceph_ino(inode));
1379                         ci->i_dirty_caps = 0;
1380                         list_del_init(&ci->i_dirty_item);
1381                         drop = true;
1382                 }
1383                 if (!list_empty(&ci->i_flushing_item)) {
1384                         pr_warn_ratelimited(
1385                                 " dropping dirty+flushing %s state for %p %lld\n",
1386                                 ceph_cap_string(ci->i_flushing_caps),
1387                                 inode, ceph_ino(inode));
1388                         ci->i_flushing_caps = 0;
1389                         list_del_init(&ci->i_flushing_item);
1390                         mdsc->num_cap_flushing--;
1391                         drop = true;
1392                 }
1393                 spin_unlock(&mdsc->cap_dirty_lock);
1394
1395                 if (atomic_read(&ci->i_filelock_ref) > 0) {
1396                         /* make further file lock syscall return -EIO */
1397                         ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1398                         pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1399                                             inode, ceph_ino(inode));
1400                 }
1401
1402                 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1403                         list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1404                         ci->i_prealloc_cap_flush = NULL;
1405                 }
1406         }
1407         spin_unlock(&ci->i_ceph_lock);
1408         while (!list_empty(&to_remove)) {
1409                 struct ceph_cap_flush *cf;
1410                 cf = list_first_entry(&to_remove,
1411                                       struct ceph_cap_flush, i_list);
1412                 list_del(&cf->i_list);
1413                 ceph_free_cap_flush(cf);
1414         }
1415
1416         wake_up_all(&ci->i_cap_wq);
1417         if (invalidate)
1418                 ceph_queue_invalidate(inode);
1419         if (drop)
1420                 iput(inode);
1421         return 0;
1422 }
1423
1424 /*
1425  * caller must hold session s_mutex
1426  */
1427 static void remove_session_caps(struct ceph_mds_session *session)
1428 {
1429         struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1430         struct super_block *sb = fsc->sb;
1431         LIST_HEAD(dispose);
1432
1433         dout("remove_session_caps on %p\n", session);
1434         iterate_session_caps(session, remove_session_caps_cb, fsc);
1435
1436         wake_up_all(&fsc->mdsc->cap_flushing_wq);
1437
1438         spin_lock(&session->s_cap_lock);
1439         if (session->s_nr_caps > 0) {
1440                 struct inode *inode;
1441                 struct ceph_cap *cap, *prev = NULL;
1442                 struct ceph_vino vino;
1443                 /*
1444                  * iterate_session_caps() skips inodes that are being
1445                  * deleted, we need to wait until deletions are complete.
1446                  * __wait_on_freeing_inode() is designed for the job,
1447                  * but it is not exported, so use lookup inode function
1448                  * to access it.
1449                  */
1450                 while (!list_empty(&session->s_caps)) {
1451                         cap = list_entry(session->s_caps.next,
1452                                          struct ceph_cap, session_caps);
1453                         if (cap == prev)
1454                                 break;
1455                         prev = cap;
1456                         vino = cap->ci->i_vino;
1457                         spin_unlock(&session->s_cap_lock);
1458
1459                         inode = ceph_find_inode(sb, vino);
1460                         iput(inode);
1461
1462                         spin_lock(&session->s_cap_lock);
1463                 }
1464         }
1465
1466         // drop cap expires and unlock s_cap_lock
1467         detach_cap_releases(session, &dispose);
1468
1469         BUG_ON(session->s_nr_caps > 0);
1470         BUG_ON(!list_empty(&session->s_cap_flushing));
1471         spin_unlock(&session->s_cap_lock);
1472         dispose_cap_releases(session->s_mdsc, &dispose);
1473 }
1474
1475 enum {
1476         RECONNECT,
1477         RENEWCAPS,
1478         FORCE_RO,
1479 };
1480
1481 /*
1482  * wake up any threads waiting on this session's caps.  if the cap is
1483  * old (didn't get renewed on the client reconnect), remove it now.
1484  *
1485  * caller must hold s_mutex.
1486  */
1487 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1488                               void *arg)
1489 {
1490         struct ceph_inode_info *ci = ceph_inode(inode);
1491         unsigned long ev = (unsigned long)arg;
1492
1493         if (ev == RECONNECT) {
1494                 spin_lock(&ci->i_ceph_lock);
1495                 ci->i_wanted_max_size = 0;
1496                 ci->i_requested_max_size = 0;
1497                 spin_unlock(&ci->i_ceph_lock);
1498         } else if (ev == RENEWCAPS) {
1499                 if (cap->cap_gen < cap->session->s_cap_gen) {
1500                         /* mds did not re-issue stale cap */
1501                         spin_lock(&ci->i_ceph_lock);
1502                         cap->issued = cap->implemented = CEPH_CAP_PIN;
1503                         /* make sure mds knows what we want */
1504                         if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted)
1505                                 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
1506                         spin_unlock(&ci->i_ceph_lock);
1507                 }
1508         } else if (ev == FORCE_RO) {
1509         }
1510         wake_up_all(&ci->i_cap_wq);
1511         return 0;
1512 }
1513
1514 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1515 {
1516         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1517         iterate_session_caps(session, wake_up_session_cb,
1518                              (void *)(unsigned long)ev);
1519 }
1520
1521 /*
1522  * Send periodic message to MDS renewing all currently held caps.  The
1523  * ack will reset the expiration for all caps from this session.
1524  *
1525  * caller holds s_mutex
1526  */
1527 static int send_renew_caps(struct ceph_mds_client *mdsc,
1528                            struct ceph_mds_session *session)
1529 {
1530         struct ceph_msg *msg;
1531         int state;
1532
1533         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1534             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1535                 pr_info("mds%d caps stale\n", session->s_mds);
1536         session->s_renew_requested = jiffies;
1537
1538         /* do not try to renew caps until a recovering mds has reconnected
1539          * with its clients. */
1540         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1541         if (state < CEPH_MDS_STATE_RECONNECT) {
1542                 dout("send_renew_caps ignoring mds%d (%s)\n",
1543                      session->s_mds, ceph_mds_state_name(state));
1544                 return 0;
1545         }
1546
1547         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1548                 ceph_mds_state_name(state));
1549         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1550                                  ++session->s_renew_seq);
1551         if (!msg)
1552                 return -ENOMEM;
1553         ceph_con_send(&session->s_con, msg);
1554         return 0;
1555 }
1556
1557 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1558                              struct ceph_mds_session *session, u64 seq)
1559 {
1560         struct ceph_msg *msg;
1561
1562         dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1563              session->s_mds, ceph_session_state_name(session->s_state), seq);
1564         msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1565         if (!msg)
1566                 return -ENOMEM;
1567         ceph_con_send(&session->s_con, msg);
1568         return 0;
1569 }
1570
1571
1572 /*
1573  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1574  *
1575  * Called under session->s_mutex
1576  */
1577 static void renewed_caps(struct ceph_mds_client *mdsc,
1578                          struct ceph_mds_session *session, int is_renew)
1579 {
1580         int was_stale;
1581         int wake = 0;
1582
1583         spin_lock(&session->s_cap_lock);
1584         was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1585
1586         session->s_cap_ttl = session->s_renew_requested +
1587                 mdsc->mdsmap->m_session_timeout*HZ;
1588
1589         if (was_stale) {
1590                 if (time_before(jiffies, session->s_cap_ttl)) {
1591                         pr_info("mds%d caps renewed\n", session->s_mds);
1592                         wake = 1;
1593                 } else {
1594                         pr_info("mds%d caps still stale\n", session->s_mds);
1595                 }
1596         }
1597         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1598              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1599              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1600         spin_unlock(&session->s_cap_lock);
1601
1602         if (wake)
1603                 wake_up_session_caps(session, RENEWCAPS);
1604 }
1605
1606 /*
1607  * send a session close request
1608  */
1609 static int request_close_session(struct ceph_mds_client *mdsc,
1610                                  struct ceph_mds_session *session)
1611 {
1612         struct ceph_msg *msg;
1613
1614         dout("request_close_session mds%d state %s seq %lld\n",
1615              session->s_mds, ceph_session_state_name(session->s_state),
1616              session->s_seq);
1617         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1618         if (!msg)
1619                 return -ENOMEM;
1620         ceph_con_send(&session->s_con, msg);
1621         return 1;
1622 }
1623
1624 /*
1625  * Called with s_mutex held.
1626  */
1627 static int __close_session(struct ceph_mds_client *mdsc,
1628                          struct ceph_mds_session *session)
1629 {
1630         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1631                 return 0;
1632         session->s_state = CEPH_MDS_SESSION_CLOSING;
1633         return request_close_session(mdsc, session);
1634 }
1635
1636 static bool drop_negative_children(struct dentry *dentry)
1637 {
1638         struct dentry *child;
1639         bool all_negative = true;
1640
1641         if (!d_is_dir(dentry))
1642                 goto out;
1643
1644         spin_lock(&dentry->d_lock);
1645         list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1646                 if (d_really_is_positive(child)) {
1647                         all_negative = false;
1648                         break;
1649                 }
1650         }
1651         spin_unlock(&dentry->d_lock);
1652
1653         if (all_negative)
1654                 shrink_dcache_parent(dentry);
1655 out:
1656         return all_negative;
1657 }
1658
1659 /*
1660  * Trim old(er) caps.
1661  *
1662  * Because we can't cache an inode without one or more caps, we do
1663  * this indirectly: if a cap is unused, we prune its aliases, at which
1664  * point the inode will hopefully get dropped to.
1665  *
1666  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1667  * memory pressure from the MDS, though, so it needn't be perfect.
1668  */
1669 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1670 {
1671         struct ceph_mds_session *session = arg;
1672         struct ceph_inode_info *ci = ceph_inode(inode);
1673         int used, wanted, oissued, mine;
1674
1675         if (session->s_trim_caps <= 0)
1676                 return -1;
1677
1678         spin_lock(&ci->i_ceph_lock);
1679         mine = cap->issued | cap->implemented;
1680         used = __ceph_caps_used(ci);
1681         wanted = __ceph_caps_file_wanted(ci);
1682         oissued = __ceph_caps_issued_other(ci, cap);
1683
1684         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1685              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1686              ceph_cap_string(used), ceph_cap_string(wanted));
1687         if (cap == ci->i_auth_cap) {
1688                 if (ci->i_dirty_caps || ci->i_flushing_caps ||
1689                     !list_empty(&ci->i_cap_snaps))
1690                         goto out;
1691                 if ((used | wanted) & CEPH_CAP_ANY_WR)
1692                         goto out;
1693                 /* Note: it's possible that i_filelock_ref becomes non-zero
1694                  * after dropping auth caps. It doesn't hurt because reply
1695                  * of lock mds request will re-add auth caps. */
1696                 if (atomic_read(&ci->i_filelock_ref) > 0)
1697                         goto out;
1698         }
1699         /* The inode has cached pages, but it's no longer used.
1700          * we can safely drop it */
1701         if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1702             !(oissued & CEPH_CAP_FILE_CACHE)) {
1703           used = 0;
1704           oissued = 0;
1705         }
1706         if ((used | wanted) & ~oissued & mine)
1707                 goto out;   /* we need these caps */
1708
1709         if (oissued) {
1710                 /* we aren't the only cap.. just remove us */
1711                 __ceph_remove_cap(cap, true);
1712                 session->s_trim_caps--;
1713         } else {
1714                 struct dentry *dentry;
1715                 /* try dropping referring dentries */
1716                 spin_unlock(&ci->i_ceph_lock);
1717                 dentry = d_find_any_alias(inode);
1718                 if (dentry && drop_negative_children(dentry)) {
1719                         int count;
1720                         dput(dentry);
1721                         d_prune_aliases(inode);
1722                         count = atomic_read(&inode->i_count);
1723                         if (count == 1)
1724                                 session->s_trim_caps--;
1725                         dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1726                              inode, cap, count);
1727                 } else {
1728                         dput(dentry);
1729                 }
1730                 return 0;
1731         }
1732
1733 out:
1734         spin_unlock(&ci->i_ceph_lock);
1735         return 0;
1736 }
1737
1738 /*
1739  * Trim session cap count down to some max number.
1740  */
1741 int ceph_trim_caps(struct ceph_mds_client *mdsc,
1742                    struct ceph_mds_session *session,
1743                    int max_caps)
1744 {
1745         int trim_caps = session->s_nr_caps - max_caps;
1746
1747         dout("trim_caps mds%d start: %d / %d, trim %d\n",
1748              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1749         if (trim_caps > 0) {
1750                 session->s_trim_caps = trim_caps;
1751                 iterate_session_caps(session, trim_caps_cb, session);
1752                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1753                      session->s_mds, session->s_nr_caps, max_caps,
1754                         trim_caps - session->s_trim_caps);
1755                 session->s_trim_caps = 0;
1756         }
1757
1758         ceph_send_cap_releases(mdsc, session);
1759         return 0;
1760 }
1761
1762 static int check_caps_flush(struct ceph_mds_client *mdsc,
1763                             u64 want_flush_tid)
1764 {
1765         int ret = 1;
1766
1767         spin_lock(&mdsc->cap_dirty_lock);
1768         if (!list_empty(&mdsc->cap_flush_list)) {
1769                 struct ceph_cap_flush *cf =
1770                         list_first_entry(&mdsc->cap_flush_list,
1771                                          struct ceph_cap_flush, g_list);
1772                 if (cf->tid <= want_flush_tid) {
1773                         dout("check_caps_flush still flushing tid "
1774                              "%llu <= %llu\n", cf->tid, want_flush_tid);
1775                         ret = 0;
1776                 }
1777         }
1778         spin_unlock(&mdsc->cap_dirty_lock);
1779         return ret;
1780 }
1781
1782 /*
1783  * flush all dirty inode data to disk.
1784  *
1785  * returns true if we've flushed through want_flush_tid
1786  */
1787 static void wait_caps_flush(struct ceph_mds_client *mdsc,
1788                             u64 want_flush_tid)
1789 {
1790         dout("check_caps_flush want %llu\n", want_flush_tid);
1791
1792         wait_event(mdsc->cap_flushing_wq,
1793                    check_caps_flush(mdsc, want_flush_tid));
1794
1795         dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
1796 }
1797
1798 /*
1799  * called under s_mutex
1800  */
1801 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1802                             struct ceph_mds_session *session)
1803 {
1804         struct ceph_msg *msg = NULL;
1805         struct ceph_mds_cap_release *head;
1806         struct ceph_mds_cap_item *item;
1807         struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
1808         struct ceph_cap *cap;
1809         LIST_HEAD(tmp_list);
1810         int num_cap_releases;
1811         __le32  barrier, *cap_barrier;
1812
1813         down_read(&osdc->lock);
1814         barrier = cpu_to_le32(osdc->epoch_barrier);
1815         up_read(&osdc->lock);
1816
1817         spin_lock(&session->s_cap_lock);
1818 again:
1819         list_splice_init(&session->s_cap_releases, &tmp_list);
1820         num_cap_releases = session->s_num_cap_releases;
1821         session->s_num_cap_releases = 0;
1822         spin_unlock(&session->s_cap_lock);
1823
1824         while (!list_empty(&tmp_list)) {
1825                 if (!msg) {
1826                         msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
1827                                         PAGE_SIZE, GFP_NOFS, false);
1828                         if (!msg)
1829                                 goto out_err;
1830                         head = msg->front.iov_base;
1831                         head->num = cpu_to_le32(0);
1832                         msg->front.iov_len = sizeof(*head);
1833
1834                         msg->hdr.version = cpu_to_le16(2);
1835                         msg->hdr.compat_version = cpu_to_le16(1);
1836                 }
1837
1838                 cap = list_first_entry(&tmp_list, struct ceph_cap,
1839                                         session_caps);
1840                 list_del(&cap->session_caps);
1841                 num_cap_releases--;
1842
1843                 head = msg->front.iov_base;
1844                 le32_add_cpu(&head->num, 1);
1845                 item = msg->front.iov_base + msg->front.iov_len;
1846                 item->ino = cpu_to_le64(cap->cap_ino);
1847                 item->cap_id = cpu_to_le64(cap->cap_id);
1848                 item->migrate_seq = cpu_to_le32(cap->mseq);
1849                 item->seq = cpu_to_le32(cap->issue_seq);
1850                 msg->front.iov_len += sizeof(*item);
1851
1852                 ceph_put_cap(mdsc, cap);
1853
1854                 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1855                         // Append cap_barrier field
1856                         cap_barrier = msg->front.iov_base + msg->front.iov_len;
1857                         *cap_barrier = barrier;
1858                         msg->front.iov_len += sizeof(*cap_barrier);
1859
1860                         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1861                         dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1862                         ceph_con_send(&session->s_con, msg);
1863                         msg = NULL;
1864                 }
1865         }
1866
1867         BUG_ON(num_cap_releases != 0);
1868
1869         spin_lock(&session->s_cap_lock);
1870         if (!list_empty(&session->s_cap_releases))
1871                 goto again;
1872         spin_unlock(&session->s_cap_lock);
1873
1874         if (msg) {
1875                 // Append cap_barrier field
1876                 cap_barrier = msg->front.iov_base + msg->front.iov_len;
1877                 *cap_barrier = barrier;
1878                 msg->front.iov_len += sizeof(*cap_barrier);
1879
1880                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1881                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1882                 ceph_con_send(&session->s_con, msg);
1883         }
1884         return;
1885 out_err:
1886         pr_err("send_cap_releases mds%d, failed to allocate message\n",
1887                 session->s_mds);
1888         spin_lock(&session->s_cap_lock);
1889         list_splice(&tmp_list, &session->s_cap_releases);
1890         session->s_num_cap_releases += num_cap_releases;
1891         spin_unlock(&session->s_cap_lock);
1892 }
1893
1894 /*
1895  * requests
1896  */
1897
1898 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1899                                     struct inode *dir)
1900 {
1901         struct ceph_inode_info *ci = ceph_inode(dir);
1902         struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1903         struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1904         size_t size = sizeof(struct ceph_mds_reply_dir_entry);
1905         int order, num_entries;
1906
1907         spin_lock(&ci->i_ceph_lock);
1908         num_entries = ci->i_files + ci->i_subdirs;
1909         spin_unlock(&ci->i_ceph_lock);
1910         num_entries = max(num_entries, 1);
1911         num_entries = min(num_entries, opt->max_readdir);
1912
1913         order = get_order(size * num_entries);
1914         while (order >= 0) {
1915                 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
1916                                                              __GFP_NOWARN,
1917                                                              order);
1918                 if (rinfo->dir_entries)
1919                         break;
1920                 order--;
1921         }
1922         if (!rinfo->dir_entries)
1923                 return -ENOMEM;
1924
1925         num_entries = (PAGE_SIZE << order) / size;
1926         num_entries = min(num_entries, opt->max_readdir);
1927
1928         rinfo->dir_buf_size = PAGE_SIZE << order;
1929         req->r_num_caps = num_entries + 1;
1930         req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
1931         req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
1932         return 0;
1933 }
1934
1935 /*
1936  * Create an mds request.
1937  */
1938 struct ceph_mds_request *
1939 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1940 {
1941         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1942         struct timespec64 ts;
1943
1944         if (!req)
1945                 return ERR_PTR(-ENOMEM);
1946
1947         mutex_init(&req->r_fill_mutex);
1948         req->r_mdsc = mdsc;
1949         req->r_started = jiffies;
1950         req->r_resend_mds = -1;
1951         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1952         INIT_LIST_HEAD(&req->r_unsafe_target_item);
1953         req->r_fmode = -1;
1954         kref_init(&req->r_kref);
1955         RB_CLEAR_NODE(&req->r_node);
1956         INIT_LIST_HEAD(&req->r_wait);
1957         init_completion(&req->r_completion);
1958         init_completion(&req->r_safe_completion);
1959         INIT_LIST_HEAD(&req->r_unsafe_item);
1960
1961         ktime_get_coarse_real_ts64(&ts);
1962         req->r_stamp = timespec64_trunc(ts, mdsc->fsc->sb->s_time_gran);
1963
1964         req->r_op = op;
1965         req->r_direct_mode = mode;
1966         return req;
1967 }
1968
1969 /*
1970  * return oldest (lowest) request, tid in request tree, 0 if none.
1971  *
1972  * called under mdsc->mutex.
1973  */
1974 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1975 {
1976         if (RB_EMPTY_ROOT(&mdsc->request_tree))
1977                 return NULL;
1978         return rb_entry(rb_first(&mdsc->request_tree),
1979                         struct ceph_mds_request, r_node);
1980 }
1981
1982 static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1983 {
1984         return mdsc->oldest_tid;
1985 }
1986
1987 /*
1988  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1989  * on build_path_from_dentry in fs/cifs/dir.c.
1990  *
1991  * If @stop_on_nosnap, generate path relative to the first non-snapped
1992  * inode.
1993  *
1994  * Encode hidden .snap dirs as a double /, i.e.
1995  *   foo/.snap/bar -> foo//bar
1996  */
1997 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1998                            int stop_on_nosnap)
1999 {
2000         struct dentry *temp;
2001         char *path;
2002         int len, pos;
2003         unsigned seq;
2004
2005         if (!dentry)
2006                 return ERR_PTR(-EINVAL);
2007
2008 retry:
2009         len = 0;
2010         seq = read_seqbegin(&rename_lock);
2011         rcu_read_lock();
2012         for (temp = dentry; !IS_ROOT(temp);) {
2013                 struct inode *inode = d_inode(temp);
2014                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
2015                         len++;  /* slash only */
2016                 else if (stop_on_nosnap && inode &&
2017                          ceph_snap(inode) == CEPH_NOSNAP)
2018                         break;
2019                 else
2020                         len += 1 + temp->d_name.len;
2021                 temp = temp->d_parent;
2022         }
2023         rcu_read_unlock();
2024         if (len)
2025                 len--;  /* no leading '/' */
2026
2027         path = kmalloc(len+1, GFP_NOFS);
2028         if (!path)
2029                 return ERR_PTR(-ENOMEM);
2030         pos = len;
2031         path[pos] = 0;  /* trailing null */
2032         rcu_read_lock();
2033         for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
2034                 struct inode *inode;
2035
2036                 spin_lock(&temp->d_lock);
2037                 inode = d_inode(temp);
2038                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2039                         dout("build_path path+%d: %p SNAPDIR\n",
2040                              pos, temp);
2041                 } else if (stop_on_nosnap && inode &&
2042                            ceph_snap(inode) == CEPH_NOSNAP) {
2043                         spin_unlock(&temp->d_lock);
2044                         break;
2045                 } else {
2046                         pos -= temp->d_name.len;
2047                         if (pos < 0) {
2048                                 spin_unlock(&temp->d_lock);
2049                                 break;
2050                         }
2051                         strncpy(path + pos, temp->d_name.name,
2052                                 temp->d_name.len);
2053                 }
2054                 spin_unlock(&temp->d_lock);
2055                 if (pos)
2056                         path[--pos] = '/';
2057                 temp = temp->d_parent;
2058         }
2059         rcu_read_unlock();
2060         if (pos != 0 || read_seqretry(&rename_lock, seq)) {
2061                 pr_err("build_path did not end path lookup where "
2062                        "expected, namelen is %d, pos is %d\n", len, pos);
2063                 /* presumably this is only possible if racing with a
2064                    rename of one of the parent directories (we can not
2065                    lock the dentries above us to prevent this, but
2066                    retrying should be harmless) */
2067                 kfree(path);
2068                 goto retry;
2069         }
2070
2071         *base = ceph_ino(d_inode(temp));
2072         *plen = len;
2073         dout("build_path on %p %d built %llx '%.*s'\n",
2074              dentry, d_count(dentry), *base, len, path);
2075         return path;
2076 }
2077
2078 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2079                              const char **ppath, int *ppathlen, u64 *pino,
2080                              int *pfreepath)
2081 {
2082         char *path;
2083
2084         rcu_read_lock();
2085         if (!dir)
2086                 dir = d_inode_rcu(dentry->d_parent);
2087         if (dir && ceph_snap(dir) == CEPH_NOSNAP) {
2088                 *pino = ceph_ino(dir);
2089                 rcu_read_unlock();
2090                 *ppath = dentry->d_name.name;
2091                 *ppathlen = dentry->d_name.len;
2092                 return 0;
2093         }
2094         rcu_read_unlock();
2095         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2096         if (IS_ERR(path))
2097                 return PTR_ERR(path);
2098         *ppath = path;
2099         *pfreepath = 1;
2100         return 0;
2101 }
2102
2103 static int build_inode_path(struct inode *inode,
2104                             const char **ppath, int *ppathlen, u64 *pino,
2105                             int *pfreepath)
2106 {
2107         struct dentry *dentry;
2108         char *path;
2109
2110         if (ceph_snap(inode) == CEPH_NOSNAP) {
2111                 *pino = ceph_ino(inode);
2112                 *ppathlen = 0;
2113                 return 0;
2114         }
2115         dentry = d_find_alias(inode);
2116         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2117         dput(dentry);
2118         if (IS_ERR(path))
2119                 return PTR_ERR(path);
2120         *ppath = path;
2121         *pfreepath = 1;
2122         return 0;
2123 }
2124
2125 /*
2126  * request arguments may be specified via an inode *, a dentry *, or
2127  * an explicit ino+path.
2128  */
2129 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2130                                   struct inode *rdiri, const char *rpath,
2131                                   u64 rino, const char **ppath, int *pathlen,
2132                                   u64 *ino, int *freepath)
2133 {
2134         int r = 0;
2135
2136         if (rinode) {
2137                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2138                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2139                      ceph_snap(rinode));
2140         } else if (rdentry) {
2141                 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2142                                         freepath);
2143                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2144                      *ppath);
2145         } else if (rpath || rino) {
2146                 *ino = rino;
2147                 *ppath = rpath;
2148                 *pathlen = rpath ? strlen(rpath) : 0;
2149                 dout(" path %.*s\n", *pathlen, rpath);
2150         }
2151
2152         return r;
2153 }
2154
2155 /*
2156  * called under mdsc->mutex
2157  */
2158 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2159                                                struct ceph_mds_request *req,
2160                                                int mds, bool drop_cap_releases)
2161 {
2162         struct ceph_msg *msg;
2163         struct ceph_mds_request_head *head;
2164         const char *path1 = NULL;
2165         const char *path2 = NULL;
2166         u64 ino1 = 0, ino2 = 0;
2167         int pathlen1 = 0, pathlen2 = 0;
2168         int freepath1 = 0, freepath2 = 0;
2169         int len;
2170         u16 releases;
2171         void *p, *end;
2172         int ret;
2173
2174         ret = set_request_path_attr(req->r_inode, req->r_dentry,
2175                               req->r_parent, req->r_path1, req->r_ino1.ino,
2176                               &path1, &pathlen1, &ino1, &freepath1);
2177         if (ret < 0) {
2178                 msg = ERR_PTR(ret);
2179                 goto out;
2180         }
2181
2182         ret = set_request_path_attr(NULL, req->r_old_dentry,
2183                               req->r_old_dentry_dir,
2184                               req->r_path2, req->r_ino2.ino,
2185                               &path2, &pathlen2, &ino2, &freepath2);
2186         if (ret < 0) {
2187                 msg = ERR_PTR(ret);
2188                 goto out_free1;
2189         }
2190
2191         len = sizeof(*head) +
2192                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2193                 sizeof(struct ceph_timespec);
2194
2195         /* calculate (max) length for cap releases */
2196         len += sizeof(struct ceph_mds_request_release) *
2197                 (!!req->r_inode_drop + !!req->r_dentry_drop +
2198                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2199         if (req->r_dentry_drop)
2200                 len += req->r_dentry->d_name.len;
2201         if (req->r_old_dentry_drop)
2202                 len += req->r_old_dentry->d_name.len;
2203
2204         msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2205         if (!msg) {
2206                 msg = ERR_PTR(-ENOMEM);
2207                 goto out_free2;
2208         }
2209
2210         msg->hdr.version = cpu_to_le16(2);
2211         msg->hdr.tid = cpu_to_le64(req->r_tid);
2212
2213         head = msg->front.iov_base;
2214         p = msg->front.iov_base + sizeof(*head);
2215         end = msg->front.iov_base + msg->front.iov_len;
2216
2217         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2218         head->op = cpu_to_le32(req->r_op);
2219         head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2220         head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2221         head->args = req->r_args;
2222
2223         ceph_encode_filepath(&p, end, ino1, path1);
2224         ceph_encode_filepath(&p, end, ino2, path2);
2225
2226         /* make note of release offset, in case we need to replay */
2227         req->r_request_release_offset = p - msg->front.iov_base;
2228
2229         /* cap releases */
2230         releases = 0;
2231         if (req->r_inode_drop)
2232                 releases += ceph_encode_inode_release(&p,
2233                       req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2234                       mds, req->r_inode_drop, req->r_inode_unless, 0);
2235         if (req->r_dentry_drop)
2236                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
2237                                 req->r_parent, mds, req->r_dentry_drop,
2238                                 req->r_dentry_unless);
2239         if (req->r_old_dentry_drop)
2240                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2241                                 req->r_old_dentry_dir, mds,
2242                                 req->r_old_dentry_drop,
2243                                 req->r_old_dentry_unless);
2244         if (req->r_old_inode_drop)
2245                 releases += ceph_encode_inode_release(&p,
2246                       d_inode(req->r_old_dentry),
2247                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2248
2249         if (drop_cap_releases) {
2250                 releases = 0;
2251                 p = msg->front.iov_base + req->r_request_release_offset;
2252         }
2253
2254         head->num_releases = cpu_to_le16(releases);
2255
2256         /* time stamp */
2257         {
2258                 struct ceph_timespec ts;
2259                 ceph_encode_timespec64(&ts, &req->r_stamp);
2260                 ceph_encode_copy(&p, &ts, sizeof(ts));
2261         }
2262
2263         BUG_ON(p > end);
2264         msg->front.iov_len = p - msg->front.iov_base;
2265         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2266
2267         if (req->r_pagelist) {
2268                 struct ceph_pagelist *pagelist = req->r_pagelist;
2269                 ceph_msg_data_add_pagelist(msg, pagelist);
2270                 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2271         } else {
2272                 msg->hdr.data_len = 0;
2273         }
2274
2275         msg->hdr.data_off = cpu_to_le16(0);
2276
2277 out_free2:
2278         if (freepath2)
2279                 kfree((char *)path2);
2280 out_free1:
2281         if (freepath1)
2282                 kfree((char *)path1);
2283 out:
2284         return msg;
2285 }
2286
2287 /*
2288  * called under mdsc->mutex if error, under no mutex if
2289  * success.
2290  */
2291 static void complete_request(struct ceph_mds_client *mdsc,
2292                              struct ceph_mds_request *req)
2293 {
2294         if (req->r_callback)
2295                 req->r_callback(mdsc, req);
2296         else
2297                 complete_all(&req->r_completion);
2298 }
2299
2300 /*
2301  * called under mdsc->mutex
2302  */
2303 static int __prepare_send_request(struct ceph_mds_client *mdsc,
2304                                   struct ceph_mds_request *req,
2305                                   int mds, bool drop_cap_releases)
2306 {
2307         struct ceph_mds_request_head *rhead;
2308         struct ceph_msg *msg;
2309         int flags = 0;
2310
2311         req->r_attempts++;
2312         if (req->r_inode) {
2313                 struct ceph_cap *cap =
2314                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2315
2316                 if (cap)
2317                         req->r_sent_on_mseq = cap->mseq;
2318                 else
2319                         req->r_sent_on_mseq = -1;
2320         }
2321         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2322              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2323
2324         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2325                 void *p;
2326                 /*
2327                  * Replay.  Do not regenerate message (and rebuild
2328                  * paths, etc.); just use the original message.
2329                  * Rebuilding paths will break for renames because
2330                  * d_move mangles the src name.
2331                  */
2332                 msg = req->r_request;
2333                 rhead = msg->front.iov_base;
2334
2335                 flags = le32_to_cpu(rhead->flags);
2336                 flags |= CEPH_MDS_FLAG_REPLAY;
2337                 rhead->flags = cpu_to_le32(flags);
2338
2339                 if (req->r_target_inode)
2340                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2341
2342                 rhead->num_retry = req->r_attempts - 1;
2343
2344                 /* remove cap/dentry releases from message */
2345                 rhead->num_releases = 0;
2346
2347                 /* time stamp */
2348                 p = msg->front.iov_base + req->r_request_release_offset;
2349                 {
2350                         struct ceph_timespec ts;
2351                         ceph_encode_timespec64(&ts, &req->r_stamp);
2352                         ceph_encode_copy(&p, &ts, sizeof(ts));
2353                 }
2354
2355                 msg->front.iov_len = p - msg->front.iov_base;
2356                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2357                 return 0;
2358         }
2359
2360         if (req->r_request) {
2361                 ceph_msg_put(req->r_request);
2362                 req->r_request = NULL;
2363         }
2364         msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2365         if (IS_ERR(msg)) {
2366                 req->r_err = PTR_ERR(msg);
2367                 return PTR_ERR(msg);
2368         }
2369         req->r_request = msg;
2370
2371         rhead = msg->front.iov_base;
2372         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2373         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2374                 flags |= CEPH_MDS_FLAG_REPLAY;
2375         if (req->r_parent)
2376                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2377         rhead->flags = cpu_to_le32(flags);
2378         rhead->num_fwd = req->r_num_fwd;
2379         rhead->num_retry = req->r_attempts - 1;
2380         rhead->ino = 0;
2381
2382         dout(" r_parent = %p\n", req->r_parent);
2383         return 0;
2384 }
2385
2386 /*
2387  * send request, or put it on the appropriate wait list.
2388  */
2389 static void __do_request(struct ceph_mds_client *mdsc,
2390                         struct ceph_mds_request *req)
2391 {
2392         struct ceph_mds_session *session = NULL;
2393         int mds = -1;
2394         int err = 0;
2395
2396         if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2397                 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2398                         __unregister_request(mdsc, req);
2399                 return;
2400         }
2401
2402         if (req->r_timeout &&
2403             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2404                 dout("do_request timed out\n");
2405                 err = -EIO;
2406                 goto finish;
2407         }
2408         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2409                 dout("do_request forced umount\n");
2410                 err = -EIO;
2411                 goto finish;
2412         }
2413         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2414                 if (mdsc->mdsmap_err) {
2415                         err = mdsc->mdsmap_err;
2416                         dout("do_request mdsmap err %d\n", err);
2417                         goto finish;
2418                 }
2419                 if (mdsc->mdsmap->m_epoch == 0) {
2420                         dout("do_request no mdsmap, waiting for map\n");
2421                         list_add(&req->r_wait, &mdsc->waiting_for_map);
2422                         return;
2423                 }
2424                 if (!(mdsc->fsc->mount_options->flags &
2425                       CEPH_MOUNT_OPT_MOUNTWAIT) &&
2426                     !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2427                         err = -ENOENT;
2428                         pr_info("probably no mds server is up\n");
2429                         goto finish;
2430                 }
2431         }
2432
2433         put_request_session(req);
2434
2435         mds = __choose_mds(mdsc, req);
2436         if (mds < 0 ||
2437             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2438                 dout("do_request no mds or not active, waiting for map\n");
2439                 list_add(&req->r_wait, &mdsc->waiting_for_map);
2440                 return;
2441         }
2442
2443         /* get, open session */
2444         session = __ceph_lookup_mds_session(mdsc, mds);
2445         if (!session) {
2446                 session = register_session(mdsc, mds);
2447                 if (IS_ERR(session)) {
2448                         err = PTR_ERR(session);
2449                         goto finish;
2450                 }
2451         }
2452         req->r_session = get_session(session);
2453
2454         dout("do_request mds%d session %p state %s\n", mds, session,
2455              ceph_session_state_name(session->s_state));
2456         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2457             session->s_state != CEPH_MDS_SESSION_HUNG) {
2458                 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2459                         err = -EACCES;
2460                         goto out_session;
2461                 }
2462                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2463                     session->s_state == CEPH_MDS_SESSION_CLOSING)
2464                         __open_session(mdsc, session);
2465                 list_add(&req->r_wait, &session->s_waiting);
2466                 goto out_session;
2467         }
2468
2469         /* send request */
2470         req->r_resend_mds = -1;   /* forget any previous mds hint */
2471
2472         if (req->r_request_started == 0)   /* note request start time */
2473                 req->r_request_started = jiffies;
2474
2475         err = __prepare_send_request(mdsc, req, mds, false);
2476         if (!err) {
2477                 ceph_msg_get(req->r_request);
2478                 ceph_con_send(&session->s_con, req->r_request);
2479         }
2480
2481 out_session:
2482         ceph_put_mds_session(session);
2483 finish:
2484         if (err) {
2485                 dout("__do_request early error %d\n", err);
2486                 req->r_err = err;
2487                 complete_request(mdsc, req);
2488                 __unregister_request(mdsc, req);
2489         }
2490         return;
2491 }
2492
2493 /*
2494  * called under mdsc->mutex
2495  */
2496 static void __wake_requests(struct ceph_mds_client *mdsc,
2497                             struct list_head *head)
2498 {
2499         struct ceph_mds_request *req;
2500         LIST_HEAD(tmp_list);
2501
2502         list_splice_init(head, &tmp_list);
2503
2504         while (!list_empty(&tmp_list)) {
2505                 req = list_entry(tmp_list.next,
2506                                  struct ceph_mds_request, r_wait);
2507                 list_del_init(&req->r_wait);
2508                 dout(" wake request %p tid %llu\n", req, req->r_tid);
2509                 __do_request(mdsc, req);
2510         }
2511 }
2512
2513 /*
2514  * Wake up threads with requests pending for @mds, so that they can
2515  * resubmit their requests to a possibly different mds.
2516  */
2517 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2518 {
2519         struct ceph_mds_request *req;
2520         struct rb_node *p = rb_first(&mdsc->request_tree);
2521
2522         dout("kick_requests mds%d\n", mds);
2523         while (p) {
2524                 req = rb_entry(p, struct ceph_mds_request, r_node);
2525                 p = rb_next(p);
2526                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2527                         continue;
2528                 if (req->r_attempts > 0)
2529                         continue; /* only new requests */
2530                 if (req->r_session &&
2531                     req->r_session->s_mds == mds) {
2532                         dout(" kicking tid %llu\n", req->r_tid);
2533                         list_del_init(&req->r_wait);
2534                         __do_request(mdsc, req);
2535                 }
2536         }
2537 }
2538
2539 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
2540                               struct ceph_mds_request *req)
2541 {
2542         dout("submit_request on %p\n", req);
2543         mutex_lock(&mdsc->mutex);
2544         __register_request(mdsc, req, NULL);
2545         __do_request(mdsc, req);
2546         mutex_unlock(&mdsc->mutex);
2547 }
2548
2549 /*
2550  * Synchrously perform an mds request.  Take care of all of the
2551  * session setup, forwarding, retry details.
2552  */
2553 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2554                          struct inode *dir,
2555                          struct ceph_mds_request *req)
2556 {
2557         int err;
2558
2559         dout("do_request on %p\n", req);
2560
2561         /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2562         if (req->r_inode)
2563                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2564         if (req->r_parent)
2565                 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
2566         if (req->r_old_dentry_dir)
2567                 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2568                                   CEPH_CAP_PIN);
2569
2570         /* issue */
2571         mutex_lock(&mdsc->mutex);
2572         __register_request(mdsc, req, dir);
2573         __do_request(mdsc, req);
2574
2575         if (req->r_err) {
2576                 err = req->r_err;
2577                 goto out;
2578         }
2579
2580         /* wait */
2581         mutex_unlock(&mdsc->mutex);
2582         dout("do_request waiting\n");
2583         if (!req->r_timeout && req->r_wait_for_completion) {
2584                 err = req->r_wait_for_completion(mdsc, req);
2585         } else {
2586                 long timeleft = wait_for_completion_killable_timeout(
2587                                         &req->r_completion,
2588                                         ceph_timeout_jiffies(req->r_timeout));
2589                 if (timeleft > 0)
2590                         err = 0;
2591                 else if (!timeleft)
2592                         err = -EIO;  /* timed out */
2593                 else
2594                         err = timeleft;  /* killed */
2595         }
2596         dout("do_request waited, got %d\n", err);
2597         mutex_lock(&mdsc->mutex);
2598
2599         /* only abort if we didn't race with a real reply */
2600         if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2601                 err = le32_to_cpu(req->r_reply_info.head->result);
2602         } else if (err < 0) {
2603                 dout("aborted request %lld with %d\n", req->r_tid, err);
2604
2605                 /*
2606                  * ensure we aren't running concurrently with
2607                  * ceph_fill_trace or ceph_readdir_prepopulate, which
2608                  * rely on locks (dir mutex) held by our caller.
2609                  */
2610                 mutex_lock(&req->r_fill_mutex);
2611                 req->r_err = err;
2612                 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
2613                 mutex_unlock(&req->r_fill_mutex);
2614
2615                 if (req->r_parent &&
2616                     (req->r_op & CEPH_MDS_OP_WRITE))
2617                         ceph_invalidate_dir_request(req);
2618         } else {
2619                 err = req->r_err;
2620         }
2621
2622 out:
2623         mutex_unlock(&mdsc->mutex);
2624         dout("do_request %p done, result %d\n", req, err);
2625         return err;
2626 }
2627
2628 /*
2629  * Invalidate dir's completeness, dentry lease state on an aborted MDS
2630  * namespace request.
2631  */
2632 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2633 {
2634         struct inode *dir = req->r_parent;
2635         struct inode *old_dir = req->r_old_dentry_dir;
2636
2637         dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
2638
2639         ceph_dir_clear_complete(dir);
2640         if (old_dir)
2641                 ceph_dir_clear_complete(old_dir);
2642         if (req->r_dentry)
2643                 ceph_invalidate_dentry_lease(req->r_dentry);
2644         if (req->r_old_dentry)
2645                 ceph_invalidate_dentry_lease(req->r_old_dentry);
2646 }
2647
2648 /*
2649  * Handle mds reply.
2650  *
2651  * We take the session mutex and parse and process the reply immediately.
2652  * This preserves the logical ordering of replies, capabilities, etc., sent
2653  * by the MDS as they are applied to our local cache.
2654  */
2655 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2656 {
2657         struct ceph_mds_client *mdsc = session->s_mdsc;
2658         struct ceph_mds_request *req;
2659         struct ceph_mds_reply_head *head = msg->front.iov_base;
2660         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2661         struct ceph_snap_realm *realm;
2662         u64 tid;
2663         int err, result;
2664         int mds = session->s_mds;
2665
2666         if (msg->front.iov_len < sizeof(*head)) {
2667                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2668                 ceph_msg_dump(msg);
2669                 return;
2670         }
2671
2672         /* get request, session */
2673         tid = le64_to_cpu(msg->hdr.tid);
2674         mutex_lock(&mdsc->mutex);
2675         req = lookup_get_request(mdsc, tid);
2676         if (!req) {
2677                 dout("handle_reply on unknown tid %llu\n", tid);
2678                 mutex_unlock(&mdsc->mutex);
2679                 return;
2680         }
2681         dout("handle_reply %p\n", req);
2682
2683         /* correct session? */
2684         if (req->r_session != session) {
2685                 pr_err("mdsc_handle_reply got %llu on session mds%d"
2686                        " not mds%d\n", tid, session->s_mds,
2687                        req->r_session ? req->r_session->s_mds : -1);
2688                 mutex_unlock(&mdsc->mutex);
2689                 goto out;
2690         }
2691
2692         /* dup? */
2693         if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
2694             (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
2695                 pr_warn("got a dup %s reply on %llu from mds%d\n",
2696                            head->safe ? "safe" : "unsafe", tid, mds);
2697                 mutex_unlock(&mdsc->mutex);
2698                 goto out;
2699         }
2700         if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
2701                 pr_warn("got unsafe after safe on %llu from mds%d\n",
2702                            tid, mds);
2703                 mutex_unlock(&mdsc->mutex);
2704                 goto out;
2705         }
2706
2707         result = le32_to_cpu(head->result);
2708
2709         /*
2710          * Handle an ESTALE
2711          * if we're not talking to the authority, send to them
2712          * if the authority has changed while we weren't looking,
2713          * send to new authority
2714          * Otherwise we just have to return an ESTALE
2715          */
2716         if (result == -ESTALE) {
2717                 dout("got ESTALE on request %llu\n", req->r_tid);
2718                 req->r_resend_mds = -1;
2719                 if (req->r_direct_mode != USE_AUTH_MDS) {
2720                         dout("not using auth, setting for that now\n");
2721                         req->r_direct_mode = USE_AUTH_MDS;
2722                         __do_request(mdsc, req);
2723                         mutex_unlock(&mdsc->mutex);
2724                         goto out;
2725                 } else  {
2726                         int mds = __choose_mds(mdsc, req);
2727                         if (mds >= 0 && mds != req->r_session->s_mds) {
2728                                 dout("but auth changed, so resending\n");
2729                                 __do_request(mdsc, req);
2730                                 mutex_unlock(&mdsc->mutex);
2731                                 goto out;
2732                         }
2733                 }
2734                 dout("have to return ESTALE on request %llu\n", req->r_tid);
2735         }
2736
2737
2738         if (head->safe) {
2739                 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
2740                 __unregister_request(mdsc, req);
2741
2742                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2743                         /*
2744                          * We already handled the unsafe response, now do the
2745                          * cleanup.  No need to examine the response; the MDS
2746                          * doesn't include any result info in the safe
2747                          * response.  And even if it did, there is nothing
2748                          * useful we could do with a revised return value.
2749                          */
2750                         dout("got safe reply %llu, mds%d\n", tid, mds);
2751
2752                         /* last unsafe request during umount? */
2753                         if (mdsc->stopping && !__get_oldest_req(mdsc))
2754                                 complete_all(&mdsc->safe_umount_waiters);
2755                         mutex_unlock(&mdsc->mutex);
2756                         goto out;
2757                 }
2758         } else {
2759                 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
2760                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2761                 if (req->r_unsafe_dir) {
2762                         struct ceph_inode_info *ci =
2763                                         ceph_inode(req->r_unsafe_dir);
2764                         spin_lock(&ci->i_unsafe_lock);
2765                         list_add_tail(&req->r_unsafe_dir_item,
2766                                       &ci->i_unsafe_dirops);
2767                         spin_unlock(&ci->i_unsafe_lock);
2768                 }
2769         }
2770
2771         dout("handle_reply tid %lld result %d\n", tid, result);
2772         rinfo = &req->r_reply_info;
2773         if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
2774                 err = parse_reply_info(msg, rinfo, (u64)-1);
2775         else
2776                 err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2777         mutex_unlock(&mdsc->mutex);
2778
2779         mutex_lock(&session->s_mutex);
2780         if (err < 0) {
2781                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2782                 ceph_msg_dump(msg);
2783                 goto out_err;
2784         }
2785
2786         /* snap trace */
2787         realm = NULL;
2788         if (rinfo->snapblob_len) {
2789                 down_write(&mdsc->snap_rwsem);
2790                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2791                                 rinfo->snapblob + rinfo->snapblob_len,
2792                                 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
2793                                 &realm);
2794                 downgrade_write(&mdsc->snap_rwsem);
2795         } else {
2796                 down_read(&mdsc->snap_rwsem);
2797         }
2798
2799         /* insert trace into our cache */
2800         mutex_lock(&req->r_fill_mutex);
2801         current->journal_info = req;
2802         err = ceph_fill_trace(mdsc->fsc->sb, req);
2803         if (err == 0) {
2804                 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2805                                     req->r_op == CEPH_MDS_OP_LSSNAP))
2806                         ceph_readdir_prepopulate(req, req->r_session);
2807                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2808         }
2809         current->journal_info = NULL;
2810         mutex_unlock(&req->r_fill_mutex);
2811
2812         up_read(&mdsc->snap_rwsem);
2813         if (realm)
2814                 ceph_put_snap_realm(mdsc, realm);
2815
2816         if (err == 0 && req->r_target_inode &&
2817             test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2818                 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
2819                 spin_lock(&ci->i_unsafe_lock);
2820                 list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
2821                 spin_unlock(&ci->i_unsafe_lock);
2822         }
2823 out_err:
2824         mutex_lock(&mdsc->mutex);
2825         if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2826                 if (err) {
2827                         req->r_err = err;
2828                 } else {
2829                         req->r_reply =  ceph_msg_get(msg);
2830                         set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
2831                 }
2832         } else {
2833                 dout("reply arrived after request %lld was aborted\n", tid);
2834         }
2835         mutex_unlock(&mdsc->mutex);
2836
2837         mutex_unlock(&session->s_mutex);
2838
2839         /* kick calling process */
2840         complete_request(mdsc, req);
2841 out:
2842         ceph_mdsc_put_request(req);
2843         return;
2844 }
2845
2846
2847
2848 /*
2849  * handle mds notification that our request has been forwarded.
2850  */
2851 static void handle_forward(struct ceph_mds_client *mdsc,
2852                            struct ceph_mds_session *session,
2853                            struct ceph_msg *msg)
2854 {
2855         struct ceph_mds_request *req;
2856         u64 tid = le64_to_cpu(msg->hdr.tid);
2857         u32 next_mds;
2858         u32 fwd_seq;
2859         int err = -EINVAL;
2860         void *p = msg->front.iov_base;
2861         void *end = p + msg->front.iov_len;
2862
2863         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2864         next_mds = ceph_decode_32(&p);
2865         fwd_seq = ceph_decode_32(&p);
2866
2867         mutex_lock(&mdsc->mutex);
2868         req = lookup_get_request(mdsc, tid);
2869         if (!req) {
2870                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2871                 goto out;  /* dup reply? */
2872         }
2873
2874         if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
2875                 dout("forward tid %llu aborted, unregistering\n", tid);
2876                 __unregister_request(mdsc, req);
2877         } else if (fwd_seq <= req->r_num_fwd) {
2878                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2879                      tid, next_mds, req->r_num_fwd, fwd_seq);
2880         } else {
2881                 /* resend. forward race not possible; mds would drop */
2882                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2883                 BUG_ON(req->r_err);
2884                 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
2885                 req->r_attempts = 0;
2886                 req->r_num_fwd = fwd_seq;
2887                 req->r_resend_mds = next_mds;
2888                 put_request_session(req);
2889                 __do_request(mdsc, req);
2890         }
2891         ceph_mdsc_put_request(req);
2892 out:
2893         mutex_unlock(&mdsc->mutex);
2894         return;
2895
2896 bad:
2897         pr_err("mdsc_handle_forward decode error err=%d\n", err);
2898 }
2899
2900 static int __decode_and_drop_session_metadata(void **p, void *end)
2901 {
2902         /* map<string,string> */
2903         u32 n;
2904         ceph_decode_32_safe(p, end, n, bad);
2905         while (n-- > 0) {
2906                 u32 len;
2907                 ceph_decode_32_safe(p, end, len, bad);
2908                 ceph_decode_need(p, end, len, bad);
2909                 *p += len;
2910                 ceph_decode_32_safe(p, end, len, bad);
2911                 ceph_decode_need(p, end, len, bad);
2912                 *p += len;
2913         }
2914         return 0;
2915 bad:
2916         return -1;
2917 }
2918
2919 /*
2920  * handle a mds session control message
2921  */
2922 static void handle_session(struct ceph_mds_session *session,
2923                            struct ceph_msg *msg)
2924 {
2925         struct ceph_mds_client *mdsc = session->s_mdsc;
2926         int mds = session->s_mds;
2927         int msg_version = le16_to_cpu(msg->hdr.version);
2928         void *p = msg->front.iov_base;
2929         void *end = p + msg->front.iov_len;
2930         struct ceph_mds_session_head *h;
2931         u32 op;
2932         u64 seq;
2933         unsigned long features = 0;
2934         int wake = 0;
2935
2936         /* decode */
2937         ceph_decode_need(&p, end, sizeof(*h), bad);
2938         h = p;
2939         p += sizeof(*h);
2940
2941         op = le32_to_cpu(h->op);
2942         seq = le64_to_cpu(h->seq);
2943
2944         if (msg_version >= 3) {
2945                 u32 len;
2946                 /* version >= 2, metadata */
2947                 if (__decode_and_drop_session_metadata(&p, end) < 0)
2948                         goto bad;
2949                 /* version >= 3, feature bits */
2950                 ceph_decode_32_safe(&p, end, len, bad);
2951                 ceph_decode_need(&p, end, len, bad);
2952                 memcpy(&features, p, min_t(size_t, len, sizeof(features)));
2953                 p += len;
2954         }
2955
2956         mutex_lock(&mdsc->mutex);
2957         if (op == CEPH_SESSION_CLOSE) {
2958                 get_session(session);
2959                 __unregister_session(mdsc, session);
2960         }
2961         /* FIXME: this ttl calculation is generous */
2962         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2963         mutex_unlock(&mdsc->mutex);
2964
2965         mutex_lock(&session->s_mutex);
2966
2967         dout("handle_session mds%d %s %p state %s seq %llu\n",
2968              mds, ceph_session_op_name(op), session,
2969              ceph_session_state_name(session->s_state), seq);
2970
2971         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2972                 session->s_state = CEPH_MDS_SESSION_OPEN;
2973                 pr_info("mds%d came back\n", session->s_mds);
2974         }
2975
2976         switch (op) {
2977         case CEPH_SESSION_OPEN:
2978                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2979                         pr_info("mds%d reconnect success\n", session->s_mds);
2980                 session->s_state = CEPH_MDS_SESSION_OPEN;
2981                 session->s_features = features;
2982                 renewed_caps(mdsc, session, 0);
2983                 wake = 1;
2984                 if (mdsc->stopping)
2985                         __close_session(mdsc, session);
2986                 break;
2987
2988         case CEPH_SESSION_RENEWCAPS:
2989                 if (session->s_renew_seq == seq)
2990                         renewed_caps(mdsc, session, 1);
2991                 break;
2992
2993         case CEPH_SESSION_CLOSE:
2994                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2995                         pr_info("mds%d reconnect denied\n", session->s_mds);
2996                 cleanup_session_requests(mdsc, session);
2997                 remove_session_caps(session);
2998                 wake = 2; /* for good measure */
2999                 wake_up_all(&mdsc->session_close_wq);
3000                 break;
3001
3002         case CEPH_SESSION_STALE:
3003                 pr_info("mds%d caps went stale, renewing\n",
3004                         session->s_mds);
3005                 spin_lock(&session->s_gen_ttl_lock);
3006                 session->s_cap_gen++;
3007                 session->s_cap_ttl = jiffies - 1;
3008                 spin_unlock(&session->s_gen_ttl_lock);
3009                 send_renew_caps(mdsc, session);
3010                 break;
3011
3012         case CEPH_SESSION_RECALL_STATE:
3013                 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3014                 break;
3015
3016         case CEPH_SESSION_FLUSHMSG:
3017                 send_flushmsg_ack(mdsc, session, seq);
3018                 break;
3019
3020         case CEPH_SESSION_FORCE_RO:
3021                 dout("force_session_readonly %p\n", session);
3022                 spin_lock(&session->s_cap_lock);
3023                 session->s_readonly = true;
3024                 spin_unlock(&session->s_cap_lock);
3025                 wake_up_session_caps(session, FORCE_RO);
3026                 break;
3027
3028         case CEPH_SESSION_REJECT:
3029                 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3030                 pr_info("mds%d rejected session\n", session->s_mds);
3031                 session->s_state = CEPH_MDS_SESSION_REJECTED;
3032                 cleanup_session_requests(mdsc, session);
3033                 remove_session_caps(session);
3034                 wake = 2; /* for good measure */
3035                 break;
3036
3037         default:
3038                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3039                 WARN_ON(1);
3040         }
3041
3042         mutex_unlock(&session->s_mutex);
3043         if (wake) {
3044                 mutex_lock(&mdsc->mutex);
3045                 __wake_requests(mdsc, &session->s_waiting);
3046                 if (wake == 2)
3047                         kick_requests(mdsc, mds);
3048                 mutex_unlock(&mdsc->mutex);
3049         }
3050         if (op == CEPH_SESSION_CLOSE)
3051                 ceph_put_mds_session(session);
3052         return;
3053
3054 bad:
3055         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3056                (int)msg->front.iov_len);
3057         ceph_msg_dump(msg);
3058         return;
3059 }
3060
3061
3062 /*
3063  * called under session->mutex.
3064  */
3065 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3066                                    struct ceph_mds_session *session)
3067 {
3068         struct ceph_mds_request *req, *nreq;
3069         struct rb_node *p;
3070         int err;
3071
3072         dout("replay_unsafe_requests mds%d\n", session->s_mds);
3073
3074         mutex_lock(&mdsc->mutex);
3075         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
3076                 err = __prepare_send_request(mdsc, req, session->s_mds, true);
3077                 if (!err) {
3078                         ceph_msg_get(req->r_request);
3079                         ceph_con_send(&session->s_con, req->r_request);
3080                 }
3081         }
3082
3083         /*
3084          * also re-send old requests when MDS enters reconnect stage. So that MDS
3085          * can process completed request in clientreplay stage.
3086          */
3087         p = rb_first(&mdsc->request_tree);
3088         while (p) {
3089                 req = rb_entry(p, struct ceph_mds_request, r_node);
3090                 p = rb_next(p);
3091                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3092                         continue;
3093                 if (req->r_attempts == 0)
3094                         continue; /* only old requests */
3095                 if (req->r_session &&
3096                     req->r_session->s_mds == session->s_mds) {
3097                         err = __prepare_send_request(mdsc, req,
3098                                                      session->s_mds, true);
3099                         if (!err) {
3100                                 ceph_msg_get(req->r_request);
3101                                 ceph_con_send(&session->s_con, req->r_request);
3102                         }
3103                 }
3104         }
3105         mutex_unlock(&mdsc->mutex);
3106 }
3107
3108 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3109 {
3110         struct ceph_msg *reply;
3111         struct ceph_pagelist *_pagelist;
3112         struct page *page;
3113         __le32 *addr;
3114         int err = -ENOMEM;
3115
3116         if (!recon_state->allow_multi)
3117                 return -ENOSPC;
3118
3119         /* can't handle message that contains both caps and realm */
3120         BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3121
3122         /* pre-allocate new pagelist */
3123         _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3124         if (!_pagelist)
3125                 return -ENOMEM;
3126
3127         reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3128         if (!reply)
3129                 goto fail_msg;
3130
3131         /* placeholder for nr_caps */
3132         err = ceph_pagelist_encode_32(_pagelist, 0);
3133         if (err < 0)
3134                 goto fail;
3135
3136         if (recon_state->nr_caps) {
3137                 /* currently encoding caps */
3138                 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3139                 if (err)
3140                         goto fail;
3141         } else {
3142                 /* placeholder for nr_realms (currently encoding relams) */
3143                 err = ceph_pagelist_encode_32(_pagelist, 0);
3144                 if (err < 0)
3145                         goto fail;
3146         }
3147
3148         err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3149         if (err)
3150                 goto fail;
3151
3152         page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3153         addr = kmap_atomic(page);
3154         if (recon_state->nr_caps) {
3155                 /* currently encoding caps */
3156                 *addr = cpu_to_le32(recon_state->nr_caps);
3157         } else {
3158                 /* currently encoding relams */
3159                 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3160         }
3161         kunmap_atomic(addr);
3162
3163         reply->hdr.version = cpu_to_le16(5);
3164         reply->hdr.compat_version = cpu_to_le16(4);
3165
3166         reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3167         ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3168
3169         ceph_con_send(&recon_state->session->s_con, reply);
3170         ceph_pagelist_release(recon_state->pagelist);
3171
3172         recon_state->pagelist = _pagelist;
3173         recon_state->nr_caps = 0;
3174         recon_state->nr_realms = 0;
3175         recon_state->msg_version = 5;
3176         return 0;
3177 fail:
3178         ceph_msg_put(reply);
3179 fail_msg:
3180         ceph_pagelist_release(_pagelist);
3181         return err;
3182 }
3183
3184 /*
3185  * Encode information about a cap for a reconnect with the MDS.
3186  */
3187 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
3188                           void *arg)
3189 {
3190         union {
3191                 struct ceph_mds_cap_reconnect v2;
3192                 struct ceph_mds_cap_reconnect_v1 v1;
3193         } rec;
3194         struct ceph_inode_info *ci = cap->ci;
3195         struct ceph_reconnect_state *recon_state = arg;
3196         struct ceph_pagelist *pagelist = recon_state->pagelist;
3197         int err;
3198         u64 snap_follows;
3199
3200         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3201              inode, ceph_vinop(inode), cap, cap->cap_id,
3202              ceph_cap_string(cap->issued));
3203
3204         spin_lock(&ci->i_ceph_lock);
3205         cap->seq = 0;        /* reset cap seq */
3206         cap->issue_seq = 0;  /* and issue_seq */
3207         cap->mseq = 0;       /* and migrate_seq */
3208         cap->cap_gen = cap->session->s_cap_gen;
3209
3210         if (recon_state->msg_version >= 2) {
3211                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3212                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3213                 rec.v2.issued = cpu_to_le32(cap->issued);
3214                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3215                 rec.v2.pathbase = 0;
3216                 rec.v2.flock_len = (__force __le32)
3217                         ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3218         } else {
3219                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3220                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3221                 rec.v1.issued = cpu_to_le32(cap->issued);
3222                 rec.v1.size = cpu_to_le64(inode->i_size);
3223                 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3224                 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3225                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3226                 rec.v1.pathbase = 0;
3227         }
3228
3229         if (list_empty(&ci->i_cap_snaps)) {
3230                 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3231         } else {
3232                 struct ceph_cap_snap *capsnap =
3233                         list_first_entry(&ci->i_cap_snaps,
3234                                          struct ceph_cap_snap, ci_item);
3235                 snap_follows = capsnap->follows;
3236         }
3237         spin_unlock(&ci->i_ceph_lock);
3238
3239         if (recon_state->msg_version >= 2) {
3240                 int num_fcntl_locks, num_flock_locks;
3241                 struct ceph_filelock *flocks = NULL;
3242                 size_t struct_len, total_len = sizeof(u64);
3243                 u8 struct_v = 0;
3244
3245 encode_again:
3246                 if (rec.v2.flock_len) {
3247                         ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3248                 } else {
3249                         num_fcntl_locks = 0;
3250                         num_flock_locks = 0;
3251                 }
3252                 if (num_fcntl_locks + num_flock_locks > 0) {
3253                         flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3254                                                sizeof(struct ceph_filelock),
3255                                                GFP_NOFS);
3256                         if (!flocks) {
3257                                 err = -ENOMEM;
3258                                 goto out_err;
3259                         }
3260                         err = ceph_encode_locks_to_buffer(inode, flocks,
3261                                                           num_fcntl_locks,
3262                                                           num_flock_locks);
3263                         if (err) {
3264                                 kfree(flocks);
3265                                 flocks = NULL;
3266                                 if (err == -ENOSPC)
3267                                         goto encode_again;
3268                                 goto out_err;
3269                         }
3270                 } else {
3271                         kfree(flocks);
3272                         flocks = NULL;
3273                 }
3274
3275                 if (recon_state->msg_version >= 3) {
3276                         /* version, compat_version and struct_len */
3277                         total_len += 2 * sizeof(u8) + sizeof(u32);
3278                         struct_v = 2;
3279                 }
3280                 /*
3281                  * number of encoded locks is stable, so copy to pagelist
3282                  */
3283                 struct_len = 2 * sizeof(u32) +
3284                             (num_fcntl_locks + num_flock_locks) *
3285                             sizeof(struct ceph_filelock);
3286                 rec.v2.flock_len = cpu_to_le32(struct_len);
3287
3288                 struct_len += sizeof(u32) + sizeof(rec.v2);
3289
3290                 if (struct_v >= 2)
3291                         struct_len += sizeof(u64); /* snap_follows */
3292
3293                 total_len += struct_len;
3294
3295                 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3296                         err = send_reconnect_partial(recon_state);
3297                         if (err)
3298                                 goto out_freeflocks;
3299                         pagelist = recon_state->pagelist;
3300                 }
3301
3302                 err = ceph_pagelist_reserve(pagelist, total_len);
3303                 if (err)
3304                         goto out_freeflocks;
3305
3306                 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3307                 if (recon_state->msg_version >= 3) {
3308                         ceph_pagelist_encode_8(pagelist, struct_v);
3309                         ceph_pagelist_encode_8(pagelist, 1);
3310                         ceph_pagelist_encode_32(pagelist, struct_len);
3311                 }
3312                 ceph_pagelist_encode_string(pagelist, NULL, 0);
3313                 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3314                 ceph_locks_to_pagelist(flocks, pagelist,
3315                                        num_fcntl_locks, num_flock_locks);
3316                 if (struct_v >= 2)
3317                         ceph_pagelist_encode_64(pagelist, snap_follows);
3318 out_freeflocks:
3319                 kfree(flocks);
3320         } else {
3321                 u64 pathbase = 0;
3322                 int pathlen = 0;
3323                 char *path = NULL;
3324                 struct dentry *dentry;
3325
3326                 dentry = d_find_alias(inode);
3327                 if (dentry) {
3328                         path = ceph_mdsc_build_path(dentry,
3329                                                 &pathlen, &pathbase, 0);
3330                         dput(dentry);
3331                         if (IS_ERR(path)) {
3332                                 err = PTR_ERR(path);
3333                                 goto out_err;
3334                         }
3335                         rec.v1.pathbase = cpu_to_le64(pathbase);
3336                 }
3337
3338                 err = ceph_pagelist_reserve(pagelist,
3339                                             sizeof(u64) + sizeof(u32) +
3340                                             pathlen + sizeof(rec.v1));
3341                 if (err) {
3342                         goto out_freepath;
3343                 }
3344
3345                 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3346                 ceph_pagelist_encode_string(pagelist, path, pathlen);
3347                 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3348 out_freepath:
3349                 kfree(path);
3350         }
3351
3352 out_err:
3353         if (err >= 0)
3354                 recon_state->nr_caps++;
3355         return err;
3356 }
3357
3358 static int encode_snap_realms(struct ceph_mds_client *mdsc,
3359                               struct ceph_reconnect_state *recon_state)
3360 {
3361         struct rb_node *p;
3362         struct ceph_pagelist *pagelist = recon_state->pagelist;
3363         int err = 0;
3364
3365         if (recon_state->msg_version >= 4) {
3366                 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3367                 if (err < 0)
3368                         goto fail;
3369         }
3370
3371         /*
3372          * snaprealms.  we provide mds with the ino, seq (version), and
3373          * parent for all of our realms.  If the mds has any newer info,
3374          * it will tell us.
3375          */
3376         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3377                 struct ceph_snap_realm *realm =
3378                        rb_entry(p, struct ceph_snap_realm, node);
3379                 struct ceph_mds_snaprealm_reconnect sr_rec;
3380
3381                 if (recon_state->msg_version >= 4) {
3382                         size_t need = sizeof(u8) * 2 + sizeof(u32) +
3383                                       sizeof(sr_rec);
3384
3385                         if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3386                                 err = send_reconnect_partial(recon_state);
3387                                 if (err)
3388                                         goto fail;
3389                                 pagelist = recon_state->pagelist;
3390                         }
3391
3392                         err = ceph_pagelist_reserve(pagelist, need);
3393                         if (err)
3394                                 goto fail;
3395
3396                         ceph_pagelist_encode_8(pagelist, 1);
3397                         ceph_pagelist_encode_8(pagelist, 1);
3398                         ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3399                 }
3400
3401                 dout(" adding snap realm %llx seq %lld parent %llx\n",
3402                      realm->ino, realm->seq, realm->parent_ino);
3403                 sr_rec.ino = cpu_to_le64(realm->ino);
3404                 sr_rec.seq = cpu_to_le64(realm->seq);
3405                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
3406
3407                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3408                 if (err)
3409                         goto fail;
3410
3411                 recon_state->nr_realms++;
3412         }
3413 fail:
3414         return err;
3415 }
3416
3417
3418 /*
3419  * If an MDS fails and recovers, clients need to reconnect in order to
3420  * reestablish shared state.  This includes all caps issued through
3421  * this session _and_ the snap_realm hierarchy.  Because it's not
3422  * clear which snap realms the mds cares about, we send everything we
3423  * know about.. that ensures we'll then get any new info the
3424  * recovering MDS might have.
3425  *
3426  * This is a relatively heavyweight operation, but it's rare.
3427  *
3428  * called with mdsc->mutex held.
3429  */
3430 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3431                                struct ceph_mds_session *session)
3432 {
3433         struct ceph_msg *reply;
3434         int mds = session->s_mds;
3435         int err = -ENOMEM;
3436         struct ceph_reconnect_state recon_state = {
3437                 .session = session,
3438         };
3439         LIST_HEAD(dispose);
3440
3441         pr_info("mds%d reconnect start\n", mds);
3442
3443         recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3444         if (!recon_state.pagelist)
3445                 goto fail_nopagelist;
3446
3447         reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3448         if (!reply)
3449                 goto fail_nomsg;
3450
3451         mutex_lock(&session->s_mutex);
3452         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3453         session->s_seq = 0;
3454
3455         dout("session %p state %s\n", session,
3456              ceph_session_state_name(session->s_state));
3457
3458         spin_lock(&session->s_gen_ttl_lock);
3459         session->s_cap_gen++;
3460         spin_unlock(&session->s_gen_ttl_lock);
3461
3462         spin_lock(&session->s_cap_lock);
3463         /* don't know if session is readonly */
3464         session->s_readonly = 0;
3465         /*
3466          * notify __ceph_remove_cap() that we are composing cap reconnect.
3467          * If a cap get released before being added to the cap reconnect,
3468          * __ceph_remove_cap() should skip queuing cap release.
3469          */
3470         session->s_cap_reconnect = 1;
3471         /* drop old cap expires; we're about to reestablish that state */
3472         detach_cap_releases(session, &dispose);
3473         spin_unlock(&session->s_cap_lock);
3474         dispose_cap_releases(mdsc, &dispose);
3475
3476         /* trim unused caps to reduce MDS's cache rejoin time */
3477         if (mdsc->fsc->sb->s_root)
3478                 shrink_dcache_parent(mdsc->fsc->sb->s_root);
3479
3480         ceph_con_close(&session->s_con);
3481         ceph_con_open(&session->s_con,
3482                       CEPH_ENTITY_TYPE_MDS, mds,
3483                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
3484
3485         /* replay unsafe requests */
3486         replay_unsafe_requests(mdsc, session);
3487
3488         ceph_early_kick_flushing_caps(mdsc, session);
3489
3490         down_read(&mdsc->snap_rwsem);
3491
3492         /* placeholder for nr_caps */
3493         err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
3494         if (err)
3495                 goto fail;
3496
3497         if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
3498                 recon_state.msg_version = 3;
3499                 recon_state.allow_multi = true;
3500         } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
3501                 recon_state.msg_version = 3;
3502         } else {
3503                 recon_state.msg_version = 2;
3504         }
3505         /* trsaverse this session's caps */
3506         err = iterate_session_caps(session, encode_caps_cb, &recon_state);
3507
3508         spin_lock(&session->s_cap_lock);
3509         session->s_cap_reconnect = 0;
3510         spin_unlock(&session->s_cap_lock);
3511
3512         if (err < 0)
3513                 goto fail;
3514
3515         /* check if all realms can be encoded into current message */
3516         if (mdsc->num_snap_realms) {
3517                 size_t total_len =
3518                         recon_state.pagelist->length +
3519                         mdsc->num_snap_realms *
3520                         sizeof(struct ceph_mds_snaprealm_reconnect);
3521                 if (recon_state.msg_version >= 4) {
3522                         /* number of realms */
3523                         total_len += sizeof(u32);
3524                         /* version, compat_version and struct_len */
3525                         total_len += mdsc->num_snap_realms *
3526                                      (2 * sizeof(u8) + sizeof(u32));
3527                 }
3528                 if (total_len > RECONNECT_MAX_SIZE) {
3529                         if (!recon_state.allow_multi) {
3530                                 err = -ENOSPC;
3531                                 goto fail;
3532                         }
3533                         if (recon_state.nr_caps) {
3534                                 err = send_reconnect_partial(&recon_state);
3535                                 if (err)
3536                                         goto fail;
3537                         }
3538                         recon_state.msg_version = 5;
3539                 }
3540         }
3541
3542         err = encode_snap_realms(mdsc, &recon_state);
3543         if (err < 0)
3544                 goto fail;
3545
3546         if (recon_state.msg_version >= 5) {
3547                 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
3548                 if (err < 0)
3549                         goto fail;
3550         }
3551
3552         if (recon_state.nr_caps || recon_state.nr_realms) {
3553                 struct page *page =
3554                         list_first_entry(&recon_state.pagelist->head,
3555                                         struct page, lru);
3556                 __le32 *addr = kmap_atomic(page);
3557                 if (recon_state.nr_caps) {
3558                         WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
3559                         *addr = cpu_to_le32(recon_state.nr_caps);
3560                 } else if (recon_state.msg_version >= 4) {
3561                         *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
3562                 }
3563                 kunmap_atomic(addr);
3564         }
3565
3566         reply->hdr.version = cpu_to_le16(recon_state.msg_version);
3567         if (recon_state.msg_version >= 4)
3568                 reply->hdr.compat_version = cpu_to_le16(4);
3569
3570         reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
3571         ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
3572
3573         ceph_con_send(&session->s_con, reply);
3574
3575         mutex_unlock(&session->s_mutex);
3576
3577         mutex_lock(&mdsc->mutex);
3578         __wake_requests(mdsc, &session->s_waiting);
3579         mutex_unlock(&mdsc->mutex);
3580
3581         up_read(&mdsc->snap_rwsem);
3582         ceph_pagelist_release(recon_state.pagelist);
3583         return;
3584
3585 fail:
3586         ceph_msg_put(reply);
3587         up_read(&mdsc->snap_rwsem);
3588         mutex_unlock(&session->s_mutex);
3589 fail_nomsg:
3590         ceph_pagelist_release(recon_state.pagelist);
3591 fail_nopagelist:
3592         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
3593         return;
3594 }
3595
3596
3597 /*
3598  * compare old and new mdsmaps, kicking requests
3599  * and closing out old connections as necessary
3600  *
3601  * called under mdsc->mutex.
3602  */
3603 static void check_new_map(struct ceph_mds_client *mdsc,
3604                           struct ceph_mdsmap *newmap,
3605                           struct ceph_mdsmap *oldmap)
3606 {
3607         int i;
3608         int oldstate, newstate;
3609         struct ceph_mds_session *s;
3610
3611         dout("check_new_map new %u old %u\n",
3612              newmap->m_epoch, oldmap->m_epoch);
3613
3614         for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
3615                 if (!mdsc->sessions[i])
3616                         continue;
3617                 s = mdsc->sessions[i];
3618                 oldstate = ceph_mdsmap_get_state(oldmap, i);
3619                 newstate = ceph_mdsmap_get_state(newmap, i);
3620
3621                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
3622                      i, ceph_mds_state_name(oldstate),
3623                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
3624                      ceph_mds_state_name(newstate),
3625                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
3626                      ceph_session_state_name(s->s_state));
3627
3628                 if (i >= newmap->m_num_mds ||
3629                     memcmp(ceph_mdsmap_get_addr(oldmap, i),
3630                            ceph_mdsmap_get_addr(newmap, i),
3631                            sizeof(struct ceph_entity_addr))) {
3632                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
3633                                 /* the session never opened, just close it
3634                                  * out now */
3635                                 get_session(s);
3636                                 __unregister_session(mdsc, s);
3637                                 __wake_requests(mdsc, &s->s_waiting);
3638                                 ceph_put_mds_session(s);
3639                         } else if (i >= newmap->m_num_mds) {
3640                                 /* force close session for stopped mds */
3641                                 get_session(s);
3642                                 __unregister_session(mdsc, s);
3643                                 __wake_requests(mdsc, &s->s_waiting);
3644                                 kick_requests(mdsc, i);
3645                                 mutex_unlock(&mdsc->mutex);
3646
3647                                 mutex_lock(&s->s_mutex);
3648                                 cleanup_session_requests(mdsc, s);
3649                                 remove_session_caps(s);
3650                                 mutex_unlock(&s->s_mutex);
3651
3652                                 ceph_put_mds_session(s);
3653
3654                                 mutex_lock(&mdsc->mutex);
3655                         } else {
3656                                 /* just close it */
3657                                 mutex_unlock(&mdsc->mutex);
3658                                 mutex_lock(&s->s_mutex);
3659                                 mutex_lock(&mdsc->mutex);
3660                                 ceph_con_close(&s->s_con);
3661                                 mutex_unlock(&s->s_mutex);
3662                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
3663                         }
3664                 } else if (oldstate == newstate) {
3665                         continue;  /* nothing new with this mds */
3666                 }
3667
3668                 /*
3669                  * send reconnect?
3670                  */
3671                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
3672                     newstate >= CEPH_MDS_STATE_RECONNECT) {
3673                         mutex_unlock(&mdsc->mutex);
3674                         send_mds_reconnect(mdsc, s);
3675                         mutex_lock(&mdsc->mutex);
3676                 }
3677
3678                 /*
3679                  * kick request on any mds that has gone active.
3680                  */
3681                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
3682                     newstate >= CEPH_MDS_STATE_ACTIVE) {
3683                         if (oldstate != CEPH_MDS_STATE_CREATING &&
3684                             oldstate != CEPH_MDS_STATE_STARTING)
3685                                 pr_info("mds%d recovery completed\n", s->s_mds);
3686                         kick_requests(mdsc, i);
3687                         ceph_kick_flushing_caps(mdsc, s);
3688                         wake_up_session_caps(s, RECONNECT);
3689                 }
3690         }
3691
3692         for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
3693                 s = mdsc->sessions[i];
3694                 if (!s)
3695                         continue;
3696                 if (!ceph_mdsmap_is_laggy(newmap, i))
3697                         continue;
3698                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3699                     s->s_state == CEPH_MDS_SESSION_HUNG ||
3700                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
3701                         dout(" connecting to export targets of laggy mds%d\n",
3702                              i);
3703                         __open_export_target_sessions(mdsc, s);
3704                 }
3705         }
3706 }
3707
3708
3709
3710 /*
3711  * leases
3712  */
3713
3714 /*
3715  * caller must hold session s_mutex, dentry->d_lock
3716  */
3717 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
3718 {
3719         struct ceph_dentry_info *di = ceph_dentry(dentry);
3720
3721         ceph_put_mds_session(di->lease_session);
3722         di->lease_session = NULL;
3723 }
3724
3725 static void handle_lease(struct ceph_mds_client *mdsc,
3726                          struct ceph_mds_session *session,
3727                          struct ceph_msg *msg)
3728 {
3729         struct super_block *sb = mdsc->fsc->sb;
3730         struct inode *inode;
3731         struct dentry *parent, *dentry;
3732         struct ceph_dentry_info *di;
3733         int mds = session->s_mds;
3734         struct ceph_mds_lease *h = msg->front.iov_base;
3735         u32 seq;
3736         struct ceph_vino vino;
3737         struct qstr dname;
3738         int release = 0;
3739
3740         dout("handle_lease from mds%d\n", mds);
3741
3742         /* decode */
3743         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3744                 goto bad;
3745         vino.ino = le64_to_cpu(h->ino);
3746         vino.snap = CEPH_NOSNAP;
3747         seq = le32_to_cpu(h->seq);
3748         dname.len = get_unaligned_le32(h + 1);
3749         if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
3750                 goto bad;
3751         dname.name = (void *)(h + 1) + sizeof(u32);
3752
3753         /* lookup inode */
3754         inode = ceph_find_inode(sb, vino);
3755         dout("handle_lease %s, ino %llx %p %.*s\n",
3756              ceph_lease_op_name(h->action), vino.ino, inode,
3757              dname.len, dname.name);
3758
3759         mutex_lock(&session->s_mutex);
3760         session->s_seq++;
3761
3762         if (!inode) {
3763                 dout("handle_lease no inode %llx\n", vino.ino);
3764                 goto release;
3765         }
3766
3767         /* dentry */
3768         parent = d_find_alias(inode);
3769         if (!parent) {
3770                 dout("no parent dentry on inode %p\n", inode);
3771                 WARN_ON(1);
3772                 goto release;  /* hrm... */
3773         }
3774         dname.hash = full_name_hash(parent, dname.name, dname.len);
3775         dentry = d_lookup(parent, &dname);
3776         dput(parent);
3777         if (!dentry)
3778                 goto release;
3779
3780         spin_lock(&dentry->d_lock);
3781         di = ceph_dentry(dentry);
3782         switch (h->action) {
3783         case CEPH_MDS_LEASE_REVOKE:
3784                 if (di->lease_session == session) {
3785                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3786                                 h->seq = cpu_to_le32(di->lease_seq);
3787                         __ceph_mdsc_drop_dentry_lease(dentry);
3788                 }
3789                 release = 1;
3790                 break;
3791
3792         case CEPH_MDS_LEASE_RENEW:
3793                 if (di->lease_session == session &&
3794                     di->lease_gen == session->s_cap_gen &&
3795                     di->lease_renew_from &&
3796                     di->lease_renew_after == 0) {
3797                         unsigned long duration =
3798                                 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
3799
3800                         di->lease_seq = seq;
3801                         di->time = di->lease_renew_from + duration;
3802                         di->lease_renew_after = di->lease_renew_from +
3803                                 (duration >> 1);
3804                         di->lease_renew_from = 0;
3805                 }
3806                 break;
3807         }
3808         spin_unlock(&dentry->d_lock);
3809         dput(dentry);
3810
3811         if (!release)
3812                 goto out;
3813
3814 release:
3815         /* let's just reuse the same message */
3816         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3817         ceph_msg_get(msg);
3818         ceph_con_send(&session->s_con, msg);
3819
3820 out:
3821         iput(inode);
3822         mutex_unlock(&session->s_mutex);
3823         return;
3824
3825 bad:
3826         pr_err("corrupt lease message\n");
3827         ceph_msg_dump(msg);
3828 }
3829
3830 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3831                               struct inode *inode,
3832                               struct dentry *dentry, char action,
3833                               u32 seq)
3834 {
3835         struct ceph_msg *msg;
3836         struct ceph_mds_lease *lease;
3837         int len = sizeof(*lease) + sizeof(u32);
3838         int dnamelen = 0;
3839
3840         dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
3841              inode, dentry, ceph_lease_op_name(action), session->s_mds);
3842         dnamelen = dentry->d_name.len;
3843         len += dnamelen;
3844
3845         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3846         if (!msg)
3847                 return;
3848         lease = msg->front.iov_base;
3849         lease->action = action;
3850         lease->ino = cpu_to_le64(ceph_vino(inode).ino);
3851         lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
3852         lease->seq = cpu_to_le32(seq);
3853         put_unaligned_le32(dnamelen, lease + 1);
3854         memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
3855
3856         /*
3857          * if this is a preemptive lease RELEASE, no need to
3858          * flush request stream, since the actual request will
3859          * soon follow.
3860          */
3861         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3862
3863         ceph_con_send(&session->s_con, msg);
3864 }
3865
3866 /*
3867  * lock unlock sessions, to wait ongoing session activities
3868  */
3869 static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
3870 {
3871         int i;
3872
3873         mutex_lock(&mdsc->mutex);
3874         for (i = 0; i < mdsc->max_sessions; i++) {
3875                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3876                 if (!s)
3877                         continue;
3878                 mutex_unlock(&mdsc->mutex);
3879                 mutex_lock(&s->s_mutex);
3880                 mutex_unlock(&s->s_mutex);
3881                 ceph_put_mds_session(s);
3882                 mutex_lock(&mdsc->mutex);
3883         }
3884         mutex_unlock(&mdsc->mutex);
3885 }
3886
3887
3888
3889 /*
3890  * delayed work -- periodically trim expired leases, renew caps with mds
3891  */
3892 static void schedule_delayed(struct ceph_mds_client *mdsc)
3893 {
3894         int delay = 5;
3895         unsigned hz = round_jiffies_relative(HZ * delay);
3896         schedule_delayed_work(&mdsc->delayed_work, hz);
3897 }
3898
3899 static void delayed_work(struct work_struct *work)
3900 {
3901         int i;
3902         struct ceph_mds_client *mdsc =
3903                 container_of(work, struct ceph_mds_client, delayed_work.work);
3904         int renew_interval;
3905         int renew_caps;
3906
3907         dout("mdsc delayed_work\n");
3908         ceph_check_delayed_caps(mdsc);
3909
3910         ceph_trim_snapid_map(mdsc);
3911
3912         mutex_lock(&mdsc->mutex);
3913         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
3914         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
3915                                    mdsc->last_renew_caps);
3916         if (renew_caps)
3917                 mdsc->last_renew_caps = jiffies;
3918
3919         for (i = 0; i < mdsc->max_sessions; i++) {
3920                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3921                 if (!s)
3922                         continue;
3923                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
3924                         dout("resending session close request for mds%d\n",
3925                              s->s_mds);
3926                         request_close_session(mdsc, s);
3927                         ceph_put_mds_session(s);
3928                         continue;
3929                 }
3930                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3931                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3932                                 s->s_state = CEPH_MDS_SESSION_HUNG;
3933                                 pr_info("mds%d hung\n", s->s_mds);
3934                         }
3935                 }
3936                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
3937                         /* this mds is failed or recovering, just wait */
3938                         ceph_put_mds_session(s);
3939                         continue;
3940                 }
3941                 mutex_unlock(&mdsc->mutex);
3942
3943                 mutex_lock(&s->s_mutex);
3944                 if (renew_caps)
3945                         send_renew_caps(mdsc, s);
3946                 else
3947                         ceph_con_keepalive(&s->s_con);
3948                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3949                     s->s_state == CEPH_MDS_SESSION_HUNG)
3950                         ceph_send_cap_releases(mdsc, s);
3951                 mutex_unlock(&s->s_mutex);
3952                 ceph_put_mds_session(s);
3953
3954                 mutex_lock(&mdsc->mutex);
3955         }
3956         mutex_unlock(&mdsc->mutex);
3957
3958         schedule_delayed(mdsc);
3959 }
3960
3961 int ceph_mdsc_init(struct ceph_fs_client *fsc)
3962
3963 {
3964         struct ceph_mds_client *mdsc;
3965
3966         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3967         if (!mdsc)
3968                 return -ENOMEM;
3969         mdsc->fsc = fsc;
3970         mutex_init(&mdsc->mutex);
3971         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3972         if (!mdsc->mdsmap) {
3973                 kfree(mdsc);
3974                 return -ENOMEM;
3975         }
3976
3977         fsc->mdsc = mdsc;
3978         init_completion(&mdsc->safe_umount_waiters);
3979         init_waitqueue_head(&mdsc->session_close_wq);
3980         INIT_LIST_HEAD(&mdsc->waiting_for_map);
3981         mdsc->sessions = NULL;
3982         atomic_set(&mdsc->num_sessions, 0);
3983         mdsc->max_sessions = 0;
3984         mdsc->stopping = 0;
3985         atomic64_set(&mdsc->quotarealms_count, 0);
3986         mdsc->last_snap_seq = 0;
3987         init_rwsem(&mdsc->snap_rwsem);
3988         mdsc->snap_realms = RB_ROOT;
3989         INIT_LIST_HEAD(&mdsc->snap_empty);
3990         mdsc->num_snap_realms = 0;
3991         spin_lock_init(&mdsc->snap_empty_lock);
3992         mdsc->last_tid = 0;
3993         mdsc->oldest_tid = 0;
3994         mdsc->request_tree = RB_ROOT;
3995         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3996         mdsc->last_renew_caps = jiffies;
3997         INIT_LIST_HEAD(&mdsc->cap_delay_list);
3998         spin_lock_init(&mdsc->cap_delay_lock);
3999         INIT_LIST_HEAD(&mdsc->snap_flush_list);
4000         spin_lock_init(&mdsc->snap_flush_lock);
4001         mdsc->last_cap_flush_tid = 1;
4002         INIT_LIST_HEAD(&mdsc->cap_flush_list);
4003         INIT_LIST_HEAD(&mdsc->cap_dirty);
4004         INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4005         mdsc->num_cap_flushing = 0;
4006         spin_lock_init(&mdsc->cap_dirty_lock);
4007         init_waitqueue_head(&mdsc->cap_flushing_wq);
4008         spin_lock_init(&mdsc->dentry_lru_lock);
4009         INIT_LIST_HEAD(&mdsc->dentry_lru);
4010
4011         ceph_caps_init(mdsc);
4012         ceph_adjust_min_caps(mdsc, fsc->min_caps);
4013
4014         spin_lock_init(&mdsc->snapid_map_lock);
4015         mdsc->snapid_map_tree = RB_ROOT;
4016         INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4017
4018         init_rwsem(&mdsc->pool_perm_rwsem);
4019         mdsc->pool_perm_tree = RB_ROOT;
4020
4021         strscpy(mdsc->nodename, utsname()->nodename,
4022                 sizeof(mdsc->nodename));
4023         return 0;
4024 }
4025
4026 /*
4027  * Wait for safe replies on open mds requests.  If we time out, drop
4028  * all requests from the tree to avoid dangling dentry refs.
4029  */
4030 static void wait_requests(struct ceph_mds_client *mdsc)
4031 {
4032         struct ceph_options *opts = mdsc->fsc->client->options;
4033         struct ceph_mds_request *req;
4034
4035         mutex_lock(&mdsc->mutex);
4036         if (__get_oldest_req(mdsc)) {
4037                 mutex_unlock(&mdsc->mutex);
4038
4039                 dout("wait_requests waiting for requests\n");
4040                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4041                                     ceph_timeout_jiffies(opts->mount_timeout));
4042
4043                 /* tear down remaining requests */
4044                 mutex_lock(&mdsc->mutex);
4045                 while ((req = __get_oldest_req(mdsc))) {
4046                         dout("wait_requests timed out on tid %llu\n",
4047                              req->r_tid);
4048                         __unregister_request(mdsc, req);
4049                 }
4050         }
4051         mutex_unlock(&mdsc->mutex);
4052         dout("wait_requests done\n");
4053 }
4054
4055 /*
4056  * called before mount is ro, and before dentries are torn down.
4057  * (hmm, does this still race with new lookups?)
4058  */
4059 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4060 {
4061         dout("pre_umount\n");
4062         mdsc->stopping = 1;
4063
4064         lock_unlock_sessions(mdsc);
4065         ceph_flush_dirty_caps(mdsc);
4066         wait_requests(mdsc);
4067
4068         /*
4069          * wait for reply handlers to drop their request refs and
4070          * their inode/dcache refs
4071          */
4072         ceph_msgr_flush();
4073 }
4074
4075 /*
4076  * wait for all write mds requests to flush.
4077  */
4078 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4079 {
4080         struct ceph_mds_request *req = NULL, *nextreq;
4081         struct rb_node *n;
4082
4083         mutex_lock(&mdsc->mutex);
4084         dout("wait_unsafe_requests want %lld\n", want_tid);
4085 restart:
4086         req = __get_oldest_req(mdsc);
4087         while (req && req->r_tid <= want_tid) {
4088                 /* find next request */
4089                 n = rb_next(&req->r_node);
4090                 if (n)
4091                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4092                 else
4093                         nextreq = NULL;
4094                 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4095                     (req->r_op & CEPH_MDS_OP_WRITE)) {
4096                         /* write op */
4097                         ceph_mdsc_get_request(req);
4098                         if (nextreq)
4099                                 ceph_mdsc_get_request(nextreq);
4100                         mutex_unlock(&mdsc->mutex);
4101                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
4102                              req->r_tid, want_tid);
4103                         wait_for_completion(&req->r_safe_completion);
4104                         mutex_lock(&mdsc->mutex);
4105                         ceph_mdsc_put_request(req);
4106                         if (!nextreq)
4107                                 break;  /* next dne before, so we're done! */
4108                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
4109                                 /* next request was removed from tree */
4110                                 ceph_mdsc_put_request(nextreq);
4111                                 goto restart;
4112                         }
4113                         ceph_mdsc_put_request(nextreq);  /* won't go away */
4114                 }
4115                 req = nextreq;
4116         }
4117         mutex_unlock(&mdsc->mutex);
4118         dout("wait_unsafe_requests done\n");
4119 }
4120
4121 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4122 {
4123         u64 want_tid, want_flush;
4124
4125         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4126                 return;
4127
4128         dout("sync\n");
4129         mutex_lock(&mdsc->mutex);
4130         want_tid = mdsc->last_tid;
4131         mutex_unlock(&mdsc->mutex);
4132
4133         ceph_flush_dirty_caps(mdsc);
4134         spin_lock(&mdsc->cap_dirty_lock);
4135         want_flush = mdsc->last_cap_flush_tid;
4136         if (!list_empty(&mdsc->cap_flush_list)) {
4137                 struct ceph_cap_flush *cf =
4138                         list_last_entry(&mdsc->cap_flush_list,
4139                                         struct ceph_cap_flush, g_list);
4140                 cf->wake = true;
4141         }
4142         spin_unlock(&mdsc->cap_dirty_lock);
4143
4144         dout("sync want tid %lld flush_seq %lld\n",
4145              want_tid, want_flush);
4146
4147         wait_unsafe_requests(mdsc, want_tid);
4148         wait_caps_flush(mdsc, want_flush);
4149 }
4150
4151 /*
4152  * true if all sessions are closed, or we force unmount
4153  */
4154 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4155 {
4156         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4157                 return true;
4158         return atomic_read(&mdsc->num_sessions) <= skipped;
4159 }
4160
4161 /*
4162  * called after sb is ro.
4163  */
4164 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4165 {
4166         struct ceph_options *opts = mdsc->fsc->client->options;
4167         struct ceph_mds_session *session;
4168         int i;
4169         int skipped = 0;
4170
4171         dout("close_sessions\n");
4172
4173         /* close sessions */
4174         mutex_lock(&mdsc->mutex);
4175         for (i = 0; i < mdsc->max_sessions; i++) {
4176                 session = __ceph_lookup_mds_session(mdsc, i);
4177                 if (!session)
4178                         continue;
4179                 mutex_unlock(&mdsc->mutex);
4180                 mutex_lock(&session->s_mutex);
4181                 if (__close_session(mdsc, session) <= 0)
4182                         skipped++;
4183                 mutex_unlock(&session->s_mutex);
4184                 ceph_put_mds_session(session);
4185                 mutex_lock(&mdsc->mutex);
4186         }
4187         mutex_unlock(&mdsc->mutex);
4188
4189         dout("waiting for sessions to close\n");
4190         wait_event_timeout(mdsc->session_close_wq,
4191                            done_closing_sessions(mdsc, skipped),
4192                            ceph_timeout_jiffies(opts->mount_timeout));
4193
4194         /* tear down remaining sessions */
4195         mutex_lock(&mdsc->mutex);
4196         for (i = 0; i < mdsc->max_sessions; i++) {
4197                 if (mdsc->sessions[i]) {
4198                         session = get_session(mdsc->sessions[i]);
4199                         __unregister_session(mdsc, session);
4200                         mutex_unlock(&mdsc->mutex);
4201                         mutex_lock(&session->s_mutex);
4202                         remove_session_caps(session);
4203                         mutex_unlock(&session->s_mutex);
4204                         ceph_put_mds_session(session);
4205                         mutex_lock(&mdsc->mutex);
4206                 }
4207         }
4208         WARN_ON(!list_empty(&mdsc->cap_delay_list));
4209         mutex_unlock(&mdsc->mutex);
4210
4211         ceph_cleanup_snapid_map(mdsc);
4212
4213         ceph_cleanup_empty_realms(mdsc);
4214
4215         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4216
4217         dout("stopped\n");
4218 }
4219
4220 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4221 {
4222         struct ceph_mds_session *session;
4223         int mds;
4224
4225         dout("force umount\n");
4226
4227         mutex_lock(&mdsc->mutex);
4228         for (mds = 0; mds < mdsc->max_sessions; mds++) {
4229                 session = __ceph_lookup_mds_session(mdsc, mds);
4230                 if (!session)
4231                         continue;
4232                 mutex_unlock(&mdsc->mutex);
4233                 mutex_lock(&session->s_mutex);
4234                 __close_session(mdsc, session);
4235                 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4236                         cleanup_session_requests(mdsc, session);
4237                         remove_session_caps(session);
4238                 }
4239                 mutex_unlock(&session->s_mutex);
4240                 ceph_put_mds_session(session);
4241                 mutex_lock(&mdsc->mutex);
4242                 kick_requests(mdsc, mds);
4243         }
4244         __wake_requests(mdsc, &mdsc->waiting_for_map);
4245         mutex_unlock(&mdsc->mutex);
4246 }
4247
4248 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4249 {
4250         dout("stop\n");
4251         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4252         if (mdsc->mdsmap)
4253                 ceph_mdsmap_destroy(mdsc->mdsmap);
4254         kfree(mdsc->sessions);
4255         ceph_caps_finalize(mdsc);
4256         ceph_pool_perm_destroy(mdsc);
4257 }
4258
4259 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4260 {
4261         struct ceph_mds_client *mdsc = fsc->mdsc;
4262         dout("mdsc_destroy %p\n", mdsc);
4263
4264         if (!mdsc)
4265                 return;
4266
4267         /* flush out any connection work with references to us */
4268         ceph_msgr_flush();
4269
4270         ceph_mdsc_stop(mdsc);
4271
4272         fsc->mdsc = NULL;
4273         kfree(mdsc);
4274         dout("mdsc_destroy %p done\n", mdsc);
4275 }
4276
4277 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4278 {
4279         struct ceph_fs_client *fsc = mdsc->fsc;
4280         const char *mds_namespace = fsc->mount_options->mds_namespace;
4281         void *p = msg->front.iov_base;
4282         void *end = p + msg->front.iov_len;
4283         u32 epoch;
4284         u32 map_len;
4285         u32 num_fs;
4286         u32 mount_fscid = (u32)-1;
4287         u8 struct_v, struct_cv;
4288         int err = -EINVAL;
4289
4290         ceph_decode_need(&p, end, sizeof(u32), bad);
4291         epoch = ceph_decode_32(&p);
4292
4293         dout("handle_fsmap epoch %u\n", epoch);
4294
4295         ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4296         struct_v = ceph_decode_8(&p);
4297         struct_cv = ceph_decode_8(&p);
4298         map_len = ceph_decode_32(&p);
4299
4300         ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
4301         p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
4302
4303         num_fs = ceph_decode_32(&p);
4304         while (num_fs-- > 0) {
4305                 void *info_p, *info_end;
4306                 u32 info_len;
4307                 u8 info_v, info_cv;
4308                 u32 fscid, namelen;
4309
4310                 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4311                 info_v = ceph_decode_8(&p);
4312                 info_cv = ceph_decode_8(&p);
4313                 info_len = ceph_decode_32(&p);
4314                 ceph_decode_need(&p, end, info_len, bad);
4315                 info_p = p;
4316                 info_end = p + info_len;
4317                 p = info_end;
4318
4319                 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4320                 fscid = ceph_decode_32(&info_p);
4321                 namelen = ceph_decode_32(&info_p);
4322                 ceph_decode_need(&info_p, info_end, namelen, bad);
4323
4324                 if (mds_namespace &&
4325                     strlen(mds_namespace) == namelen &&
4326                     !strncmp(mds_namespace, (char *)info_p, namelen)) {
4327                         mount_fscid = fscid;
4328                         break;
4329                 }
4330         }
4331
4332         ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4333         if (mount_fscid != (u32)-1) {
4334                 fsc->client->monc.fs_cluster_id = mount_fscid;
4335                 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4336                                    0, true);
4337                 ceph_monc_renew_subs(&fsc->client->monc);
4338         } else {
4339                 err = -ENOENT;
4340                 goto err_out;
4341         }
4342         return;
4343
4344 bad:
4345         pr_err("error decoding fsmap\n");
4346 err_out:
4347         mutex_lock(&mdsc->mutex);
4348         mdsc->mdsmap_err = err;
4349         __wake_requests(mdsc, &mdsc->waiting_for_map);
4350         mutex_unlock(&mdsc->mutex);
4351 }
4352
4353 /*
4354  * handle mds map update.
4355  */
4356 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4357 {
4358         u32 epoch;
4359         u32 maplen;
4360         void *p = msg->front.iov_base;
4361         void *end = p + msg->front.iov_len;
4362         struct ceph_mdsmap *newmap, *oldmap;
4363         struct ceph_fsid fsid;
4364         int err = -EINVAL;
4365
4366         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
4367         ceph_decode_copy(&p, &fsid, sizeof(fsid));
4368         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
4369                 return;
4370         epoch = ceph_decode_32(&p);
4371         maplen = ceph_decode_32(&p);
4372         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
4373
4374         /* do we need it? */
4375         mutex_lock(&mdsc->mutex);
4376         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
4377                 dout("handle_map epoch %u <= our %u\n",
4378                      epoch, mdsc->mdsmap->m_epoch);
4379                 mutex_unlock(&mdsc->mutex);
4380                 return;
4381         }
4382
4383         newmap = ceph_mdsmap_decode(&p, end);
4384         if (IS_ERR(newmap)) {
4385                 err = PTR_ERR(newmap);
4386                 goto bad_unlock;
4387         }
4388
4389         /* swap into place */
4390         if (mdsc->mdsmap) {
4391                 oldmap = mdsc->mdsmap;
4392                 mdsc->mdsmap = newmap;
4393                 check_new_map(mdsc, newmap, oldmap);
4394                 ceph_mdsmap_destroy(oldmap);
4395         } else {
4396                 mdsc->mdsmap = newmap;  /* first mds map */
4397         }
4398         mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
4399                                         MAX_LFS_FILESIZE);
4400
4401         __wake_requests(mdsc, &mdsc->waiting_for_map);
4402         ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
4403                           mdsc->mdsmap->m_epoch);
4404
4405         mutex_unlock(&mdsc->mutex);
4406         schedule_delayed(mdsc);
4407         return;
4408
4409 bad_unlock:
4410         mutex_unlock(&mdsc->mutex);
4411 bad:
4412         pr_err("error decoding mdsmap %d\n", err);
4413         return;
4414 }
4415
4416 static struct ceph_connection *con_get(struct ceph_connection *con)
4417 {
4418         struct ceph_mds_session *s = con->private;
4419
4420         if (get_session(s)) {
4421                 dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
4422                 return con;
4423         }
4424         dout("mdsc con_get %p FAIL\n", s);
4425         return NULL;
4426 }
4427
4428 static void con_put(struct ceph_connection *con)
4429 {
4430         struct ceph_mds_session *s = con->private;
4431
4432         dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
4433         ceph_put_mds_session(s);
4434 }
4435
4436 /*
4437  * if the client is unresponsive for long enough, the mds will kill
4438  * the session entirely.
4439  */
4440 static void peer_reset(struct ceph_connection *con)
4441 {
4442         struct ceph_mds_session *s = con->private;
4443         struct ceph_mds_client *mdsc = s->s_mdsc;
4444
4445         pr_warn("mds%d closed our session\n", s->s_mds);
4446         send_mds_reconnect(mdsc, s);
4447 }
4448
4449 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
4450 {
4451         struct ceph_mds_session *s = con->private;
4452         struct ceph_mds_client *mdsc = s->s_mdsc;
4453         int type = le16_to_cpu(msg->hdr.type);
4454
4455         mutex_lock(&mdsc->mutex);
4456         if (__verify_registered_session(mdsc, s) < 0) {
4457                 mutex_unlock(&mdsc->mutex);
4458                 goto out;
4459         }
4460         mutex_unlock(&mdsc->mutex);
4461
4462         switch (type) {
4463         case CEPH_MSG_MDS_MAP:
4464                 ceph_mdsc_handle_mdsmap(mdsc, msg);
4465                 break;
4466         case CEPH_MSG_FS_MAP_USER:
4467                 ceph_mdsc_handle_fsmap(mdsc, msg);
4468                 break;
4469         case CEPH_MSG_CLIENT_SESSION:
4470                 handle_session(s, msg);
4471                 break;
4472         case CEPH_MSG_CLIENT_REPLY:
4473                 handle_reply(s, msg);
4474                 break;
4475         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
4476                 handle_forward(mdsc, s, msg);
4477                 break;
4478         case CEPH_MSG_CLIENT_CAPS:
4479                 ceph_handle_caps(s, msg);
4480                 break;
4481         case CEPH_MSG_CLIENT_SNAP:
4482                 ceph_handle_snap(mdsc, s, msg);
4483                 break;
4484         case CEPH_MSG_CLIENT_LEASE:
4485                 handle_lease(mdsc, s, msg);
4486                 break;
4487         case CEPH_MSG_CLIENT_QUOTA:
4488                 ceph_handle_quota(mdsc, s, msg);
4489                 break;
4490
4491         default:
4492                 pr_err("received unknown message type %d %s\n", type,
4493                        ceph_msg_type_name(type));
4494         }
4495 out:
4496         ceph_msg_put(msg);
4497 }
4498
4499 /*
4500  * authentication
4501  */
4502
4503 /*
4504  * Note: returned pointer is the address of a structure that's
4505  * managed separately.  Caller must *not* attempt to free it.
4506  */
4507 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
4508                                         int *proto, int force_new)
4509 {
4510         struct ceph_mds_session *s = con->private;
4511         struct ceph_mds_client *mdsc = s->s_mdsc;
4512         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4513         struct ceph_auth_handshake *auth = &s->s_auth;
4514
4515         if (force_new && auth->authorizer) {
4516                 ceph_auth_destroy_authorizer(auth->authorizer);
4517                 auth->authorizer = NULL;
4518         }
4519         if (!auth->authorizer) {
4520                 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4521                                                       auth);
4522                 if (ret)
4523                         return ERR_PTR(ret);
4524         } else {
4525                 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
4526                                                       auth);
4527                 if (ret)
4528                         return ERR_PTR(ret);
4529         }
4530         *proto = ac->protocol;
4531
4532         return auth;
4533 }
4534
4535 static int add_authorizer_challenge(struct ceph_connection *con,
4536                                     void *challenge_buf, int challenge_buf_len)
4537 {
4538         struct ceph_mds_session *s = con->private;
4539         struct ceph_mds_client *mdsc = s->s_mdsc;
4540         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4541
4542         return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
4543                                             challenge_buf, challenge_buf_len);
4544 }
4545
4546 static int verify_authorizer_reply(struct ceph_connection *con)
4547 {
4548         struct ceph_mds_session *s = con->private;
4549         struct ceph_mds_client *mdsc = s->s_mdsc;
4550         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4551
4552         return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
4553 }
4554
4555 static int invalidate_authorizer(struct ceph_connection *con)
4556 {
4557         struct ceph_mds_session *s = con->private;
4558         struct ceph_mds_client *mdsc = s->s_mdsc;
4559         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
4560
4561         ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
4562
4563         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
4564 }
4565
4566 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
4567                                 struct ceph_msg_header *hdr, int *skip)
4568 {
4569         struct ceph_msg *msg;
4570         int type = (int) le16_to_cpu(hdr->type);
4571         int front_len = (int) le32_to_cpu(hdr->front_len);
4572
4573         if (con->in_msg)
4574                 return con->in_msg;
4575
4576         *skip = 0;
4577         msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
4578         if (!msg) {
4579                 pr_err("unable to allocate msg type %d len %d\n",
4580                        type, front_len);
4581                 return NULL;
4582         }
4583
4584         return msg;
4585 }
4586
4587 static int mds_sign_message(struct ceph_msg *msg)
4588 {
4589        struct ceph_mds_session *s = msg->con->private;
4590        struct ceph_auth_handshake *auth = &s->s_auth;
4591
4592        return ceph_auth_sign_message(auth, msg);
4593 }
4594
4595 static int mds_check_message_signature(struct ceph_msg *msg)
4596 {
4597        struct ceph_mds_session *s = msg->con->private;
4598        struct ceph_auth_handshake *auth = &s->s_auth;
4599
4600        return ceph_auth_check_message_signature(auth, msg);
4601 }
4602
4603 static const struct ceph_connection_operations mds_con_ops = {
4604         .get = con_get,
4605         .put = con_put,
4606         .dispatch = dispatch,
4607         .get_authorizer = get_authorizer,
4608         .add_authorizer_challenge = add_authorizer_challenge,
4609         .verify_authorizer_reply = verify_authorizer_reply,
4610         .invalidate_authorizer = invalidate_authorizer,
4611         .peer_reset = peer_reset,
4612         .alloc_msg = mds_alloc_msg,
4613         .sign_message = mds_sign_message,
4614         .check_message_signature = mds_check_message_signature,
4615 };
4616
4617 /* eof */