Merge branch 'next-general' of git://git.kernel.org/pub/scm/linux/kernel/git/jmorris...
[linux-2.6-microblaze.git] / fs / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n;
39         int rc = kstrtoint(buf, 0, &n);
40
41         if (rc)
42                 return rc;
43         ls = dlm_find_lockspace_local(ls->ls_local_handle);
44         if (!ls)
45                 return -EINVAL;
46
47         switch (n) {
48         case 0:
49                 dlm_ls_stop(ls);
50                 break;
51         case 1:
52                 dlm_ls_start(ls);
53                 break;
54         default:
55                 ret = -EINVAL;
56         }
57         dlm_put_lockspace(ls);
58         return ret;
59 }
60
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64
65         if (rc)
66                 return rc;
67         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68         wake_up(&ls->ls_uevent_wait);
69         return len;
70 }
71
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80
81         if (rc)
82                 return rc;
83         return len;
84 }
85
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93         int val;
94         int rc = kstrtoint(buf, 0, &val);
95
96         if (rc)
97                 return rc;
98         if (val == 1)
99                 set_bit(LSFL_NODIR, &ls->ls_flags);
100         return len;
101 }
102
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105         uint32_t status = dlm_recover_status(ls);
106         return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113
114 struct dlm_attr {
115         struct attribute attr;
116         ssize_t (*show)(struct dlm_ls *, char *);
117         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119
120 static struct dlm_attr dlm_attr_control = {
121         .attr  = {.name = "control", .mode = S_IWUSR},
122         .store = dlm_control_store
123 };
124
125 static struct dlm_attr dlm_attr_event = {
126         .attr  = {.name = "event_done", .mode = S_IWUSR},
127         .store = dlm_event_store
128 };
129
130 static struct dlm_attr dlm_attr_id = {
131         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132         .show  = dlm_id_show,
133         .store = dlm_id_store
134 };
135
136 static struct dlm_attr dlm_attr_nodir = {
137         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138         .show  = dlm_nodir_show,
139         .store = dlm_nodir_store
140 };
141
142 static struct dlm_attr dlm_attr_recover_status = {
143         .attr  = {.name = "recover_status", .mode = S_IRUGO},
144         .show  = dlm_recover_status_show
145 };
146
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149         .show  = dlm_recover_nodeid_show
150 };
151
152 static struct attribute *dlm_attrs[] = {
153         &dlm_attr_control.attr,
154         &dlm_attr_event.attr,
155         &dlm_attr_id.attr,
156         &dlm_attr_nodir.attr,
157         &dlm_attr_recover_status.attr,
158         &dlm_attr_recover_nodeid.attr,
159         NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162
163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164                              char *buf)
165 {
166         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168         return a->show ? a->show(ls, buf) : 0;
169 }
170
171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172                               const char *buf, size_t len)
173 {
174         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176         return a->store ? a->store(ls, buf, len) : len;
177 }
178
179 static void lockspace_kobj_release(struct kobject *k)
180 {
181         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182         kfree(ls);
183 }
184
185 static const struct sysfs_ops dlm_attr_ops = {
186         .show  = dlm_attr_show,
187         .store = dlm_attr_store,
188 };
189
190 static struct kobj_type dlm_ktype = {
191         .default_groups = dlm_groups,
192         .sysfs_ops     = &dlm_attr_ops,
193         .release       = lockspace_kobj_release,
194 };
195
196 static struct kset *dlm_kset;
197
198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200         int error;
201
202         if (in)
203                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
204         else
205                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
206
207         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
208
209         /* dlm_controld will see the uevent, do the necessary group management
210            and then write to sysfs to wake us */
211
212         error = wait_event_interruptible(ls->ls_uevent_wait,
213                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
214
215         log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
216
217         if (error)
218                 goto out;
219
220         error = ls->ls_uevent_result;
221  out:
222         if (error)
223                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
224                           error, ls->ls_uevent_result);
225         return error;
226 }
227
228 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
229                       struct kobj_uevent_env *env)
230 {
231         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
232
233         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
234         return 0;
235 }
236
237 static const struct kset_uevent_ops dlm_uevent_ops = {
238         .uevent = dlm_uevent,
239 };
240
241 int __init dlm_lockspace_init(void)
242 {
243         ls_count = 0;
244         mutex_init(&ls_lock);
245         INIT_LIST_HEAD(&lslist);
246         spin_lock_init(&lslist_lock);
247
248         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
249         if (!dlm_kset) {
250                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
251                 return -ENOMEM;
252         }
253         return 0;
254 }
255
256 void dlm_lockspace_exit(void)
257 {
258         kset_unregister(dlm_kset);
259 }
260
261 static struct dlm_ls *find_ls_to_scan(void)
262 {
263         struct dlm_ls *ls;
264
265         spin_lock(&lslist_lock);
266         list_for_each_entry(ls, &lslist, ls_list) {
267                 if (time_after_eq(jiffies, ls->ls_scan_time +
268                                             dlm_config.ci_scan_secs * HZ)) {
269                         spin_unlock(&lslist_lock);
270                         return ls;
271                 }
272         }
273         spin_unlock(&lslist_lock);
274         return NULL;
275 }
276
277 static int dlm_scand(void *data)
278 {
279         struct dlm_ls *ls;
280
281         while (!kthread_should_stop()) {
282                 ls = find_ls_to_scan();
283                 if (ls) {
284                         if (dlm_lock_recovery_try(ls)) {
285                                 ls->ls_scan_time = jiffies;
286                                 dlm_scan_rsbs(ls);
287                                 dlm_scan_timeout(ls);
288                                 dlm_scan_waiters(ls);
289                                 dlm_unlock_recovery(ls);
290                         } else {
291                                 ls->ls_scan_time += HZ;
292                         }
293                         continue;
294                 }
295                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
296         }
297         return 0;
298 }
299
300 static int dlm_scand_start(void)
301 {
302         struct task_struct *p;
303         int error = 0;
304
305         p = kthread_run(dlm_scand, NULL, "dlm_scand");
306         if (IS_ERR(p))
307                 error = PTR_ERR(p);
308         else
309                 scand_task = p;
310         return error;
311 }
312
313 static void dlm_scand_stop(void)
314 {
315         kthread_stop(scand_task);
316 }
317
318 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
319 {
320         struct dlm_ls *ls;
321
322         spin_lock(&lslist_lock);
323
324         list_for_each_entry(ls, &lslist, ls_list) {
325                 if (ls->ls_global_id == id) {
326                         ls->ls_count++;
327                         goto out;
328                 }
329         }
330         ls = NULL;
331  out:
332         spin_unlock(&lslist_lock);
333         return ls;
334 }
335
336 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
337 {
338         struct dlm_ls *ls;
339
340         spin_lock(&lslist_lock);
341         list_for_each_entry(ls, &lslist, ls_list) {
342                 if (ls->ls_local_handle == lockspace) {
343                         ls->ls_count++;
344                         goto out;
345                 }
346         }
347         ls = NULL;
348  out:
349         spin_unlock(&lslist_lock);
350         return ls;
351 }
352
353 struct dlm_ls *dlm_find_lockspace_device(int minor)
354 {
355         struct dlm_ls *ls;
356
357         spin_lock(&lslist_lock);
358         list_for_each_entry(ls, &lslist, ls_list) {
359                 if (ls->ls_device.minor == minor) {
360                         ls->ls_count++;
361                         goto out;
362                 }
363         }
364         ls = NULL;
365  out:
366         spin_unlock(&lslist_lock);
367         return ls;
368 }
369
370 void dlm_put_lockspace(struct dlm_ls *ls)
371 {
372         spin_lock(&lslist_lock);
373         ls->ls_count--;
374         spin_unlock(&lslist_lock);
375 }
376
377 static void remove_lockspace(struct dlm_ls *ls)
378 {
379         for (;;) {
380                 spin_lock(&lslist_lock);
381                 if (ls->ls_count == 0) {
382                         WARN_ON(ls->ls_create_count != 0);
383                         list_del(&ls->ls_list);
384                         spin_unlock(&lslist_lock);
385                         return;
386                 }
387                 spin_unlock(&lslist_lock);
388                 ssleep(1);
389         }
390 }
391
392 static int threads_start(void)
393 {
394         int error;
395
396         error = dlm_scand_start();
397         if (error) {
398                 log_print("cannot start dlm_scand thread %d", error);
399                 goto fail;
400         }
401
402         /* Thread for sending/receiving messages for all lockspace's */
403         error = dlm_lowcomms_start();
404         if (error) {
405                 log_print("cannot start dlm lowcomms %d", error);
406                 goto scand_fail;
407         }
408
409         return 0;
410
411  scand_fail:
412         dlm_scand_stop();
413  fail:
414         return error;
415 }
416
417 static void threads_stop(void)
418 {
419         dlm_scand_stop();
420         dlm_lowcomms_stop();
421 }
422
423 static int new_lockspace(const char *name, const char *cluster,
424                          uint32_t flags, int lvblen,
425                          const struct dlm_lockspace_ops *ops, void *ops_arg,
426                          int *ops_result, dlm_lockspace_t **lockspace)
427 {
428         struct dlm_ls *ls;
429         int i, size, error;
430         int do_unreg = 0;
431         int namelen = strlen(name);
432
433         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
434                 return -EINVAL;
435
436         if (!lvblen || (lvblen % 8))
437                 return -EINVAL;
438
439         if (!try_module_get(THIS_MODULE))
440                 return -EINVAL;
441
442         if (!dlm_user_daemon_available()) {
443                 log_print("dlm user daemon not available");
444                 error = -EUNATCH;
445                 goto out;
446         }
447
448         if (ops && ops_result) {
449                 if (!dlm_config.ci_recover_callbacks)
450                         *ops_result = -EOPNOTSUPP;
451                 else
452                         *ops_result = 0;
453         }
454
455         if (!cluster)
456                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
457                           dlm_config.ci_cluster_name);
458
459         if (dlm_config.ci_recover_callbacks && cluster &&
460             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
461                 log_print("dlm cluster name '%s' does not match "
462                           "the application cluster name '%s'",
463                           dlm_config.ci_cluster_name, cluster);
464                 error = -EBADR;
465                 goto out;
466         }
467
468         error = 0;
469
470         spin_lock(&lslist_lock);
471         list_for_each_entry(ls, &lslist, ls_list) {
472                 WARN_ON(ls->ls_create_count <= 0);
473                 if (ls->ls_namelen != namelen)
474                         continue;
475                 if (memcmp(ls->ls_name, name, namelen))
476                         continue;
477                 if (flags & DLM_LSFL_NEWEXCL) {
478                         error = -EEXIST;
479                         break;
480                 }
481                 ls->ls_create_count++;
482                 *lockspace = ls;
483                 error = 1;
484                 break;
485         }
486         spin_unlock(&lslist_lock);
487
488         if (error)
489                 goto out;
490
491         error = -ENOMEM;
492
493         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
494         if (!ls)
495                 goto out;
496         memcpy(ls->ls_name, name, namelen);
497         ls->ls_namelen = namelen;
498         ls->ls_lvblen = lvblen;
499         ls->ls_count = 0;
500         ls->ls_flags = 0;
501         ls->ls_scan_time = jiffies;
502
503         if (ops && dlm_config.ci_recover_callbacks) {
504                 ls->ls_ops = ops;
505                 ls->ls_ops_arg = ops_arg;
506         }
507
508         if (flags & DLM_LSFL_TIMEWARN)
509                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
510
511         /* ls_exflags are forced to match among nodes, and we don't
512            need to require all nodes to have some flags set */
513         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
514                                     DLM_LSFL_NEWEXCL));
515
516         size = dlm_config.ci_rsbtbl_size;
517         ls->ls_rsbtbl_size = size;
518
519         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
520         if (!ls->ls_rsbtbl)
521                 goto out_lsfree;
522         for (i = 0; i < size; i++) {
523                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
524                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
525                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
526         }
527
528         spin_lock_init(&ls->ls_remove_spin);
529
530         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
531                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
532                                                  GFP_KERNEL);
533                 if (!ls->ls_remove_names[i])
534                         goto out_rsbtbl;
535         }
536
537         idr_init(&ls->ls_lkbidr);
538         spin_lock_init(&ls->ls_lkbidr_spin);
539
540         INIT_LIST_HEAD(&ls->ls_waiters);
541         mutex_init(&ls->ls_waiters_mutex);
542         INIT_LIST_HEAD(&ls->ls_orphans);
543         mutex_init(&ls->ls_orphans_mutex);
544         INIT_LIST_HEAD(&ls->ls_timeout);
545         mutex_init(&ls->ls_timeout_mutex);
546
547         INIT_LIST_HEAD(&ls->ls_new_rsb);
548         spin_lock_init(&ls->ls_new_rsb_spin);
549
550         INIT_LIST_HEAD(&ls->ls_nodes);
551         INIT_LIST_HEAD(&ls->ls_nodes_gone);
552         ls->ls_num_nodes = 0;
553         ls->ls_low_nodeid = 0;
554         ls->ls_total_weight = 0;
555         ls->ls_node_array = NULL;
556
557         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
558         ls->ls_stub_rsb.res_ls = ls;
559
560         ls->ls_debug_rsb_dentry = NULL;
561         ls->ls_debug_waiters_dentry = NULL;
562
563         init_waitqueue_head(&ls->ls_uevent_wait);
564         ls->ls_uevent_result = 0;
565         init_completion(&ls->ls_members_done);
566         ls->ls_members_result = -1;
567
568         mutex_init(&ls->ls_cb_mutex);
569         INIT_LIST_HEAD(&ls->ls_cb_delay);
570
571         ls->ls_recoverd_task = NULL;
572         mutex_init(&ls->ls_recoverd_active);
573         spin_lock_init(&ls->ls_recover_lock);
574         spin_lock_init(&ls->ls_rcom_spin);
575         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
576         ls->ls_recover_status = 0;
577         ls->ls_recover_seq = 0;
578         ls->ls_recover_args = NULL;
579         init_rwsem(&ls->ls_in_recovery);
580         init_rwsem(&ls->ls_recv_active);
581         INIT_LIST_HEAD(&ls->ls_requestqueue);
582         mutex_init(&ls->ls_requestqueue_mutex);
583         mutex_init(&ls->ls_clear_proc_locks);
584
585         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
586         if (!ls->ls_recover_buf)
587                 goto out_lkbidr;
588
589         ls->ls_slot = 0;
590         ls->ls_num_slots = 0;
591         ls->ls_slots_size = 0;
592         ls->ls_slots = NULL;
593
594         INIT_LIST_HEAD(&ls->ls_recover_list);
595         spin_lock_init(&ls->ls_recover_list_lock);
596         idr_init(&ls->ls_recover_idr);
597         spin_lock_init(&ls->ls_recover_idr_lock);
598         ls->ls_recover_list_count = 0;
599         ls->ls_local_handle = ls;
600         init_waitqueue_head(&ls->ls_wait_general);
601         INIT_LIST_HEAD(&ls->ls_root_list);
602         init_rwsem(&ls->ls_root_sem);
603
604         spin_lock(&lslist_lock);
605         ls->ls_create_count = 1;
606         list_add(&ls->ls_list, &lslist);
607         spin_unlock(&lslist_lock);
608
609         if (flags & DLM_LSFL_FS) {
610                 error = dlm_callback_start(ls);
611                 if (error) {
612                         log_error(ls, "can't start dlm_callback %d", error);
613                         goto out_delist;
614                 }
615         }
616
617         init_waitqueue_head(&ls->ls_recover_lock_wait);
618
619         /*
620          * Once started, dlm_recoverd first looks for ls in lslist, then
621          * initializes ls_in_recovery as locked in "down" mode.  We need
622          * to wait for the wakeup from dlm_recoverd because in_recovery
623          * has to start out in down mode.
624          */
625
626         error = dlm_recoverd_start(ls);
627         if (error) {
628                 log_error(ls, "can't start dlm_recoverd %d", error);
629                 goto out_callback;
630         }
631
632         wait_event(ls->ls_recover_lock_wait,
633                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
634
635         ls->ls_kobj.kset = dlm_kset;
636         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
637                                      "%s", ls->ls_name);
638         if (error)
639                 goto out_recoverd;
640         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
641
642         /* let kobject handle freeing of ls if there's an error */
643         do_unreg = 1;
644
645         /* This uevent triggers dlm_controld in userspace to add us to the
646            group of nodes that are members of this lockspace (managed by the
647            cluster infrastructure.)  Once it's done that, it tells us who the
648            current lockspace members are (via configfs) and then tells the
649            lockspace to start running (via sysfs) in dlm_ls_start(). */
650
651         error = do_uevent(ls, 1);
652         if (error)
653                 goto out_recoverd;
654
655         wait_for_completion(&ls->ls_members_done);
656         error = ls->ls_members_result;
657         if (error)
658                 goto out_members;
659
660         dlm_create_debug_file(ls);
661
662         log_rinfo(ls, "join complete");
663         *lockspace = ls;
664         return 0;
665
666  out_members:
667         do_uevent(ls, 0);
668         dlm_clear_members(ls);
669         kfree(ls->ls_node_array);
670  out_recoverd:
671         dlm_recoverd_stop(ls);
672  out_callback:
673         dlm_callback_stop(ls);
674  out_delist:
675         spin_lock(&lslist_lock);
676         list_del(&ls->ls_list);
677         spin_unlock(&lslist_lock);
678         idr_destroy(&ls->ls_recover_idr);
679         kfree(ls->ls_recover_buf);
680  out_lkbidr:
681         idr_destroy(&ls->ls_lkbidr);
682  out_rsbtbl:
683         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
684                 kfree(ls->ls_remove_names[i]);
685         vfree(ls->ls_rsbtbl);
686  out_lsfree:
687         if (do_unreg)
688                 kobject_put(&ls->ls_kobj);
689         else
690                 kfree(ls);
691  out:
692         module_put(THIS_MODULE);
693         return error;
694 }
695
696 int dlm_new_lockspace(const char *name, const char *cluster,
697                       uint32_t flags, int lvblen,
698                       const struct dlm_lockspace_ops *ops, void *ops_arg,
699                       int *ops_result, dlm_lockspace_t **lockspace)
700 {
701         int error = 0;
702
703         mutex_lock(&ls_lock);
704         if (!ls_count)
705                 error = threads_start();
706         if (error)
707                 goto out;
708
709         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
710                               ops_result, lockspace);
711         if (!error)
712                 ls_count++;
713         if (error > 0)
714                 error = 0;
715         if (!ls_count)
716                 threads_stop();
717  out:
718         mutex_unlock(&ls_lock);
719         return error;
720 }
721
722 static int lkb_idr_is_local(int id, void *p, void *data)
723 {
724         struct dlm_lkb *lkb = p;
725
726         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
727 }
728
729 static int lkb_idr_is_any(int id, void *p, void *data)
730 {
731         return 1;
732 }
733
734 static int lkb_idr_free(int id, void *p, void *data)
735 {
736         struct dlm_lkb *lkb = p;
737
738         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
739                 dlm_free_lvb(lkb->lkb_lvbptr);
740
741         dlm_free_lkb(lkb);
742         return 0;
743 }
744
745 /* NOTE: We check the lkbidr here rather than the resource table.
746    This is because there may be LKBs queued as ASTs that have been unlinked
747    from their RSBs and are pending deletion once the AST has been delivered */
748
749 static int lockspace_busy(struct dlm_ls *ls, int force)
750 {
751         int rv;
752
753         spin_lock(&ls->ls_lkbidr_spin);
754         if (force == 0) {
755                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
756         } else if (force == 1) {
757                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
758         } else {
759                 rv = 0;
760         }
761         spin_unlock(&ls->ls_lkbidr_spin);
762         return rv;
763 }
764
765 static int release_lockspace(struct dlm_ls *ls, int force)
766 {
767         struct dlm_rsb *rsb;
768         struct rb_node *n;
769         int i, busy, rv;
770
771         busy = lockspace_busy(ls, force);
772
773         spin_lock(&lslist_lock);
774         if (ls->ls_create_count == 1) {
775                 if (busy) {
776                         rv = -EBUSY;
777                 } else {
778                         /* remove_lockspace takes ls off lslist */
779                         ls->ls_create_count = 0;
780                         rv = 0;
781                 }
782         } else if (ls->ls_create_count > 1) {
783                 rv = --ls->ls_create_count;
784         } else {
785                 rv = -EINVAL;
786         }
787         spin_unlock(&lslist_lock);
788
789         if (rv) {
790                 log_debug(ls, "release_lockspace no remove %d", rv);
791                 return rv;
792         }
793
794         dlm_device_deregister(ls);
795
796         if (force < 3 && dlm_user_daemon_available())
797                 do_uevent(ls, 0);
798
799         dlm_recoverd_stop(ls);
800
801         dlm_callback_stop(ls);
802
803         remove_lockspace(ls);
804
805         dlm_delete_debug_file(ls);
806
807         idr_destroy(&ls->ls_recover_idr);
808         kfree(ls->ls_recover_buf);
809
810         /*
811          * Free all lkb's in idr
812          */
813
814         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
815         idr_destroy(&ls->ls_lkbidr);
816
817         /*
818          * Free all rsb's on rsbtbl[] lists
819          */
820
821         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
822                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
823                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
824                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
825                         dlm_free_rsb(rsb);
826                 }
827
828                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
829                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
830                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
831                         dlm_free_rsb(rsb);
832                 }
833         }
834
835         vfree(ls->ls_rsbtbl);
836
837         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
838                 kfree(ls->ls_remove_names[i]);
839
840         while (!list_empty(&ls->ls_new_rsb)) {
841                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
842                                        res_hashchain);
843                 list_del(&rsb->res_hashchain);
844                 dlm_free_rsb(rsb);
845         }
846
847         /*
848          * Free structures on any other lists
849          */
850
851         dlm_purge_requestqueue(ls);
852         kfree(ls->ls_recover_args);
853         dlm_clear_members(ls);
854         dlm_clear_members_gone(ls);
855         kfree(ls->ls_node_array);
856         log_rinfo(ls, "release_lockspace final free");
857         kobject_put(&ls->ls_kobj);
858         /* The ls structure will be freed when the kobject is done with */
859
860         module_put(THIS_MODULE);
861         return 0;
862 }
863
864 /*
865  * Called when a system has released all its locks and is not going to use the
866  * lockspace any longer.  We free everything we're managing for this lockspace.
867  * Remaining nodes will go through the recovery process as if we'd died.  The
868  * lockspace must continue to function as usual, participating in recoveries,
869  * until this returns.
870  *
871  * Force has 4 possible values:
872  * 0 - don't destroy locksapce if it has any LKBs
873  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
874  * 2 - destroy lockspace regardless of LKBs
875  * 3 - destroy lockspace as part of a forced shutdown
876  */
877
878 int dlm_release_lockspace(void *lockspace, int force)
879 {
880         struct dlm_ls *ls;
881         int error;
882
883         ls = dlm_find_lockspace_local(lockspace);
884         if (!ls)
885                 return -EINVAL;
886         dlm_put_lockspace(ls);
887
888         mutex_lock(&ls_lock);
889         error = release_lockspace(ls, force);
890         if (!error)
891                 ls_count--;
892         if (!ls_count)
893                 threads_stop();
894         mutex_unlock(&ls_lock);
895
896         return error;
897 }
898
899 void dlm_stop_lockspaces(void)
900 {
901         struct dlm_ls *ls;
902         int count;
903
904  restart:
905         count = 0;
906         spin_lock(&lslist_lock);
907         list_for_each_entry(ls, &lslist, ls_list) {
908                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
909                         count++;
910                         continue;
911                 }
912                 spin_unlock(&lslist_lock);
913                 log_error(ls, "no userland control daemon, stopping lockspace");
914                 dlm_ls_stop(ls);
915                 goto restart;
916         }
917         spin_unlock(&lslist_lock);
918
919         if (count)
920                 log_print("dlm user daemon left %d lockspaces", count);
921 }
922