Merge tag 'ext4_for_linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tytso...
[linux-2.6-microblaze.git] / drivers / staging / zcache / ramster / ramster.c
1 /*
2  * ramster.c
3  *
4  * Copyright (c) 2010-2012, Dan Magenheimer, Oracle Corp.
5  *
6  * RAMster implements peer-to-peer transcendent memory, allowing a "cluster" of
7  * kernels to dynamically pool their RAM so that a RAM-hungry workload on one
8  * machine can temporarily and transparently utilize RAM on another machine
9  * which is presumably idle or running a non-RAM-hungry workload.
10  *
11  * RAMster combines a clustering and messaging foundation based on the ocfs2
12  * cluster layer with the in-kernel compression implementation of zcache, and
13  * adds code to glue them together.  When a page is "put" to RAMster, it is
14  * compressed and stored locally.  Periodically, a thread will "remotify" these
15  * pages by sending them via messages to a remote machine.  When the page is
16  * later needed as indicated by a page fault, a "get" is issued.  If the data
17  * is local, it is uncompressed and the fault is resolved.  If the data is
18  * remote, a message is sent to fetch the data and the faulting thread sleeps;
19  * when the data arrives, the thread awakens, the data is decompressed and
20  * the fault is resolved.
21
22  * As of V5, clusters up to eight nodes are supported; each node can remotify
23  * pages to one specified node, so clusters can be configured as clients to
24  * a "memory server".  Some simple policy is in place that will need to be
25  * refined over time.  Larger clusters and fault-resistant protocols can also
26  * be added over time.
27  */
28
29 #include <linux/module.h>
30 #include <linux/cpu.h>
31 #include <linux/highmem.h>
32 #include <linux/list.h>
33 #include <linux/lzo.h>
34 #include <linux/slab.h>
35 #include <linux/spinlock.h>
36 #include <linux/types.h>
37 #include <linux/atomic.h>
38 #include <linux/frontswap.h>
39 #include "../tmem.h"
40 #include "../zcache.h"
41 #include "../zbud.h"
42 #include "ramster.h"
43 #include "ramster_nodemanager.h"
44 #include "tcp.h"
45
46 #define RAMSTER_TESTING
47
48 #ifndef CONFIG_SYSFS
49 #error "ramster needs sysfs to define cluster nodes to use"
50 #endif
51
52 static bool use_cleancache __read_mostly;
53 static bool use_frontswap __read_mostly;
54 static bool use_frontswap_exclusive_gets __read_mostly;
55
56 /* These must be sysfs not debugfs as they are checked/used by userland!! */
57 static unsigned long ramster_interface_revision __read_mostly =
58         R2NM_API_VERSION; /* interface revision must match userspace! */
59 static unsigned long ramster_pers_remotify_enable __read_mostly;
60 static unsigned long ramster_eph_remotify_enable __read_mostly;
61 static atomic_t ramster_remote_pers_pages = ATOMIC_INIT(0);
62 #define MANUAL_NODES 8
63 static bool ramster_nodes_manual_up[MANUAL_NODES] __read_mostly;
64 static int ramster_remote_target_nodenum __read_mostly = -1;
65
66 /* these counters are made available via debugfs */
67 static long ramster_flnodes;
68 static atomic_t ramster_flnodes_atomic = ATOMIC_INIT(0);
69 static unsigned long ramster_flnodes_max;
70 static ssize_t ramster_foreign_eph_pages;
71 static atomic_t ramster_foreign_eph_pages_atomic = ATOMIC_INIT(0);
72 static ssize_t ramster_foreign_eph_pages_max;
73 static ssize_t ramster_foreign_pers_pages;
74 static atomic_t ramster_foreign_pers_pages_atomic = ATOMIC_INIT(0);
75 static ssize_t ramster_foreign_pers_pages_max;
76 static ssize_t ramster_eph_pages_remoted;
77 static ssize_t ramster_pers_pages_remoted;
78 static ssize_t ramster_eph_pages_remote_failed;
79 static ssize_t ramster_pers_pages_remote_failed;
80 static ssize_t ramster_remote_eph_pages_succ_get;
81 static ssize_t ramster_remote_pers_pages_succ_get;
82 static ssize_t ramster_remote_eph_pages_unsucc_get;
83 static ssize_t ramster_remote_pers_pages_unsucc_get;
84 static ssize_t ramster_pers_pages_remote_nomem;
85 static ssize_t ramster_remote_objects_flushed;
86 static ssize_t ramster_remote_object_flushes_failed;
87 static ssize_t ramster_remote_pages_flushed;
88 static ssize_t ramster_remote_page_flushes_failed;
89 /* FIXME frontswap selfshrinking knobs in debugfs? */
90
91 #ifdef CONFIG_DEBUG_FS
92 #include <linux/debugfs.h>
93 #define zdfs    debugfs_create_size_t
94 #define zdfs64  debugfs_create_u64
95 static int __init ramster_debugfs_init(void)
96 {
97         struct dentry *root = debugfs_create_dir("ramster", NULL);
98         if (root == NULL)
99                 return -ENXIO;
100
101         zdfs("eph_pages_remoted", S_IRUGO, root, &ramster_eph_pages_remoted);
102         zdfs("pers_pages_remoted", S_IRUGO, root, &ramster_pers_pages_remoted);
103         zdfs("eph_pages_remote_failed", S_IRUGO, root,
104                         &ramster_eph_pages_remote_failed);
105         zdfs("pers_pages_remote_failed", S_IRUGO, root,
106                         &ramster_pers_pages_remote_failed);
107         zdfs("remote_eph_pages_succ_get", S_IRUGO, root,
108                         &ramster_remote_eph_pages_succ_get);
109         zdfs("remote_pers_pages_succ_get", S_IRUGO, root,
110                         &ramster_remote_pers_pages_succ_get);
111         zdfs("remote_eph_pages_unsucc_get", S_IRUGO, root,
112                         &ramster_remote_eph_pages_unsucc_get);
113         zdfs("remote_pers_pages_unsucc_get", S_IRUGO, root,
114                         &ramster_remote_pers_pages_unsucc_get);
115         zdfs("pers_pages_remote_nomem", S_IRUGO, root,
116                         &ramster_pers_pages_remote_nomem);
117         zdfs("remote_objects_flushed", S_IRUGO, root,
118                         &ramster_remote_objects_flushed);
119         zdfs("remote_pages_flushed", S_IRUGO, root,
120                         &ramster_remote_pages_flushed);
121         zdfs("remote_object_flushes_failed", S_IRUGO, root,
122                         &ramster_remote_object_flushes_failed);
123         zdfs("remote_page_flushes_failed", S_IRUGO, root,
124                         &ramster_remote_page_flushes_failed);
125         zdfs("foreign_eph_pages", S_IRUGO, root,
126                         &ramster_foreign_eph_pages);
127         zdfs("foreign_eph_pages_max", S_IRUGO, root,
128                         &ramster_foreign_eph_pages_max);
129         zdfs("foreign_pers_pages", S_IRUGO, root,
130                         &ramster_foreign_pers_pages);
131         zdfs("foreign_pers_pages_max", S_IRUGO, root,
132                         &ramster_foreign_pers_pages_max);
133         return 0;
134 }
135 #undef  zdebugfs
136 #undef  zdfs64
137 #endif
138
139 static LIST_HEAD(ramster_rem_op_list);
140 static DEFINE_SPINLOCK(ramster_rem_op_list_lock);
141 static DEFINE_PER_CPU(struct ramster_preload, ramster_preloads);
142
143 static DEFINE_PER_CPU(unsigned char *, ramster_remoteputmem1);
144 static DEFINE_PER_CPU(unsigned char *, ramster_remoteputmem2);
145
146 static struct kmem_cache *ramster_flnode_cache __read_mostly;
147
148 static struct flushlist_node *ramster_flnode_alloc(struct tmem_pool *pool)
149 {
150         struct flushlist_node *flnode = NULL;
151         struct ramster_preload *kp;
152
153         kp = &__get_cpu_var(ramster_preloads);
154         flnode = kp->flnode;
155         BUG_ON(flnode == NULL);
156         kp->flnode = NULL;
157         ramster_flnodes = atomic_inc_return(&ramster_flnodes_atomic);
158         if (ramster_flnodes > ramster_flnodes_max)
159                 ramster_flnodes_max = ramster_flnodes;
160         return flnode;
161 }
162
163 /* the "flush list" asynchronously collects pages to remotely flush */
164 #define FLUSH_ENTIRE_OBJECT ((uint32_t)-1)
165 static void ramster_flnode_free(struct flushlist_node *flnode,
166                                 struct tmem_pool *pool)
167 {
168         int flnodes;
169
170         flnodes = atomic_dec_return(&ramster_flnodes_atomic);
171         BUG_ON(flnodes < 0);
172         kmem_cache_free(ramster_flnode_cache, flnode);
173 }
174
175 int ramster_do_preload_flnode(struct tmem_pool *pool)
176 {
177         struct ramster_preload *kp;
178         struct flushlist_node *flnode;
179         int ret = -ENOMEM;
180
181         BUG_ON(!irqs_disabled());
182         if (unlikely(ramster_flnode_cache == NULL))
183                 BUG();
184         kp = &__get_cpu_var(ramster_preloads);
185         flnode = kmem_cache_alloc(ramster_flnode_cache, GFP_ATOMIC);
186         if (unlikely(flnode == NULL) && kp->flnode == NULL)
187                 BUG();  /* FIXME handle more gracefully, but how??? */
188         else if (kp->flnode == NULL)
189                 kp->flnode = flnode;
190         else
191                 kmem_cache_free(ramster_flnode_cache, flnode);
192         return ret;
193 }
194
195 /*
196  * Called by the message handler after a (still compressed) page has been
197  * fetched from the remote machine in response to an "is_remote" tmem_get
198  * or persistent tmem_localify.  For a tmem_get, "extra" is the address of
199  * the page that is to be filled to successfully resolve the tmem_get; for
200  * a (persistent) tmem_localify, "extra" is NULL (as the data is placed only
201  * in the local zcache).  "data" points to "size" bytes of (compressed) data
202  * passed in the message.  In the case of a persistent remote get, if
203  * pre-allocation was successful (see ramster_repatriate_preload), the page
204  * is placed into both local zcache and at "extra".
205  */
206 int ramster_localify(int pool_id, struct tmem_oid *oidp, uint32_t index,
207                         char *data, unsigned int size, void *extra)
208 {
209         int ret = -ENOENT;
210         unsigned long flags;
211         struct tmem_pool *pool;
212         bool eph, delete = false;
213         void *pampd, *saved_hb;
214         struct tmem_obj *obj;
215
216         pool = zcache_get_pool_by_id(LOCAL_CLIENT, pool_id);
217         if (unlikely(pool == NULL))
218                 /* pool doesn't exist anymore */
219                 goto out;
220         eph = is_ephemeral(pool);
221         local_irq_save(flags);  /* FIXME: maybe only disable softirqs? */
222         pampd = tmem_localify_get_pampd(pool, oidp, index, &obj, &saved_hb);
223         if (pampd == NULL) {
224                 /* hmmm... must have been a flush while waiting */
225 #ifdef RAMSTER_TESTING
226                 pr_err("UNTESTED pampd==NULL in ramster_localify\n");
227 #endif
228                 if (eph)
229                         ramster_remote_eph_pages_unsucc_get++;
230                 else
231                         ramster_remote_pers_pages_unsucc_get++;
232                 obj = NULL;
233                 goto finish;
234         } else if (unlikely(!pampd_is_remote(pampd))) {
235                 /* hmmm... must have been a dup put while waiting */
236 #ifdef RAMSTER_TESTING
237                 pr_err("UNTESTED dup while waiting in ramster_localify\n");
238 #endif
239                 if (eph)
240                         ramster_remote_eph_pages_unsucc_get++;
241                 else
242                         ramster_remote_pers_pages_unsucc_get++;
243                 obj = NULL;
244                 pampd = NULL;
245                 ret = -EEXIST;
246                 goto finish;
247         } else if (size == 0) {
248                 /* no remote data, delete the local is_remote pampd */
249                 pampd = NULL;
250                 if (eph)
251                         ramster_remote_eph_pages_unsucc_get++;
252                 else
253                         BUG();
254                 delete = true;
255                 goto finish;
256         }
257         if (pampd_is_intransit(pampd)) {
258                 /*
259                  *  a pampd is marked intransit if it is remote and space has
260                  *  been allocated for it locally (note, only happens for
261                  *  persistent pages, in which case the remote copy is freed)
262                  */
263                 BUG_ON(eph);
264                 pampd = pampd_mask_intransit_and_remote(pampd);
265                 zbud_copy_to_zbud(pampd, data, size);
266         } else {
267                 /*
268                  * setting pampd to NULL tells tmem_localify_finish to leave
269                  * pampd alone... meaning it is left pointing to the
270                  * remote copy
271                  */
272                 pampd = NULL;
273                 obj = NULL;
274         }
275         /*
276          * but in all cases, we decompress direct-to-memory to complete
277          * the remotify and return success
278          */
279         BUG_ON(extra == NULL);
280         zcache_decompress_to_page(data, size, (struct page *)extra);
281         if (eph)
282                 ramster_remote_eph_pages_succ_get++;
283         else
284                 ramster_remote_pers_pages_succ_get++;
285         ret = 0;
286 finish:
287         tmem_localify_finish(obj, index, pampd, saved_hb, delete);
288         zcache_put_pool(pool);
289         local_irq_restore(flags);
290 out:
291         return ret;
292 }
293
294 void ramster_pampd_new_obj(struct tmem_obj *obj)
295 {
296         obj->extra = NULL;
297 }
298
299 void ramster_pampd_free_obj(struct tmem_pool *pool, struct tmem_obj *obj,
300                                 bool pool_destroy)
301 {
302         struct flushlist_node *flnode;
303
304         BUG_ON(preemptible());
305         if (obj->extra == NULL)
306                 return;
307         if (pool_destroy && is_ephemeral(pool))
308                 /* FIXME don't bother with remote eph data for now */
309                 return;
310         BUG_ON(!pampd_is_remote(obj->extra));
311         flnode = ramster_flnode_alloc(pool);
312         flnode->xh.client_id = pampd_remote_node(obj->extra);
313         flnode->xh.pool_id = pool->pool_id;
314         flnode->xh.oid = obj->oid;
315         flnode->xh.index = FLUSH_ENTIRE_OBJECT;
316         flnode->rem_op.op = RAMSTER_REMOTIFY_FLUSH_OBJ;
317         spin_lock(&ramster_rem_op_list_lock);
318         list_add(&flnode->rem_op.list, &ramster_rem_op_list);
319         spin_unlock(&ramster_rem_op_list_lock);
320 }
321
322 /*
323  * Called on a remote persistent tmem_get to attempt to preallocate
324  * local storage for the data contained in the remote persistent page.
325  * If successfully preallocated, returns the pampd, marked as remote and
326  * in_transit.  Else returns NULL.  Note that the appropriate tmem data
327  * structure must be locked.
328  */
329 void *ramster_pampd_repatriate_preload(void *pampd, struct tmem_pool *pool,
330                                         struct tmem_oid *oidp, uint32_t index,
331                                         bool *intransit)
332 {
333         int clen = pampd_remote_size(pampd), c;
334         void *ret_pampd = NULL;
335         unsigned long flags;
336         struct tmem_handle th;
337
338         BUG_ON(!pampd_is_remote(pampd));
339         BUG_ON(is_ephemeral(pool));
340         if (use_frontswap_exclusive_gets)
341                 /* don't need local storage */
342                 goto out;
343         if (pampd_is_intransit(pampd)) {
344                 /*
345                  * to avoid multiple allocations (and maybe a memory leak)
346                  * don't preallocate if already in the process of being
347                  * repatriated
348                  */
349                 *intransit = true;
350                 goto out;
351         }
352         *intransit = false;
353         local_irq_save(flags);
354         th.client_id = pampd_remote_node(pampd);
355         th.pool_id = pool->pool_id;
356         th.oid = *oidp;
357         th.index = index;
358         ret_pampd = zcache_pampd_create(NULL, clen, true, false, &th);
359         if (ret_pampd != NULL) {
360                 /*
361                  *  a pampd is marked intransit if it is remote and space has
362                  *  been allocated for it locally (note, only happens for
363                  *  persistent pages, in which case the remote copy is freed)
364                  */
365                 ret_pampd = pampd_mark_intransit(ret_pampd);
366                 c = atomic_dec_return(&ramster_remote_pers_pages);
367                 WARN_ON_ONCE(c < 0);
368         } else {
369                 ramster_pers_pages_remote_nomem++;
370         }
371         local_irq_restore(flags);
372 out:
373         return ret_pampd;
374 }
375
376 /*
377  * Called on a remote tmem_get to invoke a message to fetch the page.
378  * Might sleep so no tmem locks can be held.  "extra" is passed
379  * all the way through the round-trip messaging to ramster_localify.
380  */
381 int ramster_pampd_repatriate(void *fake_pampd, void *real_pampd,
382                                 struct tmem_pool *pool,
383                                 struct tmem_oid *oid, uint32_t index,
384                                 bool free, void *extra)
385 {
386         struct tmem_xhandle xh;
387         int ret;
388
389         if (pampd_is_intransit(real_pampd))
390                 /* have local space pre-reserved, so free remote copy */
391                 free = true;
392         xh = tmem_xhandle_fill(LOCAL_CLIENT, pool, oid, index);
393         /* unreliable request/response for now */
394         ret = r2net_remote_async_get(&xh, free,
395                                         pampd_remote_node(fake_pampd),
396                                         pampd_remote_size(fake_pampd),
397                                         pampd_remote_cksum(fake_pampd),
398                                         extra);
399         return ret;
400 }
401
402 bool ramster_pampd_is_remote(void *pampd)
403 {
404         return pampd_is_remote(pampd);
405 }
406
407 int ramster_pampd_replace_in_obj(void *new_pampd, struct tmem_obj *obj)
408 {
409         int ret = -1;
410
411         if (new_pampd != NULL) {
412                 if (obj->extra == NULL)
413                         obj->extra = new_pampd;
414                 /* enforce that all remote pages in an object reside
415                  * in the same node! */
416                 else if (pampd_remote_node(new_pampd) !=
417                                 pampd_remote_node((void *)(obj->extra)))
418                         BUG();
419                 ret = 0;
420         }
421         return ret;
422 }
423
424 void *ramster_pampd_free(void *pampd, struct tmem_pool *pool,
425                               struct tmem_oid *oid, uint32_t index, bool acct)
426 {
427         bool eph = is_ephemeral(pool);
428         void *local_pampd = NULL;
429         int c;
430
431         BUG_ON(preemptible());
432         BUG_ON(!pampd_is_remote(pampd));
433         WARN_ON(acct == false);
434         if (oid == NULL) {
435                 /*
436                  * a NULL oid means to ignore this pampd free
437                  * as the remote freeing will be handled elsewhere
438                  */
439         } else if (eph) {
440                 /* FIXME remote flush optional but probably good idea */
441         } else if (pampd_is_intransit(pampd)) {
442                 /* did a pers remote get_and_free, so just free local */
443                 local_pampd = pampd_mask_intransit_and_remote(pampd);
444         } else {
445                 struct flushlist_node *flnode =
446                         ramster_flnode_alloc(pool);
447
448                 flnode->xh.client_id = pampd_remote_node(pampd);
449                 flnode->xh.pool_id = pool->pool_id;
450                 flnode->xh.oid = *oid;
451                 flnode->xh.index = index;
452                 flnode->rem_op.op = RAMSTER_REMOTIFY_FLUSH_PAGE;
453                 spin_lock(&ramster_rem_op_list_lock);
454                 list_add(&flnode->rem_op.list, &ramster_rem_op_list);
455                 spin_unlock(&ramster_rem_op_list_lock);
456                 c = atomic_dec_return(&ramster_remote_pers_pages);
457                 WARN_ON_ONCE(c < 0);
458         }
459         return local_pampd;
460 }
461
462 void ramster_count_foreign_pages(bool eph, int count)
463 {
464         int c;
465
466         BUG_ON(count != 1 && count != -1);
467         if (eph) {
468                 if (count > 0) {
469                         c = atomic_inc_return(
470                                         &ramster_foreign_eph_pages_atomic);
471                         if (c > ramster_foreign_eph_pages_max)
472                                 ramster_foreign_eph_pages_max = c;
473                 } else {
474                         c = atomic_dec_return(&ramster_foreign_eph_pages_atomic);
475                         WARN_ON_ONCE(c < 0);
476                 }
477                 ramster_foreign_eph_pages = c;
478         } else {
479                 if (count > 0) {
480                         c = atomic_inc_return(
481                                         &ramster_foreign_pers_pages_atomic);
482                         if (c > ramster_foreign_pers_pages_max)
483                                 ramster_foreign_pers_pages_max = c;
484                 } else {
485                         c = atomic_dec_return(
486                                         &ramster_foreign_pers_pages_atomic);
487                         WARN_ON_ONCE(c < 0);
488                 }
489                 ramster_foreign_pers_pages = c;
490         }
491 }
492
493 /*
494  * For now, just push over a few pages every few seconds to
495  * ensure that it basically works
496  */
497 static struct workqueue_struct *ramster_remotify_workqueue;
498 static void ramster_remotify_process(struct work_struct *work);
499 static DECLARE_DELAYED_WORK(ramster_remotify_worker,
500                 ramster_remotify_process);
501
502 static void ramster_remotify_queue_delayed_work(unsigned long delay)
503 {
504         if (!queue_delayed_work(ramster_remotify_workqueue,
505                                 &ramster_remotify_worker, delay))
506                 pr_err("ramster_remotify: bad workqueue\n");
507 }
508
509 static void ramster_remote_flush_page(struct flushlist_node *flnode)
510 {
511         struct tmem_xhandle *xh;
512         int remotenode, ret;
513
514         preempt_disable();
515         xh = &flnode->xh;
516         remotenode = flnode->xh.client_id;
517         ret = r2net_remote_flush(xh, remotenode);
518         if (ret >= 0)
519                 ramster_remote_pages_flushed++;
520         else
521                 ramster_remote_page_flushes_failed++;
522         preempt_enable_no_resched();
523         ramster_flnode_free(flnode, NULL);
524 }
525
526 static void ramster_remote_flush_object(struct flushlist_node *flnode)
527 {
528         struct tmem_xhandle *xh;
529         int remotenode, ret;
530
531         preempt_disable();
532         xh = &flnode->xh;
533         remotenode = flnode->xh.client_id;
534         ret = r2net_remote_flush_object(xh, remotenode);
535         if (ret >= 0)
536                 ramster_remote_objects_flushed++;
537         else
538                 ramster_remote_object_flushes_failed++;
539         preempt_enable_no_resched();
540         ramster_flnode_free(flnode, NULL);
541 }
542
543 int ramster_remotify_pageframe(bool eph)
544 {
545         struct tmem_xhandle xh;
546         unsigned int size;
547         int remotenode, ret, zbuds;
548         struct tmem_pool *pool;
549         unsigned long flags;
550         unsigned char cksum;
551         char *p;
552         int i, j;
553         unsigned char *tmpmem[2];
554         struct tmem_handle th[2];
555         unsigned int zsize[2];
556
557         tmpmem[0] = __get_cpu_var(ramster_remoteputmem1);
558         tmpmem[1] = __get_cpu_var(ramster_remoteputmem2);
559         local_bh_disable();
560         zbuds = zbud_make_zombie_lru(&th[0], &tmpmem[0], &zsize[0], eph);
561         /* now OK to release lock set in caller */
562         local_bh_enable();
563         if (zbuds == 0)
564                 goto out;
565         BUG_ON(zbuds > 2);
566         for (i = 0; i < zbuds; i++) {
567                 xh.client_id = th[i].client_id;
568                 xh.pool_id = th[i].pool_id;
569                 xh.oid = th[i].oid;
570                 xh.index = th[i].index;
571                 size = zsize[i];
572                 BUG_ON(size == 0 || size > zbud_max_buddy_size());
573                 for (p = tmpmem[i], cksum = 0, j = 0; j < size; j++)
574                         cksum += *p++;
575                 ret = r2net_remote_put(&xh, tmpmem[i], size, eph, &remotenode);
576                 if (ret != 0) {
577                 /*
578                  * This is some form of a memory leak... if the remote put
579                  * fails, there will never be another attempt to remotify
580                  * this page.  But since we've dropped the zv pointer,
581                  * the page may have been freed or the data replaced
582                  * so we can't just "put it back" in the remote op list.
583                  * Even if we could, not sure where to put it in the list
584                  * because there may be flushes that must be strictly
585                  * ordered vs the put.  So leave this as a FIXME for now.
586                  * But count them so we know if it becomes a problem.
587                  */
588                         if (eph)
589                                 ramster_eph_pages_remote_failed++;
590                         else
591                                 ramster_pers_pages_remote_failed++;
592                         break;
593                 } else {
594                         if (!eph)
595                                 atomic_inc(&ramster_remote_pers_pages);
596                 }
597                 if (eph)
598                         ramster_eph_pages_remoted++;
599                 else
600                         ramster_pers_pages_remoted++;
601                 /*
602                  * data was successfully remoted so change the local version to
603                  * point to the remote node where it landed
604                  */
605                 local_bh_disable();
606                 pool = zcache_get_pool_by_id(LOCAL_CLIENT, xh.pool_id);
607                 local_irq_save(flags);
608                 (void)tmem_replace(pool, &xh.oid, xh.index,
609                                 pampd_make_remote(remotenode, size, cksum));
610                 local_irq_restore(flags);
611                 zcache_put_pool(pool);
612                 local_bh_enable();
613         }
614 out:
615         return zbuds;
616 }
617
618 static void zcache_do_remotify_flushes(void)
619 {
620         struct ramster_remotify_hdr *rem_op;
621         union remotify_list_node *u;
622
623         while (1) {
624                 spin_lock(&ramster_rem_op_list_lock);
625                 if (list_empty(&ramster_rem_op_list)) {
626                         spin_unlock(&ramster_rem_op_list_lock);
627                         goto out;
628                 }
629                 rem_op = list_first_entry(&ramster_rem_op_list,
630                                 struct ramster_remotify_hdr, list);
631                 list_del_init(&rem_op->list);
632                 spin_unlock(&ramster_rem_op_list_lock);
633                 u = (union remotify_list_node *)rem_op;
634                 switch (rem_op->op) {
635                 case RAMSTER_REMOTIFY_FLUSH_PAGE:
636                         ramster_remote_flush_page((struct flushlist_node *)u);
637                         break;
638                 case RAMSTER_REMOTIFY_FLUSH_OBJ:
639                         ramster_remote_flush_object((struct flushlist_node *)u);
640                         break;
641                 default:
642                         BUG();
643                 }
644         }
645 out:
646         return;
647 }
648
649 static void ramster_remotify_process(struct work_struct *work)
650 {
651         static bool remotify_in_progress;
652         int i;
653
654         BUG_ON(irqs_disabled());
655         if (remotify_in_progress)
656                 goto requeue;
657         if (ramster_remote_target_nodenum == -1)
658                 goto requeue;
659         remotify_in_progress = true;
660         if (use_cleancache && ramster_eph_remotify_enable) {
661                 for (i = 0; i < 100; i++) {
662                         zcache_do_remotify_flushes();
663                         (void)ramster_remotify_pageframe(true);
664                 }
665         }
666         if (use_frontswap && ramster_pers_remotify_enable) {
667                 for (i = 0; i < 100; i++) {
668                         zcache_do_remotify_flushes();
669                         (void)ramster_remotify_pageframe(false);
670                 }
671         }
672         remotify_in_progress = false;
673 requeue:
674         ramster_remotify_queue_delayed_work(HZ);
675 }
676
677 void __init ramster_remotify_init(void)
678 {
679         unsigned long n = 60UL;
680         ramster_remotify_workqueue =
681                 create_singlethread_workqueue("ramster_remotify");
682         ramster_remotify_queue_delayed_work(n * HZ);
683 }
684
685 static ssize_t ramster_manual_node_up_show(struct kobject *kobj,
686                                 struct kobj_attribute *attr, char *buf)
687 {
688         int i;
689         char *p = buf;
690         for (i = 0; i < MANUAL_NODES; i++)
691                 if (ramster_nodes_manual_up[i])
692                         p += sprintf(p, "%d ", i);
693         p += sprintf(p, "\n");
694         return p - buf;
695 }
696
697 static ssize_t ramster_manual_node_up_store(struct kobject *kobj,
698                 struct kobj_attribute *attr, const char *buf, size_t count)
699 {
700         int err;
701         unsigned long node_num;
702
703         err = kstrtoul(buf, 10, &node_num);
704         if (err) {
705                 pr_err("ramster: bad strtoul?\n");
706                 return -EINVAL;
707         }
708         if (node_num >= MANUAL_NODES) {
709                 pr_err("ramster: bad node_num=%lu?\n", node_num);
710                 return -EINVAL;
711         }
712         if (ramster_nodes_manual_up[node_num]) {
713                 pr_err("ramster: node %d already up, ignoring\n",
714                                                         (int)node_num);
715         } else {
716                 ramster_nodes_manual_up[node_num] = true;
717                 r2net_hb_node_up_manual((int)node_num);
718         }
719         return count;
720 }
721
722 static struct kobj_attribute ramster_manual_node_up_attr = {
723         .attr = { .name = "manual_node_up", .mode = 0644 },
724         .show = ramster_manual_node_up_show,
725         .store = ramster_manual_node_up_store,
726 };
727
728 static ssize_t ramster_remote_target_nodenum_show(struct kobject *kobj,
729                                 struct kobj_attribute *attr, char *buf)
730 {
731         if (ramster_remote_target_nodenum == -1UL)
732                 return sprintf(buf, "unset\n");
733         else
734                 return sprintf(buf, "%d\n", ramster_remote_target_nodenum);
735 }
736
737 static ssize_t ramster_remote_target_nodenum_store(struct kobject *kobj,
738                 struct kobj_attribute *attr, const char *buf, size_t count)
739 {
740         int err;
741         unsigned long node_num;
742
743         err = kstrtoul(buf, 10, &node_num);
744         if (err) {
745                 pr_err("ramster: bad strtoul?\n");
746                 return -EINVAL;
747         } else if (node_num == -1UL) {
748                 pr_err("ramster: disabling all remotification, "
749                         "data may still reside on remote nodes however\n");
750                 return -EINVAL;
751         } else if (node_num >= MANUAL_NODES) {
752                 pr_err("ramster: bad node_num=%lu?\n", node_num);
753                 return -EINVAL;
754         } else if (!ramster_nodes_manual_up[node_num]) {
755                 pr_err("ramster: node %d not up, ignoring setting "
756                         "of remotification target\n", (int)node_num);
757         } else if (r2net_remote_target_node_set((int)node_num) >= 0) {
758                 pr_info("ramster: node %d set as remotification target\n",
759                                 (int)node_num);
760                 ramster_remote_target_nodenum = (int)node_num;
761         } else {
762                 pr_err("ramster: bad num to node node_num=%d?\n",
763                                 (int)node_num);
764                 return -EINVAL;
765         }
766         return count;
767 }
768
769 static struct kobj_attribute ramster_remote_target_nodenum_attr = {
770         .attr = { .name = "remote_target_nodenum", .mode = 0644 },
771         .show = ramster_remote_target_nodenum_show,
772         .store = ramster_remote_target_nodenum_store,
773 };
774
775 #define RAMSTER_SYSFS_RO(_name) \
776         static ssize_t ramster_##_name##_show(struct kobject *kobj, \
777                                 struct kobj_attribute *attr, char *buf) \
778         { \
779                 return sprintf(buf, "%lu\n", ramster_##_name); \
780         } \
781         static struct kobj_attribute ramster_##_name##_attr = { \
782                 .attr = { .name = __stringify(_name), .mode = 0444 }, \
783                 .show = ramster_##_name##_show, \
784         }
785
786 #define RAMSTER_SYSFS_RW(_name) \
787         static ssize_t ramster_##_name##_show(struct kobject *kobj, \
788                                 struct kobj_attribute *attr, char *buf) \
789         { \
790                 return sprintf(buf, "%lu\n", ramster_##_name); \
791         } \
792         static ssize_t ramster_##_name##_store(struct kobject *kobj, \
793                 struct kobj_attribute *attr, const char *buf, size_t count) \
794         { \
795                 int err; \
796                 unsigned long enable; \
797                 err = kstrtoul(buf, 10, &enable); \
798                 if (err) \
799                         return -EINVAL; \
800                 ramster_##_name = enable; \
801                 return count; \
802         } \
803         static struct kobj_attribute ramster_##_name##_attr = { \
804                 .attr = { .name = __stringify(_name), .mode = 0644 }, \
805                 .show = ramster_##_name##_show, \
806                 .store = ramster_##_name##_store, \
807         }
808
809 #define RAMSTER_SYSFS_RO_ATOMIC(_name) \
810         static ssize_t ramster_##_name##_show(struct kobject *kobj, \
811                                 struct kobj_attribute *attr, char *buf) \
812         { \
813             return sprintf(buf, "%d\n", atomic_read(&ramster_##_name)); \
814         } \
815         static struct kobj_attribute ramster_##_name##_attr = { \
816                 .attr = { .name = __stringify(_name), .mode = 0444 }, \
817                 .show = ramster_##_name##_show, \
818         }
819
820 RAMSTER_SYSFS_RO(interface_revision);
821 RAMSTER_SYSFS_RO_ATOMIC(remote_pers_pages);
822 RAMSTER_SYSFS_RW(pers_remotify_enable);
823 RAMSTER_SYSFS_RW(eph_remotify_enable);
824
825 static struct attribute *ramster_attrs[] = {
826         &ramster_interface_revision_attr.attr,
827         &ramster_remote_pers_pages_attr.attr,
828         &ramster_manual_node_up_attr.attr,
829         &ramster_remote_target_nodenum_attr.attr,
830         &ramster_pers_remotify_enable_attr.attr,
831         &ramster_eph_remotify_enable_attr.attr,
832         NULL,
833 };
834
835 static struct attribute_group ramster_attr_group = {
836         .attrs = ramster_attrs,
837         .name = "ramster",
838 };
839
840 /*
841  * frontswap selfshrinking
842  */
843
844 /* In HZ, controls frequency of worker invocation. */
845 static unsigned int selfshrink_interval __read_mostly = 5;
846 /* Enable/disable with sysfs. */
847 static bool frontswap_selfshrinking __read_mostly;
848
849 static void selfshrink_process(struct work_struct *work);
850 static DECLARE_DELAYED_WORK(selfshrink_worker, selfshrink_process);
851
852 /* Enable/disable with kernel boot option. */
853 static bool use_frontswap_selfshrink __initdata = true;
854
855 /*
856  * The default values for the following parameters were deemed reasonable
857  * by experimentation, may be workload-dependent, and can all be
858  * adjusted via sysfs.
859  */
860
861 /* Control rate for frontswap shrinking. Higher hysteresis is slower. */
862 static unsigned int frontswap_hysteresis __read_mostly = 20;
863
864 /*
865  * Number of selfshrink worker invocations to wait before observing that
866  * frontswap selfshrinking should commence. Note that selfshrinking does
867  * not use a separate worker thread.
868  */
869 static unsigned int frontswap_inertia __read_mostly = 3;
870
871 /* Countdown to next invocation of frontswap_shrink() */
872 static unsigned long frontswap_inertia_counter;
873
874 /*
875  * Invoked by the selfshrink worker thread, uses current number of pages
876  * in frontswap (frontswap_curr_pages()), previous status, and control
877  * values (hysteresis and inertia) to determine if frontswap should be
878  * shrunk and what the new frontswap size should be.  Note that
879  * frontswap_shrink is essentially a partial swapoff that immediately
880  * transfers pages from the "swap device" (frontswap) back into kernel
881  * RAM; despite the name, frontswap "shrinking" is very different from
882  * the "shrinker" interface used by the kernel MM subsystem to reclaim
883  * memory.
884  */
885 static void frontswap_selfshrink(void)
886 {
887         static unsigned long cur_frontswap_pages;
888         static unsigned long last_frontswap_pages;
889         static unsigned long tgt_frontswap_pages;
890
891         last_frontswap_pages = cur_frontswap_pages;
892         cur_frontswap_pages = frontswap_curr_pages();
893         if (!cur_frontswap_pages ||
894                         (cur_frontswap_pages > last_frontswap_pages)) {
895                 frontswap_inertia_counter = frontswap_inertia;
896                 return;
897         }
898         if (frontswap_inertia_counter && --frontswap_inertia_counter)
899                 return;
900         if (cur_frontswap_pages <= frontswap_hysteresis)
901                 tgt_frontswap_pages = 0;
902         else
903                 tgt_frontswap_pages = cur_frontswap_pages -
904                         (cur_frontswap_pages / frontswap_hysteresis);
905         frontswap_shrink(tgt_frontswap_pages);
906 }
907
908 static int __init ramster_nofrontswap_selfshrink_setup(char *s)
909 {
910         use_frontswap_selfshrink = false;
911         return 1;
912 }
913
914 __setup("noselfshrink", ramster_nofrontswap_selfshrink_setup);
915
916 static void selfshrink_process(struct work_struct *work)
917 {
918         if (frontswap_selfshrinking && frontswap_enabled) {
919                 frontswap_selfshrink();
920                 schedule_delayed_work(&selfshrink_worker,
921                         selfshrink_interval * HZ);
922         }
923 }
924
925 void ramster_cpu_up(int cpu)
926 {
927         unsigned char *p1 = kzalloc(PAGE_SIZE, GFP_KERNEL | __GFP_REPEAT);
928         unsigned char *p2 = kzalloc(PAGE_SIZE, GFP_KERNEL | __GFP_REPEAT);
929         BUG_ON(!p1 || !p2);
930         per_cpu(ramster_remoteputmem1, cpu) = p1;
931         per_cpu(ramster_remoteputmem2, cpu) = p2;
932 }
933
934 void ramster_cpu_down(int cpu)
935 {
936         struct ramster_preload *kp;
937
938         kfree(per_cpu(ramster_remoteputmem1, cpu));
939         per_cpu(ramster_remoteputmem1, cpu) = NULL;
940         kfree(per_cpu(ramster_remoteputmem2, cpu));
941         per_cpu(ramster_remoteputmem2, cpu) = NULL;
942         kp = &per_cpu(ramster_preloads, cpu);
943         if (kp->flnode) {
944                 kmem_cache_free(ramster_flnode_cache, kp->flnode);
945                 kp->flnode = NULL;
946         }
947 }
948
949 void ramster_register_pamops(struct tmem_pamops *pamops)
950 {
951         pamops->free_obj = ramster_pampd_free_obj;
952         pamops->new_obj = ramster_pampd_new_obj;
953         pamops->replace_in_obj = ramster_pampd_replace_in_obj;
954         pamops->is_remote = ramster_pampd_is_remote;
955         pamops->repatriate = ramster_pampd_repatriate;
956         pamops->repatriate_preload = ramster_pampd_repatriate_preload;
957 }
958
959 void __init ramster_init(bool cleancache, bool frontswap,
960                                 bool frontswap_exclusive_gets)
961 {
962         int ret = 0;
963
964         if (cleancache)
965                 use_cleancache = true;
966         if (frontswap)
967                 use_frontswap = true;
968         if (frontswap_exclusive_gets)
969                 use_frontswap_exclusive_gets = true;
970         ramster_debugfs_init();
971         ret = sysfs_create_group(mm_kobj, &ramster_attr_group);
972         if (ret)
973                 pr_err("ramster: can't create sysfs for ramster\n");
974         (void)r2net_register_handlers();
975         INIT_LIST_HEAD(&ramster_rem_op_list);
976         ramster_flnode_cache = kmem_cache_create("ramster_flnode",
977                                 sizeof(struct flushlist_node), 0, 0, NULL);
978         frontswap_selfshrinking = use_frontswap_selfshrink;
979         if (frontswap_selfshrinking) {
980                 pr_info("ramster: Initializing frontswap selfshrink driver.\n");
981                 schedule_delayed_work(&selfshrink_worker,
982                                         selfshrink_interval * HZ);
983         }
984         ramster_remotify_init();
985 }