Merge tag 'input-for-v5.19-rc0' of git://git.kernel.org/pub/scm/linux/kernel/git...
[linux-2.6-microblaze.git] / fs / dlm / dir.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include "dlm_internal.h"
13 #include "lockspace.h"
14 #include "member.h"
15 #include "lowcomms.h"
16 #include "rcom.h"
17 #include "config.h"
18 #include "memory.h"
19 #include "recover.h"
20 #include "util.h"
21 #include "lock.h"
22 #include "dir.h"
23
24 /*
25  * We use the upper 16 bits of the hash value to select the directory node.
26  * Low bits are used for distribution of rsb's among hash buckets on each node.
27  *
28  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
29  * num_nodes to the hash value.  This value in the desired range is used as an
30  * offset into the sorted list of nodeid's to give the particular nodeid.
31  */
32
33 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
34 {
35         uint32_t node;
36
37         if (ls->ls_num_nodes == 1)
38                 return dlm_our_nodeid();
39         else {
40                 node = (hash >> 16) % ls->ls_total_weight;
41                 return ls->ls_node_array[node];
42         }
43 }
44
45 int dlm_dir_nodeid(struct dlm_rsb *r)
46 {
47         return r->res_dir_nodeid;
48 }
49
50 void dlm_recover_dir_nodeid(struct dlm_ls *ls)
51 {
52         struct dlm_rsb *r;
53
54         down_read(&ls->ls_root_sem);
55         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
56                 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
57         }
58         up_read(&ls->ls_root_sem);
59 }
60
61 int dlm_recover_directory(struct dlm_ls *ls)
62 {
63         struct dlm_member *memb;
64         char *b, *last_name = NULL;
65         int error = -ENOMEM, last_len, nodeid, result;
66         uint16_t namelen;
67         unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
68
69         log_rinfo(ls, "dlm_recover_directory");
70
71         if (dlm_no_directory(ls))
72                 goto out_status;
73
74         last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
75         if (!last_name)
76                 goto out;
77
78         list_for_each_entry(memb, &ls->ls_nodes, list) {
79                 if (memb->nodeid == dlm_our_nodeid())
80                         continue;
81
82                 memset(last_name, 0, DLM_RESNAME_MAXLEN);
83                 last_len = 0;
84
85                 for (;;) {
86                         int left;
87                         if (dlm_recovery_stopped(ls)) {
88                                 error = -EINTR;
89                                 goto out_free;
90                         }
91
92                         error = dlm_rcom_names(ls, memb->nodeid,
93                                                last_name, last_len);
94                         if (error)
95                                 goto out_free;
96
97                         cond_resched();
98
99                         /*
100                          * pick namelen/name pairs out of received buffer
101                          */
102
103                         b = ls->ls_recover_buf->rc_buf;
104                         left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length);
105                         left -= sizeof(struct dlm_rcom);
106
107                         for (;;) {
108                                 __be16 v;
109
110                                 error = -EINVAL;
111                                 if (left < sizeof(__be16))
112                                         goto out_free;
113
114                                 memcpy(&v, b, sizeof(__be16));
115                                 namelen = be16_to_cpu(v);
116                                 b += sizeof(__be16);
117                                 left -= sizeof(__be16);
118
119                                 /* namelen of 0xFFFFF marks end of names for
120                                    this node; namelen of 0 marks end of the
121                                    buffer */
122
123                                 if (namelen == 0xFFFF)
124                                         goto done;
125                                 if (!namelen)
126                                         break;
127
128                                 if (namelen > left)
129                                         goto out_free;
130
131                                 if (namelen > DLM_RESNAME_MAXLEN)
132                                         goto out_free;
133
134                                 error = dlm_master_lookup(ls, memb->nodeid,
135                                                           b, namelen,
136                                                           DLM_LU_RECOVER_DIR,
137                                                           &nodeid, &result);
138                                 if (error) {
139                                         log_error(ls, "recover_dir lookup %d",
140                                                   error);
141                                         goto out_free;
142                                 }
143
144                                 /* The name was found in rsbtbl, but the
145                                  * master nodeid is different from
146                                  * memb->nodeid which says it is the master.
147                                  * This should not happen. */
148
149                                 if (result == DLM_LU_MATCH &&
150                                     nodeid != memb->nodeid) {
151                                         count_bad++;
152                                         log_error(ls, "recover_dir lookup %d "
153                                                   "nodeid %d memb %d bad %u",
154                                                   result, nodeid, memb->nodeid,
155                                                   count_bad);
156                                         print_hex_dump_bytes("dlm_recover_dir ",
157                                                              DUMP_PREFIX_NONE,
158                                                              b, namelen);
159                                 }
160
161                                 /* The name was found in rsbtbl, and the
162                                  * master nodeid matches memb->nodeid. */
163
164                                 if (result == DLM_LU_MATCH &&
165                                     nodeid == memb->nodeid) {
166                                         count_match++;
167                                 }
168
169                                 /* The name was not found in rsbtbl and was
170                                  * added with memb->nodeid as the master. */
171
172                                 if (result == DLM_LU_ADD) {
173                                         count_add++;
174                                 }
175
176                                 last_len = namelen;
177                                 memcpy(last_name, b, namelen);
178                                 b += namelen;
179                                 left -= namelen;
180                                 count++;
181                         }
182                 }
183          done:
184                 ;
185         }
186
187  out_status:
188         error = 0;
189         dlm_set_recover_status(ls, DLM_RS_DIR);
190
191         log_rinfo(ls, "dlm_recover_directory %u in %u new",
192                   count, count_add);
193  out_free:
194         kfree(last_name);
195  out:
196         return error;
197 }
198
199 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
200 {
201         struct dlm_rsb *r;
202         uint32_t hash, bucket;
203         int rv;
204
205         hash = jhash(name, len, 0);
206         bucket = hash & (ls->ls_rsbtbl_size - 1);
207
208         spin_lock(&ls->ls_rsbtbl[bucket].lock);
209         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
210         if (rv)
211                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
212                                          name, len, &r);
213         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
214
215         if (!rv)
216                 return r;
217
218         down_read(&ls->ls_root_sem);
219         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
220                 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
221                         up_read(&ls->ls_root_sem);
222                         log_debug(ls, "find_rsb_root revert to root_list %s",
223                                   r->res_name);
224                         return r;
225                 }
226         }
227         up_read(&ls->ls_root_sem);
228         return NULL;
229 }
230
231 /* Find the rsb where we left off (or start again), then send rsb names
232    for rsb's we're master of and whose directory node matches the requesting
233    node.  inbuf is the rsb name last sent, inlen is the name's length */
234
235 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
236                            char *outbuf, int outlen, int nodeid)
237 {
238         struct list_head *list;
239         struct dlm_rsb *r;
240         int offset = 0, dir_nodeid;
241         __be16 be_namelen;
242
243         down_read(&ls->ls_root_sem);
244
245         if (inlen > 1) {
246                 r = find_rsb_root(ls, inbuf, inlen);
247                 if (!r) {
248                         inbuf[inlen - 1] = '\0';
249                         log_error(ls, "copy_master_names from %d start %d %s",
250                                   nodeid, inlen, inbuf);
251                         goto out;
252                 }
253                 list = r->res_root_list.next;
254         } else {
255                 list = ls->ls_root_list.next;
256         }
257
258         for (offset = 0; list != &ls->ls_root_list; list = list->next) {
259                 r = list_entry(list, struct dlm_rsb, res_root_list);
260                 if (r->res_nodeid)
261                         continue;
262
263                 dir_nodeid = dlm_dir_nodeid(r);
264                 if (dir_nodeid != nodeid)
265                         continue;
266
267                 /*
268                  * The block ends when we can't fit the following in the
269                  * remaining buffer space:
270                  * namelen (uint16_t) +
271                  * name (r->res_length) +
272                  * end-of-block record 0x0000 (uint16_t)
273                  */
274
275                 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
276                         /* Write end-of-block record */
277                         be_namelen = cpu_to_be16(0);
278                         memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
279                         offset += sizeof(__be16);
280                         ls->ls_recover_dir_sent_msg++;
281                         goto out;
282                 }
283
284                 be_namelen = cpu_to_be16(r->res_length);
285                 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
286                 offset += sizeof(__be16);
287                 memcpy(outbuf + offset, r->res_name, r->res_length);
288                 offset += r->res_length;
289                 ls->ls_recover_dir_sent_res++;
290         }
291
292         /*
293          * If we've reached the end of the list (and there's room) write a
294          * terminating record.
295          */
296
297         if ((list == &ls->ls_root_list) &&
298             (offset + sizeof(uint16_t) <= outlen)) {
299                 be_namelen = cpu_to_be16(0xFFFF);
300                 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
301                 offset += sizeof(__be16);
302                 ls->ls_recover_dir_sent_msg++;
303         }
304  out:
305         up_read(&ls->ls_root_sem);
306 }
307