Merge tag 's390-5.13-2' of git://git.kernel.org/pub/scm/linux/kernel/git/s390/linux
[linux-2.6-microblaze.git] / fs / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n;
39         int rc = kstrtoint(buf, 0, &n);
40
41         if (rc)
42                 return rc;
43         ls = dlm_find_lockspace_local(ls->ls_local_handle);
44         if (!ls)
45                 return -EINVAL;
46
47         switch (n) {
48         case 0:
49                 dlm_ls_stop(ls);
50                 break;
51         case 1:
52                 dlm_ls_start(ls);
53                 break;
54         default:
55                 ret = -EINVAL;
56         }
57         dlm_put_lockspace(ls);
58         return ret;
59 }
60
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64
65         if (rc)
66                 return rc;
67         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68         wake_up(&ls->ls_uevent_wait);
69         return len;
70 }
71
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80
81         if (rc)
82                 return rc;
83         return len;
84 }
85
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93         int val;
94         int rc = kstrtoint(buf, 0, &val);
95
96         if (rc)
97                 return rc;
98         if (val == 1)
99                 set_bit(LSFL_NODIR, &ls->ls_flags);
100         return len;
101 }
102
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105         uint32_t status = dlm_recover_status(ls);
106         return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113
114 struct dlm_attr {
115         struct attribute attr;
116         ssize_t (*show)(struct dlm_ls *, char *);
117         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119
120 static struct dlm_attr dlm_attr_control = {
121         .attr  = {.name = "control", .mode = S_IWUSR},
122         .store = dlm_control_store
123 };
124
125 static struct dlm_attr dlm_attr_event = {
126         .attr  = {.name = "event_done", .mode = S_IWUSR},
127         .store = dlm_event_store
128 };
129
130 static struct dlm_attr dlm_attr_id = {
131         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132         .show  = dlm_id_show,
133         .store = dlm_id_store
134 };
135
136 static struct dlm_attr dlm_attr_nodir = {
137         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138         .show  = dlm_nodir_show,
139         .store = dlm_nodir_store
140 };
141
142 static struct dlm_attr dlm_attr_recover_status = {
143         .attr  = {.name = "recover_status", .mode = S_IRUGO},
144         .show  = dlm_recover_status_show
145 };
146
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149         .show  = dlm_recover_nodeid_show
150 };
151
152 static struct attribute *dlm_attrs[] = {
153         &dlm_attr_control.attr,
154         &dlm_attr_event.attr,
155         &dlm_attr_id.attr,
156         &dlm_attr_nodir.attr,
157         &dlm_attr_recover_status.attr,
158         &dlm_attr_recover_nodeid.attr,
159         NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162
163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164                              char *buf)
165 {
166         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168         return a->show ? a->show(ls, buf) : 0;
169 }
170
171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172                               const char *buf, size_t len)
173 {
174         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176         return a->store ? a->store(ls, buf, len) : len;
177 }
178
179 static void lockspace_kobj_release(struct kobject *k)
180 {
181         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182         kfree(ls);
183 }
184
185 static const struct sysfs_ops dlm_attr_ops = {
186         .show  = dlm_attr_show,
187         .store = dlm_attr_store,
188 };
189
190 static struct kobj_type dlm_ktype = {
191         .default_groups = dlm_groups,
192         .sysfs_ops     = &dlm_attr_ops,
193         .release       = lockspace_kobj_release,
194 };
195
196 static struct kset *dlm_kset;
197
198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200         if (in)
201                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202         else
203                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204
205         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206
207         /* dlm_controld will see the uevent, do the necessary group management
208            and then write to sysfs to wake us */
209
210         wait_event(ls->ls_uevent_wait,
211                    test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212
213         log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214
215         return ls->ls_uevent_result;
216 }
217
218 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
219                       struct kobj_uevent_env *env)
220 {
221         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222
223         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224         return 0;
225 }
226
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228         .uevent = dlm_uevent,
229 };
230
231 int __init dlm_lockspace_init(void)
232 {
233         ls_count = 0;
234         mutex_init(&ls_lock);
235         INIT_LIST_HEAD(&lslist);
236         spin_lock_init(&lslist_lock);
237
238         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239         if (!dlm_kset) {
240                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
241                 return -ENOMEM;
242         }
243         return 0;
244 }
245
246 void dlm_lockspace_exit(void)
247 {
248         kset_unregister(dlm_kset);
249 }
250
251 static struct dlm_ls *find_ls_to_scan(void)
252 {
253         struct dlm_ls *ls;
254
255         spin_lock(&lslist_lock);
256         list_for_each_entry(ls, &lslist, ls_list) {
257                 if (time_after_eq(jiffies, ls->ls_scan_time +
258                                             dlm_config.ci_scan_secs * HZ)) {
259                         spin_unlock(&lslist_lock);
260                         return ls;
261                 }
262         }
263         spin_unlock(&lslist_lock);
264         return NULL;
265 }
266
267 static int dlm_scand(void *data)
268 {
269         struct dlm_ls *ls;
270
271         while (!kthread_should_stop()) {
272                 ls = find_ls_to_scan();
273                 if (ls) {
274                         if (dlm_lock_recovery_try(ls)) {
275                                 ls->ls_scan_time = jiffies;
276                                 dlm_scan_rsbs(ls);
277                                 dlm_scan_timeout(ls);
278                                 dlm_scan_waiters(ls);
279                                 dlm_unlock_recovery(ls);
280                         } else {
281                                 ls->ls_scan_time += HZ;
282                         }
283                         continue;
284                 }
285                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
286         }
287         return 0;
288 }
289
290 static int dlm_scand_start(void)
291 {
292         struct task_struct *p;
293         int error = 0;
294
295         p = kthread_run(dlm_scand, NULL, "dlm_scand");
296         if (IS_ERR(p))
297                 error = PTR_ERR(p);
298         else
299                 scand_task = p;
300         return error;
301 }
302
303 static void dlm_scand_stop(void)
304 {
305         kthread_stop(scand_task);
306 }
307
308 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
309 {
310         struct dlm_ls *ls;
311
312         spin_lock(&lslist_lock);
313
314         list_for_each_entry(ls, &lslist, ls_list) {
315                 if (ls->ls_global_id == id) {
316                         ls->ls_count++;
317                         goto out;
318                 }
319         }
320         ls = NULL;
321  out:
322         spin_unlock(&lslist_lock);
323         return ls;
324 }
325
326 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
327 {
328         struct dlm_ls *ls;
329
330         spin_lock(&lslist_lock);
331         list_for_each_entry(ls, &lslist, ls_list) {
332                 if (ls->ls_local_handle == lockspace) {
333                         ls->ls_count++;
334                         goto out;
335                 }
336         }
337         ls = NULL;
338  out:
339         spin_unlock(&lslist_lock);
340         return ls;
341 }
342
343 struct dlm_ls *dlm_find_lockspace_device(int minor)
344 {
345         struct dlm_ls *ls;
346
347         spin_lock(&lslist_lock);
348         list_for_each_entry(ls, &lslist, ls_list) {
349                 if (ls->ls_device.minor == minor) {
350                         ls->ls_count++;
351                         goto out;
352                 }
353         }
354         ls = NULL;
355  out:
356         spin_unlock(&lslist_lock);
357         return ls;
358 }
359
360 void dlm_put_lockspace(struct dlm_ls *ls)
361 {
362         spin_lock(&lslist_lock);
363         ls->ls_count--;
364         spin_unlock(&lslist_lock);
365 }
366
367 static void remove_lockspace(struct dlm_ls *ls)
368 {
369         for (;;) {
370                 spin_lock(&lslist_lock);
371                 if (ls->ls_count == 0) {
372                         WARN_ON(ls->ls_create_count != 0);
373                         list_del(&ls->ls_list);
374                         spin_unlock(&lslist_lock);
375                         return;
376                 }
377                 spin_unlock(&lslist_lock);
378                 ssleep(1);
379         }
380 }
381
382 static int threads_start(void)
383 {
384         int error;
385
386         error = dlm_scand_start();
387         if (error) {
388                 log_print("cannot start dlm_scand thread %d", error);
389                 goto fail;
390         }
391
392         /* Thread for sending/receiving messages for all lockspace's */
393         error = dlm_lowcomms_start();
394         if (error) {
395                 log_print("cannot start dlm lowcomms %d", error);
396                 goto scand_fail;
397         }
398
399         return 0;
400
401  scand_fail:
402         dlm_scand_stop();
403  fail:
404         return error;
405 }
406
407 static int new_lockspace(const char *name, const char *cluster,
408                          uint32_t flags, int lvblen,
409                          const struct dlm_lockspace_ops *ops, void *ops_arg,
410                          int *ops_result, dlm_lockspace_t **lockspace)
411 {
412         struct dlm_ls *ls;
413         int i, size, error;
414         int do_unreg = 0;
415         int namelen = strlen(name);
416
417         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
418                 return -EINVAL;
419
420         if (!lvblen || (lvblen % 8))
421                 return -EINVAL;
422
423         if (!try_module_get(THIS_MODULE))
424                 return -EINVAL;
425
426         if (!dlm_user_daemon_available()) {
427                 log_print("dlm user daemon not available");
428                 error = -EUNATCH;
429                 goto out;
430         }
431
432         if (ops && ops_result) {
433                 if (!dlm_config.ci_recover_callbacks)
434                         *ops_result = -EOPNOTSUPP;
435                 else
436                         *ops_result = 0;
437         }
438
439         if (!cluster)
440                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
441                           dlm_config.ci_cluster_name);
442
443         if (dlm_config.ci_recover_callbacks && cluster &&
444             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
445                 log_print("dlm cluster name '%s' does not match "
446                           "the application cluster name '%s'",
447                           dlm_config.ci_cluster_name, cluster);
448                 error = -EBADR;
449                 goto out;
450         }
451
452         error = 0;
453
454         spin_lock(&lslist_lock);
455         list_for_each_entry(ls, &lslist, ls_list) {
456                 WARN_ON(ls->ls_create_count <= 0);
457                 if (ls->ls_namelen != namelen)
458                         continue;
459                 if (memcmp(ls->ls_name, name, namelen))
460                         continue;
461                 if (flags & DLM_LSFL_NEWEXCL) {
462                         error = -EEXIST;
463                         break;
464                 }
465                 ls->ls_create_count++;
466                 *lockspace = ls;
467                 error = 1;
468                 break;
469         }
470         spin_unlock(&lslist_lock);
471
472         if (error)
473                 goto out;
474
475         error = -ENOMEM;
476
477         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
478         if (!ls)
479                 goto out;
480         memcpy(ls->ls_name, name, namelen);
481         ls->ls_namelen = namelen;
482         ls->ls_lvblen = lvblen;
483         ls->ls_count = 0;
484         ls->ls_flags = 0;
485         ls->ls_scan_time = jiffies;
486
487         if (ops && dlm_config.ci_recover_callbacks) {
488                 ls->ls_ops = ops;
489                 ls->ls_ops_arg = ops_arg;
490         }
491
492         if (flags & DLM_LSFL_TIMEWARN)
493                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
494
495         /* ls_exflags are forced to match among nodes, and we don't
496            need to require all nodes to have some flags set */
497         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
498                                     DLM_LSFL_NEWEXCL));
499
500         size = dlm_config.ci_rsbtbl_size;
501         ls->ls_rsbtbl_size = size;
502
503         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
504         if (!ls->ls_rsbtbl)
505                 goto out_lsfree;
506         for (i = 0; i < size; i++) {
507                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
508                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
509                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
510         }
511
512         spin_lock_init(&ls->ls_remove_spin);
513
514         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
515                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
516                                                  GFP_KERNEL);
517                 if (!ls->ls_remove_names[i])
518                         goto out_rsbtbl;
519         }
520
521         idr_init(&ls->ls_lkbidr);
522         spin_lock_init(&ls->ls_lkbidr_spin);
523
524         INIT_LIST_HEAD(&ls->ls_waiters);
525         mutex_init(&ls->ls_waiters_mutex);
526         INIT_LIST_HEAD(&ls->ls_orphans);
527         mutex_init(&ls->ls_orphans_mutex);
528         INIT_LIST_HEAD(&ls->ls_timeout);
529         mutex_init(&ls->ls_timeout_mutex);
530
531         INIT_LIST_HEAD(&ls->ls_new_rsb);
532         spin_lock_init(&ls->ls_new_rsb_spin);
533
534         INIT_LIST_HEAD(&ls->ls_nodes);
535         INIT_LIST_HEAD(&ls->ls_nodes_gone);
536         ls->ls_num_nodes = 0;
537         ls->ls_low_nodeid = 0;
538         ls->ls_total_weight = 0;
539         ls->ls_node_array = NULL;
540
541         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
542         ls->ls_stub_rsb.res_ls = ls;
543
544         ls->ls_debug_rsb_dentry = NULL;
545         ls->ls_debug_waiters_dentry = NULL;
546
547         init_waitqueue_head(&ls->ls_uevent_wait);
548         ls->ls_uevent_result = 0;
549         init_completion(&ls->ls_members_done);
550         ls->ls_members_result = -1;
551
552         mutex_init(&ls->ls_cb_mutex);
553         INIT_LIST_HEAD(&ls->ls_cb_delay);
554
555         ls->ls_recoverd_task = NULL;
556         mutex_init(&ls->ls_recoverd_active);
557         spin_lock_init(&ls->ls_recover_lock);
558         spin_lock_init(&ls->ls_rcom_spin);
559         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
560         ls->ls_recover_status = 0;
561         ls->ls_recover_seq = 0;
562         ls->ls_recover_args = NULL;
563         init_rwsem(&ls->ls_in_recovery);
564         init_rwsem(&ls->ls_recv_active);
565         INIT_LIST_HEAD(&ls->ls_requestqueue);
566         mutex_init(&ls->ls_requestqueue_mutex);
567         mutex_init(&ls->ls_clear_proc_locks);
568
569         ls->ls_recover_buf = kmalloc(LOWCOMMS_MAX_TX_BUFFER_LEN, GFP_NOFS);
570         if (!ls->ls_recover_buf)
571                 goto out_lkbidr;
572
573         ls->ls_slot = 0;
574         ls->ls_num_slots = 0;
575         ls->ls_slots_size = 0;
576         ls->ls_slots = NULL;
577
578         INIT_LIST_HEAD(&ls->ls_recover_list);
579         spin_lock_init(&ls->ls_recover_list_lock);
580         idr_init(&ls->ls_recover_idr);
581         spin_lock_init(&ls->ls_recover_idr_lock);
582         ls->ls_recover_list_count = 0;
583         ls->ls_local_handle = ls;
584         init_waitqueue_head(&ls->ls_wait_general);
585         INIT_LIST_HEAD(&ls->ls_root_list);
586         init_rwsem(&ls->ls_root_sem);
587
588         spin_lock(&lslist_lock);
589         ls->ls_create_count = 1;
590         list_add(&ls->ls_list, &lslist);
591         spin_unlock(&lslist_lock);
592
593         if (flags & DLM_LSFL_FS) {
594                 error = dlm_callback_start(ls);
595                 if (error) {
596                         log_error(ls, "can't start dlm_callback %d", error);
597                         goto out_delist;
598                 }
599         }
600
601         init_waitqueue_head(&ls->ls_recover_lock_wait);
602
603         /*
604          * Once started, dlm_recoverd first looks for ls in lslist, then
605          * initializes ls_in_recovery as locked in "down" mode.  We need
606          * to wait for the wakeup from dlm_recoverd because in_recovery
607          * has to start out in down mode.
608          */
609
610         error = dlm_recoverd_start(ls);
611         if (error) {
612                 log_error(ls, "can't start dlm_recoverd %d", error);
613                 goto out_callback;
614         }
615
616         wait_event(ls->ls_recover_lock_wait,
617                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
618
619         /* let kobject handle freeing of ls if there's an error */
620         do_unreg = 1;
621
622         ls->ls_kobj.kset = dlm_kset;
623         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
624                                      "%s", ls->ls_name);
625         if (error)
626                 goto out_recoverd;
627         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
628
629         /* This uevent triggers dlm_controld in userspace to add us to the
630            group of nodes that are members of this lockspace (managed by the
631            cluster infrastructure.)  Once it's done that, it tells us who the
632            current lockspace members are (via configfs) and then tells the
633            lockspace to start running (via sysfs) in dlm_ls_start(). */
634
635         error = do_uevent(ls, 1);
636         if (error)
637                 goto out_recoverd;
638
639         wait_for_completion(&ls->ls_members_done);
640         error = ls->ls_members_result;
641         if (error)
642                 goto out_members;
643
644         dlm_create_debug_file(ls);
645
646         log_rinfo(ls, "join complete");
647         *lockspace = ls;
648         return 0;
649
650  out_members:
651         do_uevent(ls, 0);
652         dlm_clear_members(ls);
653         kfree(ls->ls_node_array);
654  out_recoverd:
655         dlm_recoverd_stop(ls);
656  out_callback:
657         dlm_callback_stop(ls);
658  out_delist:
659         spin_lock(&lslist_lock);
660         list_del(&ls->ls_list);
661         spin_unlock(&lslist_lock);
662         idr_destroy(&ls->ls_recover_idr);
663         kfree(ls->ls_recover_buf);
664  out_lkbidr:
665         idr_destroy(&ls->ls_lkbidr);
666  out_rsbtbl:
667         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
668                 kfree(ls->ls_remove_names[i]);
669         vfree(ls->ls_rsbtbl);
670  out_lsfree:
671         if (do_unreg)
672                 kobject_put(&ls->ls_kobj);
673         else
674                 kfree(ls);
675  out:
676         module_put(THIS_MODULE);
677         return error;
678 }
679
680 int dlm_new_lockspace(const char *name, const char *cluster,
681                       uint32_t flags, int lvblen,
682                       const struct dlm_lockspace_ops *ops, void *ops_arg,
683                       int *ops_result, dlm_lockspace_t **lockspace)
684 {
685         int error = 0;
686
687         mutex_lock(&ls_lock);
688         if (!ls_count)
689                 error = threads_start();
690         if (error)
691                 goto out;
692
693         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
694                               ops_result, lockspace);
695         if (!error)
696                 ls_count++;
697         if (error > 0)
698                 error = 0;
699         if (!ls_count) {
700                 dlm_scand_stop();
701                 dlm_lowcomms_shutdown();
702                 dlm_lowcomms_stop();
703         }
704  out:
705         mutex_unlock(&ls_lock);
706         return error;
707 }
708
709 static int lkb_idr_is_local(int id, void *p, void *data)
710 {
711         struct dlm_lkb *lkb = p;
712
713         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
714 }
715
716 static int lkb_idr_is_any(int id, void *p, void *data)
717 {
718         return 1;
719 }
720
721 static int lkb_idr_free(int id, void *p, void *data)
722 {
723         struct dlm_lkb *lkb = p;
724
725         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
726                 dlm_free_lvb(lkb->lkb_lvbptr);
727
728         dlm_free_lkb(lkb);
729         return 0;
730 }
731
732 /* NOTE: We check the lkbidr here rather than the resource table.
733    This is because there may be LKBs queued as ASTs that have been unlinked
734    from their RSBs and are pending deletion once the AST has been delivered */
735
736 static int lockspace_busy(struct dlm_ls *ls, int force)
737 {
738         int rv;
739
740         spin_lock(&ls->ls_lkbidr_spin);
741         if (force == 0) {
742                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
743         } else if (force == 1) {
744                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
745         } else {
746                 rv = 0;
747         }
748         spin_unlock(&ls->ls_lkbidr_spin);
749         return rv;
750 }
751
752 static int release_lockspace(struct dlm_ls *ls, int force)
753 {
754         struct dlm_rsb *rsb;
755         struct rb_node *n;
756         int i, busy, rv;
757
758         busy = lockspace_busy(ls, force);
759
760         spin_lock(&lslist_lock);
761         if (ls->ls_create_count == 1) {
762                 if (busy) {
763                         rv = -EBUSY;
764                 } else {
765                         /* remove_lockspace takes ls off lslist */
766                         ls->ls_create_count = 0;
767                         rv = 0;
768                 }
769         } else if (ls->ls_create_count > 1) {
770                 rv = --ls->ls_create_count;
771         } else {
772                 rv = -EINVAL;
773         }
774         spin_unlock(&lslist_lock);
775
776         if (rv) {
777                 log_debug(ls, "release_lockspace no remove %d", rv);
778                 return rv;
779         }
780
781         dlm_device_deregister(ls);
782
783         if (force < 3 && dlm_user_daemon_available())
784                 do_uevent(ls, 0);
785
786         dlm_recoverd_stop(ls);
787
788         if (ls_count == 1) {
789                 dlm_scand_stop();
790                 dlm_lowcomms_shutdown();
791         }
792
793         dlm_callback_stop(ls);
794
795         remove_lockspace(ls);
796
797         dlm_delete_debug_file(ls);
798
799         idr_destroy(&ls->ls_recover_idr);
800         kfree(ls->ls_recover_buf);
801
802         /*
803          * Free all lkb's in idr
804          */
805
806         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
807         idr_destroy(&ls->ls_lkbidr);
808
809         /*
810          * Free all rsb's on rsbtbl[] lists
811          */
812
813         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
814                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
815                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
816                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
817                         dlm_free_rsb(rsb);
818                 }
819
820                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
821                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
822                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
823                         dlm_free_rsb(rsb);
824                 }
825         }
826
827         vfree(ls->ls_rsbtbl);
828
829         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
830                 kfree(ls->ls_remove_names[i]);
831
832         while (!list_empty(&ls->ls_new_rsb)) {
833                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
834                                        res_hashchain);
835                 list_del(&rsb->res_hashchain);
836                 dlm_free_rsb(rsb);
837         }
838
839         /*
840          * Free structures on any other lists
841          */
842
843         dlm_purge_requestqueue(ls);
844         kfree(ls->ls_recover_args);
845         dlm_clear_members(ls);
846         dlm_clear_members_gone(ls);
847         kfree(ls->ls_node_array);
848         log_rinfo(ls, "release_lockspace final free");
849         kobject_put(&ls->ls_kobj);
850         /* The ls structure will be freed when the kobject is done with */
851
852         module_put(THIS_MODULE);
853         return 0;
854 }
855
856 /*
857  * Called when a system has released all its locks and is not going to use the
858  * lockspace any longer.  We free everything we're managing for this lockspace.
859  * Remaining nodes will go through the recovery process as if we'd died.  The
860  * lockspace must continue to function as usual, participating in recoveries,
861  * until this returns.
862  *
863  * Force has 4 possible values:
864  * 0 - don't destroy locksapce if it has any LKBs
865  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
866  * 2 - destroy lockspace regardless of LKBs
867  * 3 - destroy lockspace as part of a forced shutdown
868  */
869
870 int dlm_release_lockspace(void *lockspace, int force)
871 {
872         struct dlm_ls *ls;
873         int error;
874
875         ls = dlm_find_lockspace_local(lockspace);
876         if (!ls)
877                 return -EINVAL;
878         dlm_put_lockspace(ls);
879
880         mutex_lock(&ls_lock);
881         error = release_lockspace(ls, force);
882         if (!error)
883                 ls_count--;
884         if (!ls_count)
885                 dlm_lowcomms_stop();
886         mutex_unlock(&ls_lock);
887
888         return error;
889 }
890
891 void dlm_stop_lockspaces(void)
892 {
893         struct dlm_ls *ls;
894         int count;
895
896  restart:
897         count = 0;
898         spin_lock(&lslist_lock);
899         list_for_each_entry(ls, &lslist, ls_list) {
900                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
901                         count++;
902                         continue;
903                 }
904                 spin_unlock(&lslist_lock);
905                 log_error(ls, "no userland control daemon, stopping lockspace");
906                 dlm_ls_stop(ls);
907                 goto restart;
908         }
909         spin_unlock(&lslist_lock);
910
911         if (count)
912                 log_print("dlm user daemon left %d lockspaces", count);
913 }
914