Merge tag 'leds-5.13-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/pavel...
[linux-2.6-microblaze.git] / fs / dlm / rcom.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include "dlm_internal.h"
13 #include "lockspace.h"
14 #include "member.h"
15 #include "lowcomms.h"
16 #include "midcomms.h"
17 #include "rcom.h"
18 #include "recover.h"
19 #include "dir.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "util.h"
24
25 static int rcom_response(struct dlm_ls *ls)
26 {
27         return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
28 }
29
30 static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
31                        struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
32 {
33         struct dlm_rcom *rc;
34         struct dlm_mhandle *mh;
35         char *mb;
36         int mb_len = sizeof(struct dlm_rcom) + len;
37
38         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
39         if (!mh) {
40                 log_print("create_rcom to %d type %d len %d ENOBUFS",
41                           to_nodeid, type, len);
42                 return -ENOBUFS;
43         }
44
45         rc = (struct dlm_rcom *) mb;
46
47         rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
48         rc->rc_header.h_lockspace = ls->ls_global_id;
49         rc->rc_header.h_nodeid = dlm_our_nodeid();
50         rc->rc_header.h_length = mb_len;
51         rc->rc_header.h_cmd = DLM_RCOM;
52
53         rc->rc_type = type;
54
55         spin_lock(&ls->ls_recover_lock);
56         rc->rc_seq = ls->ls_recover_seq;
57         spin_unlock(&ls->ls_recover_lock);
58
59         *mh_ret = mh;
60         *rc_ret = rc;
61         return 0;
62 }
63
64 static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
65                       struct dlm_rcom *rc)
66 {
67         dlm_rcom_out(rc);
68         dlm_lowcomms_commit_buffer(mh);
69 }
70
71 static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
72                             uint32_t flags)
73 {
74         rs->rs_flags = cpu_to_le32(flags);
75 }
76
77 /* When replying to a status request, a node also sends back its
78    configuration values.  The requesting node then checks that the remote
79    node is configured the same way as itself. */
80
81 static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
82                             uint32_t num_slots)
83 {
84         rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
85         rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
86
87         rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
88         rf->rf_num_slots = cpu_to_le16(num_slots);
89         rf->rf_generation =  cpu_to_le32(ls->ls_generation);
90 }
91
92 static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
93 {
94         struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
95
96         if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
97                 log_error(ls, "version mismatch: %x nodeid %d: %x",
98                           DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
99                           rc->rc_header.h_version);
100                 return -EPROTO;
101         }
102
103         if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
104             le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
105                 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
106                           ls->ls_lvblen, ls->ls_exflags, nodeid,
107                           le32_to_cpu(rf->rf_lvblen),
108                           le32_to_cpu(rf->rf_lsflags));
109                 return -EPROTO;
110         }
111         return 0;
112 }
113
114 static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
115 {
116         spin_lock(&ls->ls_rcom_spin);
117         *new_seq = ++ls->ls_rcom_seq;
118         set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
119         spin_unlock(&ls->ls_rcom_spin);
120 }
121
122 static void disallow_sync_reply(struct dlm_ls *ls)
123 {
124         spin_lock(&ls->ls_rcom_spin);
125         clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
126         clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
127         spin_unlock(&ls->ls_rcom_spin);
128 }
129
130 /*
131  * low nodeid gathers one slot value at a time from each node.
132  * it sets need_slots=0, and saves rf_our_slot returned from each
133  * rcom_config.
134  *
135  * other nodes gather all slot values at once from the low nodeid.
136  * they set need_slots=1, and ignore the rf_our_slot returned from each
137  * rcom_config.  they use the rf_num_slots returned from the low
138  * node's rcom_config.
139  */
140
141 int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
142 {
143         struct dlm_rcom *rc;
144         struct dlm_mhandle *mh;
145         int error = 0;
146
147         ls->ls_recover_nodeid = nodeid;
148
149         if (nodeid == dlm_our_nodeid()) {
150                 rc = ls->ls_recover_buf;
151                 rc->rc_result = dlm_recover_status(ls);
152                 goto out;
153         }
154
155 retry:
156         error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
157                             sizeof(struct rcom_status), &rc, &mh);
158         if (error)
159                 goto out;
160
161         set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
162
163         allow_sync_reply(ls, &rc->rc_id);
164         memset(ls->ls_recover_buf, 0, LOWCOMMS_MAX_TX_BUFFER_LEN);
165
166         send_rcom(ls, mh, rc);
167
168         error = dlm_wait_function(ls, &rcom_response);
169         disallow_sync_reply(ls);
170         if (error == -ETIMEDOUT)
171                 goto retry;
172         if (error)
173                 goto out;
174
175         rc = ls->ls_recover_buf;
176
177         if (rc->rc_result == -ESRCH) {
178                 /* we pretend the remote lockspace exists with 0 status */
179                 log_debug(ls, "remote node %d not ready", nodeid);
180                 rc->rc_result = 0;
181                 error = 0;
182         } else {
183                 error = check_rcom_config(ls, rc, nodeid);
184         }
185
186         /* the caller looks at rc_result for the remote recovery status */
187  out:
188         return error;
189 }
190
191 static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
192 {
193         struct dlm_rcom *rc;
194         struct dlm_mhandle *mh;
195         struct rcom_status *rs;
196         uint32_t status;
197         int nodeid = rc_in->rc_header.h_nodeid;
198         int len = sizeof(struct rcom_config);
199         int num_slots = 0;
200         int error;
201
202         if (!dlm_slots_version(&rc_in->rc_header)) {
203                 status = dlm_recover_status(ls);
204                 goto do_create;
205         }
206
207         rs = (struct rcom_status *)rc_in->rc_buf;
208
209         if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
210                 status = dlm_recover_status(ls);
211                 goto do_create;
212         }
213
214         spin_lock(&ls->ls_recover_lock);
215         status = ls->ls_recover_status;
216         num_slots = ls->ls_num_slots;
217         spin_unlock(&ls->ls_recover_lock);
218         len += num_slots * sizeof(struct rcom_slot);
219
220  do_create:
221         error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
222                             len, &rc, &mh);
223         if (error)
224                 return;
225
226         rc->rc_id = rc_in->rc_id;
227         rc->rc_seq_reply = rc_in->rc_seq;
228         rc->rc_result = status;
229
230         set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
231
232         if (!num_slots)
233                 goto do_send;
234
235         spin_lock(&ls->ls_recover_lock);
236         if (ls->ls_num_slots != num_slots) {
237                 spin_unlock(&ls->ls_recover_lock);
238                 log_debug(ls, "receive_rcom_status num_slots %d to %d",
239                           num_slots, ls->ls_num_slots);
240                 rc->rc_result = 0;
241                 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
242                 goto do_send;
243         }
244
245         dlm_slots_copy_out(ls, rc);
246         spin_unlock(&ls->ls_recover_lock);
247
248  do_send:
249         send_rcom(ls, mh, rc);
250 }
251
252 static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
253 {
254         spin_lock(&ls->ls_rcom_spin);
255         if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
256             rc_in->rc_id != ls->ls_rcom_seq) {
257                 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
258                           rc_in->rc_type, rc_in->rc_header.h_nodeid,
259                           (unsigned long long)rc_in->rc_id,
260                           (unsigned long long)ls->ls_rcom_seq);
261                 goto out;
262         }
263         memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
264         set_bit(LSFL_RCOM_READY, &ls->ls_flags);
265         clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
266         wake_up(&ls->ls_wait_general);
267  out:
268         spin_unlock(&ls->ls_rcom_spin);
269 }
270
271 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
272 {
273         struct dlm_rcom *rc;
274         struct dlm_mhandle *mh;
275         int error = 0;
276
277         ls->ls_recover_nodeid = nodeid;
278
279 retry:
280         error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
281         if (error)
282                 goto out;
283         memcpy(rc->rc_buf, last_name, last_len);
284
285         allow_sync_reply(ls, &rc->rc_id);
286         memset(ls->ls_recover_buf, 0, LOWCOMMS_MAX_TX_BUFFER_LEN);
287
288         send_rcom(ls, mh, rc);
289
290         error = dlm_wait_function(ls, &rcom_response);
291         disallow_sync_reply(ls);
292         if (error == -ETIMEDOUT)
293                 goto retry;
294  out:
295         return error;
296 }
297
298 static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
299 {
300         struct dlm_rcom *rc;
301         struct dlm_mhandle *mh;
302         int error, inlen, outlen, nodeid;
303
304         nodeid = rc_in->rc_header.h_nodeid;
305         inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
306         outlen = LOWCOMMS_MAX_TX_BUFFER_LEN - sizeof(struct dlm_rcom);
307
308         error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
309         if (error)
310                 return;
311         rc->rc_id = rc_in->rc_id;
312         rc->rc_seq_reply = rc_in->rc_seq;
313
314         dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
315                               nodeid);
316         send_rcom(ls, mh, rc);
317 }
318
319 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
320 {
321         struct dlm_rcom *rc;
322         struct dlm_mhandle *mh;
323         struct dlm_ls *ls = r->res_ls;
324         int error;
325
326         error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
327                             &rc, &mh);
328         if (error)
329                 goto out;
330         memcpy(rc->rc_buf, r->res_name, r->res_length);
331         rc->rc_id = (unsigned long) r->res_id;
332
333         send_rcom(ls, mh, rc);
334  out:
335         return error;
336 }
337
338 static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
339 {
340         struct dlm_rcom *rc;
341         struct dlm_mhandle *mh;
342         int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
343         int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
344
345         error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
346         if (error)
347                 return;
348
349         /* Old code would send this special id to trigger a debug dump. */
350         if (rc_in->rc_id == 0xFFFFFFFF) {
351                 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
352                 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
353                 return;
354         }
355
356         error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
357                                   DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
358         if (error)
359                 ret_nodeid = error;
360         rc->rc_result = ret_nodeid;
361         rc->rc_id = rc_in->rc_id;
362         rc->rc_seq_reply = rc_in->rc_seq;
363
364         send_rcom(ls, mh, rc);
365 }
366
367 static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
368 {
369         dlm_recover_master_reply(ls, rc_in);
370 }
371
372 static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
373                            struct rcom_lock *rl)
374 {
375         memset(rl, 0, sizeof(*rl));
376
377         rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
378         rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
379         rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
380         rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
381         rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
382         rl->rl_rqmode = lkb->lkb_rqmode;
383         rl->rl_grmode = lkb->lkb_grmode;
384         rl->rl_status = lkb->lkb_status;
385         rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
386
387         if (lkb->lkb_bastfn)
388                 rl->rl_asts |= DLM_CB_BAST;
389         if (lkb->lkb_astfn)
390                 rl->rl_asts |= DLM_CB_CAST;
391
392         rl->rl_namelen = cpu_to_le16(r->res_length);
393         memcpy(rl->rl_name, r->res_name, r->res_length);
394
395         /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
396            If so, receive_rcom_lock_args() won't take this copy. */
397
398         if (lkb->lkb_lvbptr)
399                 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
400 }
401
402 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
403 {
404         struct dlm_ls *ls = r->res_ls;
405         struct dlm_rcom *rc;
406         struct dlm_mhandle *mh;
407         struct rcom_lock *rl;
408         int error, len = sizeof(struct rcom_lock);
409
410         if (lkb->lkb_lvbptr)
411                 len += ls->ls_lvblen;
412
413         error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
414         if (error)
415                 goto out;
416
417         rl = (struct rcom_lock *) rc->rc_buf;
418         pack_rcom_lock(r, lkb, rl);
419         rc->rc_id = (unsigned long) r;
420
421         send_rcom(ls, mh, rc);
422  out:
423         return error;
424 }
425
426 /* needs at least dlm_rcom + rcom_lock */
427 static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
428 {
429         struct dlm_rcom *rc;
430         struct dlm_mhandle *mh;
431         int error, nodeid = rc_in->rc_header.h_nodeid;
432
433         dlm_recover_master_copy(ls, rc_in);
434
435         error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
436                             sizeof(struct rcom_lock), &rc, &mh);
437         if (error)
438                 return;
439
440         /* We send back the same rcom_lock struct we received, but
441            dlm_recover_master_copy() has filled in rl_remid and rl_result */
442
443         memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
444         rc->rc_id = rc_in->rc_id;
445         rc->rc_seq_reply = rc_in->rc_seq;
446
447         send_rcom(ls, mh, rc);
448 }
449
450 /* If the lockspace doesn't exist then still send a status message
451    back; it's possible that it just doesn't have its global_id yet. */
452
453 int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
454 {
455         struct dlm_rcom *rc;
456         struct rcom_config *rf;
457         struct dlm_mhandle *mh;
458         char *mb;
459         int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
460
461         mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
462         if (!mh)
463                 return -ENOBUFS;
464
465         rc = (struct dlm_rcom *) mb;
466
467         rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
468         rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
469         rc->rc_header.h_nodeid = dlm_our_nodeid();
470         rc->rc_header.h_length = mb_len;
471         rc->rc_header.h_cmd = DLM_RCOM;
472
473         rc->rc_type = DLM_RCOM_STATUS_REPLY;
474         rc->rc_id = rc_in->rc_id;
475         rc->rc_seq_reply = rc_in->rc_seq;
476         rc->rc_result = -ESRCH;
477
478         rf = (struct rcom_config *) rc->rc_buf;
479         rf->rf_lvblen = cpu_to_le32(~0U);
480
481         dlm_rcom_out(rc);
482         dlm_lowcomms_commit_buffer(mh);
483
484         return 0;
485 }
486
487 /*
488  * Ignore messages for stage Y before we set
489  * recover_status bit for stage X:
490  *
491  * recover_status = 0
492  *
493  * dlm_recover_members()
494  * - send nothing
495  * - recv nothing
496  * - ignore NAMES, NAMES_REPLY
497  * - ignore LOOKUP, LOOKUP_REPLY
498  * - ignore LOCK, LOCK_REPLY
499  *
500  * recover_status |= NODES
501  *
502  * dlm_recover_members_wait()
503  *
504  * dlm_recover_directory()
505  * - send NAMES
506  * - recv NAMES_REPLY
507  * - ignore LOOKUP, LOOKUP_REPLY
508  * - ignore LOCK, LOCK_REPLY
509  *
510  * recover_status |= DIR
511  *
512  * dlm_recover_directory_wait()
513  *
514  * dlm_recover_masters()
515  * - send LOOKUP
516  * - recv LOOKUP_REPLY
517  *
518  * dlm_recover_locks()
519  * - send LOCKS
520  * - recv LOCKS_REPLY
521  *
522  * recover_status |= LOCKS
523  *
524  * dlm_recover_locks_wait()
525  *
526  * recover_status |= DONE
527  */
528
529 /* Called by dlm_recv; corresponds to dlm_receive_message() but special
530    recovery-only comms are sent through here. */
531
532 void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
533 {
534         int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
535         int stop, reply = 0, names = 0, lookup = 0, lock = 0;
536         uint32_t status;
537         uint64_t seq;
538
539         switch (rc->rc_type) {
540         case DLM_RCOM_STATUS_REPLY:
541                 reply = 1;
542                 break;
543         case DLM_RCOM_NAMES:
544                 names = 1;
545                 break;
546         case DLM_RCOM_NAMES_REPLY:
547                 names = 1;
548                 reply = 1;
549                 break;
550         case DLM_RCOM_LOOKUP:
551                 lookup = 1;
552                 break;
553         case DLM_RCOM_LOOKUP_REPLY:
554                 lookup = 1;
555                 reply = 1;
556                 break;
557         case DLM_RCOM_LOCK:
558                 lock = 1;
559                 break;
560         case DLM_RCOM_LOCK_REPLY:
561                 lock = 1;
562                 reply = 1;
563                 break;
564         }
565
566         spin_lock(&ls->ls_recover_lock);
567         status = ls->ls_recover_status;
568         stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
569         seq = ls->ls_recover_seq;
570         spin_unlock(&ls->ls_recover_lock);
571
572         if (stop && (rc->rc_type != DLM_RCOM_STATUS))
573                 goto ignore;
574
575         if (reply && (rc->rc_seq_reply != seq))
576                 goto ignore;
577
578         if (!(status & DLM_RS_NODES) && (names || lookup || lock))
579                 goto ignore;
580
581         if (!(status & DLM_RS_DIR) && (lookup || lock))
582                 goto ignore;
583
584         switch (rc->rc_type) {
585         case DLM_RCOM_STATUS:
586                 receive_rcom_status(ls, rc);
587                 break;
588
589         case DLM_RCOM_NAMES:
590                 receive_rcom_names(ls, rc);
591                 break;
592
593         case DLM_RCOM_LOOKUP:
594                 receive_rcom_lookup(ls, rc);
595                 break;
596
597         case DLM_RCOM_LOCK:
598                 if (rc->rc_header.h_length < lock_size)
599                         goto Eshort;
600                 receive_rcom_lock(ls, rc);
601                 break;
602
603         case DLM_RCOM_STATUS_REPLY:
604                 receive_sync_reply(ls, rc);
605                 break;
606
607         case DLM_RCOM_NAMES_REPLY:
608                 receive_sync_reply(ls, rc);
609                 break;
610
611         case DLM_RCOM_LOOKUP_REPLY:
612                 receive_rcom_lookup_reply(ls, rc);
613                 break;
614
615         case DLM_RCOM_LOCK_REPLY:
616                 if (rc->rc_header.h_length < lock_size)
617                         goto Eshort;
618                 dlm_recover_process_copy(ls, rc);
619                 break;
620
621         default:
622                 log_error(ls, "receive_rcom bad type %d", rc->rc_type);
623         }
624         return;
625
626 ignore:
627         log_limit(ls, "dlm_receive_rcom ignore msg %d "
628                   "from %d %llu %llu recover seq %llu sts %x gen %u",
629                    rc->rc_type,
630                    nodeid,
631                    (unsigned long long)rc->rc_seq,
632                    (unsigned long long)rc->rc_seq_reply,
633                    (unsigned long long)seq,
634                    status, ls->ls_generation);
635         return;
636 Eshort:
637         log_error(ls, "recovery message %d from %d is too short",
638                   rc->rc_type, nodeid);
639 }
640