unsigned int cl_protocol;
unsigned int cl_timewarn_cs;
unsigned int cl_waitwarn_us;
+ unsigned int cl_new_rsb_count;
};
enum {
CLUSTER_ATTR_PROTOCOL,
CLUSTER_ATTR_TIMEWARN_CS,
CLUSTER_ATTR_WAITWARN_US,
+ CLUSTER_ATTR_NEW_RSB_COUNT,
};
struct cluster_attribute {
CLUSTER_ATTR(protocol, 0);
CLUSTER_ATTR(timewarn_cs, 1);
CLUSTER_ATTR(waitwarn_us, 0);
+CLUSTER_ATTR(new_rsb_count, 0);
static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
[CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr,
[CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr,
[CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr,
+ [CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count.attr,
NULL,
};
cl->cl_protocol = dlm_config.ci_protocol;
cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs;
cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us;
+ cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count;
space_list = &sps->ss_group;
comm_list = &cms->cs_group;
#define DEFAULT_PROTOCOL 0
#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */
#define DEFAULT_WAITWARN_US 0
+#define DEFAULT_NEW_RSB_COUNT 128
struct dlm_config_info dlm_config = {
.ci_tcp_port = DEFAULT_TCP_PORT,
.ci_log_debug = DEFAULT_LOG_DEBUG,
.ci_protocol = DEFAULT_PROTOCOL,
.ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
- .ci_waitwarn_us = DEFAULT_WAITWARN_US
+ .ci_waitwarn_us = DEFAULT_WAITWARN_US,
+ .ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT
};
* Basic operations on rsb's and lkb's
*/
-static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
+static int pre_rsb_struct(struct dlm_ls *ls)
+{
+ struct dlm_rsb *r1, *r2;
+ int count = 0;
+
+ spin_lock(&ls->ls_new_rsb_spin);
+ if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
+ spin_unlock(&ls->ls_new_rsb_spin);
+ return 0;
+ }
+ spin_unlock(&ls->ls_new_rsb_spin);
+
+ r1 = dlm_allocate_rsb(ls);
+ r2 = dlm_allocate_rsb(ls);
+
+ spin_lock(&ls->ls_new_rsb_spin);
+ if (r1) {
+ list_add(&r1->res_hashchain, &ls->ls_new_rsb);
+ ls->ls_new_rsb_count++;
+ }
+ if (r2) {
+ list_add(&r2->res_hashchain, &ls->ls_new_rsb);
+ ls->ls_new_rsb_count++;
+ }
+ count = ls->ls_new_rsb_count;
+ spin_unlock(&ls->ls_new_rsb_spin);
+
+ if (!count)
+ return -ENOMEM;
+ return 0;
+}
+
+/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
+ unlock any spinlocks, go back and call pre_rsb_struct again.
+ Otherwise, take an rsb off the list and return it. */
+
+static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
+ struct dlm_rsb **r_ret)
{
struct dlm_rsb *r;
+ int count;
- r = dlm_allocate_rsb(ls, len);
- if (!r)
- return NULL;
+ spin_lock(&ls->ls_new_rsb_spin);
+ if (list_empty(&ls->ls_new_rsb)) {
+ count = ls->ls_new_rsb_count;
+ spin_unlock(&ls->ls_new_rsb_spin);
+ log_debug(ls, "find_rsb retry %d %d %s",
+ count, dlm_config.ci_new_rsb_count, name);
+ return -EAGAIN;
+ }
+
+ r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
+ list_del(&r->res_hashchain);
+ ls->ls_new_rsb_count--;
+ spin_unlock(&ls->ls_new_rsb_spin);
r->res_ls = ls;
r->res_length = len;
memcpy(r->res_name, name, len);
mutex_init(&r->res_mutex);
+ INIT_LIST_HEAD(&r->res_hashchain);
INIT_LIST_HEAD(&r->res_lookup);
INIT_LIST_HEAD(&r->res_grantqueue);
INIT_LIST_HEAD(&r->res_convertqueue);
INIT_LIST_HEAD(&r->res_root_list);
INIT_LIST_HEAD(&r->res_recover_list);
- return r;
+ *r_ret = r;
+ return 0;
}
static int search_rsb_list(struct list_head *head, char *name, int len,
return error;
}
-static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
- unsigned int flags, struct dlm_rsb **r_ret)
-{
- int error;
- spin_lock(&ls->ls_rsbtbl[b].lock);
- error = _search_rsb(ls, name, len, b, flags, r_ret);
- spin_unlock(&ls->ls_rsbtbl[b].lock);
- return error;
-}
-
/*
* Find rsb in rsbtbl and potentially create/add one
*
static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
unsigned int flags, struct dlm_rsb **r_ret)
{
- struct dlm_rsb *r = NULL, *tmp;
+ struct dlm_rsb *r = NULL;
uint32_t hash, bucket;
- int error = -EINVAL;
+ int error;
- if (namelen > DLM_RESNAME_MAXLEN)
+ if (namelen > DLM_RESNAME_MAXLEN) {
+ error = -EINVAL;
goto out;
+ }
if (dlm_no_directory(ls))
flags |= R_CREATE;
- error = 0;
hash = jhash(name, namelen, 0);
bucket = hash & (ls->ls_rsbtbl_size - 1);
- error = search_rsb(ls, name, namelen, bucket, flags, &r);
+ retry:
+ if (flags & R_CREATE) {
+ error = pre_rsb_struct(ls);
+ if (error < 0)
+ goto out;
+ }
+
+ spin_lock(&ls->ls_rsbtbl[bucket].lock);
+
+ error = _search_rsb(ls, name, namelen, bucket, flags, &r);
if (!error)
- goto out;
+ goto out_unlock;
if (error == -EBADR && !(flags & R_CREATE))
- goto out;
+ goto out_unlock;
/* the rsb was found but wasn't a master copy */
if (error == -ENOTBLK)
- goto out;
+ goto out_unlock;
- error = -ENOMEM;
- r = create_rsb(ls, name, namelen);
- if (!r)
- goto out;
+ error = get_rsb_struct(ls, name, namelen, &r);
+ if (error == -EAGAIN) {
+ spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+ goto retry;
+ }
+ if (error)
+ goto out_unlock;
r->res_hash = hash;
r->res_bucket = bucket;
nodeid = 0;
r->res_nodeid = nodeid;
}
-
- spin_lock(&ls->ls_rsbtbl[bucket].lock);
- error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
- if (!error) {
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
- dlm_free_rsb(r);
- r = tmp;
- goto out;
- }
list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
- spin_unlock(&ls->ls_rsbtbl[bucket].lock);
error = 0;
+ out_unlock:
+ spin_unlock(&ls->ls_rsbtbl[bucket].lock);
out:
*r_ret = r;
return error;
#include "memory.h"
static struct kmem_cache *lkb_cache;
+static struct kmem_cache *rsb_cache;
int __init dlm_memory_init(void)
__alignof__(struct dlm_lkb), 0, NULL);
if (!lkb_cache)
ret = -ENOMEM;
+
+ rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb),
+ __alignof__(struct dlm_rsb), 0, NULL);
+ if (!rsb_cache) {
+ kmem_cache_destroy(lkb_cache);
+ ret = -ENOMEM;
+ }
+
return ret;
}
{
if (lkb_cache)
kmem_cache_destroy(lkb_cache);
+ if (rsb_cache)
+ kmem_cache_destroy(rsb_cache);
}
char *dlm_allocate_lvb(struct dlm_ls *ls)
kfree(p);
}
-/* FIXME: have some minimal space built-in to rsb for the name and
- kmalloc a separate name if needed, like dentries are done */
-
-struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls, int namelen)
+struct dlm_rsb *dlm_allocate_rsb(struct dlm_ls *ls)
{
struct dlm_rsb *r;
- DLM_ASSERT(namelen <= DLM_RESNAME_MAXLEN,);
-
- r = kzalloc(sizeof(*r) + namelen, GFP_NOFS);
+ r = kmem_cache_zalloc(rsb_cache, GFP_NOFS);
return r;
}
{
if (r->res_lvbptr)
dlm_free_lvb(r->res_lvbptr);
- kfree(r);
+ kmem_cache_free(rsb_cache, r);
}
struct dlm_lkb *dlm_allocate_lkb(struct dlm_ls *ls)