ebcebe84773cc03ad771b7dad19fb8b4ca2f609d
[linux-2.6-microblaze.git] / drivers / staging / lustre / lustre / osc / osc_cache.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  *
28  */
29 /*
30  * This file is part of Lustre, http://www.lustre.org/
31  * Lustre is a trademark of Sun Microsystems, Inc.
32  *
33  * osc cache management.
34  *
35  * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_OSC
39
40 #include "osc_cl_internal.h"
41 #include "osc_internal.h"
42
43 static int extent_debug; /* set it to be true for more debug */
44
45 static void osc_update_pending(struct osc_object *obj, int cmd, int delta);
46 static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
47                            int state);
48 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
49                               struct osc_async_page *oap, int sent, int rc);
50 static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
51                           int cmd);
52 static int osc_refresh_count(const struct lu_env *env,
53                              struct osc_async_page *oap, int cmd);
54 static int osc_io_unplug_async(const struct lu_env *env,
55                                struct client_obd *cli, struct osc_object *osc);
56 static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
57                            unsigned int lost_grant);
58
59 static void osc_extent_tree_dump0(int level, struct osc_object *obj,
60                                   const char *func, int line);
61 #define osc_extent_tree_dump(lvl, obj) \
62         osc_extent_tree_dump0(lvl, obj, __func__, __LINE__)
63
64 /** \addtogroup osc
65  *  @{
66  */
67
68 /* ------------------ osc extent ------------------ */
69 static inline char *ext_flags(struct osc_extent *ext, char *flags)
70 {
71         char *buf = flags;
72         *buf++ = ext->oe_rw ? 'r' : 'w';
73         if (ext->oe_intree)
74                 *buf++ = 'i';
75         if (ext->oe_sync)
76                 *buf++ = 'S';
77         if (ext->oe_srvlock)
78                 *buf++ = 's';
79         if (ext->oe_hp)
80                 *buf++ = 'h';
81         if (ext->oe_urgent)
82                 *buf++ = 'u';
83         if (ext->oe_memalloc)
84                 *buf++ = 'm';
85         if (ext->oe_trunc_pending)
86                 *buf++ = 't';
87         if (ext->oe_fsync_wait)
88                 *buf++ = 'Y';
89         *buf = 0;
90         return flags;
91 }
92
93 static inline char list_empty_marker(struct list_head *list)
94 {
95         return list_empty(list) ? '-' : '+';
96 }
97
98 #define EXTSTR       "[%lu -> %lu/%lu]"
99 #define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
100 static const char *oes_strings[] = {
101         "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL };
102
103 #define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do {                           \
104         struct osc_extent *__ext = (extent);                                  \
105         char __buf[16];                                                       \
106                                                                               \
107         CDEBUG(lvl,                                                           \
108                 "extent %p@{" EXTSTR ", "                                     \
109                 "[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt,          \
110                 /* ----- extent part 0 ----- */                               \
111                 __ext, EXTPARA(__ext),                                        \
112                 /* ----- part 1 ----- */                                      \
113                 atomic_read(&__ext->oe_refc),                                 \
114                 atomic_read(&__ext->oe_users),                                \
115                 list_empty_marker(&__ext->oe_link),                           \
116                 oes_strings[__ext->oe_state], ext_flags(__ext, __buf),        \
117                 __ext->oe_obj,                                                \
118                 /* ----- part 2 ----- */                                      \
119                 __ext->oe_grants, __ext->oe_nr_pages,                         \
120                 list_empty_marker(&__ext->oe_pages),                          \
121                 waitqueue_active(&__ext->oe_waitq) ? '+' : '-',               \
122                 __ext->oe_dlmlock, __ext->oe_mppr, __ext->oe_owner,           \
123                 /* ----- part 4 ----- */                                      \
124                 ## __VA_ARGS__);                                              \
125         if (lvl == D_ERROR && __ext->oe_dlmlock)                              \
126                 LDLM_ERROR(__ext->oe_dlmlock, "extent: %p", __ext);           \
127         else                                                                  \
128                 LDLM_DEBUG(__ext->oe_dlmlock, "extent: %p", __ext);           \
129 } while (0)
130
131 #undef EASSERTF
132 #define EASSERTF(expr, ext, fmt, args...) do {                          \
133         if (!(expr)) {                                                  \
134                 OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args);           \
135                 osc_extent_tree_dump(D_ERROR, (ext)->oe_obj);           \
136                 LASSERT(expr);                                          \
137         }                                                               \
138 } while (0)
139
140 #undef EASSERT
141 #define EASSERT(expr, ext) EASSERTF(expr, ext, "\n")
142
143 static inline struct osc_extent *rb_extent(struct rb_node *n)
144 {
145         if (!n)
146                 return NULL;
147
148         return container_of(n, struct osc_extent, oe_node);
149 }
150
151 static inline struct osc_extent *next_extent(struct osc_extent *ext)
152 {
153         if (!ext)
154                 return NULL;
155
156         LASSERT(ext->oe_intree);
157         return rb_extent(rb_next(&ext->oe_node));
158 }
159
160 static inline struct osc_extent *prev_extent(struct osc_extent *ext)
161 {
162         if (!ext)
163                 return NULL;
164
165         LASSERT(ext->oe_intree);
166         return rb_extent(rb_prev(&ext->oe_node));
167 }
168
169 static inline struct osc_extent *first_extent(struct osc_object *obj)
170 {
171         return rb_extent(rb_first(&obj->oo_root));
172 }
173
174 /* object must be locked by caller. */
175 static int osc_extent_sanity_check0(struct osc_extent *ext,
176                                     const char *func, const int line)
177 {
178         struct osc_object *obj = ext->oe_obj;
179         struct osc_async_page *oap;
180         int page_count;
181         int rc = 0;
182
183         if (!osc_object_is_locked(obj)) {
184                 rc = 9;
185                 goto out;
186         }
187
188         if (ext->oe_state >= OES_STATE_MAX) {
189                 rc = 10;
190                 goto out;
191         }
192
193         if (atomic_read(&ext->oe_refc) <= 0) {
194                 rc = 20;
195                 goto out;
196         }
197
198         if (atomic_read(&ext->oe_refc) < atomic_read(&ext->oe_users)) {
199                 rc = 30;
200                 goto out;
201         }
202
203         switch (ext->oe_state) {
204         case OES_INV:
205                 if (ext->oe_nr_pages > 0 || !list_empty(&ext->oe_pages))
206                         rc = 35;
207                 else
208                         rc = 0;
209                 goto out;
210         case OES_ACTIVE:
211                 if (atomic_read(&ext->oe_users) == 0) {
212                         rc = 40;
213                         goto out;
214                 }
215                 if (ext->oe_hp) {
216                         rc = 50;
217                         goto out;
218                 }
219                 if (ext->oe_fsync_wait && !ext->oe_urgent) {
220                         rc = 55;
221                         goto out;
222                 }
223                 break;
224         case OES_CACHE:
225                 if (ext->oe_grants == 0) {
226                         rc = 60;
227                         goto out;
228                 }
229                 if (ext->oe_fsync_wait && !ext->oe_urgent && !ext->oe_hp) {
230                         rc = 65;
231                         goto out;
232                 }
233         default:
234                 if (atomic_read(&ext->oe_users) > 0) {
235                         rc = 70;
236                         goto out;
237                 }
238         }
239
240         if (ext->oe_max_end < ext->oe_end || ext->oe_end < ext->oe_start) {
241                 rc = 80;
242                 goto out;
243         }
244
245         if (ext->oe_sync && ext->oe_grants > 0) {
246                 rc = 90;
247                 goto out;
248         }
249
250         if (ext->oe_dlmlock) {
251                 struct ldlm_extent *extent;
252
253                 extent = &ext->oe_dlmlock->l_policy_data.l_extent;
254                 if (!(extent->start <= cl_offset(osc2cl(obj), ext->oe_start) &&
255                       extent->end >= cl_offset(osc2cl(obj), ext->oe_max_end))) {
256                         rc = 100;
257                         goto out;
258                 }
259
260                 if (!(ext->oe_dlmlock->l_granted_mode & (LCK_PW | LCK_GROUP))) {
261                         rc = 102;
262                         goto out;
263                 }
264         }
265
266         if (ext->oe_nr_pages > ext->oe_mppr) {
267                 rc = 105;
268                 goto out;
269         }
270
271         /* Do not verify page list if extent is in RPC. This is because an
272          * in-RPC extent is supposed to be exclusively accessible w/o lock.
273          */
274         if (ext->oe_state > OES_CACHE) {
275                 rc = 0;
276                 goto out;
277         }
278
279         if (!extent_debug) {
280                 rc = 0;
281                 goto out;
282         }
283
284         page_count = 0;
285         list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
286                 pgoff_t index = osc_index(oap2osc(oap));
287                 ++page_count;
288                 if (index > ext->oe_end || index < ext->oe_start) {
289                         rc = 110;
290                         goto out;
291                 }
292         }
293         if (page_count != ext->oe_nr_pages) {
294                 rc = 120;
295                 goto out;
296         }
297
298 out:
299         if (rc != 0)
300                 OSC_EXTENT_DUMP(D_ERROR, ext,
301                                 "%s:%d sanity check %p failed with rc = %d\n",
302                                 func, line, ext, rc);
303         return rc;
304 }
305
306 #define sanity_check_nolock(ext) \
307         osc_extent_sanity_check0(ext, __func__, __LINE__)
308
309 #define sanity_check(ext) ({                                            \
310         int __res;                                                      \
311         osc_object_lock((ext)->oe_obj);                                 \
312         __res = sanity_check_nolock(ext);                               \
313         osc_object_unlock((ext)->oe_obj);                               \
314         __res;                                                          \
315 })
316
317 /**
318  * sanity check - to make sure there is no overlapped extent in the tree.
319  */
320 static int osc_extent_is_overlapped(struct osc_object *obj,
321                                     struct osc_extent *ext)
322 {
323         struct osc_extent *tmp;
324
325         LASSERT(osc_object_is_locked(obj));
326
327         if (!extent_debug)
328                 return 0;
329
330         for (tmp = first_extent(obj); tmp; tmp = next_extent(tmp)) {
331                 if (tmp == ext)
332                         continue;
333                 if (tmp->oe_end >= ext->oe_start &&
334                     tmp->oe_start <= ext->oe_end)
335                         return 1;
336         }
337         return 0;
338 }
339
340 static void osc_extent_state_set(struct osc_extent *ext, int state)
341 {
342         LASSERT(osc_object_is_locked(ext->oe_obj));
343         LASSERT(state >= OES_INV && state < OES_STATE_MAX);
344
345         /* Never try to sanity check a state changing extent :-) */
346         /* LASSERT(sanity_check_nolock(ext) == 0); */
347
348         /* TODO: validate the state machine */
349         ext->oe_state = state;
350         wake_up_all(&ext->oe_waitq);
351 }
352
353 static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
354 {
355         struct osc_extent *ext;
356
357         ext = kmem_cache_zalloc(osc_extent_kmem, GFP_NOFS);
358         if (!ext)
359                 return NULL;
360
361         RB_CLEAR_NODE(&ext->oe_node);
362         ext->oe_obj = obj;
363         atomic_set(&ext->oe_refc, 1);
364         atomic_set(&ext->oe_users, 0);
365         INIT_LIST_HEAD(&ext->oe_link);
366         ext->oe_state = OES_INV;
367         INIT_LIST_HEAD(&ext->oe_pages);
368         init_waitqueue_head(&ext->oe_waitq);
369         ext->oe_dlmlock = NULL;
370
371         return ext;
372 }
373
374 static void osc_extent_free(struct osc_extent *ext)
375 {
376         kmem_cache_free(osc_extent_kmem, ext);
377 }
378
379 static struct osc_extent *osc_extent_get(struct osc_extent *ext)
380 {
381         LASSERT(atomic_read(&ext->oe_refc) >= 0);
382         atomic_inc(&ext->oe_refc);
383         return ext;
384 }
385
386 static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
387 {
388         LASSERT(atomic_read(&ext->oe_refc) > 0);
389         if (atomic_dec_and_test(&ext->oe_refc)) {
390                 LASSERT(list_empty(&ext->oe_link));
391                 LASSERT(atomic_read(&ext->oe_users) == 0);
392                 LASSERT(ext->oe_state == OES_INV);
393                 LASSERT(!ext->oe_intree);
394
395                 if (ext->oe_dlmlock) {
396                         lu_ref_add(&ext->oe_dlmlock->l_reference,
397                                    "osc_extent", ext);
398                         LDLM_LOCK_PUT(ext->oe_dlmlock);
399                         ext->oe_dlmlock = NULL;
400                 }
401                 osc_extent_free(ext);
402         }
403 }
404
405 /**
406  * osc_extent_put_trust() is a special version of osc_extent_put() when
407  * it's known that the caller is not the last user. This is to address the
408  * problem of lacking of lu_env ;-).
409  */
410 static void osc_extent_put_trust(struct osc_extent *ext)
411 {
412         LASSERT(atomic_read(&ext->oe_refc) > 1);
413         LASSERT(osc_object_is_locked(ext->oe_obj));
414         atomic_dec(&ext->oe_refc);
415 }
416
417 /**
418  * Return the extent which includes pgoff @index, or return the greatest
419  * previous extent in the tree.
420  */
421 static struct osc_extent *osc_extent_search(struct osc_object *obj,
422                                             pgoff_t index)
423 {
424         struct rb_node *n = obj->oo_root.rb_node;
425         struct osc_extent *tmp, *p = NULL;
426
427         LASSERT(osc_object_is_locked(obj));
428         while (n) {
429                 tmp = rb_extent(n);
430                 if (index < tmp->oe_start) {
431                         n = n->rb_left;
432                 } else if (index > tmp->oe_end) {
433                         p = rb_extent(n);
434                         n = n->rb_right;
435                 } else {
436                         return tmp;
437                 }
438         }
439         return p;
440 }
441
442 /*
443  * Return the extent covering @index, otherwise return NULL.
444  * caller must have held object lock.
445  */
446 static struct osc_extent *osc_extent_lookup(struct osc_object *obj,
447                                             pgoff_t index)
448 {
449         struct osc_extent *ext;
450
451         ext = osc_extent_search(obj, index);
452         if (ext && ext->oe_start <= index && index <= ext->oe_end)
453                 return osc_extent_get(ext);
454         return NULL;
455 }
456
457 /* caller must have held object lock. */
458 static void osc_extent_insert(struct osc_object *obj, struct osc_extent *ext)
459 {
460         struct rb_node **n = &obj->oo_root.rb_node;
461         struct rb_node *parent = NULL;
462         struct osc_extent *tmp;
463
464         LASSERT(ext->oe_intree == 0);
465         LASSERT(ext->oe_obj == obj);
466         LASSERT(osc_object_is_locked(obj));
467         while (*n) {
468                 tmp = rb_extent(*n);
469                 parent = *n;
470
471                 if (ext->oe_end < tmp->oe_start)
472                         n = &(*n)->rb_left;
473                 else if (ext->oe_start > tmp->oe_end)
474                         n = &(*n)->rb_right;
475                 else
476                         EASSERTF(0, tmp, EXTSTR"\n", EXTPARA(ext));
477         }
478         rb_link_node(&ext->oe_node, parent, n);
479         rb_insert_color(&ext->oe_node, &obj->oo_root);
480         osc_extent_get(ext);
481         ext->oe_intree = 1;
482 }
483
484 /* caller must have held object lock. */
485 static void osc_extent_erase(struct osc_extent *ext)
486 {
487         struct osc_object *obj = ext->oe_obj;
488
489         LASSERT(osc_object_is_locked(obj));
490         if (ext->oe_intree) {
491                 rb_erase(&ext->oe_node, &obj->oo_root);
492                 ext->oe_intree = 0;
493                 /* rbtree held a refcount */
494                 osc_extent_put_trust(ext);
495         }
496 }
497
498 static struct osc_extent *osc_extent_hold(struct osc_extent *ext)
499 {
500         struct osc_object *obj = ext->oe_obj;
501
502         LASSERT(osc_object_is_locked(obj));
503         LASSERT(ext->oe_state == OES_ACTIVE || ext->oe_state == OES_CACHE);
504         if (ext->oe_state == OES_CACHE) {
505                 osc_extent_state_set(ext, OES_ACTIVE);
506                 osc_update_pending(obj, OBD_BRW_WRITE, -ext->oe_nr_pages);
507         }
508         atomic_inc(&ext->oe_users);
509         list_del_init(&ext->oe_link);
510         return osc_extent_get(ext);
511 }
512
513 static void __osc_extent_remove(struct osc_extent *ext)
514 {
515         LASSERT(osc_object_is_locked(ext->oe_obj));
516         LASSERT(list_empty(&ext->oe_pages));
517         osc_extent_erase(ext);
518         list_del_init(&ext->oe_link);
519         osc_extent_state_set(ext, OES_INV);
520         OSC_EXTENT_DUMP(D_CACHE, ext, "destroyed.\n");
521 }
522
523 static void osc_extent_remove(struct osc_extent *ext)
524 {
525         struct osc_object *obj = ext->oe_obj;
526
527         osc_object_lock(obj);
528         __osc_extent_remove(ext);
529         osc_object_unlock(obj);
530 }
531
532 /**
533  * This function is used to merge extents to get better performance. It checks
534  * if @cur and @victim are contiguous at chunk level.
535  */
536 static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
537                             struct osc_extent *victim)
538 {
539         struct osc_object *obj = cur->oe_obj;
540         pgoff_t chunk_start;
541         pgoff_t chunk_end;
542         int ppc_bits;
543
544         LASSERT(cur->oe_state == OES_CACHE);
545         LASSERT(osc_object_is_locked(obj));
546         if (!victim)
547                 return -EINVAL;
548
549         if (victim->oe_state != OES_CACHE || victim->oe_fsync_wait)
550                 return -EBUSY;
551
552         if (cur->oe_max_end != victim->oe_max_end)
553                 return -ERANGE;
554
555         LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
556         ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_SHIFT;
557         chunk_start = cur->oe_start >> ppc_bits;
558         chunk_end = cur->oe_end >> ppc_bits;
559         if (chunk_start != (victim->oe_end >> ppc_bits) + 1 &&
560             chunk_end + 1 != victim->oe_start >> ppc_bits)
561                 return -ERANGE;
562
563         OSC_EXTENT_DUMP(D_CACHE, victim, "will be merged by %p.\n", cur);
564
565         cur->oe_start = min(cur->oe_start, victim->oe_start);
566         cur->oe_end = max(cur->oe_end, victim->oe_end);
567         cur->oe_grants += victim->oe_grants;
568         cur->oe_nr_pages += victim->oe_nr_pages;
569         /* only the following bits are needed to merge */
570         cur->oe_urgent |= victim->oe_urgent;
571         cur->oe_memalloc |= victim->oe_memalloc;
572         list_splice_init(&victim->oe_pages, &cur->oe_pages);
573         list_del_init(&victim->oe_link);
574         victim->oe_nr_pages = 0;
575
576         osc_extent_get(victim);
577         __osc_extent_remove(victim);
578         osc_extent_put(env, victim);
579
580         OSC_EXTENT_DUMP(D_CACHE, cur, "after merging %p.\n", victim);
581         return 0;
582 }
583
584 /**
585  * Drop user count of osc_extent, and unplug IO asynchronously.
586  */
587 void osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
588 {
589         struct osc_object *obj = ext->oe_obj;
590
591         LASSERT(atomic_read(&ext->oe_users) > 0);
592         LASSERT(sanity_check(ext) == 0);
593         LASSERT(ext->oe_grants > 0);
594
595         if (atomic_dec_and_lock(&ext->oe_users, &obj->oo_lock)) {
596                 LASSERT(ext->oe_state == OES_ACTIVE);
597                 if (ext->oe_trunc_pending) {
598                         /* a truncate process is waiting for this extent.
599                          * This may happen due to a race, check
600                          * osc_cache_truncate_start().
601                          */
602                         osc_extent_state_set(ext, OES_TRUNC);
603                         ext->oe_trunc_pending = 0;
604                 } else {
605                         osc_extent_state_set(ext, OES_CACHE);
606                         osc_update_pending(obj, OBD_BRW_WRITE,
607                                            ext->oe_nr_pages);
608
609                         /* try to merge the previous and next extent. */
610                         osc_extent_merge(env, ext, prev_extent(ext));
611                         osc_extent_merge(env, ext, next_extent(ext));
612
613                         if (ext->oe_urgent)
614                                 list_move_tail(&ext->oe_link,
615                                                &obj->oo_urgent_exts);
616                 }
617                 osc_object_unlock(obj);
618
619                 osc_io_unplug_async(env, osc_cli(obj), obj);
620         }
621         osc_extent_put(env, ext);
622 }
623
624 static inline int overlapped(struct osc_extent *ex1, struct osc_extent *ex2)
625 {
626         return !(ex1->oe_end < ex2->oe_start || ex2->oe_end < ex1->oe_start);
627 }
628
629 /**
630  * Find or create an extent which includes @index, core function to manage
631  * extent tree.
632  */
633 static struct osc_extent *osc_extent_find(const struct lu_env *env,
634                                           struct osc_object *obj, pgoff_t index,
635                                           int *grants)
636 {
637         struct client_obd *cli = osc_cli(obj);
638         struct osc_lock   *olck;
639         struct cl_lock_descr *descr;
640         struct osc_extent *cur;
641         struct osc_extent *ext;
642         struct osc_extent *conflict = NULL;
643         struct osc_extent *found = NULL;
644         pgoff_t chunk;
645         pgoff_t max_end;
646         int max_pages; /* max_pages_per_rpc */
647         int chunksize;
648         int ppc_bits; /* pages per chunk bits */
649         int chunk_mask;
650         int rc;
651
652         cur = osc_extent_alloc(obj);
653         if (!cur)
654                 return ERR_PTR(-ENOMEM);
655
656         olck = osc_env_io(env)->oi_write_osclock;
657         LASSERTF(olck, "page %lu is not covered by lock\n", index);
658         LASSERT(olck->ols_state == OLS_GRANTED);
659
660         descr = &olck->ols_cl.cls_lock->cll_descr;
661         LASSERT(descr->cld_mode >= CLM_WRITE);
662
663         LASSERT(cli->cl_chunkbits >= PAGE_SHIFT);
664         ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
665         chunk_mask = ~((1 << ppc_bits) - 1);
666         chunksize = 1 << cli->cl_chunkbits;
667         chunk = index >> ppc_bits;
668
669         /* align end to rpc edge, rpc size may not be a power 2 integer. */
670         max_pages = cli->cl_max_pages_per_rpc;
671         LASSERT((max_pages & ~chunk_mask) == 0);
672         max_end = index - (index % max_pages) + max_pages - 1;
673         max_end = min_t(pgoff_t, max_end, descr->cld_end);
674
675         /* initialize new extent by parameters so far */
676         cur->oe_max_end = max_end;
677         cur->oe_start = index & chunk_mask;
678         cur->oe_end = ((index + ~chunk_mask + 1) & chunk_mask) - 1;
679         if (cur->oe_start < descr->cld_start)
680                 cur->oe_start = descr->cld_start;
681         if (cur->oe_end > max_end)
682                 cur->oe_end = max_end;
683         cur->oe_grants = 0;
684         cur->oe_mppr = max_pages;
685         if (olck->ols_dlmlock) {
686                 LASSERT(olck->ols_hold);
687                 cur->oe_dlmlock = LDLM_LOCK_GET(olck->ols_dlmlock);
688                 lu_ref_add(&olck->ols_dlmlock->l_reference, "osc_extent", cur);
689         }
690
691         /* grants has been allocated by caller */
692         LASSERTF(*grants >= chunksize + cli->cl_extent_tax,
693                  "%u/%u/%u.\n", *grants, chunksize, cli->cl_extent_tax);
694         LASSERTF((max_end - cur->oe_start) < max_pages, EXTSTR"\n",
695                  EXTPARA(cur));
696
697 restart:
698         osc_object_lock(obj);
699         ext = osc_extent_search(obj, cur->oe_start);
700         if (!ext)
701                 ext = first_extent(obj);
702         while (ext) {
703                 loff_t ext_chk_start = ext->oe_start >> ppc_bits;
704                 loff_t ext_chk_end = ext->oe_end >> ppc_bits;
705
706                 LASSERT(sanity_check_nolock(ext) == 0);
707                 if (chunk > ext_chk_end + 1)
708                         break;
709
710                 /* if covering by different locks, no chance to match */
711                 if (olck->ols_dlmlock != ext->oe_dlmlock) {
712                         EASSERTF(!overlapped(ext, cur), ext,
713                                  EXTSTR"\n", EXTPARA(cur));
714
715                         ext = next_extent(ext);
716                         continue;
717                 }
718
719                 /* discontiguous chunks? */
720                 if (chunk + 1 < ext_chk_start) {
721                         ext = next_extent(ext);
722                         continue;
723                 }
724
725                 /* ok, from now on, ext and cur have these attrs:
726                  * 1. covered by the same lock
727                  * 2. contiguous at chunk level or overlapping.
728                  */
729
730                 if (overlapped(ext, cur)) {
731                         /* cur is the minimum unit, so overlapping means
732                          * full contain.
733                          */
734                         EASSERTF((ext->oe_start <= cur->oe_start &&
735                                   ext->oe_end >= cur->oe_end),
736                                  ext, EXTSTR"\n", EXTPARA(cur));
737
738                         if (ext->oe_state > OES_CACHE || ext->oe_fsync_wait) {
739                                 /* for simplicity, we wait for this extent to
740                                  * finish before going forward.
741                                  */
742                                 conflict = osc_extent_get(ext);
743                                 break;
744                         }
745
746                         found = osc_extent_hold(ext);
747                         break;
748                 }
749
750                 /* non-overlapped extent */
751                 if (ext->oe_state != OES_CACHE || ext->oe_fsync_wait) {
752                         /* we can't do anything for a non OES_CACHE extent, or
753                          * if there is someone waiting for this extent to be
754                          * flushed, try next one.
755                          */
756                         ext = next_extent(ext);
757                         continue;
758                 }
759
760                 /* check if they belong to the same rpc slot before trying to
761                  * merge. the extents are not overlapped and contiguous at
762                  * chunk level to get here.
763                  */
764                 if (ext->oe_max_end != max_end) {
765                         /* if they don't belong to the same RPC slot or
766                          * max_pages_per_rpc has ever changed, do not merge.
767                          */
768                         ext = next_extent(ext);
769                         continue;
770                 }
771
772                 /* it's required that an extent must be contiguous at chunk
773                  * level so that we know the whole extent is covered by grant
774                  * (the pages in the extent are NOT required to be contiguous).
775                  * Otherwise, it will be too much difficult to know which
776                  * chunks have grants allocated.
777                  */
778
779                 /* try to do front merge - extend ext's start */
780                 if (chunk + 1 == ext_chk_start) {
781                         /* ext must be chunk size aligned */
782                         EASSERT((ext->oe_start & ~chunk_mask) == 0, ext);
783
784                         /* pull ext's start back to cover cur */
785                         ext->oe_start = cur->oe_start;
786                         ext->oe_grants += chunksize;
787                         *grants -= chunksize;
788
789                         found = osc_extent_hold(ext);
790                 } else if (chunk == ext_chk_end + 1) {
791                         /* rear merge */
792                         ext->oe_end = cur->oe_end;
793                         ext->oe_grants += chunksize;
794                         *grants -= chunksize;
795
796                         /* try to merge with the next one because we just fill
797                          * in a gap
798                          */
799                         if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
800                                 /* we can save extent tax from next extent */
801                                 *grants += cli->cl_extent_tax;
802
803                         found = osc_extent_hold(ext);
804                 }
805                 if (found)
806                         break;
807
808                 ext = next_extent(ext);
809         }
810
811         osc_extent_tree_dump(D_CACHE, obj);
812         if (found) {
813                 LASSERT(!conflict);
814                 if (!IS_ERR(found)) {
815                         LASSERT(found->oe_dlmlock == cur->oe_dlmlock);
816                         OSC_EXTENT_DUMP(D_CACHE, found,
817                                         "found caching ext for %lu.\n", index);
818                 }
819         } else if (!conflict) {
820                 /* create a new extent */
821                 EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
822                 cur->oe_grants = chunksize + cli->cl_extent_tax;
823                 *grants -= cur->oe_grants;
824                 LASSERT(*grants >= 0);
825
826                 cur->oe_state = OES_CACHE;
827                 found = osc_extent_hold(cur);
828                 osc_extent_insert(obj, cur);
829                 OSC_EXTENT_DUMP(D_CACHE, cur, "add into tree %lu/%lu.\n",
830                                 index, descr->cld_end);
831         }
832         osc_object_unlock(obj);
833
834         if (conflict) {
835                 LASSERT(!found);
836
837                 /* waiting for IO to finish. Please notice that it's impossible
838                  * to be an OES_TRUNC extent.
839                  */
840                 rc = osc_extent_wait(env, conflict, OES_INV);
841                 osc_extent_put(env, conflict);
842                 conflict = NULL;
843                 if (rc < 0) {
844                         found = ERR_PTR(rc);
845                         goto out;
846                 }
847
848                 goto restart;
849         }
850
851 out:
852         osc_extent_put(env, cur);
853         LASSERT(*grants >= 0);
854         return found;
855 }
856
857 /**
858  * Called when IO is finished to an extent.
859  */
860 int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
861                       int sent, int rc)
862 {
863         struct client_obd *cli = osc_cli(ext->oe_obj);
864         struct osc_async_page *oap;
865         struct osc_async_page *tmp;
866         int nr_pages = ext->oe_nr_pages;
867         int lost_grant = 0;
868         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
869         __u64 last_off = 0;
870         int last_count = -1;
871
872         OSC_EXTENT_DUMP(D_CACHE, ext, "extent finished.\n");
873
874         ext->oe_rc = rc ?: ext->oe_nr_pages;
875         EASSERT(ergo(rc == 0, ext->oe_state == OES_RPC), ext);
876
877         osc_lru_add_batch(cli, &ext->oe_pages);
878         list_for_each_entry_safe(oap, tmp, &ext->oe_pages, oap_pending_item) {
879                 list_del_init(&oap->oap_rpc_item);
880                 list_del_init(&oap->oap_pending_item);
881                 if (last_off <= oap->oap_obj_off) {
882                         last_off = oap->oap_obj_off;
883                         last_count = oap->oap_count;
884                 }
885
886                 --ext->oe_nr_pages;
887                 osc_ap_completion(env, cli, oap, sent, rc);
888         }
889         EASSERT(ext->oe_nr_pages == 0, ext);
890
891         if (!sent) {
892                 lost_grant = ext->oe_grants;
893         } else if (blocksize < PAGE_SIZE &&
894                    last_count != PAGE_SIZE) {
895                 /* For short writes we shouldn't count parts of pages that
896                  * span a whole chunk on the OST side, or our accounting goes
897                  * wrong.  Should match the code in filter_grant_check.
898                  */
899                 int offset = last_off & ~PAGE_MASK;
900                 int count = last_count + (offset & (blocksize - 1));
901                 int end = (offset + last_count) & (blocksize - 1);
902                 if (end)
903                         count += blocksize - end;
904
905                 lost_grant = PAGE_SIZE - count;
906         }
907         if (ext->oe_grants > 0)
908                 osc_free_grant(cli, nr_pages, lost_grant);
909
910         osc_extent_remove(ext);
911         /* put the refcount for RPC */
912         osc_extent_put(env, ext);
913         return 0;
914 }
915
916 static int extent_wait_cb(struct osc_extent *ext, int state)
917 {
918         int ret;
919
920         osc_object_lock(ext->oe_obj);
921         ret = ext->oe_state == state;
922         osc_object_unlock(ext->oe_obj);
923
924         return ret;
925 }
926
927 /**
928  * Wait for the extent's state to become @state.
929  */
930 static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
931                            int state)
932 {
933         struct osc_object *obj = ext->oe_obj;
934         struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
935                                                   LWI_ON_SIGNAL_NOOP, NULL);
936         int rc = 0;
937
938         osc_object_lock(obj);
939         LASSERT(sanity_check_nolock(ext) == 0);
940         /* `Kick' this extent only if the caller is waiting for it to be
941          * written out.
942          */
943         if (state == OES_INV && !ext->oe_urgent && !ext->oe_hp &&
944             !ext->oe_trunc_pending) {
945                 if (ext->oe_state == OES_ACTIVE) {
946                         ext->oe_urgent = 1;
947                 } else if (ext->oe_state == OES_CACHE) {
948                         ext->oe_urgent = 1;
949                         osc_extent_hold(ext);
950                         rc = 1;
951                 }
952         }
953         osc_object_unlock(obj);
954         if (rc == 1)
955                 osc_extent_release(env, ext);
956
957         /* wait for the extent until its state becomes @state */
958         rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), &lwi);
959         if (rc == -ETIMEDOUT) {
960                 OSC_EXTENT_DUMP(D_ERROR, ext,
961                         "%s: wait ext to %d timedout, recovery in progress?\n",
962                         osc_export(obj)->exp_obd->obd_name, state);
963
964                 lwi = LWI_INTR(NULL, NULL);
965                 rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
966                                   &lwi);
967         }
968         if (rc == 0 && ext->oe_rc < 0)
969                 rc = ext->oe_rc;
970         return rc;
971 }
972
973 /**
974  * Discard pages with index greater than @size. If @ext is overlapped with
975  * @size, then partial truncate happens.
976  */
977 static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
978                                bool partial)
979 {
980         struct cl_env_nest nest;
981         struct lu_env *env;
982         struct cl_io *io;
983         struct osc_object *obj = ext->oe_obj;
984         struct client_obd *cli = osc_cli(obj);
985         struct osc_async_page *oap;
986         struct osc_async_page *tmp;
987         int pages_in_chunk = 0;
988         int ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
989         __u64 trunc_chunk = trunc_index >> ppc_bits;
990         int grants = 0;
991         int nr_pages = 0;
992         int rc = 0;
993
994         LASSERT(sanity_check(ext) == 0);
995         EASSERT(ext->oe_state == OES_TRUNC, ext);
996         EASSERT(!ext->oe_urgent, ext);
997
998         /* Request new lu_env.
999          * We can't use that env from osc_cache_truncate_start() because
1000          * it's from lov_io_sub and not fully initialized.
1001          */
1002         env = cl_env_nested_get(&nest);
1003         io  = &osc_env_info(env)->oti_io;
1004         io->ci_obj = cl_object_top(osc2cl(obj));
1005         rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
1006         if (rc < 0)
1007                 goto out;
1008
1009         /* discard all pages with index greater then trunc_index */
1010         list_for_each_entry_safe(oap, tmp, &ext->oe_pages, oap_pending_item) {
1011                 pgoff_t index = osc_index(oap2osc(oap));
1012                 struct cl_page *page = oap2cl_page(oap);
1013
1014                 LASSERT(list_empty(&oap->oap_rpc_item));
1015
1016                 /* only discard the pages with their index greater than
1017                  * trunc_index, and ...
1018                  */
1019                 if (index < trunc_index ||
1020                     (index == trunc_index && partial)) {
1021                         /* accounting how many pages remaining in the chunk
1022                          * so that we can calculate grants correctly. */
1023                         if (index >> ppc_bits == trunc_chunk)
1024                                 ++pages_in_chunk;
1025                         continue;
1026                 }
1027
1028                 list_del_init(&oap->oap_pending_item);
1029
1030                 cl_page_get(page);
1031                 lu_ref_add(&page->cp_reference, "truncate", current);
1032
1033                 if (cl_page_own(env, io, page) == 0) {
1034                         cl_page_discard(env, io, page);
1035                         cl_page_disown(env, io, page);
1036                 } else {
1037                         LASSERT(page->cp_state == CPS_FREEING);
1038                         LASSERT(0);
1039                 }
1040
1041                 lu_ref_del(&page->cp_reference, "truncate", current);
1042                 cl_page_put(env, page);
1043
1044                 --ext->oe_nr_pages;
1045                 ++nr_pages;
1046         }
1047         EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial,
1048                       ext->oe_nr_pages == 0),
1049                 ext, "trunc_index %lu, partial %d\n", trunc_index, partial);
1050
1051         osc_object_lock(obj);
1052         if (ext->oe_nr_pages == 0) {
1053                 LASSERT(pages_in_chunk == 0);
1054                 grants = ext->oe_grants;
1055                 ext->oe_grants = 0;
1056         } else { /* calculate how many grants we can free */
1057                 int chunks = (ext->oe_end >> ppc_bits) - trunc_chunk;
1058                 pgoff_t last_index;
1059
1060                 /* if there is no pages in this chunk, we can also free grants
1061                  * for the last chunk
1062                  */
1063                 if (pages_in_chunk == 0) {
1064                         /* if this is the 1st chunk and no pages in this chunk,
1065                          * ext->oe_nr_pages must be zero, so we should be in
1066                          * the other if-clause.
1067                          */
1068                         LASSERT(trunc_chunk > 0);
1069                         --trunc_chunk;
1070                         ++chunks;
1071                 }
1072
1073                 /* this is what we can free from this extent */
1074                 grants = chunks << cli->cl_chunkbits;
1075                 ext->oe_grants -= grants;
1076                 last_index = ((trunc_chunk + 1) << ppc_bits) - 1;
1077                 ext->oe_end = min(last_index, ext->oe_max_end);
1078                 LASSERT(ext->oe_end >= ext->oe_start);
1079                 LASSERT(ext->oe_grants > 0);
1080         }
1081         osc_object_unlock(obj);
1082
1083         if (grants > 0 || nr_pages > 0)
1084                 osc_free_grant(cli, nr_pages, grants);
1085
1086 out:
1087         cl_io_fini(env, io);
1088         cl_env_nested_put(&nest, env);
1089         return rc;
1090 }
1091
1092 /**
1093  * This function is used to make the extent prepared for transfer.
1094  * A race with flushing page - ll_writepage() has to be handled cautiously.
1095  */
1096 static int osc_extent_make_ready(const struct lu_env *env,
1097                                  struct osc_extent *ext)
1098 {
1099         struct osc_async_page *oap;
1100         struct osc_async_page *last = NULL;
1101         struct osc_object *obj = ext->oe_obj;
1102         int page_count = 0;
1103         int rc;
1104
1105         /* we're going to grab page lock, so object lock must not be taken. */
1106         LASSERT(sanity_check(ext) == 0);
1107         /* in locking state, any process should not touch this extent. */
1108         EASSERT(ext->oe_state == OES_LOCKING, ext);
1109         EASSERT(ext->oe_owner, ext);
1110
1111         OSC_EXTENT_DUMP(D_CACHE, ext, "make ready\n");
1112
1113         list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1114                 ++page_count;
1115                 if (!last || last->oap_obj_off < oap->oap_obj_off)
1116                         last = oap;
1117
1118                 /* checking ASYNC_READY is race safe */
1119                 if ((oap->oap_async_flags & ASYNC_READY) != 0)
1120                         continue;
1121
1122                 rc = osc_make_ready(env, oap, OBD_BRW_WRITE);
1123                 switch (rc) {
1124                 case 0:
1125                         spin_lock(&oap->oap_lock);
1126                         oap->oap_async_flags |= ASYNC_READY;
1127                         spin_unlock(&oap->oap_lock);
1128                         break;
1129                 case -EALREADY:
1130                         LASSERT((oap->oap_async_flags & ASYNC_READY) != 0);
1131                         break;
1132                 default:
1133                         LASSERTF(0, "unknown return code: %d\n", rc);
1134                 }
1135         }
1136
1137         LASSERT(page_count == ext->oe_nr_pages);
1138         LASSERT(last);
1139         /* the last page is the only one we need to refresh its count by
1140          * the size of file.
1141          */
1142         if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
1143                 last->oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
1144                 LASSERT(last->oap_count > 0);
1145                 LASSERT(last->oap_page_off + last->oap_count <= PAGE_SIZE);
1146                 spin_lock(&last->oap_lock);
1147                 last->oap_async_flags |= ASYNC_COUNT_STABLE;
1148                 spin_unlock(&last->oap_lock);
1149         }
1150
1151         /* for the rest of pages, we don't need to call osf_refresh_count()
1152          * because it's known they are not the last page
1153          */
1154         list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1155                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
1156                         oap->oap_count = PAGE_SIZE - oap->oap_page_off;
1157                         spin_lock(&last->oap_lock);
1158                         oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1159                         spin_unlock(&last->oap_lock);
1160                 }
1161         }
1162
1163         osc_object_lock(obj);
1164         osc_extent_state_set(ext, OES_RPC);
1165         osc_object_unlock(obj);
1166         /* get a refcount for RPC. */
1167         osc_extent_get(ext);
1168
1169         return 0;
1170 }
1171
1172 /**
1173  * Quick and simple version of osc_extent_find(). This function is frequently
1174  * called to expand the extent for the same IO. To expand the extent, the
1175  * page index must be in the same or next chunk of ext->oe_end.
1176  */
1177 static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, int *grants)
1178 {
1179         struct osc_object *obj = ext->oe_obj;
1180         struct client_obd *cli = osc_cli(obj);
1181         struct osc_extent *next;
1182         int ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
1183         pgoff_t chunk = index >> ppc_bits;
1184         pgoff_t end_chunk;
1185         pgoff_t end_index;
1186         int chunksize = 1 << cli->cl_chunkbits;
1187         int rc = 0;
1188
1189         LASSERT(ext->oe_max_end >= index && ext->oe_start <= index);
1190         osc_object_lock(obj);
1191         LASSERT(sanity_check_nolock(ext) == 0);
1192         end_chunk = ext->oe_end >> ppc_bits;
1193         if (chunk > end_chunk + 1) {
1194                 rc = -ERANGE;
1195                 goto out;
1196         }
1197
1198         if (end_chunk >= chunk) {
1199                 rc = 0;
1200                 goto out;
1201         }
1202
1203         LASSERT(end_chunk + 1 == chunk);
1204         /* try to expand this extent to cover @index */
1205         end_index = min(ext->oe_max_end, ((chunk + 1) << ppc_bits) - 1);
1206
1207         next = next_extent(ext);
1208         if (next && next->oe_start <= end_index) {
1209                 /* complex mode - overlapped with the next extent,
1210                  * this case will be handled by osc_extent_find()
1211                  */
1212                 rc = -EAGAIN;
1213                 goto out;
1214         }
1215
1216         ext->oe_end = end_index;
1217         ext->oe_grants += chunksize;
1218         *grants -= chunksize;
1219         LASSERT(*grants >= 0);
1220         EASSERTF(osc_extent_is_overlapped(obj, ext) == 0, ext,
1221                  "overlapped after expanding for %lu.\n", index);
1222
1223 out:
1224         osc_object_unlock(obj);
1225         return rc;
1226 }
1227
1228 static void osc_extent_tree_dump0(int level, struct osc_object *obj,
1229                                   const char *func, int line)
1230 {
1231         struct osc_extent *ext;
1232         int cnt;
1233
1234         CDEBUG(level, "Dump object %p extents at %s:%d, mppr: %u.\n",
1235                obj, func, line, osc_cli(obj)->cl_max_pages_per_rpc);
1236
1237         /* osc_object_lock(obj); */
1238         cnt = 1;
1239         for (ext = first_extent(obj); ext; ext = next_extent(ext))
1240                 OSC_EXTENT_DUMP(level, ext, "in tree %d.\n", cnt++);
1241
1242         cnt = 1;
1243         list_for_each_entry(ext, &obj->oo_hp_exts, oe_link)
1244                 OSC_EXTENT_DUMP(level, ext, "hp %d.\n", cnt++);
1245
1246         cnt = 1;
1247         list_for_each_entry(ext, &obj->oo_urgent_exts, oe_link)
1248                 OSC_EXTENT_DUMP(level, ext, "urgent %d.\n", cnt++);
1249
1250         cnt = 1;
1251         list_for_each_entry(ext, &obj->oo_reading_exts, oe_link)
1252                 OSC_EXTENT_DUMP(level, ext, "reading %d.\n", cnt++);
1253         /* osc_object_unlock(obj); */
1254 }
1255
1256 /* ------------------ osc extent end ------------------ */
1257
1258 static inline int osc_is_ready(struct osc_object *osc)
1259 {
1260         return !list_empty(&osc->oo_ready_item) ||
1261                !list_empty(&osc->oo_hp_ready_item);
1262 }
1263
1264 #define OSC_IO_DEBUG(OSC, STR, args...)                                        \
1265         CDEBUG(D_CACHE, "obj %p ready %d|%c|%c wr %d|%c|%c rd %d|%c " STR,     \
1266                (OSC), osc_is_ready(OSC),                                       \
1267                list_empty_marker(&(OSC)->oo_hp_ready_item),                    \
1268                list_empty_marker(&(OSC)->oo_ready_item),                       \
1269                atomic_read(&(OSC)->oo_nr_writes),                              \
1270                list_empty_marker(&(OSC)->oo_hp_exts),                          \
1271                list_empty_marker(&(OSC)->oo_urgent_exts),                      \
1272                atomic_read(&(OSC)->oo_nr_reads),                               \
1273                list_empty_marker(&(OSC)->oo_reading_exts),                     \
1274                ##args)
1275
1276 static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
1277                           int cmd)
1278 {
1279         struct osc_page *opg = oap2osc_page(oap);
1280         struct cl_page  *page = oap2cl_page(oap);
1281         int result;
1282
1283         LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
1284
1285         result = cl_page_make_ready(env, page, CRT_WRITE);
1286         if (result == 0)
1287                 opg->ops_submit_time = cfs_time_current();
1288         return result;
1289 }
1290
1291 static int osc_refresh_count(const struct lu_env *env,
1292                              struct osc_async_page *oap, int cmd)
1293 {
1294         struct osc_page *opg = oap2osc_page(oap);
1295         pgoff_t index = osc_index(oap2osc(oap));
1296         struct cl_object *obj;
1297         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1298
1299         int result;
1300         loff_t kms;
1301
1302         /* readpage queues with _COUNT_STABLE, shouldn't get here. */
1303         LASSERT(!(cmd & OBD_BRW_READ));
1304         obj = opg->ops_cl.cpl_obj;
1305
1306         cl_object_attr_lock(obj);
1307         result = cl_object_attr_get(env, obj, attr);
1308         cl_object_attr_unlock(obj);
1309         if (result < 0)
1310                 return result;
1311         kms = attr->cat_kms;
1312         if (cl_offset(obj, index) >= kms)
1313                 /* catch race with truncate */
1314                 return 0;
1315         else if (cl_offset(obj, index + 1) > kms)
1316                 /* catch sub-page write at end of file */
1317                 return kms % PAGE_SIZE;
1318         else
1319                 return PAGE_SIZE;
1320 }
1321
1322 static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
1323                           int cmd, int rc)
1324 {
1325         struct osc_page *opg = oap2osc_page(oap);
1326         struct cl_page    *page = oap2cl_page(oap);
1327         struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
1328         enum cl_req_type crt;
1329         int srvlock;
1330
1331         cmd &= ~OBD_BRW_NOQUOTA;
1332         LASSERTF(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ),
1333                  "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
1334         LASSERTF(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE),
1335                  "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
1336         LASSERT(opg->ops_transfer_pinned);
1337
1338         /*
1339          * page->cp_req can be NULL if io submission failed before
1340          * cl_req was allocated.
1341          */
1342         if (page->cp_req)
1343                 cl_req_page_done(env, page);
1344         LASSERT(!page->cp_req);
1345
1346         crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
1347         /* Clear opg->ops_transfer_pinned before VM lock is released. */
1348         opg->ops_transfer_pinned = 0;
1349
1350         spin_lock(&obj->oo_seatbelt);
1351         LASSERT(opg->ops_submitter);
1352         LASSERT(!list_empty(&opg->ops_inflight));
1353         list_del_init(&opg->ops_inflight);
1354         opg->ops_submitter = NULL;
1355         spin_unlock(&obj->oo_seatbelt);
1356
1357         opg->ops_submit_time = 0;
1358         srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
1359
1360         /* statistic */
1361         if (rc == 0 && srvlock) {
1362                 struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
1363                 struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
1364                 int bytes = oap->oap_count;
1365
1366                 if (crt == CRT_READ)
1367                         stats->os_lockless_reads += bytes;
1368                 else
1369                         stats->os_lockless_writes += bytes;
1370         }
1371
1372         /*
1373          * This has to be the last operation with the page, as locks are
1374          * released in cl_page_completion() and nothing except for the
1375          * reference counter protects page from concurrent reclaim.
1376          */
1377         lu_ref_del(&page->cp_reference, "transfer", page);
1378
1379         cl_page_completion(env, page, crt, rc);
1380
1381         return 0;
1382 }
1383
1384 #define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do {                           \
1385         struct client_obd *__tmp = (cli);                                     \
1386         CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d "          \
1387                "dropped: %ld avail: %ld, reserved: %ld, flight: %d }"         \
1388                "lru {in list: %d, left: %d, waiters: %d }" fmt,               \
1389                __tmp->cl_import->imp_obd->obd_name,                           \
1390                __tmp->cl_dirty_pages, __tmp->cl_dirty_max_pages,              \
1391                atomic_read(&obd_dirty_pages), obd_max_dirty_pages,            \
1392                __tmp->cl_lost_grant, __tmp->cl_avail_grant,                   \
1393                __tmp->cl_reserved_grant, __tmp->cl_w_in_flight,               \
1394                atomic_read(&__tmp->cl_lru_in_list),                           \
1395                atomic_read(&__tmp->cl_lru_busy),                              \
1396                atomic_read(&__tmp->cl_lru_shrinkers), ##args);                \
1397 } while (0)
1398
1399 /* caller must hold loi_list_lock */
1400 static void osc_consume_write_grant(struct client_obd *cli,
1401                                     struct brw_page *pga)
1402 {
1403         assert_spin_locked(&cli->cl_loi_list_lock);
1404         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
1405         atomic_inc(&obd_dirty_pages);
1406         cli->cl_dirty_pages++;
1407         pga->flag |= OBD_BRW_FROM_GRANT;
1408         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
1409                PAGE_SIZE, pga, pga->pg);
1410         osc_update_next_shrink(cli);
1411 }
1412
1413 /* the companion to osc_consume_write_grant, called when a brw has completed.
1414  * must be called with the loi lock held.
1415  */
1416 static void osc_release_write_grant(struct client_obd *cli,
1417                                     struct brw_page *pga)
1418 {
1419         assert_spin_locked(&cli->cl_loi_list_lock);
1420         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
1421                 return;
1422         }
1423
1424         pga->flag &= ~OBD_BRW_FROM_GRANT;
1425         atomic_dec(&obd_dirty_pages);
1426         cli->cl_dirty_pages--;
1427         if (pga->flag & OBD_BRW_NOCACHE) {
1428                 pga->flag &= ~OBD_BRW_NOCACHE;
1429                 atomic_dec(&obd_dirty_transit_pages);
1430                 cli->cl_dirty_transit--;
1431         }
1432 }
1433
1434 /**
1435  * To avoid sleeping with object lock held, it's good for us allocate enough
1436  * grants before entering into critical section.
1437  *
1438  * spin_lock held by caller
1439  */
1440 static int osc_reserve_grant(struct client_obd *cli, unsigned int bytes)
1441 {
1442         int rc = -EDQUOT;
1443
1444         if (cli->cl_avail_grant >= bytes) {
1445                 cli->cl_avail_grant -= bytes;
1446                 cli->cl_reserved_grant += bytes;
1447                 rc = 0;
1448         }
1449         return rc;
1450 }
1451
1452 static void __osc_unreserve_grant(struct client_obd *cli,
1453                                   unsigned int reserved, unsigned int unused)
1454 {
1455         /* it's quite normal for us to get more grant than reserved.
1456          * Thinking about a case that two extents merged by adding a new
1457          * chunk, we can save one extent tax. If extent tax is greater than
1458          * one chunk, we can save more grant by adding a new chunk
1459          */
1460         cli->cl_reserved_grant -= reserved;
1461         if (unused > reserved) {
1462                 cli->cl_avail_grant += reserved;
1463                 cli->cl_lost_grant  += unused - reserved;
1464         } else {
1465                 cli->cl_avail_grant += unused;
1466         }
1467 }
1468
1469 static void osc_unreserve_grant(struct client_obd *cli,
1470                                 unsigned int reserved, unsigned int unused)
1471 {
1472         spin_lock(&cli->cl_loi_list_lock);
1473         __osc_unreserve_grant(cli, reserved, unused);
1474         if (unused > 0)
1475                 osc_wake_cache_waiters(cli);
1476         spin_unlock(&cli->cl_loi_list_lock);
1477 }
1478
1479 /**
1480  * Free grant after IO is finished or canceled.
1481  *
1482  * @lost_grant is used to remember how many grants we have allocated but not
1483  * used, we should return these grants to OST. There're two cases where grants
1484  * can be lost:
1485  * 1. truncate;
1486  * 2. blocksize at OST is less than PAGE_SIZE and a partial page was
1487  *    written. In this case OST may use less chunks to serve this partial
1488  *    write. OSTs don't actually know the page size on the client side. so
1489  *    clients have to calculate lost grant by the blocksize on the OST.
1490  *    See filter_grant_check() for details.
1491  */
1492 static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
1493                            unsigned int lost_grant)
1494 {
1495         int grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
1496
1497         spin_lock(&cli->cl_loi_list_lock);
1498         atomic_sub(nr_pages, &obd_dirty_pages);
1499         cli->cl_dirty_pages -= nr_pages;
1500         cli->cl_lost_grant += lost_grant;
1501         if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) {
1502                 /* borrow some grant from truncate to avoid the case that
1503                  * truncate uses up all avail grant
1504                  */
1505                 cli->cl_lost_grant -= grant;
1506                 cli->cl_avail_grant += grant;
1507         }
1508         osc_wake_cache_waiters(cli);
1509         spin_unlock(&cli->cl_loi_list_lock);
1510         CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu\n",
1511                lost_grant, cli->cl_lost_grant,
1512                cli->cl_avail_grant, cli->cl_dirty_pages << PAGE_SHIFT);
1513 }
1514
1515 /**
1516  * The companion to osc_enter_cache(), called when @oap is no longer part of
1517  * the dirty accounting due to error.
1518  */
1519 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap)
1520 {
1521         spin_lock(&cli->cl_loi_list_lock);
1522         osc_release_write_grant(cli, &oap->oap_brw_page);
1523         spin_unlock(&cli->cl_loi_list_lock);
1524 }
1525
1526 /**
1527  * Non-blocking version of osc_enter_cache() that consumes grant only when it
1528  * is available.
1529  */
1530 static int osc_enter_cache_try(struct client_obd *cli,
1531                                struct osc_async_page *oap,
1532                                int bytes, int transient)
1533 {
1534         int rc;
1535
1536         OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes);
1537
1538         rc = osc_reserve_grant(cli, bytes);
1539         if (rc < 0)
1540                 return 0;
1541
1542         if (cli->cl_dirty_pages <= cli->cl_dirty_max_pages &&
1543             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
1544                 osc_consume_write_grant(cli, &oap->oap_brw_page);
1545                 if (transient) {
1546                         cli->cl_dirty_transit++;
1547                         atomic_inc(&obd_dirty_transit_pages);
1548                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
1549                 }
1550                 rc = 1;
1551         } else {
1552                 __osc_unreserve_grant(cli, bytes, bytes);
1553                 rc = 0;
1554         }
1555         return rc;
1556 }
1557
1558 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1559 {
1560         int rc;
1561
1562         spin_lock(&cli->cl_loi_list_lock);
1563         rc = list_empty(&ocw->ocw_entry);
1564         spin_unlock(&cli->cl_loi_list_lock);
1565         return rc;
1566 }
1567
1568 /**
1569  * The main entry to reserve dirty page accounting. Usually the grant reserved
1570  * in this function will be freed in bulk in osc_free_grant() unless it fails
1571  * to add osc cache, in that case, it will be freed in osc_exit_cache().
1572  *
1573  * The process will be put into sleep if it's already run out of grant.
1574  */
1575 static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
1576                            struct osc_async_page *oap, int bytes)
1577 {
1578         struct osc_object *osc = oap->oap_obj;
1579         struct lov_oinfo *loi = osc->oo_oinfo;
1580         struct osc_cache_waiter ocw;
1581         struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
1582                                                   LWI_ON_SIGNAL_NOOP, NULL);
1583         int rc = -EDQUOT;
1584
1585         OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes);
1586
1587         spin_lock(&cli->cl_loi_list_lock);
1588
1589         /* force the caller to try sync io.  this can jump the list
1590          * of queued writes and create a discontiguous rpc stream
1591          */
1592         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
1593             !cli->cl_dirty_max_pages || cli->cl_ar.ar_force_sync ||
1594             loi->loi_ar.ar_force_sync) {
1595                 rc = -EDQUOT;
1596                 goto out;
1597         }
1598
1599         /* Hopefully normal case - cache space and write credits available */
1600         if (osc_enter_cache_try(cli, oap, bytes, 0)) {
1601                 rc = 0;
1602                 goto out;
1603         }
1604
1605         /* We can get here for two reasons: too many dirty pages in cache, or
1606          * run out of grants. In both cases we should write dirty pages out.
1607          * Adding a cache waiter will trigger urgent write-out no matter what
1608          * RPC size will be.
1609          * The exiting condition is no avail grants and no dirty pages caching,
1610          * that really means there is no space on the OST.
1611          */
1612         init_waitqueue_head(&ocw.ocw_waitq);
1613         ocw.ocw_oap   = oap;
1614         ocw.ocw_grant = bytes;
1615         while (cli->cl_dirty_pages > 0 || cli->cl_w_in_flight > 0) {
1616                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1617                 ocw.ocw_rc = 0;
1618                 spin_unlock(&cli->cl_loi_list_lock);
1619
1620                 osc_io_unplug_async(env, cli, NULL);
1621
1622                 CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
1623                        cli->cl_import->imp_obd->obd_name, &ocw, oap);
1624
1625                 rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1626
1627                 spin_lock(&cli->cl_loi_list_lock);
1628
1629                 /* l_wait_event is interrupted by signal, or timed out */
1630                 if (rc < 0) {
1631                         if (rc == -ETIMEDOUT) {
1632                                 OSC_DUMP_GRANT(D_ERROR, cli,
1633                                                "try to reserve %d.\n", bytes);
1634                                 osc_extent_tree_dump(D_ERROR, osc);
1635                                 rc = -EDQUOT;
1636                         }
1637
1638                         list_del_init(&ocw.ocw_entry);
1639                         goto out;
1640                 }
1641
1642                 LASSERT(list_empty(&ocw.ocw_entry));
1643                 rc = ocw.ocw_rc;
1644
1645                 if (rc != -EDQUOT)
1646                         goto out;
1647                 if (osc_enter_cache_try(cli, oap, bytes, 0)) {
1648                         rc = 0;
1649                         goto out;
1650                 }
1651         }
1652 out:
1653         spin_unlock(&cli->cl_loi_list_lock);
1654         OSC_DUMP_GRANT(D_CACHE, cli, "returned %d.\n", rc);
1655         return rc;
1656 }
1657
1658 /* caller must hold loi_list_lock */
1659 void osc_wake_cache_waiters(struct client_obd *cli)
1660 {
1661         struct list_head *l, *tmp;
1662         struct osc_cache_waiter *ocw;
1663
1664         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
1665                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
1666                 list_del_init(&ocw->ocw_entry);
1667
1668                 ocw->ocw_rc = -EDQUOT;
1669                 /* we can't dirty more */
1670                 if ((cli->cl_dirty_pages > cli->cl_dirty_max_pages) ||
1671                     (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
1672                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld osc max %ld, sys max %d\n",
1673                                cli->cl_dirty_pages, cli->cl_dirty_max_pages,
1674                                obd_max_dirty_pages);
1675                         goto wakeup;
1676                 }
1677
1678                 ocw->ocw_rc = 0;
1679                 if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
1680                         ocw->ocw_rc = -EDQUOT;
1681
1682 wakeup:
1683                 CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
1684                        ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
1685
1686                 wake_up(&ocw->ocw_waitq);
1687         }
1688 }
1689
1690 static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
1691 {
1692         int hprpc = !!list_empty(&osc->oo_hp_exts);
1693
1694         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
1695 }
1696
1697 /* This maintains the lists of pending pages to read/write for a given object
1698  * (lop).  This is used by osc_check_rpcs->osc_next_obj() and osc_list_maint()
1699  * to quickly find objects that are ready to send an RPC.
1700  */
1701 static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
1702                          int cmd)
1703 {
1704         int invalid_import = 0;
1705
1706         /* if we have an invalid import we want to drain the queued pages
1707          * by forcing them through rpcs that immediately fail and complete
1708          * the pages.  recovery relies on this to empty the queued pages
1709          * before canceling the locks and evicting down the llite pages
1710          */
1711         if (!cli->cl_import || cli->cl_import->imp_invalid)
1712                 invalid_import = 1;
1713
1714         if (cmd & OBD_BRW_WRITE) {
1715                 if (atomic_read(&osc->oo_nr_writes) == 0)
1716                         return 0;
1717                 if (invalid_import) {
1718                         CDEBUG(D_CACHE, "invalid import forcing RPC\n");
1719                         return 1;
1720                 }
1721                 if (!list_empty(&osc->oo_hp_exts)) {
1722                         CDEBUG(D_CACHE, "high prio request forcing RPC\n");
1723                         return 1;
1724                 }
1725                 if (!list_empty(&osc->oo_urgent_exts)) {
1726                         CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1727                         return 1;
1728                 }
1729                 /* trigger a write rpc stream as long as there are dirtiers
1730                  * waiting for space.  as they're waiting, they're not going to
1731                  * create more pages to coalesce with what's waiting..
1732                  */
1733                 if (!list_empty(&cli->cl_cache_waiters)) {
1734                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1735                         return 1;
1736                 }
1737                 if (atomic_read(&osc->oo_nr_writes) >=
1738                     cli->cl_max_pages_per_rpc)
1739                         return 1;
1740         } else {
1741                 if (atomic_read(&osc->oo_nr_reads) == 0)
1742                         return 0;
1743                 if (invalid_import) {
1744                         CDEBUG(D_CACHE, "invalid import forcing RPC\n");
1745                         return 1;
1746                 }
1747                 /* all read are urgent. */
1748                 if (!list_empty(&osc->oo_reading_exts))
1749                         return 1;
1750         }
1751
1752         return 0;
1753 }
1754
1755 static void osc_update_pending(struct osc_object *obj, int cmd, int delta)
1756 {
1757         struct client_obd *cli = osc_cli(obj);
1758
1759         if (cmd & OBD_BRW_WRITE) {
1760                 atomic_add(delta, &obj->oo_nr_writes);
1761                 atomic_add(delta, &cli->cl_pending_w_pages);
1762                 LASSERT(atomic_read(&obj->oo_nr_writes) >= 0);
1763         } else {
1764                 atomic_add(delta, &obj->oo_nr_reads);
1765                 atomic_add(delta, &cli->cl_pending_r_pages);
1766                 LASSERT(atomic_read(&obj->oo_nr_reads) >= 0);
1767         }
1768         OSC_IO_DEBUG(obj, "update pending cmd %d delta %d.\n", cmd, delta);
1769 }
1770
1771 static int osc_makes_hprpc(struct osc_object *obj)
1772 {
1773         return !list_empty(&obj->oo_hp_exts);
1774 }
1775
1776 static void on_list(struct list_head *item, struct list_head *list, int should_be_on)
1777 {
1778         if (list_empty(item) && should_be_on)
1779                 list_add_tail(item, list);
1780         else if (!list_empty(item) && !should_be_on)
1781                 list_del_init(item);
1782 }
1783
1784 /* maintain the osc's cli list membership invariants so that osc_send_oap_rpc
1785  * can find pages to build into rpcs quickly
1786  */
1787 static int __osc_list_maint(struct client_obd *cli, struct osc_object *osc)
1788 {
1789         if (osc_makes_hprpc(osc)) {
1790                 /* HP rpc */
1791                 on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list, 0);
1792                 on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1793         } else {
1794                 on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1795                 on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list,
1796                         osc_makes_rpc(cli, osc, OBD_BRW_WRITE) ||
1797                         osc_makes_rpc(cli, osc, OBD_BRW_READ));
1798         }
1799
1800         on_list(&osc->oo_write_item, &cli->cl_loi_write_list,
1801                 atomic_read(&osc->oo_nr_writes) > 0);
1802
1803         on_list(&osc->oo_read_item, &cli->cl_loi_read_list,
1804                 atomic_read(&osc->oo_nr_reads) > 0);
1805
1806         return osc_is_ready(osc);
1807 }
1808
1809 static int osc_list_maint(struct client_obd *cli, struct osc_object *osc)
1810 {
1811         int is_ready;
1812
1813         spin_lock(&cli->cl_loi_list_lock);
1814         is_ready = __osc_list_maint(cli, osc);
1815         spin_unlock(&cli->cl_loi_list_lock);
1816
1817         return is_ready;
1818 }
1819
1820 /* this is trying to propagate async writeback errors back up to the
1821  * application.  As an async write fails we record the error code for later if
1822  * the app does an fsync.  As long as errors persist we force future rpcs to be
1823  * sync so that the app can get a sync error and break the cycle of queueing
1824  * pages for which writeback will fail.
1825  */
1826 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1827                            int rc)
1828 {
1829         if (rc) {
1830                 if (!ar->ar_rc)
1831                         ar->ar_rc = rc;
1832
1833                 ar->ar_force_sync = 1;
1834                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1835                 return;
1836         }
1837
1838         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1839                 ar->ar_force_sync = 0;
1840 }
1841
1842 /* this must be called holding the loi list lock to give coverage to exit_cache,
1843  * async_flag maintenance, and oap_request
1844  */
1845 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
1846                               struct osc_async_page *oap, int sent, int rc)
1847 {
1848         struct osc_object *osc = oap->oap_obj;
1849         struct lov_oinfo *loi = osc->oo_oinfo;
1850         __u64 xid = 0;
1851
1852         if (oap->oap_request) {
1853                 xid = ptlrpc_req_xid(oap->oap_request);
1854                 ptlrpc_req_finished(oap->oap_request);
1855                 oap->oap_request = NULL;
1856         }
1857
1858         /* As the transfer for this page is being done, clear the flags */
1859         spin_lock(&oap->oap_lock);
1860         oap->oap_async_flags = 0;
1861         spin_unlock(&oap->oap_lock);
1862         oap->oap_interrupted = 0;
1863
1864         if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) {
1865                 spin_lock(&cli->cl_loi_list_lock);
1866                 osc_process_ar(&cli->cl_ar, xid, rc);
1867                 osc_process_ar(&loi->loi_ar, xid, rc);
1868                 spin_unlock(&cli->cl_loi_list_lock);
1869         }
1870
1871         rc = osc_completion(env, oap, oap->oap_cmd, rc);
1872         if (rc)
1873                 CERROR("completion on oap %p obj %p returns %d.\n",
1874                        oap, osc, rc);
1875 }
1876
1877 /**
1878  * Try to add extent to one RPC. We need to think about the following things:
1879  * - # of pages must not be over max_pages_per_rpc
1880  * - extent must be compatible with previous ones
1881  */
1882 static int try_to_add_extent_for_io(struct client_obd *cli,
1883                                     struct osc_extent *ext, struct list_head *rpclist,
1884                                     int *pc, unsigned int *max_pages)
1885 {
1886         struct osc_extent *tmp;
1887         struct osc_async_page *oap = list_first_entry(&ext->oe_pages,
1888                                                       struct osc_async_page,
1889                                                       oap_pending_item);
1890
1891         EASSERT((ext->oe_state == OES_CACHE || ext->oe_state == OES_LOCK_DONE),
1892                 ext);
1893
1894         *max_pages = max(ext->oe_mppr, *max_pages);
1895         if (*pc + ext->oe_nr_pages > *max_pages)
1896                 return 0;
1897
1898         list_for_each_entry(tmp, rpclist, oe_link) {
1899                 struct osc_async_page *oap2;
1900
1901                 oap2 = list_first_entry(&tmp->oe_pages, struct osc_async_page,
1902                                         oap_pending_item);
1903                 EASSERT(tmp->oe_owner == current, tmp);
1904                 if (oap2cl_page(oap)->cp_type != oap2cl_page(oap2)->cp_type) {
1905                         CDEBUG(D_CACHE, "Do not permit different type of IO"
1906                                         " for a same RPC\n");
1907                         return 0;
1908                 }
1909
1910                 if (tmp->oe_srvlock != ext->oe_srvlock ||
1911                     !tmp->oe_grants != !ext->oe_grants)
1912                         return 0;
1913
1914                 /* remove break for strict check */
1915                 break;
1916         }
1917
1918         *pc += ext->oe_nr_pages;
1919         list_move_tail(&ext->oe_link, rpclist);
1920         ext->oe_owner = current;
1921         return 1;
1922 }
1923
1924 /**
1925  * In order to prevent multiple ptlrpcd from breaking contiguous extents,
1926  * get_write_extent() takes all appropriate extents in atomic.
1927  *
1928  * The following policy is used to collect extents for IO:
1929  * 1. Add as many HP extents as possible;
1930  * 2. Add the first urgent extent in urgent extent list and take it out of
1931  *    urgent list;
1932  * 3. Add subsequent extents of this urgent extent;
1933  * 4. If urgent list is not empty, goto 2;
1934  * 5. Traverse the extent tree from the 1st extent;
1935  * 6. Above steps exit if there is no space in this RPC.
1936  */
1937 static int get_write_extents(struct osc_object *obj, struct list_head *rpclist)
1938 {
1939         struct client_obd *cli = osc_cli(obj);
1940         struct osc_extent *ext;
1941         struct osc_extent *temp;
1942         int page_count = 0;
1943         unsigned int max_pages = cli->cl_max_pages_per_rpc;
1944
1945         LASSERT(osc_object_is_locked(obj));
1946         list_for_each_entry_safe(ext, temp, &obj->oo_hp_exts, oe_link) {
1947                 LASSERT(ext->oe_state == OES_CACHE);
1948                 if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
1949                                               &max_pages))
1950                         return page_count;
1951                 EASSERT(ext->oe_nr_pages <= max_pages, ext);
1952         }
1953         if (page_count == max_pages)
1954                 return page_count;
1955
1956         while (!list_empty(&obj->oo_urgent_exts)) {
1957                 ext = list_entry(obj->oo_urgent_exts.next,
1958                                  struct osc_extent, oe_link);
1959                 if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
1960                                               &max_pages))
1961                         return page_count;
1962
1963                 if (!ext->oe_intree)
1964                         continue;
1965
1966                 while ((ext = next_extent(ext)) != NULL) {
1967                         if ((ext->oe_state != OES_CACHE) ||
1968                             (!list_empty(&ext->oe_link) &&
1969                              ext->oe_owner))
1970                                 continue;
1971
1972                         if (!try_to_add_extent_for_io(cli, ext, rpclist,
1973                                                       &page_count, &max_pages))
1974                                 return page_count;
1975                 }
1976         }
1977         if (page_count == max_pages)
1978                 return page_count;
1979
1980         ext = first_extent(obj);
1981         while (ext) {
1982                 if ((ext->oe_state != OES_CACHE) ||
1983                     /* this extent may be already in current rpclist */
1984                     (!list_empty(&ext->oe_link) && ext->oe_owner)) {
1985                         ext = next_extent(ext);
1986                         continue;
1987                 }
1988
1989                 if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
1990                                               &max_pages))
1991                         return page_count;
1992
1993                 ext = next_extent(ext);
1994         }
1995         return page_count;
1996 }
1997
1998 static int
1999 osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli,
2000                    struct osc_object *osc)
2001         __must_hold(osc)
2002 {
2003         LIST_HEAD(rpclist);
2004         struct osc_extent *ext;
2005         struct osc_extent *tmp;
2006         struct osc_extent *first = NULL;
2007         u32 page_count = 0;
2008         int srvlock = 0;
2009         int rc = 0;
2010
2011         LASSERT(osc_object_is_locked(osc));
2012
2013         page_count = get_write_extents(osc, &rpclist);
2014         LASSERT(equi(page_count == 0, list_empty(&rpclist)));
2015
2016         if (list_empty(&rpclist))
2017                 return 0;
2018
2019         osc_update_pending(osc, OBD_BRW_WRITE, -page_count);
2020
2021         list_for_each_entry(ext, &rpclist, oe_link) {
2022                 LASSERT(ext->oe_state == OES_CACHE ||
2023                         ext->oe_state == OES_LOCK_DONE);
2024                 if (ext->oe_state == OES_CACHE)
2025                         osc_extent_state_set(ext, OES_LOCKING);
2026                 else
2027                         osc_extent_state_set(ext, OES_RPC);
2028         }
2029
2030         /* we're going to grab page lock, so release object lock because
2031          * lock order is page lock -> object lock.
2032          */
2033         osc_object_unlock(osc);
2034
2035         list_for_each_entry_safe(ext, tmp, &rpclist, oe_link) {
2036                 if (ext->oe_state == OES_LOCKING) {
2037                         rc = osc_extent_make_ready(env, ext);
2038                         if (unlikely(rc < 0)) {
2039                                 list_del_init(&ext->oe_link);
2040                                 osc_extent_finish(env, ext, 0, rc);
2041                                 continue;
2042                         }
2043                 }
2044                 if (!first) {
2045                         first = ext;
2046                         srvlock = ext->oe_srvlock;
2047                 } else {
2048                         LASSERT(srvlock == ext->oe_srvlock);
2049                 }
2050         }
2051
2052         if (!list_empty(&rpclist)) {
2053                 LASSERT(page_count > 0);
2054                 rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_WRITE);
2055                 LASSERT(list_empty(&rpclist));
2056         }
2057
2058         osc_object_lock(osc);
2059         return rc;
2060 }
2061
2062 /**
2063  * prepare pages for ASYNC io and put pages in send queue.
2064  *
2065  * \param cmd OBD_BRW_* macroses
2066  * \param lop pending pages
2067  *
2068  * \return zero if no page added to send queue.
2069  * \return 1 if pages successfully added to send queue.
2070  * \return negative on errors.
2071  */
2072 static int
2073 osc_send_read_rpc(const struct lu_env *env, struct client_obd *cli,
2074                   struct osc_object *osc)
2075         __must_hold(osc)
2076 {
2077         struct osc_extent *ext;
2078         struct osc_extent *next;
2079         LIST_HEAD(rpclist);
2080         int page_count = 0;
2081         unsigned int max_pages = cli->cl_max_pages_per_rpc;
2082         int rc = 0;
2083
2084         LASSERT(osc_object_is_locked(osc));
2085         list_for_each_entry_safe(ext, next, &osc->oo_reading_exts, oe_link) {
2086                 EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
2087                 if (!try_to_add_extent_for_io(cli, ext, &rpclist, &page_count,
2088                                               &max_pages))
2089                         break;
2090                 osc_extent_state_set(ext, OES_RPC);
2091                 EASSERT(ext->oe_nr_pages <= max_pages, ext);
2092         }
2093         LASSERT(page_count <= max_pages);
2094
2095         osc_update_pending(osc, OBD_BRW_READ, -page_count);
2096
2097         if (!list_empty(&rpclist)) {
2098                 osc_object_unlock(osc);
2099
2100                 LASSERT(page_count > 0);
2101                 rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ);
2102                 LASSERT(list_empty(&rpclist));
2103
2104                 osc_object_lock(osc);
2105         }
2106         return rc;
2107 }
2108
2109 #define list_to_obj(list, item) ({                                            \
2110         struct list_head *__tmp = (list)->next;                               \
2111         list_del_init(__tmp);                                                 \
2112         list_entry(__tmp, struct osc_object, oo_##item);                      \
2113 })
2114
2115 /* This is called by osc_check_rpcs() to find which objects have pages that
2116  * we could be sending.  These lists are maintained by osc_makes_rpc().
2117  */
2118 static struct osc_object *osc_next_obj(struct client_obd *cli)
2119 {
2120         /* First return objects that have blocked locks so that they
2121          * will be flushed quickly and other clients can get the lock,
2122          * then objects which have pages ready to be stuffed into RPCs
2123          */
2124         if (!list_empty(&cli->cl_loi_hp_ready_list))
2125                 return list_to_obj(&cli->cl_loi_hp_ready_list, hp_ready_item);
2126         if (!list_empty(&cli->cl_loi_ready_list))
2127                 return list_to_obj(&cli->cl_loi_ready_list, ready_item);
2128
2129         /* then if we have cache waiters, return all objects with queued
2130          * writes.  This is especially important when many small files
2131          * have filled up the cache and not been fired into rpcs because
2132          * they don't pass the nr_pending/object threshold
2133          */
2134         if (!list_empty(&cli->cl_cache_waiters) &&
2135             !list_empty(&cli->cl_loi_write_list))
2136                 return list_to_obj(&cli->cl_loi_write_list, write_item);
2137
2138         /* then return all queued objects when we have an invalid import
2139          * so that they get flushed
2140          */
2141         if (!cli->cl_import || cli->cl_import->imp_invalid) {
2142                 if (!list_empty(&cli->cl_loi_write_list))
2143                         return list_to_obj(&cli->cl_loi_write_list, write_item);
2144                 if (!list_empty(&cli->cl_loi_read_list))
2145                         return list_to_obj(&cli->cl_loi_read_list, read_item);
2146         }
2147         return NULL;
2148 }
2149
2150 /* called with the loi list lock held */
2151 static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2152         __must_hold(&cli->cl_loi_list_lock)
2153 {
2154         struct osc_object *osc;
2155         int rc = 0;
2156
2157         while ((osc = osc_next_obj(cli)) != NULL) {
2158                 struct cl_object *obj = osc2cl(osc);
2159                 struct lu_ref_link link;
2160
2161                 OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
2162
2163                 if (osc_max_rpc_in_flight(cli, osc)) {
2164                         __osc_list_maint(cli, osc);
2165                         break;
2166                 }
2167
2168                 cl_object_get(obj);
2169                 spin_unlock(&cli->cl_loi_list_lock);
2170                 lu_object_ref_add_at(&obj->co_lu, &link, "check", current);
2171
2172                 /* attempt some read/write balancing by alternating between
2173                  * reads and writes in an object.  The makes_rpc checks here
2174                  * would be redundant if we were getting read/write work items
2175                  * instead of objects.  we don't want send_oap_rpc to drain a
2176                  * partial read pending queue when we're given this object to
2177                  * do io on writes while there are cache waiters
2178                  */
2179                 osc_object_lock(osc);
2180                 if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
2181                         rc = osc_send_write_rpc(env, cli, osc);
2182                         if (rc < 0) {
2183                                 CERROR("Write request failed with %d\n", rc);
2184
2185                                 /* osc_send_write_rpc failed, mostly because of
2186                                  * memory pressure.
2187                                  *
2188                                  * It can't break here, because if:
2189                                  *  - a page was submitted by osc_io_submit, so
2190                                  *    page locked;
2191                                  *  - no request in flight
2192                                  *  - no subsequent request
2193                                  * The system will be in live-lock state,
2194                                  * because there is no chance to call
2195                                  * osc_io_unplug() and osc_check_rpcs() any
2196                                  * more. pdflush can't help in this case,
2197                                  * because it might be blocked at grabbing
2198                                  * the page lock as we mentioned.
2199                                  *
2200                                  * Anyway, continue to drain pages.
2201                                  */
2202                                 /* break; */
2203                         }
2204                 }
2205                 if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) {
2206                         rc = osc_send_read_rpc(env, cli, osc);
2207                         if (rc < 0)
2208                                 CERROR("Read request failed with %d\n", rc);
2209                 }
2210                 osc_object_unlock(osc);
2211
2212                 osc_list_maint(cli, osc);
2213                 lu_object_ref_del_at(&obj->co_lu, &link, "check", current);
2214                 cl_object_put(env, obj);
2215
2216                 spin_lock(&cli->cl_loi_list_lock);
2217         }
2218 }
2219
2220 static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
2221                           struct osc_object *osc, int async)
2222 {
2223         int rc = 0;
2224
2225         if (osc && osc_list_maint(cli, osc) == 0)
2226                 return 0;
2227
2228         if (!async) {
2229                 /* disable osc_lru_shrink() temporarily to avoid
2230                  * potential stack overrun problem. LU-2859
2231                  */
2232                 atomic_inc(&cli->cl_lru_shrinkers);
2233                 spin_lock(&cli->cl_loi_list_lock);
2234                 osc_check_rpcs(env, cli);
2235                 spin_unlock(&cli->cl_loi_list_lock);
2236                 atomic_dec(&cli->cl_lru_shrinkers);
2237         } else {
2238                 CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", cli);
2239                 LASSERT(cli->cl_writeback_work);
2240                 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
2241         }
2242         return rc;
2243 }
2244
2245 static int osc_io_unplug_async(const struct lu_env *env,
2246                                struct client_obd *cli, struct osc_object *osc)
2247 {
2248         return osc_io_unplug0(env, cli, osc, 1);
2249 }
2250
2251 void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
2252                    struct osc_object *osc)
2253 {
2254         (void)osc_io_unplug0(env, cli, osc, 0);
2255 }
2256
2257 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
2258                         struct page *page, loff_t offset)
2259 {
2260         struct obd_export *exp = osc_export(osc);
2261         struct osc_async_page *oap = &ops->ops_oap;
2262
2263         if (!page)
2264                 return cfs_size_round(sizeof(*oap));
2265
2266         oap->oap_magic = OAP_MAGIC;
2267         oap->oap_cli = &exp->exp_obd->u.cli;
2268         oap->oap_obj = osc;
2269
2270         oap->oap_page = page;
2271         oap->oap_obj_off = offset;
2272         LASSERT(!(offset & ~PAGE_MASK));
2273
2274         if (capable(CFS_CAP_SYS_RESOURCE))
2275                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2276
2277         INIT_LIST_HEAD(&oap->oap_pending_item);
2278         INIT_LIST_HEAD(&oap->oap_rpc_item);
2279
2280         spin_lock_init(&oap->oap_lock);
2281         CDEBUG(D_INFO, "oap %p page %p obj off %llu\n",
2282                oap, page, oap->oap_obj_off);
2283         return 0;
2284 }
2285
2286 int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
2287                        struct osc_page *ops)
2288 {
2289         struct osc_io *oio = osc_env_io(env);
2290         struct osc_extent *ext = NULL;
2291         struct osc_async_page *oap = &ops->ops_oap;
2292         struct client_obd *cli = oap->oap_cli;
2293         struct osc_object *osc = oap->oap_obj;
2294         pgoff_t index;
2295         int grants = 0;
2296         int brw_flags = OBD_BRW_ASYNC;
2297         int cmd = OBD_BRW_WRITE;
2298         int need_release = 0;
2299         int rc = 0;
2300
2301         if (oap->oap_magic != OAP_MAGIC)
2302                 return -EINVAL;
2303
2304         if (!cli->cl_import || cli->cl_import->imp_invalid)
2305                 return -EIO;
2306
2307         if (!list_empty(&oap->oap_pending_item) ||
2308             !list_empty(&oap->oap_rpc_item))
2309                 return -EBUSY;
2310
2311         /* Set the OBD_BRW_SRVLOCK before the page is queued. */
2312         brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
2313         if (capable(CFS_CAP_SYS_RESOURCE)) {
2314                 brw_flags |= OBD_BRW_NOQUOTA;
2315                 cmd |= OBD_BRW_NOQUOTA;
2316         }
2317
2318         /* check if the file's owner/group is over quota */
2319         if (!(cmd & OBD_BRW_NOQUOTA)) {
2320                 struct cl_object *obj;
2321                 struct cl_attr *attr;
2322                 unsigned int qid[MAXQUOTAS];
2323
2324                 obj = cl_object_top(&osc->oo_cl);
2325                 attr = &osc_env_info(env)->oti_attr;
2326
2327                 cl_object_attr_lock(obj);
2328                 rc = cl_object_attr_get(env, obj, attr);
2329                 cl_object_attr_unlock(obj);
2330
2331                 qid[USRQUOTA] = attr->cat_uid;
2332                 qid[GRPQUOTA] = attr->cat_gid;
2333                 if (rc == 0 && osc_quota_chkdq(cli, qid) == NO_QUOTA)
2334                         rc = -EDQUOT;
2335                 if (rc)
2336                         return rc;
2337         }
2338
2339         oap->oap_cmd = cmd;
2340         oap->oap_page_off = ops->ops_from;
2341         oap->oap_count = ops->ops_to - ops->ops_from;
2342         /*
2343          * No need to hold a lock here,
2344          * since this page is not in any list yet.
2345          */
2346         oap->oap_async_flags = 0;
2347         oap->oap_brw_flags = brw_flags;
2348
2349         OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n",
2350                      oap, oap->oap_page, oap->oap_cmd & OBD_BRW_RWMASK);
2351
2352         index = osc_index(oap2osc(oap));
2353
2354         /* Add this page into extent by the following steps:
2355          * 1. if there exists an active extent for this IO, mostly this page
2356          *    can be added to the active extent and sometimes we need to
2357          *    expand extent to accommodate this page;
2358          * 2. otherwise, a new extent will be allocated.
2359          */
2360
2361         ext = oio->oi_active;
2362         if (ext && ext->oe_start <= index && ext->oe_max_end >= index) {
2363                 /* one chunk plus extent overhead must be enough to write this
2364                  * page
2365                  */
2366                 grants = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
2367                 if (ext->oe_end >= index)
2368                         grants = 0;
2369
2370                 /* it doesn't need any grant to dirty this page */
2371                 spin_lock(&cli->cl_loi_list_lock);
2372                 rc = osc_enter_cache_try(cli, oap, grants, 0);
2373                 spin_unlock(&cli->cl_loi_list_lock);
2374                 if (rc == 0) { /* try failed */
2375                         grants = 0;
2376                         need_release = 1;
2377                 } else if (ext->oe_end < index) {
2378                         int tmp = grants;
2379                         /* try to expand this extent */
2380                         rc = osc_extent_expand(ext, index, &tmp);
2381                         if (rc < 0) {
2382                                 need_release = 1;
2383                                 /* don't free reserved grant */
2384                         } else {
2385                                 OSC_EXTENT_DUMP(D_CACHE, ext,
2386                                                 "expanded for %lu.\n", index);
2387                                 osc_unreserve_grant(cli, grants, tmp);
2388                                 grants = 0;
2389                         }
2390                 }
2391                 rc = 0;
2392         } else if (ext) {
2393                 /* index is located outside of active extent */
2394                 need_release = 1;
2395         }
2396         if (need_release) {
2397                 osc_extent_release(env, ext);
2398                 oio->oi_active = NULL;
2399                 ext = NULL;
2400         }
2401
2402         if (!ext) {
2403                 int tmp = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
2404
2405                 /* try to find new extent to cover this page */
2406                 LASSERT(!oio->oi_active);
2407                 /* we may have allocated grant for this page if we failed
2408                  * to expand the previous active extent.
2409                  */
2410                 LASSERT(ergo(grants > 0, grants >= tmp));
2411
2412                 rc = 0;
2413                 if (grants == 0) {
2414                         /* we haven't allocated grant for this page. */
2415                         rc = osc_enter_cache(env, cli, oap, tmp);
2416                         if (rc == 0)
2417                                 grants = tmp;
2418                 }
2419
2420                 tmp = grants;
2421                 if (rc == 0) {
2422                         ext = osc_extent_find(env, osc, index, &tmp);
2423                         if (IS_ERR(ext)) {
2424                                 LASSERT(tmp == grants);
2425                                 osc_exit_cache(cli, oap);
2426                                 rc = PTR_ERR(ext);
2427                                 ext = NULL;
2428                         } else {
2429                                 oio->oi_active = ext;
2430                         }
2431                 }
2432                 if (grants > 0)
2433                         osc_unreserve_grant(cli, grants, tmp);
2434         }
2435
2436         LASSERT(ergo(rc == 0, ext));
2437         if (ext) {
2438                 EASSERTF(ext->oe_end >= index && ext->oe_start <= index,
2439                          ext, "index = %lu.\n", index);
2440                 LASSERT((oap->oap_brw_flags & OBD_BRW_FROM_GRANT) != 0);
2441
2442                 osc_object_lock(osc);
2443                 if (ext->oe_nr_pages == 0)
2444                         ext->oe_srvlock = ops->ops_srvlock;
2445                 else
2446                         LASSERT(ext->oe_srvlock == ops->ops_srvlock);
2447                 ++ext->oe_nr_pages;
2448                 list_add_tail(&oap->oap_pending_item, &ext->oe_pages);
2449                 osc_object_unlock(osc);
2450         }
2451         return rc;
2452 }
2453
2454 int osc_teardown_async_page(const struct lu_env *env,
2455                             struct osc_object *obj, struct osc_page *ops)
2456 {
2457         struct osc_async_page *oap = &ops->ops_oap;
2458         struct osc_extent *ext = NULL;
2459         int rc = 0;
2460
2461         LASSERT(oap->oap_magic == OAP_MAGIC);
2462
2463         CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
2464                oap, ops, osc_index(oap2osc(oap)));
2465
2466         osc_object_lock(obj);
2467         if (!list_empty(&oap->oap_rpc_item)) {
2468                 CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap);
2469                 rc = -EBUSY;
2470         } else if (!list_empty(&oap->oap_pending_item)) {
2471                 ext = osc_extent_lookup(obj, osc_index(oap2osc(oap)));
2472                 /* only truncated pages are allowed to be taken out.
2473                  * See osc_extent_truncate() and osc_cache_truncate_start()
2474                  * for details.
2475                  */
2476                 if (ext && ext->oe_state != OES_TRUNC) {
2477                         OSC_EXTENT_DUMP(D_ERROR, ext, "trunc at %lu.\n",
2478                                         osc_index(oap2osc(oap)));
2479                         rc = -EBUSY;
2480                 }
2481         }
2482         osc_object_unlock(obj);
2483         if (ext)
2484                 osc_extent_put(env, ext);
2485         return rc;
2486 }
2487
2488 /**
2489  * This is called when a page is picked up by kernel to write out.
2490  *
2491  * We should find out the corresponding extent and add the whole extent
2492  * into urgent list. The extent may be being truncated or used, handle it
2493  * carefully.
2494  */
2495 int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
2496                          struct osc_page *ops)
2497 {
2498         struct osc_extent *ext = NULL;
2499         struct osc_object *obj = cl2osc(ops->ops_cl.cpl_obj);
2500         struct cl_page *cp = ops->ops_cl.cpl_page;
2501         pgoff_t            index = osc_index(ops);
2502         struct osc_async_page *oap = &ops->ops_oap;
2503         bool unplug = false;
2504         int rc = 0;
2505
2506         osc_object_lock(obj);
2507         ext = osc_extent_lookup(obj, index);
2508         if (!ext) {
2509                 osc_extent_tree_dump(D_ERROR, obj);
2510                 LASSERTF(0, "page index %lu is NOT covered.\n", index);
2511         }
2512
2513         switch (ext->oe_state) {
2514         case OES_RPC:
2515         case OES_LOCK_DONE:
2516                 CL_PAGE_DEBUG(D_ERROR, env, cp, "flush an in-rpc page?\n");
2517                 LASSERT(0);
2518                 break;
2519         case OES_LOCKING:
2520                 /* If we know this extent is being written out, we should abort
2521                  * so that the writer can make this page ready. Otherwise, there
2522                  * exists a deadlock problem because other process can wait for
2523                  * page writeback bit holding page lock; and meanwhile in
2524                  * vvp_page_make_ready(), we need to grab page lock before
2525                  * really sending the RPC.
2526                  */
2527         case OES_TRUNC:
2528                 /* race with truncate, page will be redirtied */
2529         case OES_ACTIVE:
2530                 /* The extent is active so we need to abort and let the caller
2531                  * re-dirty the page. If we continued on here, and we were the
2532                  * one making the extent active, we could deadlock waiting for
2533                  * the page writeback to clear but it won't because the extent
2534                  * is active and won't be written out.
2535                  */
2536                 rc = -EAGAIN;
2537                 goto out;
2538         default:
2539                 break;
2540         }
2541
2542         rc = cl_page_prep(env, io, cp, CRT_WRITE);
2543         if (rc)
2544                 goto out;
2545
2546         spin_lock(&oap->oap_lock);
2547         oap->oap_async_flags |= ASYNC_READY | ASYNC_URGENT;
2548         spin_unlock(&oap->oap_lock);
2549
2550         if (memory_pressure_get())
2551                 ext->oe_memalloc = 1;
2552
2553         ext->oe_urgent = 1;
2554         if (ext->oe_state == OES_CACHE) {
2555                 OSC_EXTENT_DUMP(D_CACHE, ext,
2556                                 "flush page %p make it urgent.\n", oap);
2557                 if (list_empty(&ext->oe_link))
2558                         list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
2559                 unplug = true;
2560         }
2561         rc = 0;
2562
2563 out:
2564         osc_object_unlock(obj);
2565         osc_extent_put(env, ext);
2566         if (unplug)
2567                 osc_io_unplug_async(env, osc_cli(obj), obj);
2568         return rc;
2569 }
2570
2571 /**
2572  * this is called when a sync waiter receives an interruption.  Its job is to
2573  * get the caller woken as soon as possible.  If its page hasn't been put in an
2574  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2575  * desiring interruption which will forcefully complete the rpc once the rpc
2576  * has timed out.
2577  */
2578 int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
2579 {
2580         struct osc_async_page *oap = &ops->ops_oap;
2581         struct osc_object *obj = oap->oap_obj;
2582         struct client_obd *cli = osc_cli(obj);
2583         struct osc_extent *ext;
2584         struct osc_extent *found = NULL;
2585         struct list_head *plist;
2586         pgoff_t index = osc_index(ops);
2587         int rc = -EBUSY;
2588         int cmd;
2589
2590         LASSERT(!oap->oap_interrupted);
2591         oap->oap_interrupted = 1;
2592
2593         /* Find out the caching extent */
2594         osc_object_lock(obj);
2595         if (oap->oap_cmd & OBD_BRW_WRITE) {
2596                 plist = &obj->oo_urgent_exts;
2597                 cmd = OBD_BRW_WRITE;
2598         } else {
2599                 plist = &obj->oo_reading_exts;
2600                 cmd = OBD_BRW_READ;
2601         }
2602         list_for_each_entry(ext, plist, oe_link) {
2603                 if (ext->oe_start <= index && ext->oe_end >= index) {
2604                         LASSERT(ext->oe_state == OES_LOCK_DONE);
2605                         /* For OES_LOCK_DONE state extent, it has already held
2606                          * a refcount for RPC.
2607                          */
2608                         found = osc_extent_get(ext);
2609                         break;
2610                 }
2611         }
2612         if (found) {
2613                 list_del_init(&found->oe_link);
2614                 osc_update_pending(obj, cmd, -found->oe_nr_pages);
2615                 osc_object_unlock(obj);
2616
2617                 osc_extent_finish(env, found, 0, -EINTR);
2618                 osc_extent_put(env, found);
2619                 rc = 0;
2620         } else {
2621                 osc_object_unlock(obj);
2622                 /* ok, it's been put in an rpc. only one oap gets a request
2623                  * reference
2624                  */
2625                 if (oap->oap_request) {
2626                         ptlrpc_mark_interrupted(oap->oap_request);
2627                         ptlrpcd_wake(oap->oap_request);
2628                         ptlrpc_req_finished(oap->oap_request);
2629                         oap->oap_request = NULL;
2630                 }
2631         }
2632
2633         osc_list_maint(cli, obj);
2634         return rc;
2635 }
2636
2637 int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
2638                          struct list_head *list, int cmd, int brw_flags)
2639 {
2640         struct client_obd *cli = osc_cli(obj);
2641         struct osc_extent *ext;
2642         struct osc_async_page *oap, *tmp;
2643         int page_count = 0;
2644         int mppr = cli->cl_max_pages_per_rpc;
2645         pgoff_t start = CL_PAGE_EOF;
2646         pgoff_t end = 0;
2647
2648         list_for_each_entry(oap, list, oap_pending_item) {
2649                 pgoff_t index = osc_index(oap2osc(oap));
2650
2651                 if (index > end)
2652                         end = index;
2653                 if (index < start)
2654                         start = index;
2655                 ++page_count;
2656                 mppr <<= (page_count > mppr);
2657         }
2658
2659         ext = osc_extent_alloc(obj);
2660         if (!ext) {
2661                 list_for_each_entry_safe(oap, tmp, list, oap_pending_item) {
2662                         list_del_init(&oap->oap_pending_item);
2663                         osc_ap_completion(env, cli, oap, 0, -ENOMEM);
2664                 }
2665                 return -ENOMEM;
2666         }
2667
2668         ext->oe_rw = !!(cmd & OBD_BRW_READ);
2669         ext->oe_sync = 1;
2670         ext->oe_urgent = 1;
2671         ext->oe_start = start;
2672         ext->oe_end = end;
2673         ext->oe_max_end = end;
2674         ext->oe_obj = obj;
2675         ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
2676         ext->oe_nr_pages = page_count;
2677         ext->oe_mppr = mppr;
2678         list_splice_init(list, &ext->oe_pages);
2679
2680         osc_object_lock(obj);
2681         /* Reuse the initial refcount for RPC, don't drop it */
2682         osc_extent_state_set(ext, OES_LOCK_DONE);
2683         if (cmd & OBD_BRW_WRITE) {
2684                 list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
2685                 osc_update_pending(obj, OBD_BRW_WRITE, page_count);
2686         } else {
2687                 list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
2688                 osc_update_pending(obj, OBD_BRW_READ, page_count);
2689         }
2690         osc_object_unlock(obj);
2691
2692         osc_io_unplug_async(env, cli, obj);
2693         return 0;
2694 }
2695
2696 /**
2697  * Called by osc_io_setattr_start() to freeze and destroy covering extents.
2698  */
2699 int osc_cache_truncate_start(const struct lu_env *env, struct osc_io *oio,
2700                              struct osc_object *obj, __u64 size)
2701 {
2702         struct client_obd *cli = osc_cli(obj);
2703         struct osc_extent *ext;
2704         struct osc_extent *temp;
2705         struct osc_extent *waiting = NULL;
2706         pgoff_t index;
2707         LIST_HEAD(list);
2708         int result = 0;
2709         bool partial;
2710
2711         /* pages with index greater or equal to index will be truncated. */
2712         index = cl_index(osc2cl(obj), size);
2713         partial = size > cl_offset(osc2cl(obj), index);
2714
2715 again:
2716         osc_object_lock(obj);
2717         ext = osc_extent_search(obj, index);
2718         if (!ext)
2719                 ext = first_extent(obj);
2720         else if (ext->oe_end < index)
2721                 ext = next_extent(ext);
2722         while (ext) {
2723                 EASSERT(ext->oe_state != OES_TRUNC, ext);
2724
2725                 if (ext->oe_state > OES_CACHE || ext->oe_urgent) {
2726                         /* if ext is in urgent state, it means there must exist
2727                          * a page already having been flushed by write_page().
2728                          * We have to wait for this extent because we can't
2729                          * truncate that page.
2730                          */
2731                         LASSERT(!ext->oe_hp);
2732                         OSC_EXTENT_DUMP(D_CACHE, ext,
2733                                         "waiting for busy extent\n");
2734                         waiting = osc_extent_get(ext);
2735                         break;
2736                 }
2737
2738                 OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:%llu.\n", size);
2739
2740                 osc_extent_get(ext);
2741                 if (ext->oe_state == OES_ACTIVE) {
2742                         /* though we grab inode mutex for write path, but we
2743                          * release it before releasing extent(in osc_io_end()),
2744                          * so there is a race window that an extent is still
2745                          * in OES_ACTIVE when truncate starts.
2746                          */
2747                         LASSERT(!ext->oe_trunc_pending);
2748                         ext->oe_trunc_pending = 1;
2749                 } else {
2750                         EASSERT(ext->oe_state == OES_CACHE, ext);
2751                         osc_extent_state_set(ext, OES_TRUNC);
2752                         osc_update_pending(obj, OBD_BRW_WRITE,
2753                                            -ext->oe_nr_pages);
2754                 }
2755                 EASSERT(list_empty(&ext->oe_link), ext);
2756                 list_add_tail(&ext->oe_link, &list);
2757
2758                 ext = next_extent(ext);
2759         }
2760         osc_object_unlock(obj);
2761
2762         osc_list_maint(cli, obj);
2763
2764         list_for_each_entry_safe(ext, temp, &list, oe_link) {
2765                 int rc;
2766
2767                 list_del_init(&ext->oe_link);
2768
2769                 /* extent may be in OES_ACTIVE state because inode mutex
2770                  * is released before osc_io_end() in file write case
2771                  */
2772                 if (ext->oe_state != OES_TRUNC)
2773                         osc_extent_wait(env, ext, OES_TRUNC);
2774
2775                 rc = osc_extent_truncate(ext, index, partial);
2776                 if (rc < 0) {
2777                         if (result == 0)
2778                                 result = rc;
2779
2780                         OSC_EXTENT_DUMP(D_ERROR, ext,
2781                                         "truncate error %d\n", rc);
2782                 } else if (ext->oe_nr_pages == 0) {
2783                         osc_extent_remove(ext);
2784                 } else {
2785                         /* this must be an overlapped extent which means only
2786                          * part of pages in this extent have been truncated.
2787                          */
2788                         EASSERTF(ext->oe_start <= index, ext,
2789                                  "trunc index = %lu/%d.\n", index, partial);
2790                         /* fix index to skip this partially truncated extent */
2791                         index = ext->oe_end + 1;
2792                         partial = false;
2793
2794                         /* we need to hold this extent in OES_TRUNC state so
2795                          * that no writeback will happen. This is to avoid
2796                          * BUG 17397.
2797                          */
2798                         LASSERT(!oio->oi_trunc);
2799                         oio->oi_trunc = osc_extent_get(ext);
2800                         OSC_EXTENT_DUMP(D_CACHE, ext,
2801                                         "trunc at %llu\n", size);
2802                 }
2803                 osc_extent_put(env, ext);
2804         }
2805         if (waiting) {
2806                 int rc;
2807
2808                 /* ignore the result of osc_extent_wait the write initiator
2809                  * should take care of it.
2810                  */
2811                 rc = osc_extent_wait(env, waiting, OES_INV);
2812                 if (rc < 0)
2813                         OSC_EXTENT_DUMP(D_CACHE, waiting, "error: %d.\n", rc);
2814
2815                 osc_extent_put(env, waiting);
2816                 waiting = NULL;
2817                 goto again;
2818         }
2819         return result;
2820 }
2821
2822 /**
2823  * Called after osc_io_setattr_end to add oio->oi_trunc back to cache.
2824  */
2825 void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio,
2826                             struct osc_object *obj)
2827 {
2828         struct osc_extent *ext = oio->oi_trunc;
2829
2830         oio->oi_trunc = NULL;
2831         if (ext) {
2832                 bool unplug = false;
2833
2834                 EASSERT(ext->oe_nr_pages > 0, ext);
2835                 EASSERT(ext->oe_state == OES_TRUNC, ext);
2836                 EASSERT(!ext->oe_urgent, ext);
2837
2838                 OSC_EXTENT_DUMP(D_CACHE, ext, "trunc -> cache.\n");
2839                 osc_object_lock(obj);
2840                 osc_extent_state_set(ext, OES_CACHE);
2841                 if (ext->oe_fsync_wait && !ext->oe_urgent) {
2842                         ext->oe_urgent = 1;
2843                         list_move_tail(&ext->oe_link, &obj->oo_urgent_exts);
2844                         unplug = true;
2845                 }
2846                 osc_update_pending(obj, OBD_BRW_WRITE, ext->oe_nr_pages);
2847                 osc_object_unlock(obj);
2848                 osc_extent_put(env, ext);
2849
2850                 if (unplug)
2851                         osc_io_unplug_async(env, osc_cli(obj), obj);
2852         }
2853 }
2854
2855 /**
2856  * Wait for extents in a specific range to be written out.
2857  * The caller must have called osc_cache_writeback_range() to issue IO
2858  * otherwise it will take a long time for this function to finish.
2859  *
2860  * Caller must hold inode_mutex , or cancel exclusive dlm lock so that
2861  * nobody else can dirty this range of file while we're waiting for
2862  * extents to be written.
2863  */
2864 int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj,
2865                          pgoff_t start, pgoff_t end)
2866 {
2867         struct osc_extent *ext;
2868         pgoff_t index = start;
2869         int result = 0;
2870
2871 again:
2872         osc_object_lock(obj);
2873         ext = osc_extent_search(obj, index);
2874         if (!ext)
2875                 ext = first_extent(obj);
2876         else if (ext->oe_end < index)
2877                 ext = next_extent(ext);
2878         while (ext) {
2879                 int rc;
2880
2881                 if (ext->oe_start > end)
2882                         break;
2883
2884                 if (!ext->oe_fsync_wait) {
2885                         ext = next_extent(ext);
2886                         continue;
2887                 }
2888
2889                 EASSERT(ergo(ext->oe_state == OES_CACHE,
2890                              ext->oe_hp || ext->oe_urgent), ext);
2891                 EASSERT(ergo(ext->oe_state == OES_ACTIVE,
2892                              !ext->oe_hp && ext->oe_urgent), ext);
2893
2894                 index = ext->oe_end + 1;
2895                 osc_extent_get(ext);
2896                 osc_object_unlock(obj);
2897
2898                 rc = osc_extent_wait(env, ext, OES_INV);
2899                 if (result == 0)
2900                         result = rc;
2901                 osc_extent_put(env, ext);
2902                 goto again;
2903         }
2904         osc_object_unlock(obj);
2905
2906         OSC_IO_DEBUG(obj, "sync file range.\n");
2907         return result;
2908 }
2909
2910 /**
2911  * Called to write out a range of osc object.
2912  *
2913  * @hp     : should be set this is caused by lock cancel;
2914  * @discard: is set if dirty pages should be dropped - file will be deleted or
2915  *         truncated, this implies there is no partially discarding extents.
2916  *
2917  * Return how many pages will be issued, or error code if error occurred.
2918  */
2919 int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
2920                               pgoff_t start, pgoff_t end, int hp, int discard)
2921 {
2922         struct osc_extent *ext;
2923         LIST_HEAD(discard_list);
2924         bool unplug = false;
2925         int result = 0;
2926
2927         osc_object_lock(obj);
2928         ext = osc_extent_search(obj, start);
2929         if (!ext)
2930                 ext = first_extent(obj);
2931         else if (ext->oe_end < start)
2932                 ext = next_extent(ext);
2933         while (ext) {
2934                 if (ext->oe_start > end)
2935                         break;
2936
2937                 ext->oe_fsync_wait = 1;
2938                 switch (ext->oe_state) {
2939                 case OES_CACHE:
2940                         result += ext->oe_nr_pages;
2941                         if (!discard) {
2942                                 struct list_head *list = NULL;
2943
2944                                 if (hp) {
2945                                         EASSERT(!ext->oe_hp, ext);
2946                                         ext->oe_hp = 1;
2947                                         list = &obj->oo_hp_exts;
2948                                 } else if (!ext->oe_urgent) {
2949                                         ext->oe_urgent = 1;
2950                                         list = &obj->oo_urgent_exts;
2951                                 }
2952                                 if (list)
2953                                         list_move_tail(&ext->oe_link, list);
2954                                 unplug = true;
2955                         } else {
2956                                 /* the only discarder is lock cancelling, so
2957                                  * [start, end] must contain this extent
2958                                  */
2959                                 EASSERT(ext->oe_start >= start &&
2960                                         ext->oe_max_end <= end, ext);
2961                                 osc_extent_state_set(ext, OES_LOCKING);
2962                                 ext->oe_owner = current;
2963                                 list_move_tail(&ext->oe_link, &discard_list);
2964                                 osc_update_pending(obj, OBD_BRW_WRITE,
2965                                                    -ext->oe_nr_pages);
2966                         }
2967                         break;
2968                 case OES_ACTIVE:
2969                         /* It's pretty bad to wait for ACTIVE extents, because
2970                          * we don't know how long we will wait for it to be
2971                          * flushed since it may be blocked at awaiting more
2972                          * grants. We do this for the correctness of fsync.
2973                          */
2974                         LASSERT(hp == 0 && discard == 0);
2975                         ext->oe_urgent = 1;
2976                         break;
2977                 case OES_TRUNC:
2978                         /* this extent is being truncated, can't do anything
2979                          * for it now. it will be set to urgent after truncate
2980                          * is finished in osc_cache_truncate_end().
2981                          */
2982                 default:
2983                         break;
2984                 }
2985                 ext = next_extent(ext);
2986         }
2987         osc_object_unlock(obj);
2988
2989         LASSERT(ergo(!discard, list_empty(&discard_list)));
2990         if (!list_empty(&discard_list)) {
2991                 struct osc_extent *tmp;
2992                 int rc;
2993
2994                 osc_list_maint(osc_cli(obj), obj);
2995                 list_for_each_entry_safe(ext, tmp, &discard_list, oe_link) {
2996                         list_del_init(&ext->oe_link);
2997                         EASSERT(ext->oe_state == OES_LOCKING, ext);
2998
2999                         /* Discard caching pages. We don't actually write this
3000                          * extent out but we complete it as if we did.
3001                          */
3002                         rc = osc_extent_make_ready(env, ext);
3003                         if (unlikely(rc < 0)) {
3004                                 OSC_EXTENT_DUMP(D_ERROR, ext,
3005                                                 "make_ready returned %d\n", rc);
3006                                 if (result >= 0)
3007                                         result = rc;
3008                         }
3009
3010                         /* finish the extent as if the pages were sent */
3011                         osc_extent_finish(env, ext, 0, 0);
3012                 }
3013         }
3014
3015         if (unplug)
3016                 osc_io_unplug(env, osc_cli(obj), obj);
3017
3018         if (hp || discard) {
3019                 int rc;
3020
3021                 rc = osc_cache_wait_range(env, obj, start, end);
3022                 if (result >= 0 && rc < 0)
3023                         result = rc;
3024         }
3025
3026         OSC_IO_DEBUG(obj, "pageout [%lu, %lu], %d.\n", start, end, result);
3027         return result;
3028 }
3029
3030 /**
3031  * Returns a list of pages by a given [start, end] of \a obj.
3032  *
3033  * \param resched If not NULL, then we give up before hogging CPU for too
3034  * long and set *resched = 1, in that case caller should implement a retry
3035  * logic.
3036  *
3037  * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
3038  * crucial in the face of [offset, EOF] locks.
3039  *
3040  * Return at least one page in @queue unless there is no covered page.
3041  */
3042 int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
3043                          struct osc_object *osc, pgoff_t start, pgoff_t end,
3044                          osc_page_gang_cbt cb, void *cbdata)
3045 {
3046         struct osc_page *ops;
3047         void            **pvec;
3048         pgoff_t         idx;
3049         unsigned int    nr;
3050         unsigned int    i;
3051         unsigned int    j;
3052         int             res = CLP_GANG_OKAY;
3053         bool            tree_lock = true;
3054
3055         idx = start;
3056         pvec = osc_env_info(env)->oti_pvec;
3057         spin_lock(&osc->oo_tree_lock);
3058         while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
3059                                             idx, OTI_PVEC_SIZE)) > 0) {
3060                 struct cl_page *page;
3061                 bool end_of_region = false;
3062
3063                 for (i = 0, j = 0; i < nr; ++i) {
3064                         ops = pvec[i];
3065                         pvec[i] = NULL;
3066
3067                         idx = osc_index(ops);
3068                         if (idx > end) {
3069                                 end_of_region = true;
3070                                 break;
3071                         }
3072
3073                         page = ops->ops_cl.cpl_page;
3074                         LASSERT(page->cp_type == CPT_CACHEABLE);
3075                         if (page->cp_state == CPS_FREEING)
3076                                 continue;
3077
3078                         cl_page_get(page);
3079                         lu_ref_add_atomic(&page->cp_reference,
3080                                           "gang_lookup", current);
3081                         pvec[j++] = ops;
3082                 }
3083                 ++idx;
3084
3085                 /*
3086                  * Here a delicate locking dance is performed. Current thread
3087                  * holds a reference to a page, but has to own it before it
3088                  * can be placed into queue. Owning implies waiting, so
3089                  * radix-tree lock is to be released. After a wait one has to
3090                  * check that pages weren't truncated (cl_page_own() returns
3091                  * error in the latter case).
3092                  */
3093                 spin_unlock(&osc->oo_tree_lock);
3094                 tree_lock = false;
3095
3096                 for (i = 0; i < j; ++i) {
3097                         ops = pvec[i];
3098                         if (res == CLP_GANG_OKAY)
3099                                 res = (*cb)(env, io, ops, cbdata);
3100
3101                         page = ops->ops_cl.cpl_page;
3102                         lu_ref_del(&page->cp_reference, "gang_lookup", current);
3103                         cl_page_put(env, page);
3104                 }
3105                 if (nr < OTI_PVEC_SIZE || end_of_region)
3106                         break;
3107
3108                 if (res == CLP_GANG_OKAY && need_resched())
3109                         res = CLP_GANG_RESCHED;
3110                 if (res != CLP_GANG_OKAY)
3111                         break;
3112
3113                 spin_lock(&osc->oo_tree_lock);
3114                 tree_lock = true;
3115         }
3116         if (tree_lock)
3117                 spin_unlock(&osc->oo_tree_lock);
3118         return res;
3119 }
3120
3121 /**
3122  * Check if page @page is covered by an extra lock or discard it.
3123  */
3124 static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
3125                                 struct osc_page *ops, void *cbdata)
3126 {
3127         struct osc_thread_info *info = osc_env_info(env);
3128         struct osc_object *osc = cbdata;
3129         pgoff_t index;
3130
3131         index = osc_index(ops);
3132         if (index >= info->oti_fn_index) {
3133                 struct ldlm_lock *tmp;
3134                 struct cl_page *page = ops->ops_cl.cpl_page;
3135
3136                 /* refresh non-overlapped index */
3137                 tmp = osc_dlmlock_at_pgoff(env, osc, index, 0, 0);
3138                 if (tmp) {
3139                         __u64 end = tmp->l_policy_data.l_extent.end;
3140                         /* Cache the first-non-overlapped index so as to skip
3141                          * all pages within [index, oti_fn_index). This is safe
3142                          * because if tmp lock is canceled, it will discard
3143                          * these pages.
3144                          */
3145                         info->oti_fn_index = cl_index(osc2cl(osc), end + 1);
3146                         if (end == OBD_OBJECT_EOF)
3147                                 info->oti_fn_index = CL_PAGE_EOF;
3148                         LDLM_LOCK_PUT(tmp);
3149                 } else if (cl_page_own(env, io, page) == 0) {
3150                         /* discard the page */
3151                         cl_page_discard(env, io, page);
3152                         cl_page_disown(env, io, page);
3153                 } else {
3154                         LASSERT(page->cp_state == CPS_FREEING);
3155                 }
3156         }
3157
3158         info->oti_next_index = index + 1;
3159         return CLP_GANG_OKAY;
3160 }
3161
3162 static int discard_cb(const struct lu_env *env, struct cl_io *io,
3163                       struct osc_page *ops, void *cbdata)
3164 {
3165         struct osc_thread_info *info = osc_env_info(env);
3166         struct cl_page *page = ops->ops_cl.cpl_page;
3167
3168         /* page is top page. */
3169         info->oti_next_index = osc_index(ops) + 1;
3170         if (cl_page_own(env, io, page) == 0) {
3171                 KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
3172                               !PageDirty(cl_page_vmpage(page))));
3173
3174                 /* discard the page */
3175                 cl_page_discard(env, io, page);
3176                 cl_page_disown(env, io, page);
3177         } else {
3178                 LASSERT(page->cp_state == CPS_FREEING);
3179         }
3180
3181         return CLP_GANG_OKAY;
3182 }
3183
3184 /**
3185  * Discard pages protected by the given lock. This function traverses radix
3186  * tree to find all covering pages and discard them. If a page is being covered
3187  * by other locks, it should remain in cache.
3188  *
3189  * If error happens on any step, the process continues anyway (the reasoning
3190  * behind this being that lock cancellation cannot be delayed indefinitely).
3191  */
3192 int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
3193                            pgoff_t start, pgoff_t end, enum cl_lock_mode mode)
3194 {
3195         struct osc_thread_info *info = osc_env_info(env);
3196         struct cl_io *io = &info->oti_io;
3197         osc_page_gang_cbt cb;
3198         int res;
3199         int result;
3200
3201         io->ci_obj = cl_object_top(osc2cl(osc));
3202         io->ci_ignore_layout = 1;
3203         result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
3204         if (result != 0)
3205                 goto out;
3206
3207         cb = mode == CLM_READ ? check_and_discard_cb : discard_cb;
3208         info->oti_fn_index = start;
3209         info->oti_next_index = start;
3210         do {
3211                 res = osc_page_gang_lookup(env, io, osc,
3212                                            info->oti_next_index, end, cb, osc);
3213                 if (info->oti_next_index > end)
3214                         break;
3215
3216                 if (res == CLP_GANG_RESCHED)
3217                         cond_resched();
3218         } while (res != CLP_GANG_OKAY);
3219 out:
3220         cl_io_fini(env, io);
3221         return result;
3222 }
3223
3224 /** @} osc */