io_uring: move defer_list to slow data
[linux-2.6-microblaze.git] / fs / dlm / rcom.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include "dlm_internal.h"
13 #include "lockspace.h"
14 #include "member.h"
15 #include "lowcomms.h"
16 #include "midcomms.h"
17 #include "rcom.h"
18 #include "recover.h"
19 #include "dir.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "util.h"
24
25 static int rcom_response(struct dlm_ls *ls)
26 {
27         return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
28 }
29
30 static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
31                          struct dlm_rcom **rc_ret, char *mb, int mb_len)
32 {
33         struct dlm_rcom *rc;
34
35         rc = (struct dlm_rcom *) mb;
36
37         rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
38         rc->rc_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
39         rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
40         rc->rc_header.h_length = cpu_to_le16(mb_len);
41         rc->rc_header.h_cmd = DLM_RCOM;
42
43         rc->rc_type = cpu_to_le32(type);
44
45         spin_lock(&ls->ls_recover_lock);
46         rc->rc_seq = cpu_to_le64(ls->ls_recover_seq);
47         spin_unlock(&ls->ls_recover_lock);
48
49         *rc_ret = rc;
50 }
51
52 static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
53                        struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
54 {
55         int mb_len = sizeof(struct dlm_rcom) + len;
56         struct dlm_mhandle *mh;
57         char *mb;
58
59         mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
60         if (!mh) {
61                 log_print("%s to %d type %d len %d ENOBUFS",
62                           __func__, to_nodeid, type, len);
63                 return -ENOBUFS;
64         }
65
66         _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
67         *mh_ret = mh;
68         return 0;
69 }
70
71 static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
72                                  int len, struct dlm_rcom **rc_ret,
73                                  struct dlm_msg **msg_ret)
74 {
75         int mb_len = sizeof(struct dlm_rcom) + len;
76         struct dlm_msg *msg;
77         char *mb;
78
79         msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb,
80                                    NULL, NULL);
81         if (!msg) {
82                 log_print("create_rcom to %d type %d len %d ENOBUFS",
83                           to_nodeid, type, len);
84                 return -ENOBUFS;
85         }
86
87         _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
88         *msg_ret = msg;
89         return 0;
90 }
91
92 static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
93 {
94         dlm_midcomms_commit_mhandle(mh);
95 }
96
97 static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
98 {
99         dlm_lowcomms_commit_msg(msg);
100         dlm_lowcomms_put_msg(msg);
101 }
102
103 static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
104                             uint32_t flags)
105 {
106         rs->rs_flags = cpu_to_le32(flags);
107 }
108
109 /* When replying to a status request, a node also sends back its
110    configuration values.  The requesting node then checks that the remote
111    node is configured the same way as itself. */
112
113 static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
114                             uint32_t num_slots)
115 {
116         rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
117         rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
118
119         rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
120         rf->rf_num_slots = cpu_to_le16(num_slots);
121         rf->rf_generation =  cpu_to_le32(ls->ls_generation);
122 }
123
124 static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
125 {
126         struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
127
128         if ((le32_to_cpu(rc->rc_header.h_version) & 0xFFFF0000) != DLM_HEADER_MAJOR) {
129                 log_error(ls, "version mismatch: %x nodeid %d: %x",
130                           DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
131                           le32_to_cpu(rc->rc_header.h_version));
132                 return -EPROTO;
133         }
134
135         if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
136             le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
137                 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
138                           ls->ls_lvblen, ls->ls_exflags, nodeid,
139                           le32_to_cpu(rf->rf_lvblen),
140                           le32_to_cpu(rf->rf_lsflags));
141                 return -EPROTO;
142         }
143         return 0;
144 }
145
146 static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
147 {
148         spin_lock(&ls->ls_rcom_spin);
149         *new_seq = cpu_to_le64(++ls->ls_rcom_seq);
150         set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
151         spin_unlock(&ls->ls_rcom_spin);
152 }
153
154 static void disallow_sync_reply(struct dlm_ls *ls)
155 {
156         spin_lock(&ls->ls_rcom_spin);
157         clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
158         clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
159         spin_unlock(&ls->ls_rcom_spin);
160 }
161
162 /*
163  * low nodeid gathers one slot value at a time from each node.
164  * it sets need_slots=0, and saves rf_our_slot returned from each
165  * rcom_config.
166  *
167  * other nodes gather all slot values at once from the low nodeid.
168  * they set need_slots=1, and ignore the rf_our_slot returned from each
169  * rcom_config.  they use the rf_num_slots returned from the low
170  * node's rcom_config.
171  */
172
173 int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
174 {
175         struct dlm_rcom *rc;
176         struct dlm_msg *msg;
177         int error = 0;
178
179         ls->ls_recover_nodeid = nodeid;
180
181         if (nodeid == dlm_our_nodeid()) {
182                 rc = ls->ls_recover_buf;
183                 rc->rc_result = cpu_to_le32(dlm_recover_status(ls));
184                 goto out;
185         }
186
187 retry:
188         error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
189                                       sizeof(struct rcom_status), &rc, &msg);
190         if (error)
191                 goto out;
192
193         set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
194
195         allow_sync_reply(ls, &rc->rc_id);
196         memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
197
198         send_rcom_stateless(msg, rc);
199
200         error = dlm_wait_function(ls, &rcom_response);
201         disallow_sync_reply(ls);
202         if (error == -ETIMEDOUT)
203                 goto retry;
204         if (error)
205                 goto out;
206
207         rc = ls->ls_recover_buf;
208
209         if (rc->rc_result == cpu_to_le32(-ESRCH)) {
210                 /* we pretend the remote lockspace exists with 0 status */
211                 log_debug(ls, "remote node %d not ready", nodeid);
212                 rc->rc_result = 0;
213                 error = 0;
214         } else {
215                 error = check_rcom_config(ls, rc, nodeid);
216         }
217
218         /* the caller looks at rc_result for the remote recovery status */
219  out:
220         return error;
221 }
222
223 static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
224 {
225         struct dlm_rcom *rc;
226         struct rcom_status *rs;
227         uint32_t status;
228         int nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
229         int len = sizeof(struct rcom_config);
230         struct dlm_msg *msg;
231         int num_slots = 0;
232         int error;
233
234         if (!dlm_slots_version(&rc_in->rc_header)) {
235                 status = dlm_recover_status(ls);
236                 goto do_create;
237         }
238
239         rs = (struct rcom_status *)rc_in->rc_buf;
240
241         if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
242                 status = dlm_recover_status(ls);
243                 goto do_create;
244         }
245
246         spin_lock(&ls->ls_recover_lock);
247         status = ls->ls_recover_status;
248         num_slots = ls->ls_num_slots;
249         spin_unlock(&ls->ls_recover_lock);
250         len += num_slots * sizeof(struct rcom_slot);
251
252  do_create:
253         error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
254                                       len, &rc, &msg);
255         if (error)
256                 return;
257
258         rc->rc_id = rc_in->rc_id;
259         rc->rc_seq_reply = rc_in->rc_seq;
260         rc->rc_result = cpu_to_le32(status);
261
262         set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
263
264         if (!num_slots)
265                 goto do_send;
266
267         spin_lock(&ls->ls_recover_lock);
268         if (ls->ls_num_slots != num_slots) {
269                 spin_unlock(&ls->ls_recover_lock);
270                 log_debug(ls, "receive_rcom_status num_slots %d to %d",
271                           num_slots, ls->ls_num_slots);
272                 rc->rc_result = 0;
273                 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
274                 goto do_send;
275         }
276
277         dlm_slots_copy_out(ls, rc);
278         spin_unlock(&ls->ls_recover_lock);
279
280  do_send:
281         send_rcom_stateless(msg, rc);
282 }
283
284 static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
285 {
286         spin_lock(&ls->ls_rcom_spin);
287         if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
288             le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
289                 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
290                           le32_to_cpu(rc_in->rc_type),
291                           le32_to_cpu(rc_in->rc_header.h_nodeid),
292                           (unsigned long long)le64_to_cpu(rc_in->rc_id),
293                           (unsigned long long)ls->ls_rcom_seq);
294                 goto out;
295         }
296         memcpy(ls->ls_recover_buf, rc_in,
297                le16_to_cpu(rc_in->rc_header.h_length));
298         set_bit(LSFL_RCOM_READY, &ls->ls_flags);
299         clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
300         wake_up(&ls->ls_wait_general);
301  out:
302         spin_unlock(&ls->ls_rcom_spin);
303 }
304
305 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
306 {
307         struct dlm_rcom *rc;
308         struct dlm_msg *msg;
309         int error = 0;
310
311         ls->ls_recover_nodeid = nodeid;
312
313 retry:
314         error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len,
315                                       &rc, &msg);
316         if (error)
317                 goto out;
318         memcpy(rc->rc_buf, last_name, last_len);
319
320         allow_sync_reply(ls, &rc->rc_id);
321         memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
322
323         send_rcom_stateless(msg, rc);
324
325         error = dlm_wait_function(ls, &rcom_response);
326         disallow_sync_reply(ls);
327         if (error == -ETIMEDOUT)
328                 goto retry;
329  out:
330         return error;
331 }
332
333 static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
334 {
335         struct dlm_rcom *rc;
336         int error, inlen, outlen, nodeid;
337         struct dlm_msg *msg;
338
339         nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
340         inlen = le16_to_cpu(rc_in->rc_header.h_length) -
341                 sizeof(struct dlm_rcom);
342         outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
343
344         error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
345                                       &rc, &msg);
346         if (error)
347                 return;
348         rc->rc_id = rc_in->rc_id;
349         rc->rc_seq_reply = rc_in->rc_seq;
350
351         dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
352                               nodeid);
353         send_rcom_stateless(msg, rc);
354 }
355
356 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
357 {
358         struct dlm_rcom *rc;
359         struct dlm_mhandle *mh;
360         struct dlm_ls *ls = r->res_ls;
361         int error;
362
363         error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
364                             &rc, &mh);
365         if (error)
366                 goto out;
367         memcpy(rc->rc_buf, r->res_name, r->res_length);
368         rc->rc_id = cpu_to_le64(r->res_id);
369
370         send_rcom(mh, rc);
371  out:
372         return error;
373 }
374
375 static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
376 {
377         struct dlm_rcom *rc;
378         struct dlm_mhandle *mh;
379         int error, ret_nodeid, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
380         int len = le16_to_cpu(rc_in->rc_header.h_length) -
381                 sizeof(struct dlm_rcom);
382
383         /* Old code would send this special id to trigger a debug dump. */
384         if (rc_in->rc_id == cpu_to_le64(0xFFFFFFFF)) {
385                 log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
386                 dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
387                 return;
388         }
389
390         error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
391         if (error)
392                 return;
393
394         error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
395                                   DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
396         if (error)
397                 ret_nodeid = error;
398         rc->rc_result = cpu_to_le32(ret_nodeid);
399         rc->rc_id = rc_in->rc_id;
400         rc->rc_seq_reply = rc_in->rc_seq;
401
402         send_rcom(mh, rc);
403 }
404
405 static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
406 {
407         dlm_recover_master_reply(ls, rc_in);
408 }
409
410 static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
411                            struct rcom_lock *rl)
412 {
413         memset(rl, 0, sizeof(*rl));
414
415         rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
416         rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
417         rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
418         rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
419         rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
420         rl->rl_rqmode = lkb->lkb_rqmode;
421         rl->rl_grmode = lkb->lkb_grmode;
422         rl->rl_status = lkb->lkb_status;
423         rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
424
425         if (lkb->lkb_bastfn)
426                 rl->rl_asts |= DLM_CB_BAST;
427         if (lkb->lkb_astfn)
428                 rl->rl_asts |= DLM_CB_CAST;
429
430         rl->rl_namelen = cpu_to_le16(r->res_length);
431         memcpy(rl->rl_name, r->res_name, r->res_length);
432
433         /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
434            If so, receive_rcom_lock_args() won't take this copy. */
435
436         if (lkb->lkb_lvbptr)
437                 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
438 }
439
440 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
441 {
442         struct dlm_ls *ls = r->res_ls;
443         struct dlm_rcom *rc;
444         struct dlm_mhandle *mh;
445         struct rcom_lock *rl;
446         int error, len = sizeof(struct rcom_lock);
447
448         if (lkb->lkb_lvbptr)
449                 len += ls->ls_lvblen;
450
451         error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
452         if (error)
453                 goto out;
454
455         rl = (struct rcom_lock *) rc->rc_buf;
456         pack_rcom_lock(r, lkb, rl);
457         rc->rc_id = cpu_to_le64((uintptr_t)r);
458
459         send_rcom(mh, rc);
460  out:
461         return error;
462 }
463
464 /* needs at least dlm_rcom + rcom_lock */
465 static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
466 {
467         struct dlm_rcom *rc;
468         struct dlm_mhandle *mh;
469         int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid);
470
471         dlm_recover_master_copy(ls, rc_in);
472
473         error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
474                             sizeof(struct rcom_lock), &rc, &mh);
475         if (error)
476                 return;
477
478         /* We send back the same rcom_lock struct we received, but
479            dlm_recover_master_copy() has filled in rl_remid and rl_result */
480
481         memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
482         rc->rc_id = rc_in->rc_id;
483         rc->rc_seq_reply = rc_in->rc_seq;
484
485         send_rcom(mh, rc);
486 }
487
488 /* If the lockspace doesn't exist then still send a status message
489    back; it's possible that it just doesn't have its global_id yet. */
490
491 int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
492 {
493         struct dlm_rcom *rc;
494         struct rcom_config *rf;
495         struct dlm_mhandle *mh;
496         char *mb;
497         int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
498
499         mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb);
500         if (!mh)
501                 return -ENOBUFS;
502
503         rc = (struct dlm_rcom *) mb;
504
505         rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
506         rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace;
507         rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
508         rc->rc_header.h_length = cpu_to_le16(mb_len);
509         rc->rc_header.h_cmd = DLM_RCOM;
510
511         rc->rc_type = cpu_to_le32(DLM_RCOM_STATUS_REPLY);
512         rc->rc_id = rc_in->rc_id;
513         rc->rc_seq_reply = rc_in->rc_seq;
514         rc->rc_result = cpu_to_le32(-ESRCH);
515
516         rf = (struct rcom_config *) rc->rc_buf;
517         rf->rf_lvblen = cpu_to_le32(~0U);
518
519         dlm_midcomms_commit_mhandle(mh);
520
521         return 0;
522 }
523
524 /*
525  * Ignore messages for stage Y before we set
526  * recover_status bit for stage X:
527  *
528  * recover_status = 0
529  *
530  * dlm_recover_members()
531  * - send nothing
532  * - recv nothing
533  * - ignore NAMES, NAMES_REPLY
534  * - ignore LOOKUP, LOOKUP_REPLY
535  * - ignore LOCK, LOCK_REPLY
536  *
537  * recover_status |= NODES
538  *
539  * dlm_recover_members_wait()
540  *
541  * dlm_recover_directory()
542  * - send NAMES
543  * - recv NAMES_REPLY
544  * - ignore LOOKUP, LOOKUP_REPLY
545  * - ignore LOCK, LOCK_REPLY
546  *
547  * recover_status |= DIR
548  *
549  * dlm_recover_directory_wait()
550  *
551  * dlm_recover_masters()
552  * - send LOOKUP
553  * - recv LOOKUP_REPLY
554  *
555  * dlm_recover_locks()
556  * - send LOCKS
557  * - recv LOCKS_REPLY
558  *
559  * recover_status |= LOCKS
560  *
561  * dlm_recover_locks_wait()
562  *
563  * recover_status |= DONE
564  */
565
566 /* Called by dlm_recv; corresponds to dlm_receive_message() but special
567    recovery-only comms are sent through here. */
568
569 void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
570 {
571         int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
572         int stop, reply = 0, names = 0, lookup = 0, lock = 0;
573         uint32_t status;
574         uint64_t seq;
575
576         switch (rc->rc_type) {
577         case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
578                 reply = 1;
579                 break;
580         case cpu_to_le32(DLM_RCOM_NAMES):
581                 names = 1;
582                 break;
583         case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
584                 names = 1;
585                 reply = 1;
586                 break;
587         case cpu_to_le32(DLM_RCOM_LOOKUP):
588                 lookup = 1;
589                 break;
590         case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
591                 lookup = 1;
592                 reply = 1;
593                 break;
594         case cpu_to_le32(DLM_RCOM_LOCK):
595                 lock = 1;
596                 break;
597         case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
598                 lock = 1;
599                 reply = 1;
600                 break;
601         }
602
603         spin_lock(&ls->ls_recover_lock);
604         status = ls->ls_recover_status;
605         stop = dlm_recovery_stopped(ls);
606         seq = ls->ls_recover_seq;
607         spin_unlock(&ls->ls_recover_lock);
608
609         if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
610                 goto ignore;
611
612         if (reply && (le64_to_cpu(rc->rc_seq_reply) != seq))
613                 goto ignore;
614
615         if (!(status & DLM_RS_NODES) && (names || lookup || lock))
616                 goto ignore;
617
618         if (!(status & DLM_RS_DIR) && (lookup || lock))
619                 goto ignore;
620
621         switch (rc->rc_type) {
622         case cpu_to_le32(DLM_RCOM_STATUS):
623                 receive_rcom_status(ls, rc);
624                 break;
625
626         case cpu_to_le32(DLM_RCOM_NAMES):
627                 receive_rcom_names(ls, rc);
628                 break;
629
630         case cpu_to_le32(DLM_RCOM_LOOKUP):
631                 receive_rcom_lookup(ls, rc);
632                 break;
633
634         case cpu_to_le32(DLM_RCOM_LOCK):
635                 if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
636                         goto Eshort;
637                 receive_rcom_lock(ls, rc);
638                 break;
639
640         case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
641                 receive_sync_reply(ls, rc);
642                 break;
643
644         case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
645                 receive_sync_reply(ls, rc);
646                 break;
647
648         case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY):
649                 receive_rcom_lookup_reply(ls, rc);
650                 break;
651
652         case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
653                 if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
654                         goto Eshort;
655                 dlm_recover_process_copy(ls, rc);
656                 break;
657
658         default:
659                 log_error(ls, "receive_rcom bad type %d",
660                           le32_to_cpu(rc->rc_type));
661         }
662         return;
663
664 ignore:
665         log_limit(ls, "dlm_receive_rcom ignore msg %d "
666                   "from %d %llu %llu recover seq %llu sts %x gen %u",
667                    le32_to_cpu(rc->rc_type),
668                    nodeid,
669                    (unsigned long long)le64_to_cpu(rc->rc_seq),
670                    (unsigned long long)le64_to_cpu(rc->rc_seq_reply),
671                    (unsigned long long)seq,
672                    status, ls->ls_generation);
673         return;
674 Eshort:
675         log_error(ls, "recovery message %d from %d is too short",
676                   le32_to_cpu(rc->rc_type), nodeid);
677 }
678