Merge tag 'clk-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/clk/linux
[linux-2.6-microblaze.git] / fs / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n;
39         int rc = kstrtoint(buf, 0, &n);
40
41         if (rc)
42                 return rc;
43         ls = dlm_find_lockspace_local(ls->ls_local_handle);
44         if (!ls)
45                 return -EINVAL;
46
47         switch (n) {
48         case 0:
49                 dlm_ls_stop(ls);
50                 break;
51         case 1:
52                 dlm_ls_start(ls);
53                 break;
54         default:
55                 ret = -EINVAL;
56         }
57         dlm_put_lockspace(ls);
58         return ret;
59 }
60
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64
65         if (rc)
66                 return rc;
67         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68         wake_up(&ls->ls_uevent_wait);
69         return len;
70 }
71
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80
81         if (rc)
82                 return rc;
83         return len;
84 }
85
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93         int val;
94         int rc = kstrtoint(buf, 0, &val);
95
96         if (rc)
97                 return rc;
98         if (val == 1)
99                 set_bit(LSFL_NODIR, &ls->ls_flags);
100         return len;
101 }
102
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105         uint32_t status = dlm_recover_status(ls);
106         return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113
114 struct dlm_attr {
115         struct attribute attr;
116         ssize_t (*show)(struct dlm_ls *, char *);
117         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119
120 static struct dlm_attr dlm_attr_control = {
121         .attr  = {.name = "control", .mode = S_IWUSR},
122         .store = dlm_control_store
123 };
124
125 static struct dlm_attr dlm_attr_event = {
126         .attr  = {.name = "event_done", .mode = S_IWUSR},
127         .store = dlm_event_store
128 };
129
130 static struct dlm_attr dlm_attr_id = {
131         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132         .show  = dlm_id_show,
133         .store = dlm_id_store
134 };
135
136 static struct dlm_attr dlm_attr_nodir = {
137         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138         .show  = dlm_nodir_show,
139         .store = dlm_nodir_store
140 };
141
142 static struct dlm_attr dlm_attr_recover_status = {
143         .attr  = {.name = "recover_status", .mode = S_IRUGO},
144         .show  = dlm_recover_status_show
145 };
146
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149         .show  = dlm_recover_nodeid_show
150 };
151
152 static struct attribute *dlm_attrs[] = {
153         &dlm_attr_control.attr,
154         &dlm_attr_event.attr,
155         &dlm_attr_id.attr,
156         &dlm_attr_nodir.attr,
157         &dlm_attr_recover_status.attr,
158         &dlm_attr_recover_nodeid.attr,
159         NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162
163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164                              char *buf)
165 {
166         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168         return a->show ? a->show(ls, buf) : 0;
169 }
170
171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172                               const char *buf, size_t len)
173 {
174         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176         return a->store ? a->store(ls, buf, len) : len;
177 }
178
179 static void lockspace_kobj_release(struct kobject *k)
180 {
181         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182         kfree(ls);
183 }
184
185 static const struct sysfs_ops dlm_attr_ops = {
186         .show  = dlm_attr_show,
187         .store = dlm_attr_store,
188 };
189
190 static struct kobj_type dlm_ktype = {
191         .default_groups = dlm_groups,
192         .sysfs_ops     = &dlm_attr_ops,
193         .release       = lockspace_kobj_release,
194 };
195
196 static struct kset *dlm_kset;
197
198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200         if (in)
201                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202         else
203                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204
205         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206
207         /* dlm_controld will see the uevent, do the necessary group management
208            and then write to sysfs to wake us */
209
210         wait_event(ls->ls_uevent_wait,
211                    test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212
213         log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214
215         return ls->ls_uevent_result;
216 }
217
218 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
219                       struct kobj_uevent_env *env)
220 {
221         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222
223         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224         return 0;
225 }
226
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228         .uevent = dlm_uevent,
229 };
230
231 int __init dlm_lockspace_init(void)
232 {
233         ls_count = 0;
234         mutex_init(&ls_lock);
235         INIT_LIST_HEAD(&lslist);
236         spin_lock_init(&lslist_lock);
237
238         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239         if (!dlm_kset) {
240                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
241                 return -ENOMEM;
242         }
243         return 0;
244 }
245
246 void dlm_lockspace_exit(void)
247 {
248         kset_unregister(dlm_kset);
249 }
250
251 static struct dlm_ls *find_ls_to_scan(void)
252 {
253         struct dlm_ls *ls;
254
255         spin_lock(&lslist_lock);
256         list_for_each_entry(ls, &lslist, ls_list) {
257                 if (time_after_eq(jiffies, ls->ls_scan_time +
258                                             dlm_config.ci_scan_secs * HZ)) {
259                         spin_unlock(&lslist_lock);
260                         return ls;
261                 }
262         }
263         spin_unlock(&lslist_lock);
264         return NULL;
265 }
266
267 static int dlm_scand(void *data)
268 {
269         struct dlm_ls *ls;
270
271         while (!kthread_should_stop()) {
272                 ls = find_ls_to_scan();
273                 if (ls) {
274                         if (dlm_lock_recovery_try(ls)) {
275                                 ls->ls_scan_time = jiffies;
276                                 dlm_scan_rsbs(ls);
277                                 dlm_scan_timeout(ls);
278                                 dlm_scan_waiters(ls);
279                                 dlm_unlock_recovery(ls);
280                         } else {
281                                 ls->ls_scan_time += HZ;
282                         }
283                         continue;
284                 }
285                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
286         }
287         return 0;
288 }
289
290 static int dlm_scand_start(void)
291 {
292         struct task_struct *p;
293         int error = 0;
294
295         p = kthread_run(dlm_scand, NULL, "dlm_scand");
296         if (IS_ERR(p))
297                 error = PTR_ERR(p);
298         else
299                 scand_task = p;
300         return error;
301 }
302
303 static void dlm_scand_stop(void)
304 {
305         kthread_stop(scand_task);
306 }
307
308 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
309 {
310         struct dlm_ls *ls;
311
312         spin_lock(&lslist_lock);
313
314         list_for_each_entry(ls, &lslist, ls_list) {
315                 if (ls->ls_global_id == id) {
316                         ls->ls_count++;
317                         goto out;
318                 }
319         }
320         ls = NULL;
321  out:
322         spin_unlock(&lslist_lock);
323         return ls;
324 }
325
326 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
327 {
328         struct dlm_ls *ls;
329
330         spin_lock(&lslist_lock);
331         list_for_each_entry(ls, &lslist, ls_list) {
332                 if (ls->ls_local_handle == lockspace) {
333                         ls->ls_count++;
334                         goto out;
335                 }
336         }
337         ls = NULL;
338  out:
339         spin_unlock(&lslist_lock);
340         return ls;
341 }
342
343 struct dlm_ls *dlm_find_lockspace_device(int minor)
344 {
345         struct dlm_ls *ls;
346
347         spin_lock(&lslist_lock);
348         list_for_each_entry(ls, &lslist, ls_list) {
349                 if (ls->ls_device.minor == minor) {
350                         ls->ls_count++;
351                         goto out;
352                 }
353         }
354         ls = NULL;
355  out:
356         spin_unlock(&lslist_lock);
357         return ls;
358 }
359
360 void dlm_put_lockspace(struct dlm_ls *ls)
361 {
362         spin_lock(&lslist_lock);
363         ls->ls_count--;
364         spin_unlock(&lslist_lock);
365 }
366
367 static void remove_lockspace(struct dlm_ls *ls)
368 {
369         for (;;) {
370                 spin_lock(&lslist_lock);
371                 if (ls->ls_count == 0) {
372                         WARN_ON(ls->ls_create_count != 0);
373                         list_del(&ls->ls_list);
374                         spin_unlock(&lslist_lock);
375                         return;
376                 }
377                 spin_unlock(&lslist_lock);
378                 ssleep(1);
379         }
380 }
381
382 static int threads_start(void)
383 {
384         int error;
385
386         error = dlm_scand_start();
387         if (error) {
388                 log_print("cannot start dlm_scand thread %d", error);
389                 goto fail;
390         }
391
392         /* Thread for sending/receiving messages for all lockspace's */
393         error = dlm_lowcomms_start();
394         if (error) {
395                 log_print("cannot start dlm lowcomms %d", error);
396                 goto scand_fail;
397         }
398
399         return 0;
400
401  scand_fail:
402         dlm_scand_stop();
403  fail:
404         return error;
405 }
406
407 static void threads_stop(void)
408 {
409         dlm_scand_stop();
410         dlm_lowcomms_stop();
411 }
412
413 static int new_lockspace(const char *name, const char *cluster,
414                          uint32_t flags, int lvblen,
415                          const struct dlm_lockspace_ops *ops, void *ops_arg,
416                          int *ops_result, dlm_lockspace_t **lockspace)
417 {
418         struct dlm_ls *ls;
419         int i, size, error;
420         int do_unreg = 0;
421         int namelen = strlen(name);
422
423         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
424                 return -EINVAL;
425
426         if (!lvblen || (lvblen % 8))
427                 return -EINVAL;
428
429         if (!try_module_get(THIS_MODULE))
430                 return -EINVAL;
431
432         if (!dlm_user_daemon_available()) {
433                 log_print("dlm user daemon not available");
434                 error = -EUNATCH;
435                 goto out;
436         }
437
438         if (ops && ops_result) {
439                 if (!dlm_config.ci_recover_callbacks)
440                         *ops_result = -EOPNOTSUPP;
441                 else
442                         *ops_result = 0;
443         }
444
445         if (!cluster)
446                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
447                           dlm_config.ci_cluster_name);
448
449         if (dlm_config.ci_recover_callbacks && cluster &&
450             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
451                 log_print("dlm cluster name '%s' does not match "
452                           "the application cluster name '%s'",
453                           dlm_config.ci_cluster_name, cluster);
454                 error = -EBADR;
455                 goto out;
456         }
457
458         error = 0;
459
460         spin_lock(&lslist_lock);
461         list_for_each_entry(ls, &lslist, ls_list) {
462                 WARN_ON(ls->ls_create_count <= 0);
463                 if (ls->ls_namelen != namelen)
464                         continue;
465                 if (memcmp(ls->ls_name, name, namelen))
466                         continue;
467                 if (flags & DLM_LSFL_NEWEXCL) {
468                         error = -EEXIST;
469                         break;
470                 }
471                 ls->ls_create_count++;
472                 *lockspace = ls;
473                 error = 1;
474                 break;
475         }
476         spin_unlock(&lslist_lock);
477
478         if (error)
479                 goto out;
480
481         error = -ENOMEM;
482
483         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
484         if (!ls)
485                 goto out;
486         memcpy(ls->ls_name, name, namelen);
487         ls->ls_namelen = namelen;
488         ls->ls_lvblen = lvblen;
489         ls->ls_count = 0;
490         ls->ls_flags = 0;
491         ls->ls_scan_time = jiffies;
492
493         if (ops && dlm_config.ci_recover_callbacks) {
494                 ls->ls_ops = ops;
495                 ls->ls_ops_arg = ops_arg;
496         }
497
498         if (flags & DLM_LSFL_TIMEWARN)
499                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
500
501         /* ls_exflags are forced to match among nodes, and we don't
502            need to require all nodes to have some flags set */
503         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
504                                     DLM_LSFL_NEWEXCL));
505
506         size = dlm_config.ci_rsbtbl_size;
507         ls->ls_rsbtbl_size = size;
508
509         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
510         if (!ls->ls_rsbtbl)
511                 goto out_lsfree;
512         for (i = 0; i < size; i++) {
513                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
514                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
515                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
516         }
517
518         spin_lock_init(&ls->ls_remove_spin);
519
520         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
521                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
522                                                  GFP_KERNEL);
523                 if (!ls->ls_remove_names[i])
524                         goto out_rsbtbl;
525         }
526
527         idr_init(&ls->ls_lkbidr);
528         spin_lock_init(&ls->ls_lkbidr_spin);
529
530         INIT_LIST_HEAD(&ls->ls_waiters);
531         mutex_init(&ls->ls_waiters_mutex);
532         INIT_LIST_HEAD(&ls->ls_orphans);
533         mutex_init(&ls->ls_orphans_mutex);
534         INIT_LIST_HEAD(&ls->ls_timeout);
535         mutex_init(&ls->ls_timeout_mutex);
536
537         INIT_LIST_HEAD(&ls->ls_new_rsb);
538         spin_lock_init(&ls->ls_new_rsb_spin);
539
540         INIT_LIST_HEAD(&ls->ls_nodes);
541         INIT_LIST_HEAD(&ls->ls_nodes_gone);
542         ls->ls_num_nodes = 0;
543         ls->ls_low_nodeid = 0;
544         ls->ls_total_weight = 0;
545         ls->ls_node_array = NULL;
546
547         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
548         ls->ls_stub_rsb.res_ls = ls;
549
550         ls->ls_debug_rsb_dentry = NULL;
551         ls->ls_debug_waiters_dentry = NULL;
552
553         init_waitqueue_head(&ls->ls_uevent_wait);
554         ls->ls_uevent_result = 0;
555         init_completion(&ls->ls_members_done);
556         ls->ls_members_result = -1;
557
558         mutex_init(&ls->ls_cb_mutex);
559         INIT_LIST_HEAD(&ls->ls_cb_delay);
560
561         ls->ls_recoverd_task = NULL;
562         mutex_init(&ls->ls_recoverd_active);
563         spin_lock_init(&ls->ls_recover_lock);
564         spin_lock_init(&ls->ls_rcom_spin);
565         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
566         ls->ls_recover_status = 0;
567         ls->ls_recover_seq = 0;
568         ls->ls_recover_args = NULL;
569         init_rwsem(&ls->ls_in_recovery);
570         init_rwsem(&ls->ls_recv_active);
571         INIT_LIST_HEAD(&ls->ls_requestqueue);
572         mutex_init(&ls->ls_requestqueue_mutex);
573         mutex_init(&ls->ls_clear_proc_locks);
574
575         ls->ls_recover_buf = kmalloc(LOWCOMMS_MAX_TX_BUFFER_LEN, GFP_NOFS);
576         if (!ls->ls_recover_buf)
577                 goto out_lkbidr;
578
579         ls->ls_slot = 0;
580         ls->ls_num_slots = 0;
581         ls->ls_slots_size = 0;
582         ls->ls_slots = NULL;
583
584         INIT_LIST_HEAD(&ls->ls_recover_list);
585         spin_lock_init(&ls->ls_recover_list_lock);
586         idr_init(&ls->ls_recover_idr);
587         spin_lock_init(&ls->ls_recover_idr_lock);
588         ls->ls_recover_list_count = 0;
589         ls->ls_local_handle = ls;
590         init_waitqueue_head(&ls->ls_wait_general);
591         INIT_LIST_HEAD(&ls->ls_root_list);
592         init_rwsem(&ls->ls_root_sem);
593
594         spin_lock(&lslist_lock);
595         ls->ls_create_count = 1;
596         list_add(&ls->ls_list, &lslist);
597         spin_unlock(&lslist_lock);
598
599         if (flags & DLM_LSFL_FS) {
600                 error = dlm_callback_start(ls);
601                 if (error) {
602                         log_error(ls, "can't start dlm_callback %d", error);
603                         goto out_delist;
604                 }
605         }
606
607         init_waitqueue_head(&ls->ls_recover_lock_wait);
608
609         /*
610          * Once started, dlm_recoverd first looks for ls in lslist, then
611          * initializes ls_in_recovery as locked in "down" mode.  We need
612          * to wait for the wakeup from dlm_recoverd because in_recovery
613          * has to start out in down mode.
614          */
615
616         error = dlm_recoverd_start(ls);
617         if (error) {
618                 log_error(ls, "can't start dlm_recoverd %d", error);
619                 goto out_callback;
620         }
621
622         wait_event(ls->ls_recover_lock_wait,
623                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
624
625         /* let kobject handle freeing of ls if there's an error */
626         do_unreg = 1;
627
628         ls->ls_kobj.kset = dlm_kset;
629         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
630                                      "%s", ls->ls_name);
631         if (error)
632                 goto out_recoverd;
633         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
634
635         /* This uevent triggers dlm_controld in userspace to add us to the
636            group of nodes that are members of this lockspace (managed by the
637            cluster infrastructure.)  Once it's done that, it tells us who the
638            current lockspace members are (via configfs) and then tells the
639            lockspace to start running (via sysfs) in dlm_ls_start(). */
640
641         error = do_uevent(ls, 1);
642         if (error)
643                 goto out_recoverd;
644
645         wait_for_completion(&ls->ls_members_done);
646         error = ls->ls_members_result;
647         if (error)
648                 goto out_members;
649
650         dlm_create_debug_file(ls);
651
652         log_rinfo(ls, "join complete");
653         *lockspace = ls;
654         return 0;
655
656  out_members:
657         do_uevent(ls, 0);
658         dlm_clear_members(ls);
659         kfree(ls->ls_node_array);
660  out_recoverd:
661         dlm_recoverd_stop(ls);
662  out_callback:
663         dlm_callback_stop(ls);
664  out_delist:
665         spin_lock(&lslist_lock);
666         list_del(&ls->ls_list);
667         spin_unlock(&lslist_lock);
668         idr_destroy(&ls->ls_recover_idr);
669         kfree(ls->ls_recover_buf);
670  out_lkbidr:
671         idr_destroy(&ls->ls_lkbidr);
672  out_rsbtbl:
673         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
674                 kfree(ls->ls_remove_names[i]);
675         vfree(ls->ls_rsbtbl);
676  out_lsfree:
677         if (do_unreg)
678                 kobject_put(&ls->ls_kobj);
679         else
680                 kfree(ls);
681  out:
682         module_put(THIS_MODULE);
683         return error;
684 }
685
686 int dlm_new_lockspace(const char *name, const char *cluster,
687                       uint32_t flags, int lvblen,
688                       const struct dlm_lockspace_ops *ops, void *ops_arg,
689                       int *ops_result, dlm_lockspace_t **lockspace)
690 {
691         int error = 0;
692
693         mutex_lock(&ls_lock);
694         if (!ls_count)
695                 error = threads_start();
696         if (error)
697                 goto out;
698
699         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
700                               ops_result, lockspace);
701         if (!error)
702                 ls_count++;
703         if (error > 0)
704                 error = 0;
705         if (!ls_count)
706                 threads_stop();
707  out:
708         mutex_unlock(&ls_lock);
709         return error;
710 }
711
712 static int lkb_idr_is_local(int id, void *p, void *data)
713 {
714         struct dlm_lkb *lkb = p;
715
716         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
717 }
718
719 static int lkb_idr_is_any(int id, void *p, void *data)
720 {
721         return 1;
722 }
723
724 static int lkb_idr_free(int id, void *p, void *data)
725 {
726         struct dlm_lkb *lkb = p;
727
728         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
729                 dlm_free_lvb(lkb->lkb_lvbptr);
730
731         dlm_free_lkb(lkb);
732         return 0;
733 }
734
735 /* NOTE: We check the lkbidr here rather than the resource table.
736    This is because there may be LKBs queued as ASTs that have been unlinked
737    from their RSBs and are pending deletion once the AST has been delivered */
738
739 static int lockspace_busy(struct dlm_ls *ls, int force)
740 {
741         int rv;
742
743         spin_lock(&ls->ls_lkbidr_spin);
744         if (force == 0) {
745                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
746         } else if (force == 1) {
747                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
748         } else {
749                 rv = 0;
750         }
751         spin_unlock(&ls->ls_lkbidr_spin);
752         return rv;
753 }
754
755 static int release_lockspace(struct dlm_ls *ls, int force)
756 {
757         struct dlm_rsb *rsb;
758         struct rb_node *n;
759         int i, busy, rv;
760
761         busy = lockspace_busy(ls, force);
762
763         spin_lock(&lslist_lock);
764         if (ls->ls_create_count == 1) {
765                 if (busy) {
766                         rv = -EBUSY;
767                 } else {
768                         /* remove_lockspace takes ls off lslist */
769                         ls->ls_create_count = 0;
770                         rv = 0;
771                 }
772         } else if (ls->ls_create_count > 1) {
773                 rv = --ls->ls_create_count;
774         } else {
775                 rv = -EINVAL;
776         }
777         spin_unlock(&lslist_lock);
778
779         if (rv) {
780                 log_debug(ls, "release_lockspace no remove %d", rv);
781                 return rv;
782         }
783
784         dlm_device_deregister(ls);
785
786         if (force < 3 && dlm_user_daemon_available())
787                 do_uevent(ls, 0);
788
789         dlm_recoverd_stop(ls);
790
791         dlm_callback_stop(ls);
792
793         remove_lockspace(ls);
794
795         dlm_delete_debug_file(ls);
796
797         idr_destroy(&ls->ls_recover_idr);
798         kfree(ls->ls_recover_buf);
799
800         /*
801          * Free all lkb's in idr
802          */
803
804         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
805         idr_destroy(&ls->ls_lkbidr);
806
807         /*
808          * Free all rsb's on rsbtbl[] lists
809          */
810
811         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
812                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
813                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
814                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
815                         dlm_free_rsb(rsb);
816                 }
817
818                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
819                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
820                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
821                         dlm_free_rsb(rsb);
822                 }
823         }
824
825         vfree(ls->ls_rsbtbl);
826
827         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
828                 kfree(ls->ls_remove_names[i]);
829
830         while (!list_empty(&ls->ls_new_rsb)) {
831                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
832                                        res_hashchain);
833                 list_del(&rsb->res_hashchain);
834                 dlm_free_rsb(rsb);
835         }
836
837         /*
838          * Free structures on any other lists
839          */
840
841         dlm_purge_requestqueue(ls);
842         kfree(ls->ls_recover_args);
843         dlm_clear_members(ls);
844         dlm_clear_members_gone(ls);
845         kfree(ls->ls_node_array);
846         log_rinfo(ls, "release_lockspace final free");
847         kobject_put(&ls->ls_kobj);
848         /* The ls structure will be freed when the kobject is done with */
849
850         module_put(THIS_MODULE);
851         return 0;
852 }
853
854 /*
855  * Called when a system has released all its locks and is not going to use the
856  * lockspace any longer.  We free everything we're managing for this lockspace.
857  * Remaining nodes will go through the recovery process as if we'd died.  The
858  * lockspace must continue to function as usual, participating in recoveries,
859  * until this returns.
860  *
861  * Force has 4 possible values:
862  * 0 - don't destroy locksapce if it has any LKBs
863  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
864  * 2 - destroy lockspace regardless of LKBs
865  * 3 - destroy lockspace as part of a forced shutdown
866  */
867
868 int dlm_release_lockspace(void *lockspace, int force)
869 {
870         struct dlm_ls *ls;
871         int error;
872
873         ls = dlm_find_lockspace_local(lockspace);
874         if (!ls)
875                 return -EINVAL;
876         dlm_put_lockspace(ls);
877
878         mutex_lock(&ls_lock);
879         error = release_lockspace(ls, force);
880         if (!error)
881                 ls_count--;
882         if (!ls_count)
883                 threads_stop();
884         mutex_unlock(&ls_lock);
885
886         return error;
887 }
888
889 void dlm_stop_lockspaces(void)
890 {
891         struct dlm_ls *ls;
892         int count;
893
894  restart:
895         count = 0;
896         spin_lock(&lslist_lock);
897         list_for_each_entry(ls, &lslist, ls_list) {
898                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
899                         count++;
900                         continue;
901                 }
902                 spin_unlock(&lslist_lock);
903                 log_error(ls, "no userland control daemon, stopping lockspace");
904                 dlm_ls_stop(ls);
905                 goto restart;
906         }
907         spin_unlock(&lslist_lock);
908
909         if (count)
910                 log_print("dlm user daemon left %d lockspaces", count);
911 }
912