Merge tag 'pci-v5.17-fixes-1' of git://git.kernel.org/pub/scm/linux/kernel/git/helgaa...
[linux-2.6-microblaze.git] / fs / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27 #include "ast.h"
28
29 static int                      ls_count;
30 static struct mutex             ls_lock;
31 static struct list_head         lslist;
32 static spinlock_t               lslist_lock;
33 static struct task_struct *     scand_task;
34
35
36 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
37 {
38         ssize_t ret = len;
39         int n;
40         int rc = kstrtoint(buf, 0, &n);
41
42         if (rc)
43                 return rc;
44         ls = dlm_find_lockspace_local(ls->ls_local_handle);
45         if (!ls)
46                 return -EINVAL;
47
48         switch (n) {
49         case 0:
50                 dlm_ls_stop(ls);
51                 break;
52         case 1:
53                 dlm_ls_start(ls);
54                 break;
55         default:
56                 ret = -EINVAL;
57         }
58         dlm_put_lockspace(ls);
59         return ret;
60 }
61
62 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
63 {
64         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
65
66         if (rc)
67                 return rc;
68         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69         wake_up(&ls->ls_uevent_wait);
70         return len;
71 }
72
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
81
82         if (rc)
83                 return rc;
84         return len;
85 }
86
87 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
88 {
89         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
90 }
91
92 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
93 {
94         int val;
95         int rc = kstrtoint(buf, 0, &val);
96
97         if (rc)
98                 return rc;
99         if (val == 1)
100                 set_bit(LSFL_NODIR, &ls->ls_flags);
101         return len;
102 }
103
104 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
105 {
106         uint32_t status = dlm_recover_status(ls);
107         return snprintf(buf, PAGE_SIZE, "%x\n", status);
108 }
109
110 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
111 {
112         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
113 }
114
115 struct dlm_attr {
116         struct attribute attr;
117         ssize_t (*show)(struct dlm_ls *, char *);
118         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
119 };
120
121 static struct dlm_attr dlm_attr_control = {
122         .attr  = {.name = "control", .mode = S_IWUSR},
123         .store = dlm_control_store
124 };
125
126 static struct dlm_attr dlm_attr_event = {
127         .attr  = {.name = "event_done", .mode = S_IWUSR},
128         .store = dlm_event_store
129 };
130
131 static struct dlm_attr dlm_attr_id = {
132         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
133         .show  = dlm_id_show,
134         .store = dlm_id_store
135 };
136
137 static struct dlm_attr dlm_attr_nodir = {
138         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
139         .show  = dlm_nodir_show,
140         .store = dlm_nodir_store
141 };
142
143 static struct dlm_attr dlm_attr_recover_status = {
144         .attr  = {.name = "recover_status", .mode = S_IRUGO},
145         .show  = dlm_recover_status_show
146 };
147
148 static struct dlm_attr dlm_attr_recover_nodeid = {
149         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
150         .show  = dlm_recover_nodeid_show
151 };
152
153 static struct attribute *dlm_attrs[] = {
154         &dlm_attr_control.attr,
155         &dlm_attr_event.attr,
156         &dlm_attr_id.attr,
157         &dlm_attr_nodir.attr,
158         &dlm_attr_recover_status.attr,
159         &dlm_attr_recover_nodeid.attr,
160         NULL,
161 };
162 ATTRIBUTE_GROUPS(dlm);
163
164 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
165                              char *buf)
166 {
167         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
168         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
169         return a->show ? a->show(ls, buf) : 0;
170 }
171
172 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
173                               const char *buf, size_t len)
174 {
175         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
176         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
177         return a->store ? a->store(ls, buf, len) : len;
178 }
179
180 static void lockspace_kobj_release(struct kobject *k)
181 {
182         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
183         kfree(ls);
184 }
185
186 static const struct sysfs_ops dlm_attr_ops = {
187         .show  = dlm_attr_show,
188         .store = dlm_attr_store,
189 };
190
191 static struct kobj_type dlm_ktype = {
192         .default_groups = dlm_groups,
193         .sysfs_ops     = &dlm_attr_ops,
194         .release       = lockspace_kobj_release,
195 };
196
197 static struct kset *dlm_kset;
198
199 static int do_uevent(struct dlm_ls *ls, int in)
200 {
201         if (in)
202                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203         else
204                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205
206         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207
208         /* dlm_controld will see the uevent, do the necessary group management
209            and then write to sysfs to wake us */
210
211         wait_event(ls->ls_uevent_wait,
212                    test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213
214         log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
215
216         return ls->ls_uevent_result;
217 }
218
219 static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
220 {
221         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
222
223         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
224         return 0;
225 }
226
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228         .uevent = dlm_uevent,
229 };
230
231 int __init dlm_lockspace_init(void)
232 {
233         ls_count = 0;
234         mutex_init(&ls_lock);
235         INIT_LIST_HEAD(&lslist);
236         spin_lock_init(&lslist_lock);
237
238         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
239         if (!dlm_kset) {
240                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
241                 return -ENOMEM;
242         }
243         return 0;
244 }
245
246 void dlm_lockspace_exit(void)
247 {
248         kset_unregister(dlm_kset);
249 }
250
251 static struct dlm_ls *find_ls_to_scan(void)
252 {
253         struct dlm_ls *ls;
254
255         spin_lock(&lslist_lock);
256         list_for_each_entry(ls, &lslist, ls_list) {
257                 if (time_after_eq(jiffies, ls->ls_scan_time +
258                                             dlm_config.ci_scan_secs * HZ)) {
259                         spin_unlock(&lslist_lock);
260                         return ls;
261                 }
262         }
263         spin_unlock(&lslist_lock);
264         return NULL;
265 }
266
267 static int dlm_scand(void *data)
268 {
269         struct dlm_ls *ls;
270
271         while (!kthread_should_stop()) {
272                 ls = find_ls_to_scan();
273                 if (ls) {
274                         if (dlm_lock_recovery_try(ls)) {
275                                 ls->ls_scan_time = jiffies;
276                                 dlm_scan_rsbs(ls);
277                                 dlm_scan_timeout(ls);
278                                 dlm_scan_waiters(ls);
279                                 dlm_unlock_recovery(ls);
280                         } else {
281                                 ls->ls_scan_time += HZ;
282                         }
283                         continue;
284                 }
285                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
286         }
287         return 0;
288 }
289
290 static int dlm_scand_start(void)
291 {
292         struct task_struct *p;
293         int error = 0;
294
295         p = kthread_run(dlm_scand, NULL, "dlm_scand");
296         if (IS_ERR(p))
297                 error = PTR_ERR(p);
298         else
299                 scand_task = p;
300         return error;
301 }
302
303 static void dlm_scand_stop(void)
304 {
305         kthread_stop(scand_task);
306 }
307
308 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
309 {
310         struct dlm_ls *ls;
311
312         spin_lock(&lslist_lock);
313
314         list_for_each_entry(ls, &lslist, ls_list) {
315                 if (ls->ls_global_id == id) {
316                         atomic_inc(&ls->ls_count);
317                         goto out;
318                 }
319         }
320         ls = NULL;
321  out:
322         spin_unlock(&lslist_lock);
323         return ls;
324 }
325
326 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
327 {
328         struct dlm_ls *ls;
329
330         spin_lock(&lslist_lock);
331         list_for_each_entry(ls, &lslist, ls_list) {
332                 if (ls->ls_local_handle == lockspace) {
333                         atomic_inc(&ls->ls_count);
334                         goto out;
335                 }
336         }
337         ls = NULL;
338  out:
339         spin_unlock(&lslist_lock);
340         return ls;
341 }
342
343 struct dlm_ls *dlm_find_lockspace_device(int minor)
344 {
345         struct dlm_ls *ls;
346
347         spin_lock(&lslist_lock);
348         list_for_each_entry(ls, &lslist, ls_list) {
349                 if (ls->ls_device.minor == minor) {
350                         atomic_inc(&ls->ls_count);
351                         goto out;
352                 }
353         }
354         ls = NULL;
355  out:
356         spin_unlock(&lslist_lock);
357         return ls;
358 }
359
360 void dlm_put_lockspace(struct dlm_ls *ls)
361 {
362         if (atomic_dec_and_test(&ls->ls_count))
363                 wake_up(&ls->ls_count_wait);
364 }
365
366 static void remove_lockspace(struct dlm_ls *ls)
367 {
368 retry:
369         wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
370
371         spin_lock(&lslist_lock);
372         if (atomic_read(&ls->ls_count) != 0) {
373                 spin_unlock(&lslist_lock);
374                 goto retry;
375         }
376
377         WARN_ON(ls->ls_create_count != 0);
378         list_del(&ls->ls_list);
379         spin_unlock(&lslist_lock);
380 }
381
382 static int threads_start(void)
383 {
384         int error;
385
386         error = dlm_scand_start();
387         if (error) {
388                 log_print("cannot start dlm_scand thread %d", error);
389                 goto fail;
390         }
391
392         /* Thread for sending/receiving messages for all lockspace's */
393         error = dlm_midcomms_start();
394         if (error) {
395                 log_print("cannot start dlm lowcomms %d", error);
396                 goto scand_fail;
397         }
398
399         return 0;
400
401  scand_fail:
402         dlm_scand_stop();
403  fail:
404         return error;
405 }
406
407 static int new_lockspace(const char *name, const char *cluster,
408                          uint32_t flags, int lvblen,
409                          const struct dlm_lockspace_ops *ops, void *ops_arg,
410                          int *ops_result, dlm_lockspace_t **lockspace)
411 {
412         struct dlm_ls *ls;
413         int i, size, error;
414         int do_unreg = 0;
415         int namelen = strlen(name);
416
417         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
418                 return -EINVAL;
419
420         if (!lvblen || (lvblen % 8))
421                 return -EINVAL;
422
423         if (!try_module_get(THIS_MODULE))
424                 return -EINVAL;
425
426         if (!dlm_user_daemon_available()) {
427                 log_print("dlm user daemon not available");
428                 error = -EUNATCH;
429                 goto out;
430         }
431
432         if (ops && ops_result) {
433                 if (!dlm_config.ci_recover_callbacks)
434                         *ops_result = -EOPNOTSUPP;
435                 else
436                         *ops_result = 0;
437         }
438
439         if (!cluster)
440                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
441                           dlm_config.ci_cluster_name);
442
443         if (dlm_config.ci_recover_callbacks && cluster &&
444             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
445                 log_print("dlm cluster name '%s' does not match "
446                           "the application cluster name '%s'",
447                           dlm_config.ci_cluster_name, cluster);
448                 error = -EBADR;
449                 goto out;
450         }
451
452         error = 0;
453
454         spin_lock(&lslist_lock);
455         list_for_each_entry(ls, &lslist, ls_list) {
456                 WARN_ON(ls->ls_create_count <= 0);
457                 if (ls->ls_namelen != namelen)
458                         continue;
459                 if (memcmp(ls->ls_name, name, namelen))
460                         continue;
461                 if (flags & DLM_LSFL_NEWEXCL) {
462                         error = -EEXIST;
463                         break;
464                 }
465                 ls->ls_create_count++;
466                 *lockspace = ls;
467                 error = 1;
468                 break;
469         }
470         spin_unlock(&lslist_lock);
471
472         if (error)
473                 goto out;
474
475         error = -ENOMEM;
476
477         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
478         if (!ls)
479                 goto out;
480         memcpy(ls->ls_name, name, namelen);
481         ls->ls_namelen = namelen;
482         ls->ls_lvblen = lvblen;
483         atomic_set(&ls->ls_count, 0);
484         init_waitqueue_head(&ls->ls_count_wait);
485         ls->ls_flags = 0;
486         ls->ls_scan_time = jiffies;
487
488         if (ops && dlm_config.ci_recover_callbacks) {
489                 ls->ls_ops = ops;
490                 ls->ls_ops_arg = ops_arg;
491         }
492
493         if (flags & DLM_LSFL_TIMEWARN)
494                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
495
496         /* ls_exflags are forced to match among nodes, and we don't
497            need to require all nodes to have some flags set */
498         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
499                                     DLM_LSFL_NEWEXCL));
500
501         size = READ_ONCE(dlm_config.ci_rsbtbl_size);
502         ls->ls_rsbtbl_size = size;
503
504         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
505         if (!ls->ls_rsbtbl)
506                 goto out_lsfree;
507         for (i = 0; i < size; i++) {
508                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
509                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
510                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
511         }
512
513         spin_lock_init(&ls->ls_remove_spin);
514         init_waitqueue_head(&ls->ls_remove_wait);
515
516         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
517                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
518                                                  GFP_KERNEL);
519                 if (!ls->ls_remove_names[i])
520                         goto out_rsbtbl;
521         }
522
523         idr_init(&ls->ls_lkbidr);
524         spin_lock_init(&ls->ls_lkbidr_spin);
525
526         INIT_LIST_HEAD(&ls->ls_waiters);
527         mutex_init(&ls->ls_waiters_mutex);
528         INIT_LIST_HEAD(&ls->ls_orphans);
529         mutex_init(&ls->ls_orphans_mutex);
530         INIT_LIST_HEAD(&ls->ls_timeout);
531         mutex_init(&ls->ls_timeout_mutex);
532
533         INIT_LIST_HEAD(&ls->ls_new_rsb);
534         spin_lock_init(&ls->ls_new_rsb_spin);
535
536         INIT_LIST_HEAD(&ls->ls_nodes);
537         INIT_LIST_HEAD(&ls->ls_nodes_gone);
538         ls->ls_num_nodes = 0;
539         ls->ls_low_nodeid = 0;
540         ls->ls_total_weight = 0;
541         ls->ls_node_array = NULL;
542
543         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
544         ls->ls_stub_rsb.res_ls = ls;
545
546         ls->ls_debug_rsb_dentry = NULL;
547         ls->ls_debug_waiters_dentry = NULL;
548
549         init_waitqueue_head(&ls->ls_uevent_wait);
550         ls->ls_uevent_result = 0;
551         init_completion(&ls->ls_members_done);
552         ls->ls_members_result = -1;
553
554         mutex_init(&ls->ls_cb_mutex);
555         INIT_LIST_HEAD(&ls->ls_cb_delay);
556
557         ls->ls_recoverd_task = NULL;
558         mutex_init(&ls->ls_recoverd_active);
559         spin_lock_init(&ls->ls_recover_lock);
560         spin_lock_init(&ls->ls_rcom_spin);
561         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
562         ls->ls_recover_status = 0;
563         ls->ls_recover_seq = 0;
564         ls->ls_recover_args = NULL;
565         init_rwsem(&ls->ls_in_recovery);
566         init_rwsem(&ls->ls_recv_active);
567         INIT_LIST_HEAD(&ls->ls_requestqueue);
568         atomic_set(&ls->ls_requestqueue_cnt, 0);
569         init_waitqueue_head(&ls->ls_requestqueue_wait);
570         mutex_init(&ls->ls_requestqueue_mutex);
571         mutex_init(&ls->ls_clear_proc_locks);
572
573         /* Due backwards compatibility with 3.1 we need to use maximum
574          * possible dlm message size to be sure the message will fit and
575          * not having out of bounds issues. However on sending side 3.2
576          * might send less.
577          */
578         ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
579         if (!ls->ls_recover_buf)
580                 goto out_lkbidr;
581
582         ls->ls_slot = 0;
583         ls->ls_num_slots = 0;
584         ls->ls_slots_size = 0;
585         ls->ls_slots = NULL;
586
587         INIT_LIST_HEAD(&ls->ls_recover_list);
588         spin_lock_init(&ls->ls_recover_list_lock);
589         idr_init(&ls->ls_recover_idr);
590         spin_lock_init(&ls->ls_recover_idr_lock);
591         ls->ls_recover_list_count = 0;
592         ls->ls_local_handle = ls;
593         init_waitqueue_head(&ls->ls_wait_general);
594         INIT_LIST_HEAD(&ls->ls_root_list);
595         init_rwsem(&ls->ls_root_sem);
596
597         spin_lock(&lslist_lock);
598         ls->ls_create_count = 1;
599         list_add(&ls->ls_list, &lslist);
600         spin_unlock(&lslist_lock);
601
602         if (flags & DLM_LSFL_FS) {
603                 error = dlm_callback_start(ls);
604                 if (error) {
605                         log_error(ls, "can't start dlm_callback %d", error);
606                         goto out_delist;
607                 }
608         }
609
610         init_waitqueue_head(&ls->ls_recover_lock_wait);
611
612         /*
613          * Once started, dlm_recoverd first looks for ls in lslist, then
614          * initializes ls_in_recovery as locked in "down" mode.  We need
615          * to wait for the wakeup from dlm_recoverd because in_recovery
616          * has to start out in down mode.
617          */
618
619         error = dlm_recoverd_start(ls);
620         if (error) {
621                 log_error(ls, "can't start dlm_recoverd %d", error);
622                 goto out_callback;
623         }
624
625         wait_event(ls->ls_recover_lock_wait,
626                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
627
628         /* let kobject handle freeing of ls if there's an error */
629         do_unreg = 1;
630
631         ls->ls_kobj.kset = dlm_kset;
632         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
633                                      "%s", ls->ls_name);
634         if (error)
635                 goto out_recoverd;
636         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
637
638         /* This uevent triggers dlm_controld in userspace to add us to the
639            group of nodes that are members of this lockspace (managed by the
640            cluster infrastructure.)  Once it's done that, it tells us who the
641            current lockspace members are (via configfs) and then tells the
642            lockspace to start running (via sysfs) in dlm_ls_start(). */
643
644         error = do_uevent(ls, 1);
645         if (error)
646                 goto out_recoverd;
647
648         wait_for_completion(&ls->ls_members_done);
649         error = ls->ls_members_result;
650         if (error)
651                 goto out_members;
652
653         dlm_create_debug_file(ls);
654
655         log_rinfo(ls, "join complete");
656         *lockspace = ls;
657         return 0;
658
659  out_members:
660         do_uevent(ls, 0);
661         dlm_clear_members(ls);
662         kfree(ls->ls_node_array);
663  out_recoverd:
664         dlm_recoverd_stop(ls);
665  out_callback:
666         dlm_callback_stop(ls);
667  out_delist:
668         spin_lock(&lslist_lock);
669         list_del(&ls->ls_list);
670         spin_unlock(&lslist_lock);
671         idr_destroy(&ls->ls_recover_idr);
672         kfree(ls->ls_recover_buf);
673  out_lkbidr:
674         idr_destroy(&ls->ls_lkbidr);
675  out_rsbtbl:
676         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
677                 kfree(ls->ls_remove_names[i]);
678         vfree(ls->ls_rsbtbl);
679  out_lsfree:
680         if (do_unreg)
681                 kobject_put(&ls->ls_kobj);
682         else
683                 kfree(ls);
684  out:
685         module_put(THIS_MODULE);
686         return error;
687 }
688
689 int dlm_new_lockspace(const char *name, const char *cluster,
690                       uint32_t flags, int lvblen,
691                       const struct dlm_lockspace_ops *ops, void *ops_arg,
692                       int *ops_result, dlm_lockspace_t **lockspace)
693 {
694         int error = 0;
695
696         mutex_lock(&ls_lock);
697         if (!ls_count)
698                 error = threads_start();
699         if (error)
700                 goto out;
701
702         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
703                               ops_result, lockspace);
704         if (!error)
705                 ls_count++;
706         if (error > 0)
707                 error = 0;
708         if (!ls_count) {
709                 dlm_scand_stop();
710                 dlm_midcomms_shutdown();
711                 dlm_lowcomms_stop();
712         }
713  out:
714         mutex_unlock(&ls_lock);
715         return error;
716 }
717
718 static int lkb_idr_is_local(int id, void *p, void *data)
719 {
720         struct dlm_lkb *lkb = p;
721
722         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
723 }
724
725 static int lkb_idr_is_any(int id, void *p, void *data)
726 {
727         return 1;
728 }
729
730 static int lkb_idr_free(int id, void *p, void *data)
731 {
732         struct dlm_lkb *lkb = p;
733
734         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
735                 dlm_free_lvb(lkb->lkb_lvbptr);
736
737         dlm_free_lkb(lkb);
738         return 0;
739 }
740
741 /* NOTE: We check the lkbidr here rather than the resource table.
742    This is because there may be LKBs queued as ASTs that have been unlinked
743    from their RSBs and are pending deletion once the AST has been delivered */
744
745 static int lockspace_busy(struct dlm_ls *ls, int force)
746 {
747         int rv;
748
749         spin_lock(&ls->ls_lkbidr_spin);
750         if (force == 0) {
751                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
752         } else if (force == 1) {
753                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
754         } else {
755                 rv = 0;
756         }
757         spin_unlock(&ls->ls_lkbidr_spin);
758         return rv;
759 }
760
761 static int release_lockspace(struct dlm_ls *ls, int force)
762 {
763         struct dlm_rsb *rsb;
764         struct rb_node *n;
765         int i, busy, rv;
766
767         busy = lockspace_busy(ls, force);
768
769         spin_lock(&lslist_lock);
770         if (ls->ls_create_count == 1) {
771                 if (busy) {
772                         rv = -EBUSY;
773                 } else {
774                         /* remove_lockspace takes ls off lslist */
775                         ls->ls_create_count = 0;
776                         rv = 0;
777                 }
778         } else if (ls->ls_create_count > 1) {
779                 rv = --ls->ls_create_count;
780         } else {
781                 rv = -EINVAL;
782         }
783         spin_unlock(&lslist_lock);
784
785         if (rv) {
786                 log_debug(ls, "release_lockspace no remove %d", rv);
787                 return rv;
788         }
789
790         dlm_device_deregister(ls);
791
792         if (force < 3 && dlm_user_daemon_available())
793                 do_uevent(ls, 0);
794
795         dlm_recoverd_stop(ls);
796
797         if (ls_count == 1) {
798                 dlm_scand_stop();
799                 dlm_clear_members(ls);
800                 dlm_midcomms_shutdown();
801         }
802
803         dlm_callback_stop(ls);
804
805         remove_lockspace(ls);
806
807         dlm_delete_debug_file(ls);
808
809         idr_destroy(&ls->ls_recover_idr);
810         kfree(ls->ls_recover_buf);
811
812         /*
813          * Free all lkb's in idr
814          */
815
816         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
817         idr_destroy(&ls->ls_lkbidr);
818
819         /*
820          * Free all rsb's on rsbtbl[] lists
821          */
822
823         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
824                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
825                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
826                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
827                         dlm_free_rsb(rsb);
828                 }
829
830                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
831                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
832                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
833                         dlm_free_rsb(rsb);
834                 }
835         }
836
837         vfree(ls->ls_rsbtbl);
838
839         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
840                 kfree(ls->ls_remove_names[i]);
841
842         while (!list_empty(&ls->ls_new_rsb)) {
843                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
844                                        res_hashchain);
845                 list_del(&rsb->res_hashchain);
846                 dlm_free_rsb(rsb);
847         }
848
849         /*
850          * Free structures on any other lists
851          */
852
853         dlm_purge_requestqueue(ls);
854         kfree(ls->ls_recover_args);
855         dlm_clear_members(ls);
856         dlm_clear_members_gone(ls);
857         kfree(ls->ls_node_array);
858         log_rinfo(ls, "release_lockspace final free");
859         kobject_put(&ls->ls_kobj);
860         /* The ls structure will be freed when the kobject is done with */
861
862         module_put(THIS_MODULE);
863         return 0;
864 }
865
866 /*
867  * Called when a system has released all its locks and is not going to use the
868  * lockspace any longer.  We free everything we're managing for this lockspace.
869  * Remaining nodes will go through the recovery process as if we'd died.  The
870  * lockspace must continue to function as usual, participating in recoveries,
871  * until this returns.
872  *
873  * Force has 4 possible values:
874  * 0 - don't destroy lockspace if it has any LKBs
875  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
876  * 2 - destroy lockspace regardless of LKBs
877  * 3 - destroy lockspace as part of a forced shutdown
878  */
879
880 int dlm_release_lockspace(void *lockspace, int force)
881 {
882         struct dlm_ls *ls;
883         int error;
884
885         ls = dlm_find_lockspace_local(lockspace);
886         if (!ls)
887                 return -EINVAL;
888         dlm_put_lockspace(ls);
889
890         mutex_lock(&ls_lock);
891         error = release_lockspace(ls, force);
892         if (!error)
893                 ls_count--;
894         if (!ls_count)
895                 dlm_lowcomms_stop();
896         mutex_unlock(&ls_lock);
897
898         return error;
899 }
900
901 void dlm_stop_lockspaces(void)
902 {
903         struct dlm_ls *ls;
904         int count;
905
906  restart:
907         count = 0;
908         spin_lock(&lslist_lock);
909         list_for_each_entry(ls, &lslist, ls_list) {
910                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
911                         count++;
912                         continue;
913                 }
914                 spin_unlock(&lslist_lock);
915                 log_error(ls, "no userland control daemon, stopping lockspace");
916                 dlm_ls_stop(ls);
917                 goto restart;
918         }
919         spin_unlock(&lslist_lock);
920
921         if (count)
922                 log_print("dlm user daemon left %d lockspaces", count);
923 }
924