Merge tag 'asm-generic-unaligned-5.14' of git://git.kernel.org/pub/scm/linux/kernel...
[linux-2.6-microblaze.git] / fs / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27 #include "ast.h"
28
29 static int                      ls_count;
30 static struct mutex             ls_lock;
31 static struct list_head         lslist;
32 static spinlock_t               lslist_lock;
33 static struct task_struct *     scand_task;
34
35
36 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
37 {
38         ssize_t ret = len;
39         int n;
40         int rc = kstrtoint(buf, 0, &n);
41
42         if (rc)
43                 return rc;
44         ls = dlm_find_lockspace_local(ls->ls_local_handle);
45         if (!ls)
46                 return -EINVAL;
47
48         switch (n) {
49         case 0:
50                 dlm_ls_stop(ls);
51                 break;
52         case 1:
53                 dlm_ls_start(ls);
54                 break;
55         default:
56                 ret = -EINVAL;
57         }
58         dlm_put_lockspace(ls);
59         return ret;
60 }
61
62 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
63 {
64         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
65
66         if (rc)
67                 return rc;
68         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69         wake_up(&ls->ls_uevent_wait);
70         return len;
71 }
72
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
81
82         if (rc)
83                 return rc;
84         return len;
85 }
86
87 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
88 {
89         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
90 }
91
92 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
93 {
94         int val;
95         int rc = kstrtoint(buf, 0, &val);
96
97         if (rc)
98                 return rc;
99         if (val == 1)
100                 set_bit(LSFL_NODIR, &ls->ls_flags);
101         return len;
102 }
103
104 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
105 {
106         uint32_t status = dlm_recover_status(ls);
107         return snprintf(buf, PAGE_SIZE, "%x\n", status);
108 }
109
110 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
111 {
112         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
113 }
114
115 struct dlm_attr {
116         struct attribute attr;
117         ssize_t (*show)(struct dlm_ls *, char *);
118         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
119 };
120
121 static struct dlm_attr dlm_attr_control = {
122         .attr  = {.name = "control", .mode = S_IWUSR},
123         .store = dlm_control_store
124 };
125
126 static struct dlm_attr dlm_attr_event = {
127         .attr  = {.name = "event_done", .mode = S_IWUSR},
128         .store = dlm_event_store
129 };
130
131 static struct dlm_attr dlm_attr_id = {
132         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
133         .show  = dlm_id_show,
134         .store = dlm_id_store
135 };
136
137 static struct dlm_attr dlm_attr_nodir = {
138         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
139         .show  = dlm_nodir_show,
140         .store = dlm_nodir_store
141 };
142
143 static struct dlm_attr dlm_attr_recover_status = {
144         .attr  = {.name = "recover_status", .mode = S_IRUGO},
145         .show  = dlm_recover_status_show
146 };
147
148 static struct dlm_attr dlm_attr_recover_nodeid = {
149         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
150         .show  = dlm_recover_nodeid_show
151 };
152
153 static struct attribute *dlm_attrs[] = {
154         &dlm_attr_control.attr,
155         &dlm_attr_event.attr,
156         &dlm_attr_id.attr,
157         &dlm_attr_nodir.attr,
158         &dlm_attr_recover_status.attr,
159         &dlm_attr_recover_nodeid.attr,
160         NULL,
161 };
162 ATTRIBUTE_GROUPS(dlm);
163
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165                              char *buf)
166 {
167         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169         return a->show ? a->show(ls, buf) : 0;
170 }
171
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173                               const char *buf, size_t len)
174 {
175         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177         return a->store ? a->store(ls, buf, len) : len;
178 }
179
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183         kfree(ls);
184 }
185
186 static const struct sysfs_ops dlm_attr_ops = {
187         .show  = dlm_attr_show,
188         .store = dlm_attr_store,
189 };
190
191 static struct kobj_type dlm_ktype = {
192         .default_groups = dlm_groups,
193         .sysfs_ops     = &dlm_attr_ops,
194         .release       = lockspace_kobj_release,
195 };
196
197 static struct kset *dlm_kset;
198
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201         if (in)
202                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203         else
204                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205
206         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207
208         /* dlm_controld will see the uevent, do the necessary group management
209            and then write to sysfs to wake us */
210
211         wait_event(ls->ls_uevent_wait,
212                    test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213
214         log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
215
216         return ls->ls_uevent_result;
217 }
218
219 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
220                       struct kobj_uevent_env *env)
221 {
222         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
223
224         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
225         return 0;
226 }
227
228 static const struct kset_uevent_ops dlm_uevent_ops = {
229         .uevent = dlm_uevent,
230 };
231
232 int __init dlm_lockspace_init(void)
233 {
234         ls_count = 0;
235         mutex_init(&ls_lock);
236         INIT_LIST_HEAD(&lslist);
237         spin_lock_init(&lslist_lock);
238
239         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
240         if (!dlm_kset) {
241                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
242                 return -ENOMEM;
243         }
244         return 0;
245 }
246
247 void dlm_lockspace_exit(void)
248 {
249         kset_unregister(dlm_kset);
250 }
251
252 static struct dlm_ls *find_ls_to_scan(void)
253 {
254         struct dlm_ls *ls;
255
256         spin_lock(&lslist_lock);
257         list_for_each_entry(ls, &lslist, ls_list) {
258                 if (time_after_eq(jiffies, ls->ls_scan_time +
259                                             dlm_config.ci_scan_secs * HZ)) {
260                         spin_unlock(&lslist_lock);
261                         return ls;
262                 }
263         }
264         spin_unlock(&lslist_lock);
265         return NULL;
266 }
267
268 static int dlm_scand(void *data)
269 {
270         struct dlm_ls *ls;
271
272         while (!kthread_should_stop()) {
273                 ls = find_ls_to_scan();
274                 if (ls) {
275                         if (dlm_lock_recovery_try(ls)) {
276                                 ls->ls_scan_time = jiffies;
277                                 dlm_scan_rsbs(ls);
278                                 dlm_scan_timeout(ls);
279                                 dlm_scan_waiters(ls);
280                                 dlm_unlock_recovery(ls);
281                         } else {
282                                 ls->ls_scan_time += HZ;
283                         }
284                         continue;
285                 }
286                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
287         }
288         return 0;
289 }
290
291 static int dlm_scand_start(void)
292 {
293         struct task_struct *p;
294         int error = 0;
295
296         p = kthread_run(dlm_scand, NULL, "dlm_scand");
297         if (IS_ERR(p))
298                 error = PTR_ERR(p);
299         else
300                 scand_task = p;
301         return error;
302 }
303
304 static void dlm_scand_stop(void)
305 {
306         kthread_stop(scand_task);
307 }
308
309 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
310 {
311         struct dlm_ls *ls;
312
313         spin_lock(&lslist_lock);
314
315         list_for_each_entry(ls, &lslist, ls_list) {
316                 if (ls->ls_global_id == id) {
317                         ls->ls_count++;
318                         goto out;
319                 }
320         }
321         ls = NULL;
322  out:
323         spin_unlock(&lslist_lock);
324         return ls;
325 }
326
327 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
328 {
329         struct dlm_ls *ls;
330
331         spin_lock(&lslist_lock);
332         list_for_each_entry(ls, &lslist, ls_list) {
333                 if (ls->ls_local_handle == lockspace) {
334                         ls->ls_count++;
335                         goto out;
336                 }
337         }
338         ls = NULL;
339  out:
340         spin_unlock(&lslist_lock);
341         return ls;
342 }
343
344 struct dlm_ls *dlm_find_lockspace_device(int minor)
345 {
346         struct dlm_ls *ls;
347
348         spin_lock(&lslist_lock);
349         list_for_each_entry(ls, &lslist, ls_list) {
350                 if (ls->ls_device.minor == minor) {
351                         ls->ls_count++;
352                         goto out;
353                 }
354         }
355         ls = NULL;
356  out:
357         spin_unlock(&lslist_lock);
358         return ls;
359 }
360
361 void dlm_put_lockspace(struct dlm_ls *ls)
362 {
363         spin_lock(&lslist_lock);
364         ls->ls_count--;
365         spin_unlock(&lslist_lock);
366 }
367
368 static void remove_lockspace(struct dlm_ls *ls)
369 {
370         for (;;) {
371                 spin_lock(&lslist_lock);
372                 if (ls->ls_count == 0) {
373                         WARN_ON(ls->ls_create_count != 0);
374                         list_del(&ls->ls_list);
375                         spin_unlock(&lslist_lock);
376                         return;
377                 }
378                 spin_unlock(&lslist_lock);
379                 ssleep(1);
380         }
381 }
382
383 static int threads_start(void)
384 {
385         int error;
386
387         error = dlm_scand_start();
388         if (error) {
389                 log_print("cannot start dlm_scand thread %d", error);
390                 goto fail;
391         }
392
393         /* Thread for sending/receiving messages for all lockspace's */
394         error = dlm_midcomms_start();
395         if (error) {
396                 log_print("cannot start dlm lowcomms %d", error);
397                 goto scand_fail;
398         }
399
400         return 0;
401
402  scand_fail:
403         dlm_scand_stop();
404  fail:
405         return error;
406 }
407
408 static int new_lockspace(const char *name, const char *cluster,
409                          uint32_t flags, int lvblen,
410                          const struct dlm_lockspace_ops *ops, void *ops_arg,
411                          int *ops_result, dlm_lockspace_t **lockspace)
412 {
413         struct dlm_ls *ls;
414         int i, size, error;
415         int do_unreg = 0;
416         int namelen = strlen(name);
417
418         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
419                 return -EINVAL;
420
421         if (!lvblen || (lvblen % 8))
422                 return -EINVAL;
423
424         if (!try_module_get(THIS_MODULE))
425                 return -EINVAL;
426
427         if (!dlm_user_daemon_available()) {
428                 log_print("dlm user daemon not available");
429                 error = -EUNATCH;
430                 goto out;
431         }
432
433         if (ops && ops_result) {
434                 if (!dlm_config.ci_recover_callbacks)
435                         *ops_result = -EOPNOTSUPP;
436                 else
437                         *ops_result = 0;
438         }
439
440         if (!cluster)
441                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
442                           dlm_config.ci_cluster_name);
443
444         if (dlm_config.ci_recover_callbacks && cluster &&
445             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
446                 log_print("dlm cluster name '%s' does not match "
447                           "the application cluster name '%s'",
448                           dlm_config.ci_cluster_name, cluster);
449                 error = -EBADR;
450                 goto out;
451         }
452
453         error = 0;
454
455         spin_lock(&lslist_lock);
456         list_for_each_entry(ls, &lslist, ls_list) {
457                 WARN_ON(ls->ls_create_count <= 0);
458                 if (ls->ls_namelen != namelen)
459                         continue;
460                 if (memcmp(ls->ls_name, name, namelen))
461                         continue;
462                 if (flags & DLM_LSFL_NEWEXCL) {
463                         error = -EEXIST;
464                         break;
465                 }
466                 ls->ls_create_count++;
467                 *lockspace = ls;
468                 error = 1;
469                 break;
470         }
471         spin_unlock(&lslist_lock);
472
473         if (error)
474                 goto out;
475
476         error = -ENOMEM;
477
478         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
479         if (!ls)
480                 goto out;
481         memcpy(ls->ls_name, name, namelen);
482         ls->ls_namelen = namelen;
483         ls->ls_lvblen = lvblen;
484         ls->ls_count = 0;
485         ls->ls_flags = 0;
486         ls->ls_scan_time = jiffies;
487
488         if (ops && dlm_config.ci_recover_callbacks) {
489                 ls->ls_ops = ops;
490                 ls->ls_ops_arg = ops_arg;
491         }
492
493         if (flags & DLM_LSFL_TIMEWARN)
494                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
495
496         /* ls_exflags are forced to match among nodes, and we don't
497            need to require all nodes to have some flags set */
498         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
499                                     DLM_LSFL_NEWEXCL));
500
501         size = dlm_config.ci_rsbtbl_size;
502         ls->ls_rsbtbl_size = size;
503
504         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
505         if (!ls->ls_rsbtbl)
506                 goto out_lsfree;
507         for (i = 0; i < size; i++) {
508                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
509                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
510                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
511         }
512
513         spin_lock_init(&ls->ls_remove_spin);
514
515         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
516                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
517                                                  GFP_KERNEL);
518                 if (!ls->ls_remove_names[i])
519                         goto out_rsbtbl;
520         }
521
522         idr_init(&ls->ls_lkbidr);
523         spin_lock_init(&ls->ls_lkbidr_spin);
524
525         INIT_LIST_HEAD(&ls->ls_waiters);
526         mutex_init(&ls->ls_waiters_mutex);
527         INIT_LIST_HEAD(&ls->ls_orphans);
528         mutex_init(&ls->ls_orphans_mutex);
529         INIT_LIST_HEAD(&ls->ls_timeout);
530         mutex_init(&ls->ls_timeout_mutex);
531
532         INIT_LIST_HEAD(&ls->ls_new_rsb);
533         spin_lock_init(&ls->ls_new_rsb_spin);
534
535         INIT_LIST_HEAD(&ls->ls_nodes);
536         INIT_LIST_HEAD(&ls->ls_nodes_gone);
537         ls->ls_num_nodes = 0;
538         ls->ls_low_nodeid = 0;
539         ls->ls_total_weight = 0;
540         ls->ls_node_array = NULL;
541
542         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
543         ls->ls_stub_rsb.res_ls = ls;
544
545         ls->ls_debug_rsb_dentry = NULL;
546         ls->ls_debug_waiters_dentry = NULL;
547
548         init_waitqueue_head(&ls->ls_uevent_wait);
549         ls->ls_uevent_result = 0;
550         init_completion(&ls->ls_members_done);
551         ls->ls_members_result = -1;
552
553         mutex_init(&ls->ls_cb_mutex);
554         INIT_LIST_HEAD(&ls->ls_cb_delay);
555
556         ls->ls_recoverd_task = NULL;
557         mutex_init(&ls->ls_recoverd_active);
558         spin_lock_init(&ls->ls_recover_lock);
559         spin_lock_init(&ls->ls_rcom_spin);
560         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
561         ls->ls_recover_status = 0;
562         ls->ls_recover_seq = 0;
563         ls->ls_recover_args = NULL;
564         init_rwsem(&ls->ls_in_recovery);
565         init_rwsem(&ls->ls_recv_active);
566         INIT_LIST_HEAD(&ls->ls_requestqueue);
567         mutex_init(&ls->ls_requestqueue_mutex);
568         mutex_init(&ls->ls_clear_proc_locks);
569
570         /* Due backwards compatibility with 3.1 we need to use maximum
571          * possible dlm message size to be sure the message will fit and
572          * not having out of bounds issues. However on sending side 3.2
573          * might send less.
574          */
575         ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
576         if (!ls->ls_recover_buf)
577                 goto out_lkbidr;
578
579         ls->ls_slot = 0;
580         ls->ls_num_slots = 0;
581         ls->ls_slots_size = 0;
582         ls->ls_slots = NULL;
583
584         INIT_LIST_HEAD(&ls->ls_recover_list);
585         spin_lock_init(&ls->ls_recover_list_lock);
586         idr_init(&ls->ls_recover_idr);
587         spin_lock_init(&ls->ls_recover_idr_lock);
588         ls->ls_recover_list_count = 0;
589         ls->ls_local_handle = ls;
590         init_waitqueue_head(&ls->ls_wait_general);
591         INIT_LIST_HEAD(&ls->ls_root_list);
592         init_rwsem(&ls->ls_root_sem);
593
594         spin_lock(&lslist_lock);
595         ls->ls_create_count = 1;
596         list_add(&ls->ls_list, &lslist);
597         spin_unlock(&lslist_lock);
598
599         if (flags & DLM_LSFL_FS) {
600                 error = dlm_callback_start(ls);
601                 if (error) {
602                         log_error(ls, "can't start dlm_callback %d", error);
603                         goto out_delist;
604                 }
605         }
606
607         init_waitqueue_head(&ls->ls_recover_lock_wait);
608
609         /*
610          * Once started, dlm_recoverd first looks for ls in lslist, then
611          * initializes ls_in_recovery as locked in "down" mode.  We need
612          * to wait for the wakeup from dlm_recoverd because in_recovery
613          * has to start out in down mode.
614          */
615
616         error = dlm_recoverd_start(ls);
617         if (error) {
618                 log_error(ls, "can't start dlm_recoverd %d", error);
619                 goto out_callback;
620         }
621
622         wait_event(ls->ls_recover_lock_wait,
623                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
624
625         /* let kobject handle freeing of ls if there's an error */
626         do_unreg = 1;
627
628         ls->ls_kobj.kset = dlm_kset;
629         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
630                                      "%s", ls->ls_name);
631         if (error)
632                 goto out_recoverd;
633         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
634
635         /* This uevent triggers dlm_controld in userspace to add us to the
636            group of nodes that are members of this lockspace (managed by the
637            cluster infrastructure.)  Once it's done that, it tells us who the
638            current lockspace members are (via configfs) and then tells the
639            lockspace to start running (via sysfs) in dlm_ls_start(). */
640
641         error = do_uevent(ls, 1);
642         if (error)
643                 goto out_recoverd;
644
645         wait_for_completion(&ls->ls_members_done);
646         error = ls->ls_members_result;
647         if (error)
648                 goto out_members;
649
650         dlm_create_debug_file(ls);
651
652         log_rinfo(ls, "join complete");
653         *lockspace = ls;
654         return 0;
655
656  out_members:
657         do_uevent(ls, 0);
658         dlm_clear_members(ls);
659         kfree(ls->ls_node_array);
660  out_recoverd:
661         dlm_recoverd_stop(ls);
662  out_callback:
663         dlm_callback_stop(ls);
664  out_delist:
665         spin_lock(&lslist_lock);
666         list_del(&ls->ls_list);
667         spin_unlock(&lslist_lock);
668         idr_destroy(&ls->ls_recover_idr);
669         kfree(ls->ls_recover_buf);
670  out_lkbidr:
671         idr_destroy(&ls->ls_lkbidr);
672  out_rsbtbl:
673         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
674                 kfree(ls->ls_remove_names[i]);
675         vfree(ls->ls_rsbtbl);
676  out_lsfree:
677         if (do_unreg)
678                 kobject_put(&ls->ls_kobj);
679         else
680                 kfree(ls);
681  out:
682         module_put(THIS_MODULE);
683         return error;
684 }
685
686 int dlm_new_lockspace(const char *name, const char *cluster,
687                       uint32_t flags, int lvblen,
688                       const struct dlm_lockspace_ops *ops, void *ops_arg,
689                       int *ops_result, dlm_lockspace_t **lockspace)
690 {
691         int error = 0;
692
693         mutex_lock(&ls_lock);
694         if (!ls_count)
695                 error = threads_start();
696         if (error)
697                 goto out;
698
699         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
700                               ops_result, lockspace);
701         if (!error)
702                 ls_count++;
703         if (error > 0)
704                 error = 0;
705         if (!ls_count) {
706                 dlm_scand_stop();
707                 dlm_midcomms_shutdown();
708                 dlm_lowcomms_stop();
709         }
710  out:
711         mutex_unlock(&ls_lock);
712         return error;
713 }
714
715 static int lkb_idr_is_local(int id, void *p, void *data)
716 {
717         struct dlm_lkb *lkb = p;
718
719         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
720 }
721
722 static int lkb_idr_is_any(int id, void *p, void *data)
723 {
724         return 1;
725 }
726
727 static int lkb_idr_free(int id, void *p, void *data)
728 {
729         struct dlm_lkb *lkb = p;
730
731         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
732                 dlm_free_lvb(lkb->lkb_lvbptr);
733
734         dlm_free_lkb(lkb);
735         return 0;
736 }
737
738 /* NOTE: We check the lkbidr here rather than the resource table.
739    This is because there may be LKBs queued as ASTs that have been unlinked
740    from their RSBs and are pending deletion once the AST has been delivered */
741
742 static int lockspace_busy(struct dlm_ls *ls, int force)
743 {
744         int rv;
745
746         spin_lock(&ls->ls_lkbidr_spin);
747         if (force == 0) {
748                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
749         } else if (force == 1) {
750                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
751         } else {
752                 rv = 0;
753         }
754         spin_unlock(&ls->ls_lkbidr_spin);
755         return rv;
756 }
757
758 static int release_lockspace(struct dlm_ls *ls, int force)
759 {
760         struct dlm_rsb *rsb;
761         struct rb_node *n;
762         int i, busy, rv;
763
764         busy = lockspace_busy(ls, force);
765
766         spin_lock(&lslist_lock);
767         if (ls->ls_create_count == 1) {
768                 if (busy) {
769                         rv = -EBUSY;
770                 } else {
771                         /* remove_lockspace takes ls off lslist */
772                         ls->ls_create_count = 0;
773                         rv = 0;
774                 }
775         } else if (ls->ls_create_count > 1) {
776                 rv = --ls->ls_create_count;
777         } else {
778                 rv = -EINVAL;
779         }
780         spin_unlock(&lslist_lock);
781
782         if (rv) {
783                 log_debug(ls, "release_lockspace no remove %d", rv);
784                 return rv;
785         }
786
787         dlm_device_deregister(ls);
788
789         if (force < 3 && dlm_user_daemon_available())
790                 do_uevent(ls, 0);
791
792         dlm_recoverd_stop(ls);
793
794         if (ls_count == 1) {
795                 dlm_scand_stop();
796                 dlm_midcomms_shutdown();
797         }
798
799         dlm_callback_stop(ls);
800
801         remove_lockspace(ls);
802
803         dlm_delete_debug_file(ls);
804
805         idr_destroy(&ls->ls_recover_idr);
806         kfree(ls->ls_recover_buf);
807
808         /*
809          * Free all lkb's in idr
810          */
811
812         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
813         idr_destroy(&ls->ls_lkbidr);
814
815         /*
816          * Free all rsb's on rsbtbl[] lists
817          */
818
819         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
820                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
821                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
822                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
823                         dlm_free_rsb(rsb);
824                 }
825
826                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
827                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
828                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
829                         dlm_free_rsb(rsb);
830                 }
831         }
832
833         vfree(ls->ls_rsbtbl);
834
835         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
836                 kfree(ls->ls_remove_names[i]);
837
838         while (!list_empty(&ls->ls_new_rsb)) {
839                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
840                                        res_hashchain);
841                 list_del(&rsb->res_hashchain);
842                 dlm_free_rsb(rsb);
843         }
844
845         /*
846          * Free structures on any other lists
847          */
848
849         dlm_purge_requestqueue(ls);
850         kfree(ls->ls_recover_args);
851         dlm_clear_members(ls);
852         dlm_clear_members_gone(ls);
853         kfree(ls->ls_node_array);
854         log_rinfo(ls, "release_lockspace final free");
855         kobject_put(&ls->ls_kobj);
856         /* The ls structure will be freed when the kobject is done with */
857
858         module_put(THIS_MODULE);
859         return 0;
860 }
861
862 /*
863  * Called when a system has released all its locks and is not going to use the
864  * lockspace any longer.  We free everything we're managing for this lockspace.
865  * Remaining nodes will go through the recovery process as if we'd died.  The
866  * lockspace must continue to function as usual, participating in recoveries,
867  * until this returns.
868  *
869  * Force has 4 possible values:
870  * 0 - don't destroy locksapce if it has any LKBs
871  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
872  * 2 - destroy lockspace regardless of LKBs
873  * 3 - destroy lockspace as part of a forced shutdown
874  */
875
876 int dlm_release_lockspace(void *lockspace, int force)
877 {
878         struct dlm_ls *ls;
879         int error;
880
881         ls = dlm_find_lockspace_local(lockspace);
882         if (!ls)
883                 return -EINVAL;
884         dlm_put_lockspace(ls);
885
886         mutex_lock(&ls_lock);
887         error = release_lockspace(ls, force);
888         if (!error)
889                 ls_count--;
890         if (!ls_count)
891                 dlm_lowcomms_stop();
892         mutex_unlock(&ls_lock);
893
894         return error;
895 }
896
897 void dlm_stop_lockspaces(void)
898 {
899         struct dlm_ls *ls;
900         int count;
901
902  restart:
903         count = 0;
904         spin_lock(&lslist_lock);
905         list_for_each_entry(ls, &lslist, ls_list) {
906                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
907                         count++;
908                         continue;
909                 }
910                 spin_unlock(&lslist_lock);
911                 log_error(ls, "no userland control daemon, stopping lockspace");
912                 dlm_ls_stop(ls);
913                 goto restart;
914         }
915         spin_unlock(&lslist_lock);
916
917         if (count)
918                 log_print("dlm user daemon left %d lockspaces", count);
919 }
920